package storage import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "io/ioutil" "os" "path" "path/filepath" "strings" "sync" "time" apexlog "github.com/apex/log" guuid "github.com/gofrs/uuid" "github.com/minio/sha256-simd" "github.com/notaryproject/notation-go-lib" notreg "github.com/notaryproject/notation/pkg/registry" godigest "github.com/opencontainers/go-digest" ispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/opencontainers/umoci" "github.com/opencontainers/umoci/oci/casext" artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" "github.com/rs/zerolog" zerr "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/extensions/monitoring" zlog "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/test" ) const ( // BlobUploadDir defines the upload directory for blob uploads. BlobUploadDir = ".uploads" SchemaVersion = 2 gcDelay = 1 * time.Hour DefaultFilePerms = 0o600 DefaultDirPerms = 0o700 RLOCK = "RLock" RWLOCK = "RWLock" ) // BlobUpload models and upload request. type BlobUpload struct { StoreName string ID string } type StoreController struct { DefaultStore ImageStore SubStore map[string]ImageStore } // ImageStoreFS provides the image storage operations. type ImageStoreFS struct { rootDir string lock *sync.RWMutex blobUploads map[string]BlobUpload cache *Cache gc bool dedupe bool commit bool log zerolog.Logger metrics monitoring.MetricServer } func (is *ImageStoreFS) RootDir() string { return is.rootDir } func (is *ImageStoreFS) DirExists(d string) bool { return DirExists(d) } func getRoutePrefix(name string) string { names := strings.SplitN(name, "/", 2) //nolint:gomnd if len(names) != 2 { //nolint:gomnd // it means route is of global storage e.g "centos:latest" if len(names) == 1 { return "/" } } return fmt.Sprintf("/%s", names[0]) } func (sc StoreController) GetImageStore(name string) ImageStore { if sc.SubStore != nil { // SubStore is being provided, now we need to find equivalent image store and this will be found by splitting name prefixName := getRoutePrefix(name) imgStore, ok := sc.SubStore[prefixName] if !ok { imgStore = sc.DefaultStore } return imgStore } return sc.DefaultStore } // NewImageStore returns a new image store backed by a file storage. func NewImageStore(rootDir string, gc, dedupe, commit bool, log zlog.Logger, metrics monitoring.MetricServer) ImageStore { if _, err := os.Stat(rootDir); os.IsNotExist(err) { if err := os.MkdirAll(rootDir, DefaultDirPerms); err != nil { log.Error().Err(err).Str("rootDir", rootDir).Msg("unable to create root dir") return nil } } imgStore := &ImageStoreFS{ rootDir: rootDir, lock: &sync.RWMutex{}, blobUploads: make(map[string]BlobUpload), gc: gc, dedupe: dedupe, commit: commit, log: log.With().Caller().Logger(), metrics: metrics, } if dedupe { imgStore.cache = NewCache(rootDir, "cache", log) } if gc { // we use umoci GC to perform garbage-collection, but it uses its own logger // - so capture those logs, could be useful apexlog.SetLevel(apexlog.DebugLevel) apexlog.SetHandler(apexlog.HandlerFunc(func(entry *apexlog.Entry) error { e := log.Debug() for k, v := range entry.Fields { e = e.Interface(k, v) } e.Msg(entry.Message) return nil })) } return imgStore } // RLock read-lock. func (is *ImageStoreFS) RLock(lockStart *time.Time) { *lockStart = time.Now() is.lock.RLock() } // RUnlock read-unlock. func (is *ImageStoreFS) RUnlock(lockStart *time.Time) { is.lock.RUnlock() lockEnd := time.Now() latency := lockEnd.Sub(*lockStart) monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RLOCK) // histogram } // Lock write-lock. func (is *ImageStoreFS) Lock(lockStart *time.Time) { *lockStart = time.Now() is.lock.Lock() } // Unlock write-unlock. func (is *ImageStoreFS) Unlock(lockStart *time.Time) { is.lock.Unlock() lockEnd := time.Now() latency := lockEnd.Sub(*lockStart) monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RWLOCK) // histogram } func (is *ImageStoreFS) initRepo(name string) error { repoDir := path.Join(is.rootDir, name) // create "blobs" subdir err := ensureDir(path.Join(repoDir, "blobs"), is.log) if err != nil { is.log.Error().Err(err).Msg("error creating blobs subdir") return err } // create BlobUploadDir subdir err = ensureDir(path.Join(repoDir, BlobUploadDir), is.log) if err != nil { is.log.Error().Err(err).Msg("error creating blob upload subdir") return err } // "oci-layout" file - create if it doesn't exist ilPath := path.Join(repoDir, ispec.ImageLayoutFile) if _, err := os.Stat(ilPath); err != nil { il := ispec.ImageLayout{Version: ispec.ImageLayoutVersion} buf, err := json.Marshal(il) if err != nil { is.log.Panic().Err(err).Msg("unable to marshal JSON") } if err := is.writeFile(ilPath, buf); err != nil { is.log.Error().Err(err).Str("file", ilPath).Msg("unable to write file") return err } } // "index.json" file - create if it doesn't exist indexPath := path.Join(repoDir, "index.json") if _, err := os.Stat(indexPath); err != nil { index := ispec.Index{} index.SchemaVersion = 2 buf, err := json.Marshal(index) if err != nil { is.log.Panic().Err(err).Msg("unable to marshal JSON") } if err := is.writeFile(indexPath, buf); err != nil { is.log.Error().Err(err).Str("file", indexPath).Msg("unable to write file") return err } } return nil } // InitRepo creates an image repository under this store. func (is *ImageStoreFS) InitRepo(name string) error { var lockLatency time.Time is.Lock(&lockLatency) defer is.Unlock(&lockLatency) return is.initRepo(name) } // ValidateRepo validates that the repository layout is complaint with the OCI repo layout. func (is *ImageStoreFS) ValidateRepo(name string) (bool, error) { // https://github.com/opencontainers/image-spec/blob/master/image-layout.md#content // at least, expect at least 3 entries - ["blobs", "oci-layout", "index.json"] // and an additional/optional BlobUploadDir in each image store dir := path.Join(is.rootDir, name) if !is.DirExists(dir) { return false, zerr.ErrRepoNotFound } files, err := ioutil.ReadDir(dir) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("unable to read directory") return false, zerr.ErrRepoNotFound } if len(files) < 3 { // nolint:gomnd return false, zerr.ErrRepoBadVersion } found := map[string]bool{ "blobs": false, ispec.ImageLayoutFile: false, "index.json": false, } for _, file := range files { if file.Name() == "blobs" && !file.IsDir() { return false, nil } found[file.Name()] = true } for k, v := range found { if !v && k != BlobUploadDir { return false, nil } } buf, err := ioutil.ReadFile(path.Join(dir, ispec.ImageLayoutFile)) if err != nil { return false, err } var il ispec.ImageLayout if err := json.Unmarshal(buf, &il); err != nil { return false, err } if il.Version != ispec.ImageLayoutVersion { return false, zerr.ErrRepoBadVersion } return true, nil } // GetRepositories returns a list of all the repositories under this store. func (is *ImageStoreFS) GetRepositories() ([]string, error) { var lockLatency time.Time dir := is.rootDir is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) _, err := ioutil.ReadDir(dir) if err != nil { is.log.Error().Err(err).Msg("failure walking storage root-dir") return nil, err } stores := make([]string, 0) err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { return nil } rel, err := filepath.Rel(is.rootDir, path) if err != nil { return nil // nolint:nilerr // ignore paths not relative to root dir } if ok, err := is.ValidateRepo(rel); !ok || err != nil { return nil // nolint:nilerr // ignore invalid repos } // is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store") stores = append(stores, rel) return nil }) return stores, err } // GetImageTags returns a list of image tags available in the specified repository. func (is *ImageStoreFS) GetImageTags(repo string) ([]string, error) { var lockLatency time.Time dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return nil, zerr.ErrRepoNotFound } is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") return nil, zerr.ErrRepoNotFound } var index ispec.Index if err := json.Unmarshal(buf, &index); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") return nil, zerr.ErrRepoNotFound } tags := make([]string, 0) for _, manifest := range index.Manifests { v, ok := manifest.Annotations[ispec.AnnotationRefName] if ok { tags = append(tags, v) } } return tags, nil } // GetImageManifest returns the image manifest of an image in the specific repository. func (is *ImageStoreFS) GetImageManifest(repo string, reference string) ([]byte, string, string, error) { var lockLatency time.Time dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return nil, "", "", zerr.ErrRepoNotFound } is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") if os.IsNotExist(err) { return nil, "", "", zerr.ErrRepoNotFound } return nil, "", "", err } var index ispec.Index if err := json.Unmarshal(buf, &index); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") return nil, "", "", err } found := false var digest godigest.Digest mediaType := "" for _, manifest := range index.Manifests { if reference == manifest.Digest.String() { digest = manifest.Digest mediaType = manifest.MediaType found = true break } v, ok := manifest.Annotations[ispec.AnnotationRefName] if ok && v == reference { digest = manifest.Digest mediaType = manifest.MediaType found = true break } } if !found { return nil, "", "", zerr.ErrManifestNotFound } p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) buf, err = ioutil.ReadFile(p) if err != nil { is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest") if os.IsNotExist(err) { return nil, "", "", zerr.ErrManifestNotFound } return nil, "", "", err } var manifest ispec.Manifest if err := json.Unmarshal(buf, &manifest); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") return nil, "", "", err } monitoring.IncDownloadCounter(is.metrics, repo) return buf, digest.String(), mediaType, nil } func (is *ImageStoreFS) validateOCIManifest(repo string, reference string, manifest *ispec.Manifest) (string, error) { if manifest.SchemaVersion != SchemaVersion { is.log.Error().Int("SchemaVersion", manifest.SchemaVersion).Msg("invalid manifest") return "", zerr.ErrBadManifest } // validate image config config := manifest.Config if config.MediaType != ispec.MediaTypeImageConfig { return "", zerr.ErrBadManifest } digest := config.Digest blobPath := is.BlobPath(repo, digest) if _, err := os.Stat(blobPath); err != nil { is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to find blob") return digest.String(), zerr.ErrBlobNotFound } blobFile, err := os.Open(blobPath) if err != nil { is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to find blob") return digest.String(), zerr.ErrBlobNotFound } defer blobFile.Close() dec := json.NewDecoder(blobFile) var cspec ispec.Image if err := dec.Decode(&cspec); err != nil { return "", zerr.ErrBadManifest } // validate the layers for _, l := range manifest.Layers { digest = l.Digest blobPath = is.BlobPath(repo, digest) is.log.Info().Str("blobPath", blobPath).Str("reference", reference).Msg("manifest layers") if _, err := os.Stat(blobPath); err != nil { is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to find blob") return digest.String(), zerr.ErrBlobNotFound } } return "", nil } // PutImageManifest adds an image manifest to the repository. func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaType string, body []byte) (string, error) { if err := is.InitRepo(repo); err != nil { is.log.Debug().Err(err).Msg("init repo") return "", err } // validate the manifest if !IsSupportedMediaType(mediaType) { is.log.Debug().Interface("actual", mediaType). Interface("expected", ispec.MediaTypeImageManifest).Msg("bad manifest media type") return "", zerr.ErrBadManifest } if len(body) == 0 { is.log.Debug().Int("len", len(body)).Msg("invalid body length") return "", zerr.ErrBadManifest } if mediaType == ispec.MediaTypeImageManifest { var manifest ispec.Manifest if err := json.Unmarshal(body, &manifest); err != nil { is.log.Error().Err(err).Msg("unable to unmarshal JSON") return "", zerr.ErrBadManifest } if manifest.Config.MediaType == ispec.MediaTypeImageConfig { digest, err := is.validateOCIManifest(repo, reference, &manifest) if err != nil { is.log.Error().Err(err).Msg("invalid oci image manifest") return digest, err } } } else if mediaType == artifactspec.MediaTypeArtifactManifest { var m notation.Descriptor if err := json.Unmarshal(body, &m); err != nil { is.log.Error().Err(err).Msg("unable to unmarshal JSON") return "", zerr.ErrBadManifest } } mDigest := godigest.FromBytes(body) refIsDigest := false d, err := godigest.Parse(reference) if err == nil { if d.String() != mDigest.String() { is.log.Error().Str("actual", mDigest.String()).Str("expected", d.String()). Msg("manifest digest is not valid") return "", zerr.ErrBadManifest } refIsDigest = true } var lockLatency time.Time is.Lock(&lockLatency) defer is.Unlock(&lockLatency) dir := path.Join(is.rootDir, repo) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") return "", err } var index ispec.Index if err := json.Unmarshal(buf, &index); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") return "", zerr.ErrRepoBadVersion } updateIndex := true // create a new descriptor desc := ispec.Descriptor{ MediaType: mediaType, Size: int64(len(body)), Digest: mDigest, Platform: &ispec.Platform{Architecture: "amd64", OS: "linux"}, } if !refIsDigest { desc.Annotations = map[string]string{ispec.AnnotationRefName: reference} } for midx, manifest := range index.Manifests { if reference == manifest.Digest.String() { // nothing changed, so don't update desc = manifest updateIndex = false break } v, ok := manifest.Annotations[ispec.AnnotationRefName] if ok && v == reference { if manifest.Digest.String() == mDigest.String() { // nothing changed, so don't update desc = manifest updateIndex = false break } // manifest contents have changed for the same tag, // so update index.json descriptor is.log.Info(). Int64("old size", desc.Size). Int64("new size", int64(len(body))). Str("old digest", desc.Digest.String()). Str("new digest", mDigest.String()). Msg("updating existing tag with new manifest contents") desc = manifest desc.Size = int64(len(body)) desc.Digest = mDigest index.Manifests = append(index.Manifests[:midx], index.Manifests[midx+1:]...) break } } if !updateIndex { return desc.Digest.String(), nil } // write manifest to "blobs" dir = path.Join(is.rootDir, repo, "blobs", mDigest.Algorithm().String()) _ = ensureDir(dir, is.log) file := path.Join(dir, mDigest.Encoded()) if err := is.writeFile(file, body); err != nil { is.log.Error().Err(err).Str("file", file).Msg("unable to write") return "", err } // now update "index.json" index.Manifests = append(index.Manifests, desc) dir = path.Join(is.rootDir, repo) file = path.Join(dir, "index.json") buf, err = json.Marshal(index) if err != nil { is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON") return "", err } if err := is.writeFile(file, buf); err != nil { is.log.Error().Err(err).Str("file", file).Msg("unable to write") return "", err } if is.gc { oci, err := umoci.OpenLayout(dir) if err != nil { return "", err } defer oci.Close() if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil { return "", err } } monitoring.SetStorageUsage(is.metrics, is.rootDir, repo) monitoring.IncUploadCounter(is.metrics, repo) return desc.Digest.String(), nil } // DeleteImageManifest deletes the image manifest from the repository. func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error { var lockLatency time.Time dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return zerr.ErrRepoNotFound } isTag := false // as per spec "reference" can be a digest and a tag digest, err := godigest.Parse(reference) if err != nil { is.log.Debug().Str("invalid digest: ", reference).Msg("storage: assuming tag") isTag = true } is.Lock(&lockLatency) defer is.Unlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") return err } var index ispec.Index if err := json.Unmarshal(buf, &index); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") return err } found := false var manifest ispec.Descriptor // we are deleting, so keep only those manifests that don't match outIndex := index outIndex.Manifests = []ispec.Descriptor{} for _, manifest = range index.Manifests { if isTag { tag, ok := manifest.Annotations[ispec.AnnotationRefName] if ok && tag == reference { is.log.Debug().Str("deleting tag", tag).Msg("") digest = manifest.Digest found = true continue } } else if reference == manifest.Digest.String() { is.log.Debug().Str("deleting reference", reference).Msg("") found = true continue } outIndex.Manifests = append(outIndex.Manifests, manifest) } if !found { return zerr.ErrManifestNotFound } // now update "index.json" dir = path.Join(is.rootDir, repo) file := path.Join(dir, "index.json") buf, err = json.Marshal(outIndex) if err != nil { return err } if err := is.writeFile(file, buf); err != nil { return err } if is.gc { oci, err := umoci.OpenLayout(dir) if err != nil { return err } defer oci.Close() if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil { return err } } // Delete blob only when blob digest not present in manifest entry. // e.g. 1.0.1 & 1.0.2 have same blob digest so if we delete 1.0.1, blob should not be removed. toDelete := true for _, manifest = range outIndex.Manifests { if digest.String() == manifest.Digest.String() { toDelete = false break } } if toDelete { p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) _ = os.Remove(p) } monitoring.SetStorageUsage(is.metrics, is.rootDir, repo) return nil } // BlobUploadPath returns the upload path for a blob in this store. func (is *ImageStoreFS) BlobUploadPath(repo string, uuid string) string { dir := path.Join(is.rootDir, repo) blobUploadPath := path.Join(dir, BlobUploadDir, uuid) return blobUploadPath } // NewBlobUpload returns the unique ID for an upload in progress. func (is *ImageStoreFS) NewBlobUpload(repo string) (string, error) { if err := is.InitRepo(repo); err != nil { is.log.Error().Err(err).Msg("error initializing repo") return "", err } uuid, err := guuid.NewV4() if err != nil { return "", err } uid := uuid.String() blobUploadPath := is.BlobUploadPath(repo, uid) file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, DefaultFilePerms) if err != nil { return "", zerr.ErrRepoNotFound } defer file.Close() return uid, nil } // GetBlobUpload returns the current size of a blob upload. func (is *ImageStoreFS) GetBlobUpload(repo string, uuid string) (int64, error) { blobUploadPath := is.BlobUploadPath(repo, uuid) binfo, err := os.Stat(blobUploadPath) if err != nil { if os.IsNotExist(err) { return -1, zerr.ErrUploadNotFound } return -1, err } return binfo.Size(), nil } // PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns // the number of actual bytes to the blob. func (is *ImageStoreFS) PutBlobChunkStreamed(repo string, uuid string, body io.Reader) (int64, error) { if err := is.InitRepo(repo); err != nil { return -1, err } blobUploadPath := is.BlobUploadPath(repo, uuid) _, err := os.Stat(blobUploadPath) if err != nil { return -1, zerr.ErrUploadNotFound } file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_CREATE, DefaultFilePerms) if err != nil { is.log.Error().Err(err).Msg("failed to open file") return -1, err } defer func() { if is.commit { _ = file.Sync() } _ = file.Close() }() if _, err := file.Seek(0, io.SeekEnd); err != nil { is.log.Error().Err(err).Msg("failed to seek file") return -1, err } n, err := io.Copy(file, body) return n, err } // PutBlobChunk writes another chunk of data to the specified blob. It returns // the number of actual bytes to the blob. func (is *ImageStoreFS) PutBlobChunk(repo string, uuid string, from int64, to int64, body io.Reader) (int64, error) { if err := is.InitRepo(repo); err != nil { return -1, err } blobUploadPath := is.BlobUploadPath(repo, uuid) binfo, err := os.Stat(blobUploadPath) if err != nil { return -1, zerr.ErrUploadNotFound } if from != binfo.Size() { is.log.Error().Int64("expected", from).Int64("actual", binfo.Size()). Msg("invalid range start for blob upload") return -1, zerr.ErrBadUploadRange } file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_CREATE, DefaultFilePerms) if err != nil { is.log.Error().Err(err).Msg("failed to open file") return -1, err } defer func() { if is.commit { _ = file.Sync() } _ = file.Close() }() if _, err := file.Seek(from, io.SeekStart); err != nil { is.log.Error().Err(err).Msg("failed to seek file") return -1, err } n, err := io.Copy(file, body) return n, err } // BlobUploadInfo returns the current blob size in bytes. func (is *ImageStoreFS) BlobUploadInfo(repo string, uuid string) (int64, error) { blobUploadPath := is.BlobUploadPath(repo, uuid) binfo, err := os.Stat(blobUploadPath) if err != nil { is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob") return -1, err } size := binfo.Size() return size, nil } // FinishBlobUpload finalizes the blob upload and moves blob the repository. func (is *ImageStoreFS) FinishBlobUpload(repo string, uuid string, body io.Reader, digest string) error { dstDigest, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") return zerr.ErrBadBlobDigest } src := is.BlobUploadPath(repo, uuid) _, err = os.Stat(src) if err != nil { is.log.Error().Err(err).Str("blob", src).Msg("failed to stat blob") return zerr.ErrUploadNotFound } blobFile, err := os.Open(src) if err != nil { is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") return zerr.ErrUploadNotFound } defer blobFile.Close() digester := sha256.New() _, err = io.Copy(digester, blobFile) if err != nil { is.log.Error().Err(err).Str("repo", repo).Str("blob", src).Str("digest", digest).Msg("unable to compute hash") return err } srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil))) if srcDigest != dstDigest { is.log.Error().Str("srcDigest", srcDigest.String()). Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest") return zerr.ErrBadBlobDigest } dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) var lockLatency time.Time is.Lock(&lockLatency) defer is.Unlock(&lockLatency) err = ensureDir(dir, is.log) if err != nil { is.log.Error().Err(err).Msg("error creating blobs/sha256 dir") return err } dst := is.BlobPath(repo, dstDigest) if is.dedupe && is.cache != nil { if err := is.DedupeBlob(src, dstDigest, dst); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to dedupe blob") return err } } else { if err := os.Rename(src, dst); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to finish blob") return err } } return nil } // FullBlobUpload handles a full blob upload, and no partial session is created. func (is *ImageStoreFS) FullBlobUpload(repo string, body io.Reader, digest string) (string, int64, error) { if err := is.InitRepo(repo); err != nil { return "", -1, err } dstDigest, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") return "", -1, zerr.ErrBadBlobDigest } u, err := guuid.NewV4() if err != nil { return "", -1, err } uuid := u.String() src := is.BlobUploadPath(repo, uuid) blobFile, err := os.Create(src) if err != nil { is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") return "", -1, zerr.ErrUploadNotFound } defer func() { if is.commit { _ = blobFile.Sync() } _ = blobFile.Close() }() digester := sha256.New() mw := io.MultiWriter(blobFile, digester) nbytes, err := io.Copy(mw, body) if err != nil { return "", -1, err } srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil))) if srcDigest != dstDigest { is.log.Error().Str("srcDigest", srcDigest.String()). Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest") return "", -1, zerr.ErrBadBlobDigest } dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) var lockLatency time.Time is.Lock(&lockLatency) defer is.Unlock(&lockLatency) _ = ensureDir(dir, is.log) dst := is.BlobPath(repo, dstDigest) if is.dedupe && is.cache != nil { if err := is.DedupeBlob(src, dstDigest, dst); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to dedupe blob") return "", -1, err } } else { if err := os.Rename(src, dst); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to finish blob") return "", -1, err } } return uuid, nbytes, nil } func (is *ImageStoreFS) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error { retry: is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: ENTER") dstRecord, err := is.cache.GetBlob(dstDigest.String()) if err != nil && !errors.Is(err, zerr.ErrCacheMiss) { is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to lookup blob record") return err } if dstRecord == "" { // cache record doesn't exist, so first disk and cache entry for this diges if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil { is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record") return err } // move the blob from uploads to final dest if err := os.Rename(src, dst); err != nil { is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob") return err } is.log.Debug().Str("src", src).Str("dst", dst).Msg("dedupe: rename") } else { // cache record exists, but due to GC and upgrades from older versions, // disk content and cache records may go out of sync dstRecord = path.Join(is.rootDir, dstRecord) dstRecordFi, err := os.Stat(dstRecord) if err != nil { is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") // the actual blob on disk may have been removed by GC, so sync the cache if err := is.cache.DeleteBlob(dstDigest.String(), dstRecord); err != nil { // nolint:lll is.log.Error().Err(err).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: unable to delete blob record") return err } goto retry } dstFi, err := os.Stat(dst) if err != nil && !os.IsNotExist(err) { is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") return err } if !os.SameFile(dstFi, dstRecordFi) { // blob lookup cache out of sync with actual disk contents if err := os.Remove(dst); err != nil && !os.IsNotExist(err) { is.log.Error().Err(err).Str("dst", dst).Msg("dedupe: unable to remove blob") return err } is.log.Debug().Str("blobPath", dst).Msg("dedupe: creating hard link") if err := os.Link(dstRecord, dst); err != nil { is.log.Error().Err(err).Str("blobPath", dst).Str("link", dstRecord).Msg("dedupe: unable to hard link") return err } } if err := os.Remove(src); err != nil { is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob") return err } is.log.Debug().Str("src", src).Msg("dedupe: remove") } return nil } // DeleteBlobUpload deletes an existing blob upload that is currently in progress. func (is *ImageStoreFS) DeleteBlobUpload(repo string, uuid string) error { blobUploadPath := is.BlobUploadPath(repo, uuid) if err := os.Remove(blobUploadPath); err != nil { is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload") return err } return nil } // BlobPath returns the repository path of a blob. func (is *ImageStoreFS) BlobPath(repo string, digest godigest.Digest) string { return path.Join(is.rootDir, repo, "blobs", digest.Algorithm().String(), digest.Encoded()) } // CheckBlob verifies a blob and returns true if the blob is correct. func (is *ImageStoreFS) CheckBlob(repo string, digest string) (bool, int64, error) { var lockLatency time.Time parsedDigest, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") return false, -1, zerr.ErrBadBlobDigest } blobPath := is.BlobPath(repo, parsedDigest) if is.dedupe && is.cache != nil { is.Lock(&lockLatency) defer is.Unlock(&lockLatency) } else { is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) } binfo, err := os.Stat(blobPath) if err == nil { is.log.Debug().Str("blob path", blobPath).Msg("blob path found") return true, binfo.Size(), nil } is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") // Check blobs in cache dstRecord, err := is.checkCacheBlob(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found") return false, -1, zerr.ErrBlobNotFound } // If found copy to location blobSize, err := is.copyBlob(repo, blobPath, dstRecord) if err != nil { return false, -1, zerr.ErrBlobNotFound } if err := is.cache.PutBlob(digest, blobPath); err != nil { is.log.Error().Err(err).Str("blobPath", blobPath).Msg("dedupe: unable to insert blob record") return false, -1, err } return true, blobSize, nil } func (is *ImageStoreFS) checkCacheBlob(digest string) (string, error) { if !is.dedupe || is.cache == nil { return "", zerr.ErrBlobNotFound } dstRecord, err := is.cache.GetBlob(digest) if err != nil { return "", err } dstRecord = path.Join(is.rootDir, dstRecord) is.log.Debug().Str("digest", digest).Str("dstRecord", dstRecord).Msg("cache: found dedupe record") return dstRecord, nil } func (is *ImageStoreFS) copyBlob(repo string, blobPath string, dstRecord string) (int64, error) { if err := is.initRepo(repo); err != nil { is.log.Error().Err(err).Str("repo", repo).Msg("unable to initialize an empty repo") return -1, err } _ = ensureDir(filepath.Dir(blobPath), is.log) if err := os.Link(dstRecord, blobPath); err != nil { is.log.Error().Err(err).Str("blobPath", blobPath).Str("link", dstRecord).Msg("dedupe: unable to hard link") return -1, zerr.ErrBlobNotFound } binfo, err := os.Stat(blobPath) if err == nil { return binfo.Size(), nil } return -1, zerr.ErrBlobNotFound } // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. func (is *ImageStoreFS) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) { var lockLatency time.Time parsedDigest, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") return nil, -1, zerr.ErrBadBlobDigest } blobPath := is.BlobPath(repo, parsedDigest) is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) binfo, err := os.Stat(blobPath) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") return nil, -1, zerr.ErrBlobNotFound } blobReader, err := os.Open(blobPath) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") return nil, -1, err } return blobReader, binfo.Size(), nil } func (is *ImageStoreFS) GetBlobContent(repo string, digest string) ([]byte, error) { blob, _, err := is.GetBlob(repo, digest, ispec.MediaTypeImageManifest) if err != nil { return []byte{}, err } buf := new(bytes.Buffer) _, err = buf.ReadFrom(blob) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to read blob") return []byte{}, err } return buf.Bytes(), nil } func (is *ImageStoreFS) GetIndexContent(repo string) ([]byte, error) { dir := path.Join(is.rootDir, repo) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { if os.IsNotExist(err) { is.log.Error().Err(err).Str("dir", dir).Msg("index.json doesn't exist") return []byte{}, zerr.ErrRepoNotFound } is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") return []byte{}, err } return buf, nil } // DeleteBlob removes the blob from the repository. func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error { var lockLatency time.Time dgst, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") return zerr.ErrBlobNotFound } blobPath := is.BlobPath(repo, dgst) is.Lock(&lockLatency) defer is.Unlock(&lockLatency) _, err = os.Stat(blobPath) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") return zerr.ErrBlobNotFound } if is.cache != nil { if err := is.cache.DeleteBlob(digest, blobPath); err != nil { is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache") return err } } if err := os.Remove(blobPath); err != nil { is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path") return err } return nil } func (is *ImageStoreFS) GetReferrers(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error) { var lockLatency time.Time dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return nil, zerr.ErrRepoNotFound } gdigest, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") return nil, zerr.ErrBadBlobDigest } is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") if os.IsNotExist(err) { return nil, zerr.ErrRepoNotFound } return nil, err } var index ispec.Index if err := json.Unmarshal(buf, &index); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") return nil, err } found := false result := []artifactspec.Descriptor{} for _, manifest := range index.Manifests { if manifest.MediaType != artifactspec.MediaTypeArtifactManifest { continue } p := path.Join(dir, "blobs", manifest.Digest.Algorithm().String(), manifest.Digest.Encoded()) buf, err = ioutil.ReadFile(p) if err != nil { is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest") if os.IsNotExist(err) { return nil, zerr.ErrManifestNotFound } return nil, err } var artManifest artifactspec.Manifest if err := json.Unmarshal(buf, &artManifest); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") return nil, err } if mediaType != artManifest.ArtifactType || gdigest != artManifest.Subject.Digest { continue } result = append(result, artifactspec.Descriptor{ MediaType: manifest.MediaType, ArtifactType: notreg.ArtifactTypeNotation, Digest: manifest.Digest, Size: manifest.Size, Annotations: manifest.Annotations, }) found = true } if !found { return nil, zerr.ErrManifestNotFound } return result, nil } func (is *ImageStoreFS) writeFile(filename string, data []byte) error { if !is.commit { return ioutil.WriteFile(filename, data, DefaultFilePerms) } fhandle, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, DefaultFilePerms) if err != nil { return err } _, err = fhandle.Write(data) if err1 := test.Error(fhandle.Sync()); err1 != nil && err == nil { err = err1 is.log.Error().Err(err).Str("filename", filename).Msg("unable to sync file") } if err1 := test.Error(fhandle.Close()); err1 != nil && err == nil { err = err1 } return err } func IsSupportedMediaType(mediaType string) bool { return mediaType == ispec.MediaTypeImageManifest || mediaType == artifactspec.MediaTypeArtifactManifest } // utility routines func ValidateHardLink(rootDir string) error { if err := os.MkdirAll(rootDir, DefaultDirPerms); err != nil { return err } err := ioutil.WriteFile(path.Join(rootDir, "hardlinkcheck.txt"), []byte("check whether hardlinks work on filesystem"), DefaultFilePerms) if err != nil { return err } err = os.Link(path.Join(rootDir, "hardlinkcheck.txt"), path.Join(rootDir, "duphardlinkcheck.txt")) if err != nil { // Remove hardlinkcheck.txt if hardlink fails zerr := os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt")) if zerr != nil { return zerr } return err } err = os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt")) if err != nil { return err } return os.RemoveAll(path.Join(rootDir, "duphardlinkcheck.txt")) } func ensureDir(dir string, log zerolog.Logger) error { if err := os.MkdirAll(dir, DefaultDirPerms); err != nil { log.Error().Err(err).Str("dir", dir).Msg("unable to create dir") return err } return nil } func ifOlderThan(imgStore *ImageStoreFS, repo string, delay time.Duration) casext.GCPolicy { return func(ctx context.Context, digest godigest.Digest) (bool, error) { blobPath := imgStore.BlobPath(repo, digest) fi, err := os.Stat(blobPath) if err != nil { return false, err } if fi.ModTime().Add(delay).After(time.Now()) { return false, nil } imgStore.log.Info().Str("digest", digest.String()).Str("blobPath", blobPath).Msg("perform GC on blob") return true, nil } } func DirExists(d string) bool { fi, err := os.Stat(d) if err != nil && os.IsNotExist(err) { return false } if !fi.IsDir() { return false } return true }