0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-06 22:40:28 -05:00
zot/pkg/storage/local/local.go
peusebiu fd87a22829
fix(storage): resolve cache/storage inconsistencies on HEAD request (#794)
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
2022-10-11 10:05:39 -07:00

1627 lines
38 KiB
Go

package local
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"sync"
"syscall"
"time"
"unicode/utf8"
apexlog "github.com/apex/log"
guuid "github.com/gofrs/uuid"
"github.com/minio/sha256-simd"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/umoci"
"github.com/opencontainers/umoci/oci/casext"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/rs/zerolog"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/extensions/monitoring"
zlog "zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/test"
)
const (
DefaultFilePerms = 0o600
DefaultDirPerms = 0o700
)
// ImageStoreLocal provides the image storage operations.
type ImageStoreLocal struct {
rootDir string
lock *sync.RWMutex
blobUploads map[string]storage.BlobUpload
cache *storage.Cache
gc bool
dedupe bool
commit bool
gcDelay time.Duration
log zerolog.Logger
metrics monitoring.MetricServer
linter storage.Lint
}
func (is *ImageStoreLocal) RootDir() string {
return is.rootDir
}
func (is *ImageStoreLocal) DirExists(d string) bool {
return DirExists(d)
}
// NewImageStore returns a new image store backed by a file storage.
func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commit bool,
log zlog.Logger, metrics monitoring.MetricServer, linter storage.Lint,
) storage.ImageStore {
if _, err := os.Stat(rootDir); os.IsNotExist(err) {
if err := os.MkdirAll(rootDir, DefaultDirPerms); err != nil {
log.Error().Err(err).Str("rootDir", rootDir).Msg("unable to create root dir")
return nil
}
}
imgStore := &ImageStoreLocal{
rootDir: rootDir,
lock: &sync.RWMutex{},
blobUploads: make(map[string]storage.BlobUpload),
gc: gc,
gcDelay: gcDelay,
dedupe: dedupe,
commit: commit,
log: log.With().Caller().Logger(),
metrics: metrics,
linter: linter,
}
if dedupe {
imgStore.cache = storage.NewCache(rootDir, "cache", true, log)
}
if gc {
// we use umoci GC to perform garbage-collection, but it uses its own logger
// - so capture those logs, could be useful
apexlog.SetLevel(apexlog.DebugLevel)
apexlog.SetHandler(apexlog.HandlerFunc(func(entry *apexlog.Entry) error {
e := log.Debug()
for k, v := range entry.Fields {
e = e.Interface(k, v)
}
e.Msg(entry.Message)
return nil
}))
}
return imgStore
}
// RLock read-lock.
func (is *ImageStoreLocal) RLock(lockStart *time.Time) {
*lockStart = time.Now()
is.lock.RLock()
}
// RUnlock read-unlock.
func (is *ImageStoreLocal) RUnlock(lockStart *time.Time) {
is.lock.RUnlock()
lockEnd := time.Now()
latency := lockEnd.Sub(*lockStart)
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storage.RLOCK) // histogram
}
// Lock write-lock.
func (is *ImageStoreLocal) Lock(lockStart *time.Time) {
*lockStart = time.Now()
is.lock.Lock()
}
// Unlock write-unlock.
func (is *ImageStoreLocal) Unlock(lockStart *time.Time) {
is.lock.Unlock()
lockEnd := time.Now()
latency := lockEnd.Sub(*lockStart)
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storage.RWLOCK) // histogram
}
func (is *ImageStoreLocal) initRepo(name string) error {
repoDir := path.Join(is.rootDir, name)
if !utf8.ValidString(name) {
is.log.Error().Msg("input is not valid UTF-8")
return zerr.ErrInvalidRepositoryName
}
// create "blobs" subdir
err := ensureDir(path.Join(repoDir, "blobs"), is.log)
if err != nil {
is.log.Error().Err(err).Msg("error creating blobs subdir")
return err
}
// create BlobUploadDir subdir
err = ensureDir(path.Join(repoDir, storage.BlobUploadDir), is.log)
if err != nil {
is.log.Error().Err(err).Msg("error creating blob upload subdir")
return err
}
// "oci-layout" file - create if it doesn't exist
ilPath := path.Join(repoDir, ispec.ImageLayoutFile)
if _, err := os.Stat(ilPath); err != nil {
il := ispec.ImageLayout{Version: ispec.ImageLayoutVersion}
buf, err := json.Marshal(il)
if err != nil {
is.log.Panic().Err(err).Msg("unable to marshal JSON")
}
if err := is.writeFile(ilPath, buf); err != nil {
is.log.Error().Err(err).Str("file", ilPath).Msg("unable to write file")
return err
}
}
// "index.json" file - create if it doesn't exist
indexPath := path.Join(repoDir, "index.json")
if _, err := os.Stat(indexPath); err != nil {
index := ispec.Index{}
index.SchemaVersion = 2
buf, err := json.Marshal(index)
if err != nil {
is.log.Panic().Err(err).Msg("unable to marshal JSON")
}
if err := is.writeFile(indexPath, buf); err != nil {
is.log.Error().Err(err).Str("file", indexPath).Msg("unable to write file")
return err
}
}
return nil
}
// InitRepo creates an image repository under this store.
func (is *ImageStoreLocal) InitRepo(name string) error {
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
return is.initRepo(name)
}
// ValidateRepo validates that the repository layout is complaint with the OCI repo layout.
func (is *ImageStoreLocal) ValidateRepo(name string) (bool, error) {
// https://github.com/opencontainers/image-spec/blob/master/image-layout.md#content
// at least, expect at least 3 entries - ["blobs", "oci-layout", "index.json"]
// and an additional/optional BlobUploadDir in each image store
dir := path.Join(is.rootDir, name)
if !is.DirExists(dir) {
return false, zerr.ErrRepoNotFound
}
files, err := os.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("unable to read directory")
return false, zerr.ErrRepoNotFound
}
if len(files) < 3 { //nolint:gomnd
return false, zerr.ErrRepoBadVersion
}
found := map[string]bool{
"blobs": false,
ispec.ImageLayoutFile: false,
"index.json": false,
}
for _, file := range files {
if file.Name() == "blobs" && !file.IsDir() {
return false, nil
}
found[file.Name()] = true
}
for k, v := range found {
if !v && k != storage.BlobUploadDir {
return false, nil
}
}
buf, err := os.ReadFile(path.Join(dir, ispec.ImageLayoutFile))
if err != nil {
return false, err
}
var il ispec.ImageLayout
if err := json.Unmarshal(buf, &il); err != nil {
return false, err
}
if il.Version != ispec.ImageLayoutVersion {
return false, zerr.ErrRepoBadVersion
}
return true, nil
}
// GetRepositories returns a list of all the repositories under this store.
func (is *ImageStoreLocal) GetRepositories() ([]string, error) {
var lockLatency time.Time
dir := is.rootDir
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
_, err := os.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Msg("failure walking storage root-dir")
return nil, err
}
stores := make([]string, 0)
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
return nil
}
rel, err := filepath.Rel(is.rootDir, path)
if err != nil {
return nil //nolint:nilerr // ignore paths not relative to root dir
}
if ok, err := is.ValidateRepo(rel); !ok || err != nil {
return nil //nolint:nilerr // ignore invalid repos
}
// is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store")
stores = append(stores, rel)
return nil
})
return stores, err
}
// GetNextRepository returns next repository under this store.
func (is *ImageStoreLocal) GetNextRepository(repo string) (string, error) {
var lockLatency time.Time
dir := is.rootDir
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
_, err := os.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Msg("failure walking storage root-dir")
return "", err
}
found := false
store := ""
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
return nil
}
rel, err := filepath.Rel(is.rootDir, path)
if err != nil {
return nil //nolint:nilerr // ignore paths not relative to root dir
}
ok, err := is.ValidateRepo(rel)
if !ok || err != nil {
return nil //nolint:nilerr // ignore invalid repos
}
if repo == "" && ok && err == nil {
store = rel
return io.EOF
}
if found {
store = rel
return io.EOF
}
if rel == repo {
found = true
}
return nil
})
return store, err
}
// GetImageTags returns a list of image tags available in the specified repository.
func (is *ImageStoreLocal) GetImageTags(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo)
if !is.DirExists(dir) {
return nil, zerr.ErrRepoNotFound
}
index, err := storage.GetIndex(is, repo, is.log)
if err != nil {
return nil, err
}
return storage.GetTagsByIndex(index), nil
}
// GetImageManifest returns the image manifest of an image in the specific repository.
func (is *ImageStoreLocal) GetImageManifest(repo, reference string) ([]byte, string, string, error) {
dir := path.Join(is.rootDir, repo)
if !is.DirExists(dir) {
return nil, "", "", zerr.ErrRepoNotFound
}
index, err := storage.GetIndex(is, repo, is.log)
if err != nil {
return nil, "", "", err
}
manifestDesc, found := storage.GetManifestDescByReference(index, reference)
if !found {
return nil, "", "", zerr.ErrManifestNotFound
}
buf, err := is.GetBlobContent(repo, manifestDesc.Digest.String())
if err != nil {
if errors.Is(err, zerr.ErrBlobNotFound) {
return nil, "", "", zerr.ErrManifestNotFound
}
return nil, "", "", err
}
var manifest ispec.Manifest
if err := json.Unmarshal(buf, &manifest); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, "", "", err
}
monitoring.IncDownloadCounter(is.metrics, repo)
return buf, manifestDesc.Digest.String(), manifestDesc.MediaType, nil
}
// PutImageManifest adds an image manifest to the repository.
func (is *ImageStoreLocal) PutImageManifest(repo, reference, mediaType string, //nolint: gocyclo
body []byte,
) (string, error) {
if err := is.InitRepo(repo); err != nil {
is.log.Debug().Err(err).Msg("init repo")
return "", err
}
dig, err := storage.ValidateManifest(is, repo, reference, mediaType, body, is.log)
if err != nil {
return dig, err
}
refIsDigest := true
mDigest, err := storage.GetAndValidateRequestDigest(body, reference, is.log)
if err != nil {
if errors.Is(err, zerr.ErrBadManifest) {
return mDigest.String(), err
}
refIsDigest = false
}
index, err := storage.GetIndex(is, repo, is.log)
if err != nil {
return "", err
}
// create a new descriptor
desc := ispec.Descriptor{
MediaType: mediaType, Size: int64(len(body)), Digest: mDigest,
}
if !refIsDigest {
desc.Annotations = map[string]string{ispec.AnnotationRefName: reference}
}
updateIndex, oldDgst, err := storage.CheckIfIndexNeedsUpdate(&index, &desc, is.log)
if err != nil {
return "", err
}
if !updateIndex {
return desc.Digest.String(), nil
}
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
// write manifest to "blobs"
dir := path.Join(is.rootDir, repo, "blobs", mDigest.Algorithm().String())
_ = ensureDir(dir, is.log)
file := path.Join(dir, mDigest.Encoded())
// in case the linter will not pass, it will be garbage collected
if err := is.writeFile(file, body); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to write")
return "", err
}
is.Unlock(&lockLatency)
err = storage.UpdateIndexWithPrunedImageManifests(is, &index, repo, desc, oldDgst, is.log)
if err != nil {
return "", err
}
is.Lock(&lockLatency)
// now update "index.json"
index.Manifests = append(index.Manifests, desc)
dir = path.Join(is.rootDir, repo)
file = path.Join(dir, "index.json")
buf, err := json.Marshal(index)
if err := test.Error(err); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON")
return "", err
}
is.Unlock(&lockLatency)
// apply linter only on images, not signatures
pass, err := storage.ApplyLinter(is, is.linter, repo, desc)
is.Lock(&lockLatency)
if !pass {
is.log.Error().Err(err).Str("repo", repo).Str("reference", reference).Msg("linter didn't pass")
return "", err
}
err = is.writeFile(file, buf)
if err := test.Error(err); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to write")
return "", err
}
if is.gc {
if err := is.garbageCollect(dir, repo); err != nil {
return "", err
}
}
monitoring.SetStorageUsage(is.metrics, is.rootDir, repo)
monitoring.IncUploadCounter(is.metrics, repo)
return desc.Digest.String(), nil
}
// DeleteImageManifest deletes the image manifest from the repository.
func (is *ImageStoreLocal) DeleteImageManifest(repo, reference string) error {
var lockLatency time.Time
dir := path.Join(is.rootDir, repo)
if !is.DirExists(dir) {
return zerr.ErrRepoNotFound
}
index, err := storage.GetIndex(is, repo, is.log)
if err != nil {
return err
}
manifestDesc, found := storage.RemoveManifestDescByReference(&index, reference)
if !found {
return zerr.ErrManifestNotFound
}
err = storage.UpdateIndexWithPrunedImageManifests(is, &index, repo, manifestDesc, manifestDesc.Digest, is.log)
if err != nil {
return err
}
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
// now update "index.json"
dir = path.Join(is.rootDir, repo)
file := path.Join(dir, "index.json")
buf, err := json.Marshal(index)
if err != nil {
return err
}
if err := is.writeFile(file, buf); err != nil {
return err
}
if is.gc {
if err := is.garbageCollect(dir, repo); err != nil {
return err
}
}
// Delete blob only when blob digest not present in manifest entry.
// e.g. 1.0.1 & 1.0.2 have same blob digest so if we delete 1.0.1, blob should not be removed.
toDelete := true
for _, manifest := range index.Manifests {
if manifestDesc.Digest.String() == manifest.Digest.String() {
toDelete = false
break
}
}
if toDelete {
p := path.Join(dir, "blobs", manifestDesc.Digest.Algorithm().String(), manifestDesc.Digest.Encoded())
_ = os.Remove(p)
}
monitoring.SetStorageUsage(is.metrics, is.rootDir, repo)
return nil
}
// BlobUploadPath returns the upload path for a blob in this store.
func (is *ImageStoreLocal) BlobUploadPath(repo, uuid string) string {
dir := path.Join(is.rootDir, repo)
blobUploadPath := path.Join(dir, storage.BlobUploadDir, uuid)
return blobUploadPath
}
// NewBlobUpload returns the unique ID for an upload in progress.
func (is *ImageStoreLocal) NewBlobUpload(repo string) (string, error) {
if err := is.InitRepo(repo); err != nil {
is.log.Error().Err(err).Msg("error initializing repo")
return "", err
}
uuid, err := guuid.NewV4()
if err != nil {
return "", err
}
uid := uuid.String()
blobUploadPath := is.BlobUploadPath(repo, uid)
file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, DefaultFilePerms)
if err != nil {
return "", zerr.ErrRepoNotFound
}
defer file.Close()
return uid, nil
}
// GetBlobUpload returns the current size of a blob upload.
func (is *ImageStoreLocal) GetBlobUpload(repo, uuid string) (int64, error) {
blobUploadPath := is.BlobUploadPath(repo, uuid)
if !utf8.ValidString(blobUploadPath) {
is.log.Error().Msg("input is not valid UTF-8")
return -1, zerr.ErrInvalidRepositoryName
}
binfo, err := os.Stat(blobUploadPath)
if err != nil {
if os.IsNotExist(err) {
return -1, zerr.ErrUploadNotFound
}
return -1, err
}
return binfo.Size(), nil
}
// PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns
// the number of actual bytes to the blob.
func (is *ImageStoreLocal) PutBlobChunkStreamed(repo, uuid string, body io.Reader) (int64, error) {
if err := is.InitRepo(repo); err != nil {
return -1, err
}
blobUploadPath := is.BlobUploadPath(repo, uuid)
_, err := os.Stat(blobUploadPath)
if err != nil {
return -1, zerr.ErrUploadNotFound
}
file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_CREATE, DefaultFilePerms)
if err != nil {
is.log.Error().Err(err).Msg("failed to open file")
return -1, err
}
defer func() {
if is.commit {
_ = file.Sync()
}
_ = file.Close()
}()
if _, err := file.Seek(0, io.SeekEnd); err != nil {
is.log.Error().Err(err).Msg("failed to seek file")
return -1, err
}
n, err := io.Copy(file, body)
return n, err
}
// PutBlobChunk writes another chunk of data to the specified blob. It returns
// the number of actual bytes to the blob.
func (is *ImageStoreLocal) PutBlobChunk(repo, uuid string, from, to int64,
body io.Reader,
) (int64, error) {
if err := is.InitRepo(repo); err != nil {
return -1, err
}
blobUploadPath := is.BlobUploadPath(repo, uuid)
binfo, err := os.Stat(blobUploadPath)
if err != nil {
return -1, zerr.ErrUploadNotFound
}
if from != binfo.Size() {
is.log.Error().Int64("expected", from).Int64("actual", binfo.Size()).
Msg("invalid range start for blob upload")
return -1, zerr.ErrBadUploadRange
}
file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_CREATE, DefaultFilePerms)
if err != nil {
is.log.Error().Err(err).Msg("failed to open file")
return -1, err
}
defer func() {
if is.commit {
_ = file.Sync()
}
_ = file.Close()
}()
if _, err := file.Seek(from, io.SeekStart); err != nil {
is.log.Error().Err(err).Msg("failed to seek file")
return -1, err
}
n, err := io.Copy(file, body)
return n, err
}
// BlobUploadInfo returns the current blob size in bytes.
func (is *ImageStoreLocal) BlobUploadInfo(repo, uuid string) (int64, error) {
blobUploadPath := is.BlobUploadPath(repo, uuid)
binfo, err := os.Stat(blobUploadPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob")
return -1, err
}
size := binfo.Size()
return size, nil
}
// FinishBlobUpload finalizes the blob upload and moves blob the repository.
func (is *ImageStoreLocal) FinishBlobUpload(repo, uuid string, body io.Reader, digest string) error {
dstDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return zerr.ErrBadBlobDigest
}
src := is.BlobUploadPath(repo, uuid)
_, err = os.Stat(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to stat blob")
return zerr.ErrUploadNotFound
}
blobFile, err := os.Open(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return zerr.ErrUploadNotFound
}
defer blobFile.Close()
digester := sha256.New()
_, err = io.Copy(digester, blobFile)
if err != nil {
is.log.Error().Err(err).Str("repo", repo).Str("blob", src).Str("digest", digest).Msg("unable to compute hash")
return err
}
srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil)))
if srcDigest != dstDigest {
is.log.Error().Str("srcDigest", srcDigest.String()).
Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest")
return zerr.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
err = ensureDir(dir, is.log)
if err != nil {
is.log.Error().Err(err).Msg("error creating blobs/sha256 dir")
return err
}
dst := is.BlobPath(repo, dstDigest)
if is.dedupe && is.cache != nil {
err = is.DedupeBlob(src, dstDigest, dst)
if err := test.Error(err); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
return err
}
} else {
if err := os.Rename(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to finish blob")
return err
}
}
return nil
}
// FullBlobUpload handles a full blob upload, and no partial session is created.
func (is *ImageStoreLocal) FullBlobUpload(repo string, body io.Reader, digest string) (string, int64, error) {
if err := is.InitRepo(repo); err != nil {
return "", -1, err
}
dstDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return "", -1, zerr.ErrBadBlobDigest
}
u, err := guuid.NewV4()
if err != nil {
return "", -1, err
}
uuid := u.String()
src := is.BlobUploadPath(repo, uuid)
blobFile, err := os.Create(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return "", -1, zerr.ErrUploadNotFound
}
defer func() {
if is.commit {
_ = blobFile.Sync()
}
_ = blobFile.Close()
}()
digester := sha256.New()
mw := io.MultiWriter(blobFile, digester)
nbytes, err := io.Copy(mw, body)
if err != nil {
return "", -1, err
}
srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil)))
if srcDigest != dstDigest {
is.log.Error().Str("srcDigest", srcDigest.String()).
Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest")
return "", -1, zerr.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
_ = ensureDir(dir, is.log)
dst := is.BlobPath(repo, dstDigest)
if is.dedupe && is.cache != nil {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
return "", -1, err
}
} else {
if err := os.Rename(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to finish blob")
return "", -1, err
}
}
return uuid, nbytes, nil
}
func (is *ImageStoreLocal) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error {
retry:
is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: enter")
dstRecord, err := is.cache.GetBlob(dstDigest.String())
if err != nil && !errors.Is(err, zerr.ErrCacheMiss) {
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to lookup blob record")
return err
}
if dstRecord == "" {
// cache record doesn't exist, so first disk and cache entry for this diges
if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record")
return err
}
// move the blob from uploads to final dest
if err := os.Rename(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob")
return err
}
is.log.Debug().Str("src", src).Str("dst", dst).Msg("dedupe: rename")
} else {
// cache record exists, but due to GC and upgrades from older versions,
// disk content and cache records may go out of sync
dstRecord = path.Join(is.rootDir, dstRecord)
dstRecordFi, err := os.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
// the actual blob on disk may have been removed by GC, so sync the cache
if err := is.cache.DeleteBlob(dstDigest.String(), dstRecord); err != nil {
//nolint:lll // gofumpt conflicts with lll
is.log.Error().Err(err).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: unable to delete blob record")
return err
}
goto retry
}
dstFi, err := os.Stat(dst)
if err != nil && !os.IsNotExist(err) {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
return err
}
if !os.SameFile(dstFi, dstRecordFi) {
// blob lookup cache out of sync with actual disk contents
if err := os.Remove(dst); err != nil && !os.IsNotExist(err) {
is.log.Error().Err(err).Str("dst", dst).Msg("dedupe: unable to remove blob")
return err
}
is.log.Debug().Str("blobPath", dst).Str("dstRecord", dstRecord).Msg("dedupe: creating hard link")
if err := os.Link(dstRecord, dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Str("link", dstRecord).Msg("dedupe: unable to hard link")
return err
}
}
if err := os.Remove(src); err != nil {
is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob")
return err
}
is.log.Debug().Str("src", src).Msg("dedupe: remove")
}
return nil
}
// DeleteBlobUpload deletes an existing blob upload that is currently in progress.
func (is *ImageStoreLocal) DeleteBlobUpload(repo, uuid string) error {
blobUploadPath := is.BlobUploadPath(repo, uuid)
if err := os.Remove(blobUploadPath); err != nil {
is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload")
return err
}
return nil
}
// BlobPath returns the repository path of a blob.
func (is *ImageStoreLocal) BlobPath(repo string, digest godigest.Digest) string {
return path.Join(is.rootDir, repo, "blobs", digest.Algorithm().String(), digest.Encoded())
}
// CheckBlob verifies a blob and returns true if the blob is correct.
func (is *ImageStoreLocal) CheckBlob(repo, digest string) (bool, int64, error) {
var lockLatency time.Time
parsedDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return false, -1, zerr.ErrBadBlobDigest
}
blobPath := is.BlobPath(repo, parsedDigest)
if is.dedupe && is.cache != nil {
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
} else {
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
}
binfo, err := os.Stat(blobPath)
if err == nil {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
return true, binfo.Size(), nil
}
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
// Check blobs in cache
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found")
return false, -1, zerr.ErrBlobNotFound
}
// If found copy to location
blobSize, err := is.copyBlob(repo, blobPath, dstRecord)
if err != nil {
return false, -1, zerr.ErrBlobNotFound
}
if err := is.cache.PutBlob(digest, blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("dedupe: unable to insert blob record")
return false, -1, err
}
return true, blobSize, nil
}
func (is *ImageStoreLocal) checkCacheBlob(digest string) (string, error) {
if !is.dedupe || is.cache == nil {
return "", zerr.ErrBlobNotFound
}
dstRecord, err := is.cache.GetBlob(digest)
if err != nil {
return "", err
}
dstRecord = path.Join(is.rootDir, dstRecord)
if _, err := os.Stat(dstRecord); err != nil {
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob")
// the actual blob on disk may have been removed by GC, so sync the cache
if err := is.cache.DeleteBlob(digest, dstRecord); err != nil {
is.log.Error().Err(err).Str("digest", digest).Str("blobPath", dstRecord).Msg("unable to remove blob path from cache")
return "", err
}
return "", zerr.ErrBlobNotFound
}
is.log.Debug().Str("digest", digest).Str("dstRecord", dstRecord).Msg("cache: found dedupe record")
return dstRecord, nil
}
func (is *ImageStoreLocal) copyBlob(repo, blobPath, dstRecord string) (int64, error) {
if err := is.initRepo(repo); err != nil {
is.log.Error().Err(err).Str("repo", repo).Msg("unable to initialize an empty repo")
return -1, err
}
_ = ensureDir(filepath.Dir(blobPath), is.log)
if err := os.Link(dstRecord, blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Str("link", dstRecord).Msg("dedupe: unable to hard link")
return -1, zerr.ErrBlobNotFound
}
binfo, err := os.Stat(blobPath)
if err == nil {
return binfo.Size(), nil
}
return -1, zerr.ErrBlobNotFound
}
// blobStream is using to serve blob range requests.
type blobStream struct {
reader io.Reader
closer io.Closer
}
func newBlobStream(blobPath string, from, to int64) (io.ReadCloser, error) {
blobFile, err := os.Open(blobPath)
if err != nil {
return nil, err
}
if from > 0 {
_, err = blobFile.Seek(from, io.SeekStart)
if err != nil {
return nil, err
}
}
if from < 0 || to < from {
return nil, zerr.ErrBadRange
}
blobstrm := blobStream{reader: blobFile, closer: blobFile}
blobstrm.reader = io.LimitReader(blobFile, to-from+1)
return &blobstrm, nil
}
func (bs *blobStream) Read(buf []byte) (int, error) {
return bs.reader.Read(buf)
}
func (bs *blobStream) Close() error {
return bs.closer.Close()
}
// GetBlobPartial returns a partial stream to read the blob.
// blob selector instead of directly downloading the blob.
func (is *ImageStoreLocal) GetBlobPartial(repo, digest, mediaType string, from, to int64,
) (io.ReadCloser, int64, int64, error) {
var lockLatency time.Time
parsedDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return nil, -1, -1, zerr.ErrBadBlobDigest
}
blobPath := is.BlobPath(repo, parsedDigest)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
binfo, err := os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, -1, -1, zerr.ErrBlobNotFound
}
if to < 0 || to >= binfo.Size() {
to = binfo.Size() - 1
}
blobReadCloser, err := newBlobStream(blobPath, from, to)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
return nil, -1, -1, err
}
// The caller function is responsible for calling Close()
return blobReadCloser, to - from + 1, binfo.Size(), nil
}
// GetBlob returns a stream to read the blob.
// blob selector instead of directly downloading the blob.
func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) {
var lockLatency time.Time
parsedDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return nil, -1, zerr.ErrBadBlobDigest
}
blobPath := is.BlobPath(repo, parsedDigest)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
binfo, err := os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, -1, zerr.ErrBlobNotFound
}
blobReadCloser, err := os.Open(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
return nil, -1, err
}
// The caller function is responsible for calling Close()
return blobReadCloser, binfo.Size(), nil
}
func (is *ImageStoreLocal) GetBlobContent(repo, digest string) ([]byte, error) {
blob, _, err := is.GetBlob(repo, digest, ispec.MediaTypeImageManifest)
if err != nil {
return []byte{}, err
}
defer blob.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(blob)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to read blob")
return []byte{}, err
}
return buf.Bytes(), nil
}
func (is *ImageStoreLocal) GetIndexContent(repo string) ([]byte, error) {
var lockLatency time.Time
dir := path.Join(is.rootDir, repo)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
buf, err := os.ReadFile(path.Join(dir, "index.json"))
if err != nil {
if os.IsNotExist(err) {
is.log.Error().Err(err).Str("dir", dir).Msg("index.json doesn't exist")
return []byte{}, zerr.ErrRepoNotFound
}
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return []byte{}, err
}
return buf, nil
}
// DeleteBlob removes the blob from the repository.
func (is *ImageStoreLocal) DeleteBlob(repo, digest string) error {
var lockLatency time.Time
dgst, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return zerr.ErrBlobNotFound
}
blobPath := is.BlobPath(repo, dgst)
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
_, err = os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return zerr.ErrBlobNotFound
}
if is.cache != nil {
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache")
return err
}
}
if err := os.Remove(blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path")
return err
}
return nil
}
func (is *ImageStoreLocal) GetReferrers(repo, digest, artifactType string) ([]artifactspec.Descriptor, error) {
var lockLatency time.Time
dir := path.Join(is.rootDir, repo)
if !is.DirExists(dir) {
return nil, zerr.ErrRepoNotFound
}
gdigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return nil, zerr.ErrBadBlobDigest
}
index, err := storage.GetIndex(is, repo, is.log)
if err != nil {
return nil, err
}
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
found := false
result := []artifactspec.Descriptor{}
for _, manifest := range index.Manifests {
if manifest.MediaType != artifactspec.MediaTypeArtifactManifest {
continue
}
p := path.Join(dir, "blobs", manifest.Digest.Algorithm().String(), manifest.Digest.Encoded())
buf, err := os.ReadFile(p)
if err != nil {
is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest")
if os.IsNotExist(err) {
return nil, zerr.ErrManifestNotFound
}
return nil, err
}
var artManifest artifactspec.Manifest
if err := json.Unmarshal(buf, &artManifest); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, err
}
if artifactType != artManifest.ArtifactType || gdigest != artManifest.Subject.Digest {
continue
}
result = append(result, artifactspec.Descriptor{
MediaType: manifest.MediaType,
ArtifactType: artManifest.ArtifactType,
Digest: manifest.Digest,
Size: manifest.Size,
Annotations: manifest.Annotations,
})
found = true
}
if !found {
return nil, zerr.ErrManifestNotFound
}
return result, nil
}
func (is *ImageStoreLocal) writeFile(filename string, data []byte) error {
if !is.commit {
return os.WriteFile(filename, data, DefaultFilePerms)
}
fhandle, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, DefaultFilePerms)
if err != nil {
return err
}
_, err = fhandle.Write(data)
if err1 := test.Error(fhandle.Sync()); err1 != nil && err == nil {
err = err1
is.log.Error().Err(err).Str("filename", filename).Msg("unable to sync file")
}
if err1 := test.Error(fhandle.Close()); err1 != nil && err == nil {
err = err1
}
return err
}
// utility routines
func ValidateHardLink(rootDir string) error {
if err := os.MkdirAll(rootDir, DefaultDirPerms); err != nil {
return err
}
err := os.WriteFile(path.Join(rootDir, "hardlinkcheck.txt"),
[]byte("check whether hardlinks work on filesystem"), DefaultFilePerms)
if err != nil {
return err
}
err = os.Link(path.Join(rootDir, "hardlinkcheck.txt"), path.Join(rootDir, "duphardlinkcheck.txt"))
if err != nil {
// Remove hardlinkcheck.txt if hardlink fails
zerr := os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt"))
if zerr != nil {
return zerr
}
return err
}
err = os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt"))
if err != nil {
return err
}
return os.RemoveAll(path.Join(rootDir, "duphardlinkcheck.txt"))
}
func ensureDir(dir string, log zerolog.Logger) error {
if err := os.MkdirAll(dir, DefaultDirPerms); err != nil {
log.Error().Err(err).Str("dir", dir).Msg("unable to create dir")
return err
}
return nil
}
func (is *ImageStoreLocal) garbageCollect(dir string, repo string) error {
oci, err := umoci.OpenLayout(dir)
if err := test.Error(err); err != nil {
return err
}
defer oci.Close()
err = oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay))
if err := test.Error(err); err != nil {
return err
}
return nil
}
func ifOlderThan(imgStore *ImageStoreLocal, repo string, delay time.Duration) casext.GCPolicy {
return func(ctx context.Context, digest godigest.Digest) (bool, error) {
blobPath := imgStore.BlobPath(repo, digest)
fi, err := os.Stat(blobPath)
if err != nil {
return false, err
}
if fi.ModTime().Add(delay).After(time.Now()) {
return false, nil
}
imgStore.log.Info().Str("digest", digest.String()).Str("blobPath", blobPath).Msg("perform GC on blob")
return true, nil
}
}
func DirExists(d string) bool {
if !utf8.ValidString(d) {
return false
}
fileInfo, err := os.Stat(d)
if err != nil {
if e, ok := err.(*fs.PathError); ok && errors.Is(e.Err, syscall.ENAMETOOLONG) || //nolint: errorlint
errors.Is(e.Err, syscall.EINVAL) {
return false
}
}
if err != nil && os.IsNotExist(err) {
return false
}
if !fileInfo.IsDir() {
return false
}
return true
}
func (is *ImageStoreLocal) gcRepo(repo string) error {
dir := path.Join(is.RootDir(), repo)
var lockLatency time.Time
is.Lock(&lockLatency)
err := is.garbageCollect(dir, repo)
is.Unlock(&lockLatency)
if err != nil {
return err
}
return nil
}
func (is *ImageStoreLocal) RunGCRepo(repo string) error {
is.log.Info().Msg(fmt.Sprintf("executing GC of orphaned blobs for %s", path.Join(is.RootDir(), repo)))
if err := is.gcRepo(repo); err != nil {
errMessage := fmt.Sprintf("error while running GC for %s", path.Join(is.RootDir(), repo))
is.log.Error().Err(err).Msg(errMessage)
is.log.Info().Msg(fmt.Sprintf("GC unsuccessfully completed for %s", path.Join(is.RootDir(), repo)))
return err
}
is.log.Info().Msg(fmt.Sprintf("GC successfully completed for %s", path.Join(is.RootDir(), repo)))
return nil
}
func (is *ImageStoreLocal) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) {
generator := &taskGenerator{
imgStore: is,
}
sch.SubmitGenerator(generator, interval, scheduler.MediumPriority)
}
type taskGenerator struct {
imgStore *ImageStoreLocal
lastRepo string
done bool
}
func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) {
repo, err := gen.imgStore.GetNextRepository(gen.lastRepo)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
if repo == "" {
gen.done = true
return nil, nil
}
gen.lastRepo = repo
return newGCTask(gen.imgStore, repo), nil
}
func (gen *taskGenerator) IsDone() bool {
return gen.done
}
func (gen *taskGenerator) Reset() {
gen.lastRepo = ""
gen.done = false
}
type gcTask struct {
imgStore *ImageStoreLocal
repo string
}
func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask {
return &gcTask{imgStore, repo}
}
func (gcT *gcTask) DoWork() error {
return gcT.imgStore.RunGCRepo(gcT.repo)
}