diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml new file mode 100644 index 00000000..b384335d --- /dev/null +++ b/.github/workflows/nightly.yaml @@ -0,0 +1,57 @@ +name: 'Nightly jobs' +on: + schedule: + - cron: '30 1 * * *' + workflow_dispatch: + +permissions: read-all + +# Here we are running two tests: +# 1. run zot with local storage and dedupe disabled, push images, restart zot with dedupe enabled +# task scheduler will start a dedupe all blobs process at zot startup and it shouldn't interfere with clients. +# 2. run zot with s3 storage and dynamodb and dedupe enabled, push images, restart zot with dedupe false and no cache +# task scheduler will start a restore all blobs process at zot startup, after it finishes all blobs should be restored to their original state (have content) +jobs: + client-tools: + name: Dedupe/restore blobs + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version: 1.19.x + - name: Install dependencies + run: | + cd $GITHUB_WORKSPACE + go install github.com/swaggo/swag/cmd/swag + go mod download + sudo apt-get update + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap + # install skopeo + git clone -b v1.9.0 https://github.com/containers/skopeo.git + cd skopeo + make bin/skopeo + sudo cp bin/skopeo /usr/bin + skopeo -v + - name: Install localstack + run: | + pip install --upgrade pyopenssl + pip install localstack awscli-local[ver1] # install LocalStack cli and awslocal + docker pull localstack/localstack # Make sure to pull the latest version of the image + localstack start -d # Start LocalStack in the background + + echo "Waiting for LocalStack startup..." # Wait 30 seconds for the LocalStack container + localstack wait -t 30 # to become ready before timing out + echo "Startup complete" + - name: Run restore s3 blobs after cache is deleted + run: | + make test-restore-s3-blobs + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake + - name: Run dedupe filesystem blobs after switching dedupe to enable. + run: | + make test-push-pull-running-dedupe + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake diff --git a/Makefile b/Makefile index 6534baf1..adf07d81 100644 --- a/Makefile +++ b/Makefile @@ -307,6 +307,22 @@ test-push-pull: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS) $(HELM) test-push-pull-verbose: binary check-skopeo $(BATS) $(BATS) --trace --verbose-run --print-output-on-failure --show-output-of-passing-tests test/blackbox/pushpull.bats +.PHONY: test-push-pull-running-dedupe +test-push-pull-running-dedupe: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS) $(HELM) + $(BATS) --trace --print-output-on-failure test/blackbox/pushpull_running_dedupe.bats + +.PHONY: test-push-pull-running-dedupe-verbose +test-push-pull-running-dedupe-verbose: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS) $(HELM) + $(BATS) --trace --verbose-run --print-output-on-failure --show-output-of-passing-tests test/blackbox/pushpull_running_dedupe.bats + +.PHONY: test-restore-s3-blobs +test-restore-s3-blobs: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS) $(HELM) + $(BATS) --trace --print-output-on-failure test/blackbox/restore_s3_blobs.bats + +.PHONY: test-restore-s3-blobs-verbose +test-restore-s3-blobs-verbose: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS) $(HELM) + $(BATS) --trace --verbose-run --print-output-on-failure --show-output-of-passing-tests test/blackbox/restore_s3_blobs.bats + .PHONY: test-bats-referrers test-bats-referrers: EXTENSIONS=search test-bats-referrers: binary check-skopeo $(BATS) $(ORAS) diff --git a/errors/errors.go b/errors/errors.go index d8ffcaa3..ef6cd843 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -81,4 +81,5 @@ var ( ErrMediaTypeNotSupported = errors.New("repodb: media type is not supported") ErrTimeout = errors.New("operation timeout") ErrNotImplemented = errors.New("not implemented") + ErrDedupeRebuild = errors.New("dedupe: couldn't rebuild dedupe index") ) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 92bc6057..405d8b2d 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -451,11 +451,16 @@ func getUseRelPaths(storageConfig *config.StorageConfig) bool { } func CreateCacheDatabaseDriver(storageConfig config.StorageConfig, log log.Logger) cache.Cache { - if storageConfig.Dedupe { + if storageConfig.Dedupe || storageConfig.StorageDriver != nil { if !storageConfig.RemoteCache { params := cache.BoltDBDriverParameters{} params.RootDir = storageConfig.RootDirectory params.Name = constants.BoltdbName + + if storageConfig.StorageDriver != nil { + params.Name = s3.CacheDBName + } + params.UseRelPaths = getUseRelPaths(&storageConfig) driver, _ := storage.Create("boltdb", params, log) @@ -548,6 +553,9 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval, taskScheduler) } + // Enable running dedupe blobs both ways (dedupe or restore deduped blobs) + c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // Enable extensions if extension config is provided for DefaultStore if c.Config != nil && c.Config.Extensions != nil { ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory) @@ -565,6 +573,12 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { if c.Config != nil && c.Config.Extensions != nil { ext.EnableMetricsExtension(c.Config, c.Log, storageConfig.RootDirectory) } + + // Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths + substore := c.StoreController.SubStore[route] + if substore != nil { + substore.RunDedupeBlobs(time.Duration(0), taskScheduler) + } } } diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 2a321dae..618a5734 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -3345,10 +3345,10 @@ func TestCrossRepoMount(t *testing.T) { dir := t.TempDir() ctlr := makeController(conf, dir, "../../test/data") ctlr.Config.Storage.RemoteCache = false + ctlr.Config.Storage.Dedupe = false - cm := test.NewControllerManager(ctlr) + cm := test.NewControllerManager(ctlr) //nolint: varnamelen cm.StartAndWait(port) - defer cm.StopServer() params := make(map[string]string) @@ -3449,6 +3449,14 @@ func TestCrossRepoMount(t *testing.T) { // in cache, now try mount blob request status and it should be 201 because now blob is present in cache // and it should do hard link. + // restart server with dedupe enabled + cm.StopServer() + ctlr.Config.Storage.Dedupe = true + cm.StartAndWait(port) + + // wait for dedupe task to run + time.Sleep(15 * time.Second) + params["mount"] = string(manifestDigest) postResponse, err = client.R(). SetBasicAuth(username, passphrase).SetQueryParams(params). @@ -6479,7 +6487,7 @@ func TestPeriodicGC(t *testing.T) { ctlr := api.NewController(conf) dir := t.TempDir() ctlr.Config.Storage.RootDirectory = dir - + ctlr.Config.Storage.Dedupe = false ctlr.Config.Storage.GC = true ctlr.Config.Storage.GCInterval = 1 * time.Hour ctlr.Config.Storage.GCDelay = 1 * time.Second @@ -6518,8 +6526,8 @@ func TestPeriodicGC(t *testing.T) { subPaths := make(map[string]config.StorageConfig) - subPaths["/a"] = config.StorageConfig{RootDirectory: subDir, GC: true, GCDelay: 1 * time.Second, GCInterval: 24 * time.Hour, RemoteCache: false} //nolint:lll // gofumpt conflicts with lll - + subPaths["/a"] = config.StorageConfig{RootDirectory: subDir, GC: true, GCDelay: 1 * time.Second, GCInterval: 24 * time.Hour, RemoteCache: false, Dedupe: false} //nolint:lll // gofumpt conflicts with lll + ctlr.Config.Storage.Dedupe = false ctlr.Config.Storage.SubPaths = subPaths cm := test.NewControllerManager(ctlr) @@ -6552,8 +6560,8 @@ func TestPeriodicGC(t *testing.T) { ctlr := api.NewController(conf) dir := t.TempDir() ctlr.Config.Storage.RootDirectory = dir + ctlr.Config.Storage.Dedupe = false - ctlr.Config.Storage.RootDirectory = dir ctlr.Config.Storage.GC = true ctlr.Config.Storage.GCInterval = 1 * time.Hour ctlr.Config.Storage.GCDelay = 1 * time.Second diff --git a/pkg/common/common.go b/pkg/common/common.go index df1f809f..7ba87d6e 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -16,6 +16,8 @@ import ( "time" "unicode/utf8" + "github.com/opencontainers/go-digest" + "zotregistry.io/zot/pkg/log" ) @@ -233,3 +235,13 @@ func MarshalThroughStruct(obj interface{}, throughStruct interface{}) ([]byte, e return toJSON, nil } + +func DContains(slice []digest.Digest, item digest.Digest) bool { + for _, v := range slice { + if item == v { + return true + } + } + + return false +} diff --git a/pkg/extensions/extensions_test.go b/pkg/extensions/extensions_test.go index a838376e..28d909f5 100644 --- a/pkg/extensions/extensions_test.go +++ b/pkg/extensions/extensions_test.go @@ -91,7 +91,10 @@ func TestMetricsExtension(t *testing.T) { ctlrManager := test.NewControllerManager(ctlr) subPaths := make(map[string]config.StorageConfig) - subPaths["/a"] = config.StorageConfig{} + subPaths["/a"] = config.StorageConfig{ + Dedupe: false, + RootDirectory: t.TempDir(), + } ctlr.Config.Storage.RootDirectory = globalDir ctlr.Config.Storage.SubPaths = subPaths diff --git a/pkg/extensions/scrub/scrub_test.go b/pkg/extensions/scrub/scrub_test.go index a64230d7..e4713040 100644 --- a/pkg/extensions/scrub/scrub_test.go +++ b/pkg/extensions/scrub/scrub_test.go @@ -45,6 +45,8 @@ func TestScrubExtension(t *testing.T) { subdir := t.TempDir() conf.Storage.RootDirectory = dir + conf.Storage.Dedupe = false + substore := config.StorageConfig{RootDirectory: subdir} conf.Storage.SubPaths = map[string]config.StorageConfig{"/a": substore} conf.Log.Output = logFile.Name() @@ -86,6 +88,8 @@ func TestScrubExtension(t *testing.T) { dir := t.TempDir() conf.Storage.RootDirectory = dir + conf.Storage.Dedupe = false + conf.Log.Output = logFile.Name() trueValue := true scrubConfig := &extconf.ScrubConfig{ @@ -132,6 +136,8 @@ func TestScrubExtension(t *testing.T) { dir := t.TempDir() conf.Storage.RootDirectory = dir + conf.Storage.Dedupe = false + conf.Log.Output = logFile.Name() trueValue := true scrubConfig := &extconf.ScrubConfig{ diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 20c37d41..2a7580b8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -52,7 +52,11 @@ func (pq *generatorsPriorityQueue) Pop() any { return item } -const rateLimiterScheduler = 400 +const ( + rateLimiterScheduler = 400 + rateLimit = 5 * time.Second + numWorkers = 3 +) type Scheduler struct { tasksQLow chan Task @@ -63,6 +67,7 @@ type Scheduler struct { generatorsLock *sync.Mutex log log.Logger stopCh chan struct{} + RateLimit time.Duration } func NewScheduler(logC log.Logger) *Scheduler { @@ -82,14 +87,11 @@ func NewScheduler(logC log.Logger) *Scheduler { generatorsLock: new(sync.Mutex), log: log.Logger{Logger: sublogger}, stopCh: make(chan struct{}), + // default value + RateLimit: rateLimit, } } -const ( - rateLimit = 5 * time.Second - numWorkers = 3 -) - func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) { for i := 0; i < numWorkers; i++ { go func(workerID int) { @@ -120,6 +122,8 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { close(tasksWorker) close(scheduler.stopCh) + scheduler.log.Debug().Msg("scheduler: received stop signal, exiting...") + return default: i := 0 diff --git a/pkg/storage/cache/boltdb.go b/pkg/storage/cache/boltdb.go index b9c68a86..ec6bc35d 100644 --- a/pkg/storage/cache/boltdb.go +++ b/pkg/storage/cache/boltdb.go @@ -22,8 +22,9 @@ type BoltDBDriver struct { } type BoltDBDriverParameters struct { - RootDir, Name string - UseRelPaths bool + RootDir string + Name string + UseRelPaths bool } func NewBoltDBCache(parameters interface{}, log zlog.Logger) Cache { @@ -206,10 +207,17 @@ func (d *BoltDBDriver) HasBlob(digest godigest.Digest, blob string) bool { return errors.ErrCacheMiss } - if origin.Get([]byte(blob)) == nil { + deduped := bucket.Bucket([]byte(constants.DuplicatesBucket)) + if deduped == nil { return errors.ErrCacheMiss } + if origin.Get([]byte(blob)) == nil { + if deduped.Get([]byte(blob)) == nil { + return errors.ErrCacheMiss + } + } + return nil }); err != nil { return false diff --git a/pkg/storage/common.go b/pkg/storage/common.go index 5953e95a..8f0174ae 100644 --- a/pkg/storage/common.go +++ b/pkg/storage/common.go @@ -16,6 +16,7 @@ import ( "github.com/sigstore/cosign/pkg/oci/remote" zerr "zotregistry.io/zot/errors" + "zotregistry.io/zot/pkg/scheduler" storageConstants "zotregistry.io/zot/pkg/storage/constants" ) @@ -789,3 +790,88 @@ func CheckIsImageSignature(repoName string, manifestBlob []byte, reference strin return false, "", "", nil } + +/* + DedupeTaskGenerator takes all blobs paths found in the imagestore and groups them by digest + +for each digest and based on the dedupe value it will dedupe or restore deduped blobs to the original state(undeduped)\ +by creating a task for each digest and pushing it to the task scheduler. +*/ +type DedupeTaskGenerator struct { + ImgStore ImageStore + // storage dedupe value + Dedupe bool + // store blobs paths grouped by digest + digest godigest.Digest + duplicateBlobs []string + /* store processed digest, used for iterating duplicateBlobs one by one + and generating a task for each unprocessed one*/ + lastDigests []godigest.Digest + done bool + Log zerolog.Logger +} + +func (gen *DedupeTaskGenerator) GenerateTask() (scheduler.Task, error) { + var err error + + // get all blobs from imageStore and group them by digest + gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.lastDigests) + if err != nil { + gen.Log.Error().Err(err).Msg("dedupe rebuild: failed to get next digest") + + return nil, err + } + + // if no digests left, then mark the task generator as done + if gen.digest == "" { + gen.Log.Info().Msg("dedupe rebuild: finished") + + gen.done = true + + return nil, nil + } + + // mark digest as processed before running its task + gen.lastDigests = append(gen.lastDigests, gen.digest) + + // generate rebuild dedupe task for this digest + return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil +} + +func (gen *DedupeTaskGenerator) IsDone() bool { + return gen.done +} + +func (gen *DedupeTaskGenerator) Reset() { + gen.lastDigests = []godigest.Digest{} + gen.duplicateBlobs = []string{} + gen.digest = "" + gen.done = false +} + +type dedupeTask struct { + imgStore ImageStore + // digest of duplicateBLobs + digest godigest.Digest + // blobs paths with the same digest ^ + duplicateBlobs []string + dedupe bool + log zerolog.Logger +} + +func newDedupeTask(imgStore ImageStore, digest godigest.Digest, dedupe bool, + duplicateBlobs []string, log zerolog.Logger, +) *dedupeTask { + return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log} +} + +func (dt *dedupeTask) DoWork() error { + // run task + err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs) + if err != nil { + // log it + dt.log.Error().Err(err).Msgf("rebuild dedupe: failed to rebuild digest %s", dt.digest.String()) + } + + return err +} diff --git a/pkg/storage/local/local.go b/pkg/storage/local/local.go index 07fc5f6d..372c028c 100644 --- a/pkg/storage/local/local.go +++ b/pkg/storage/local/local.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/fs" "os" "path" "path/filepath" @@ -1776,3 +1777,166 @@ func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask { func (gcT *gcTask) DoWork() error { return gcT.imgStore.RunGCRepo(gcT.repo) } + +func (is *ImageStoreLocal) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest, +) (godigest.Digest, []string, error) { + var lockLatency time.Time + + dir := is.rootDir + + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + + var duplicateBlobs []string + + var digest godigest.Digest + + err := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error { + if err != nil { + is.log.Warn().Err(err).Msg("unable to walk dir, skipping it") + // skip files/dirs which can't be walked + return filepath.SkipDir + } + + if info.IsDir() { + return nil + } + + blobDigest := godigest.NewDigestFromEncoded("sha256", info.Name()) + if err := blobDigest.Validate(); err != nil { + return nil //nolint:nilerr // ignore files which are not blobs + } + + if digest == "" && !common.DContains(lastDigests, blobDigest) { + digest = blobDigest + } + + if blobDigest == digest { + duplicateBlobs = append(duplicateBlobs, path) + } + + return nil + }) + + return digest, duplicateBlobs, err +} + +func (is *ImageStoreLocal) dedupeBlobs(digest godigest.Digest, duplicateBlobs []string) error { + if fmt.Sprintf("%v", is.cache) == fmt.Sprintf("%v", nil) { + is.log.Error().Err(zerr.ErrDedupeRebuild).Msg("no cache driver found, can not dedupe blobs") + + return zerr.ErrDedupeRebuild + } + + is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest") + + var originalBlob string + + var originalBlobFi fs.FileInfo + + var err error + // rebuild from dedupe false to true + for _, blobPath := range duplicateBlobs { + /* for local storage, because we use hard links, we can assume that any blob can be original + so we skip the first one and hard link the rest of them with the first*/ + if originalBlob == "" { + originalBlob = blobPath + + originalBlobFi, err = os.Stat(originalBlob) + if err != nil { + is.log.Error().Err(err).Str("path", originalBlob).Msg("rebuild dedupe: failed to stat blob") + + return err + } + + // cache it + if ok := is.cache.HasBlob(digest, blobPath); !ok { + if err := is.cache.PutBlob(digest, blobPath); err != nil { + return err + } + } + + continue + } + + binfo, err := os.Stat(blobPath) + if err != nil { + is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob") + + return err + } + + // dedupe blob + if !os.SameFile(originalBlobFi, binfo) { + // we should link to a temp file instead of removing blob and then linking + // to make this more atomic + uuid, err := guuid.NewV4() + if err != nil { + return err + } + + // put temp blob in /.uploads dir + tempLinkBlobDir := path.Join(strings.Replace(blobPath, path.Join("blobs/sha256", binfo.Name()), "", 1), + storageConstants.BlobUploadDir) + + if err := os.MkdirAll(tempLinkBlobDir, DefaultDirPerms); err != nil { + is.log.Error().Err(err).Str("dir", tempLinkBlobDir).Msg("rebuild dedupe: unable to mkdir") + + return err + } + + tempLinkBlobPath := path.Join(tempLinkBlobDir, uuid.String()) + + if err := os.Link(originalBlob, tempLinkBlobPath); err != nil { + is.log.Error().Err(err).Str("src", originalBlob). + Str("dst", tempLinkBlobPath).Msg("rebuild dedupe: unable to hard link") + + return err + } + + if err := os.Rename(tempLinkBlobPath, blobPath); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Msg("rebuild dedupe: unable to rename temp link") + + return err + } + + // cache it + if ok := is.cache.HasBlob(digest, blobPath); !ok { + if err := is.cache.PutBlob(digest, blobPath); err != nil { + return err + } + } + } + } + + is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest finished successfully") + + return nil +} + +func (is *ImageStoreLocal) RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error { + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + + if dedupe { + return is.dedupeBlobs(digest, duplicateBlobs) + } + + // otherwise noop + return nil +} + +func (is *ImageStoreLocal) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) { + // for local storage no need to undedupe blobs + if is.dedupe { + generator := &storage.DedupeTaskGenerator{ + ImgStore: is, + Dedupe: is.dedupe, + Log: is.log, + } + + sch.SubmitGenerator(generator, interval, scheduler.HighPriority) + } +} diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index c30c6f9a..3c17200c 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -2,6 +2,7 @@ package local_test import ( "bytes" + "context" "crypto/rand" _ "crypto/sha256" "encoding/json" @@ -28,11 +29,13 @@ import ( "zotregistry.io/zot/pkg/common" "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" "zotregistry.io/zot/pkg/storage" "zotregistry.io/zot/pkg/storage/cache" storageConstants "zotregistry.io/zot/pkg/storage/constants" "zotregistry.io/zot/pkg/storage/local" "zotregistry.io/zot/pkg/test" + "zotregistry.io/zot/pkg/test/mocks" ) const ( @@ -40,6 +43,18 @@ const ( repoName = "test" ) +var errCache = errors.New("new cache error") + +func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { + taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler.RateLimit = 50 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + taskScheduler.RunScheduler(ctx) + + return taskScheduler, cancel +} + func TestStorageFSAPIs(t *testing.T) { dir := t.TempDir() @@ -1066,155 +1081,98 @@ func FuzzRunGCRepo(f *testing.F) { } func TestDedupeLinks(t *testing.T) { - dir := t.TempDir() + testCases := []struct { + dedupe bool + expected bool + }{ + { + dedupe: true, + expected: true, + }, + { + dedupe: false, + expected: false, + }, + } log := log.Logger{Logger: zerolog.New(os.Stdout)} metrics := monitoring.NewMetricsServer(false, log) - cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ - RootDir: dir, - Name: "cache", - UseRelPaths: true, - }, log) - imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, - true, true, log, metrics, nil, cacheDriver) - Convey("Dedupe", t, func(c C) { - // manifest1 - upload, err := imgStore.NewBlobUpload("dedupe1") - So(err, ShouldBeNil) - So(upload, ShouldNotBeEmpty) + for _, testCase := range testCases { + dir := t.TempDir() - content := []byte("test-data3") - buf := bytes.NewBuffer(content) - buflen := buf.Len() - digest := godigest.FromBytes(content) - blob, err := imgStore.PutBlobChunkStreamed("dedupe1", upload, buf) - So(err, ShouldBeNil) - So(blob, ShouldEqual, buflen) - blobDigest1 := strings.Split(digest.String(), ":")[1] - So(blobDigest1, ShouldNotBeEmpty) + cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: dir, + Name: "cache", + UseRelPaths: true, + }, log) - err = imgStore.FinishBlobUpload("dedupe1", upload, buf, digest) - So(err, ShouldBeNil) - So(blob, ShouldEqual, buflen) + imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, + testCase.dedupe, true, log, metrics, nil, cacheDriver) - _, _, err = imgStore.CheckBlob("dedupe1", digest) - So(err, ShouldBeNil) + Convey(fmt.Sprintf("Dedupe %t", testCase.dedupe), t, func(c C) { + // manifest1 + upload, err := imgStore.NewBlobUpload("dedupe1") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) - blobrc, _, err := imgStore.GetBlob("dedupe1", digest, "application/vnd.oci.image.layer.v1.tar+gzip") - So(err, ShouldBeNil) - err = blobrc.Close() - So(err, ShouldBeNil) + content := []byte("test-data3") + buf := bytes.NewBuffer(content) + buflen := buf.Len() + digest := godigest.FromBytes(content) + blob, err := imgStore.PutBlobChunkStreamed("dedupe1", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + blobDigest1 := strings.Split(digest.String(), ":")[1] + So(blobDigest1, ShouldNotBeEmpty) - cblob, cdigest := test.GetRandomImageConfig() - _, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest) - So(err, ShouldBeNil) - So(clen, ShouldEqual, len(cblob)) - hasBlob, _, err := imgStore.CheckBlob("dedupe1", cdigest) - So(err, ShouldBeNil) - So(hasBlob, ShouldEqual, true) + err = imgStore.FinishBlobUpload("dedupe1", upload, buf, digest) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) - manifest := ispec.Manifest{ - Config: ispec.Descriptor{ - MediaType: "application/vnd.oci.image.config.v1+json", - Digest: cdigest, - Size: int64(len(cblob)), - }, - Layers: []ispec.Descriptor{ - { - MediaType: "application/vnd.oci.image.layer.v1.tar", - Digest: digest, - Size: int64(buflen), - }, - }, - } - manifest.SchemaVersion = 2 - manifestBuf, err := json.Marshal(manifest) - So(err, ShouldBeNil) - digest = godigest.FromBytes(manifestBuf) - _, err = imgStore.PutImageManifest("dedupe1", digest.String(), - ispec.MediaTypeImageManifest, manifestBuf) - So(err, ShouldBeNil) - - _, _, _, err = imgStore.GetImageManifest("dedupe1", digest.String()) - So(err, ShouldBeNil) - - // manifest2 - upload, err = imgStore.NewBlobUpload("dedupe2") - So(err, ShouldBeNil) - So(upload, ShouldNotBeEmpty) - - content = []byte("test-data3") - buf = bytes.NewBuffer(content) - buflen = buf.Len() - digest = godigest.FromBytes(content) - blob, err = imgStore.PutBlobChunkStreamed("dedupe2", upload, buf) - So(err, ShouldBeNil) - So(blob, ShouldEqual, buflen) - blobDigest2 := strings.Split(digest.String(), ":")[1] - So(blobDigest2, ShouldNotBeEmpty) - - err = imgStore.FinishBlobUpload("dedupe2", upload, buf, digest) - So(err, ShouldBeNil) - So(blob, ShouldEqual, buflen) - - _, _, err = imgStore.CheckBlob("dedupe2", digest) - So(err, ShouldBeNil) - - blobrc, _, err = imgStore.GetBlob("dedupe2", digest, "application/vnd.oci.image.layer.v1.tar+gzip") - So(err, ShouldBeNil) - err = blobrc.Close() - So(err, ShouldBeNil) - - cblob, cdigest = test.GetRandomImageConfig() - _, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest) - So(err, ShouldBeNil) - So(clen, ShouldEqual, len(cblob)) - hasBlob, _, err = imgStore.CheckBlob("dedupe2", cdigest) - So(err, ShouldBeNil) - So(hasBlob, ShouldEqual, true) - - manifest = ispec.Manifest{ - Config: ispec.Descriptor{ - MediaType: "application/vnd.oci.image.config.v1+json", - Digest: cdigest, - Size: int64(len(cblob)), - }, - Layers: []ispec.Descriptor{ - { - MediaType: "application/vnd.oci.image.layer.v1.tar", - Digest: digest, - Size: int64(buflen), - }, - }, - } - manifest.SchemaVersion = 2 - manifestBuf, err = json.Marshal(manifest) - So(err, ShouldBeNil) - digest = godigest.FromBytes(manifestBuf) - _, err = imgStore.PutImageManifest("dedupe2", "1.0", ispec.MediaTypeImageManifest, manifestBuf) - So(err, ShouldBeNil) - - _, _, _, err = imgStore.GetImageManifest("dedupe2", digest.String()) - So(err, ShouldBeNil) - - // verify that dedupe with hard links happened - fi1, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest1)) - So(err, ShouldBeNil) - fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) - So(err, ShouldBeNil) - So(os.SameFile(fi1, fi2), ShouldBeTrue) - - Convey("storage and cache inconsistency", func() { - // delete blobs - err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + _, _, err = imgStore.CheckBlob("dedupe1", digest) So(err, ShouldBeNil) - err := os.Remove(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + blobrc, _, err := imgStore.GetBlob("dedupe1", digest, "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + err = blobrc.Close() So(err, ShouldBeNil) - // now cache is inconsistent with storage (blobs present in cache but not in storage) - upload, err = imgStore.NewBlobUpload("dedupe3") + cblob, cdigest := test.GetRandomImageConfig() + _, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest) + So(err, ShouldBeNil) + So(clen, ShouldEqual, len(cblob)) + hasBlob, _, err := imgStore.CheckBlob("dedupe1", cdigest) + So(err, ShouldBeNil) + So(hasBlob, ShouldEqual, true) + + manifest := ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(buflen), + }, + }, + } + manifest.SchemaVersion = 2 + manifestBuf, err := json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(manifestBuf) + _, err = imgStore.PutImageManifest("dedupe1", digest.String(), + ispec.MediaTypeImageManifest, manifestBuf) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetImageManifest("dedupe1", digest.String()) + So(err, ShouldBeNil) + + // manifest2 + upload, err = imgStore.NewBlobUpload("dedupe2") So(err, ShouldBeNil) So(upload, ShouldNotBeEmpty) @@ -1222,17 +1180,213 @@ func TestDedupeLinks(t *testing.T) { buf = bytes.NewBuffer(content) buflen = buf.Len() digest = godigest.FromBytes(content) - blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf) + blob, err = imgStore.PutBlobChunkStreamed("dedupe2", upload, buf) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) blobDigest2 := strings.Split(digest.String(), ":")[1] So(blobDigest2, ShouldNotBeEmpty) - err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest) + err = imgStore.FinishBlobUpload("dedupe2", upload, buf, digest) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) + + _, _, err = imgStore.CheckBlob("dedupe2", digest) + So(err, ShouldBeNil) + + blobrc, _, err = imgStore.GetBlob("dedupe2", digest, "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + err = blobrc.Close() + So(err, ShouldBeNil) + + cblob, cdigest = test.GetRandomImageConfig() + _, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest) + So(err, ShouldBeNil) + So(clen, ShouldEqual, len(cblob)) + hasBlob, _, err = imgStore.CheckBlob("dedupe2", cdigest) + So(err, ShouldBeNil) + So(hasBlob, ShouldEqual, true) + + manifest = ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(buflen), + }, + }, + } + manifest.SchemaVersion = 2 + manifestBuf, err = json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(manifestBuf) + _, err = imgStore.PutImageManifest("dedupe2", "1.0", ispec.MediaTypeImageManifest, manifestBuf) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetImageManifest("dedupe2", digest.String()) + So(err, ShouldBeNil) + + // verify that dedupe with hard links happened + fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + So(os.SameFile(fi1, fi2), ShouldEqual, testCase.expected) + + if !testCase.dedupe { + Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { + for i := 0; i < 10; i++ { + taskScheduler, cancel := runAndGetScheduler() + // rebuild with dedupe true + imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, + true, true, log, metrics, nil, cacheDriver) + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + sleepValue := i * 50 + time.Sleep(time.Duration(sleepValue) * time.Millisecond) + + cancel() + } + + taskScheduler, cancel := runAndGetScheduler() + + // rebuild with dedupe true + imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, + true, true, log, metrics, nil, cacheDriver) + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(10 * time.Second) + + cancel() + + fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + So(os.SameFile(fi1, fi2), ShouldEqual, true) + }) + + Convey("rebuild dedupe index error cache nil", func() { + // switch dedupe to true from false + taskScheduler, cancel := runAndGetScheduler() + + imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, + true, true, log, metrics, nil, nil) + + // rebuild with dedupe true + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + + time.Sleep(3 * time.Second) + + cancel() + + fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + So(os.SameFile(fi1, fi2), ShouldEqual, false) + }) + + Convey("rebuild dedupe index cache error on original blob", func() { + // switch dedupe to true from false + taskScheduler, cancel := runAndGetScheduler() + + imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, + true, true, log, metrics, nil, &mocks.CacheMock{ + HasBlobFn: func(digest godigest.Digest, path string) bool { + return false + }, + PutBlobFn: func(digest godigest.Digest, path string) error { + return errCache + }, + }) + // rebuild with dedupe true, should have samefile blobs + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + + time.Sleep(10 * time.Second) + + cancel() + + fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + So(os.SameFile(fi1, fi2), ShouldEqual, false) + }) + + Convey("rebuild dedupe index cache error on duplicate blob", func() { + // switch dedupe to true from false + taskScheduler, cancel := runAndGetScheduler() + + imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, + true, true, log, metrics, nil, &mocks.CacheMock{ + HasBlobFn: func(digest godigest.Digest, path string) bool { + return false + }, + PutBlobFn: func(digest godigest.Digest, path string) error { + if strings.Contains(path, "dedupe2") { + return errCache + } + + return nil + }, + }) + // rebuild with dedupe true, should have samefile blobs + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + + time.Sleep(15 * time.Second) + + cancel() + + fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + // deduped happened, but didn't cached + So(os.SameFile(fi1, fi2), ShouldEqual, true) + }) + } + + Convey("storage and cache inconsistency", func() { + // delete blobs + err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + + err := os.Remove(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + // now cache is inconsistent with storage (blobs present in cache but not in storage) + upload, err = imgStore.NewBlobUpload("dedupe3") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + content = []byte("test-data3") + buf = bytes.NewBuffer(content) + buflen = buf.Len() + digest = godigest.FromBytes(content) + blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + blobDigest2 := strings.Split(digest.String(), ":")[1] + So(blobDigest2, ShouldNotBeEmpty) + + err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + }) }) - }) + } } func TestDedupe(t *testing.T) { diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index a449dcc9..c26adbaa 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -24,6 +24,7 @@ import ( "github.com/rs/zerolog" zerr "zotregistry.io/zot/errors" + "zotregistry.io/zot/pkg/common" "zotregistry.io/zot/pkg/extensions/monitoring" zlog "zotregistry.io/zot/pkg/log" zreg "zotregistry.io/zot/pkg/regexp" @@ -1191,7 +1192,7 @@ func (is *ObjectStorage) GetBlob(repo string, digest godigest.Digest, mediaType } // is a 'deduped' blob? - if binfo.Size() == 0 && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { + if binfo.Size() == 0 { // Check blobs in cache dstRecord, err := is.checkCacheBlob(digest) if err != nil { @@ -1244,7 +1245,7 @@ func (is *ObjectStorage) GetBlobContent(repo string, digest godigest.Digest) ([] } // is a 'deduped' blob? - if binfo.Size() == 0 && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { + if binfo.Size() == 0 { // Check blobs in cache dstRecord, err := is.checkCacheBlob(digest) if err != nil { @@ -1395,3 +1396,230 @@ func writeFile(store driver.StorageDriver, filepath string, buf []byte) (int, er return n, nil } + +func (is *ObjectStorage) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error) { + var lockLatency time.Time + + dir := is.rootDir + + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + + var duplicateBlobs []string + + var digest godigest.Digest + + err := is.store.Walk(context.Background(), dir, func(fileInfo driver.FileInfo) error { + if fileInfo.IsDir() { + return nil + } + + blobDigest := godigest.NewDigestFromEncoded("sha256", path.Base(fileInfo.Path())) + if err := blobDigest.Validate(); err != nil { + return nil //nolint:nilerr // ignore files which are not blobs + } + + if digest == "" && !common.DContains(lastDigests, blobDigest) { + digest = blobDigest + } + + if blobDigest == digest { + duplicateBlobs = append(duplicateBlobs, fileInfo.Path()) + } + + return nil + }) + + // if the root directory is not yet created + var perr driver.PathNotFoundError + + if errors.As(err, &perr) { + return digest, duplicateBlobs, nil + } + + return digest, duplicateBlobs, err +} + +func (is *ObjectStorage) getOriginalBlobFromDisk(duplicateBlobs []string) (string, error) { + for _, blobPath := range duplicateBlobs { + binfo, err := is.store.Stat(context.Background(), blobPath) + if err != nil { + is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob") + + return "", zerr.ErrBlobNotFound + } + + if binfo.Size() > 0 { + return blobPath, nil + } + } + + return "", zerr.ErrBlobNotFound +} + +func (is *ObjectStorage) getOriginalBlob(digest godigest.Digest, duplicateBlobs []string) (string, error) { + originalBlob := "" + + var err error + + originalBlob, err = is.checkCacheBlob(digest) + if err != nil && !errors.Is(err, zerr.ErrBlobNotFound) && !errors.Is(err, zerr.ErrCacheMiss) { + is.log.Error().Err(err).Msg("rebuild dedupe: unable to find blob in cache") + + return originalBlob, err + } + + // if we still don't have, search it + if originalBlob == "" { + is.log.Warn().Msg("rebuild dedupe: failed to find blob in cache, searching it in s3...") + // a rebuild dedupe was attempted in the past + // get original blob, should be found otherwise exit with error + + originalBlob, err = is.getOriginalBlobFromDisk(duplicateBlobs) + if err != nil { + return originalBlob, err + } + } + + is.log.Info().Msgf("rebuild dedupe: found original blob %s", originalBlob) + + return originalBlob, nil +} + +func (is *ObjectStorage) dedupeBlobs(digest godigest.Digest, duplicateBlobs []string) error { + if fmt.Sprintf("%v", is.cache) == fmt.Sprintf("%v", nil) { + is.log.Error().Err(zerr.ErrDedupeRebuild).Msg("no cache driver found, can not dedupe blobs") + + return zerr.ErrDedupeRebuild + } + + is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest") + + var originalBlob string + + // rebuild from dedupe false to true + for _, blobPath := range duplicateBlobs { + binfo, err := is.store.Stat(context.Background(), blobPath) + if err != nil { + is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob") + + return err + } + + if binfo.Size() == 0 { + is.log.Warn().Msg("rebuild dedupe: found file without content, trying to find the original blob") + // a rebuild dedupe was attempted in the past + // get original blob, should be found otherwise exit with error + if originalBlob == "" { + originalBlob, err = is.getOriginalBlob(digest, duplicateBlobs) + if err != nil { + is.log.Error().Err(err).Msg("rebuild dedupe: unable to find original blob") + + return zerr.ErrDedupeRebuild + } + + // cache original blob + if ok := is.cache.HasBlob(digest, originalBlob); !ok { + if err := is.cache.PutBlob(digest, originalBlob); err != nil { + return err + } + } + } + + // cache dedupe blob + if ok := is.cache.HasBlob(digest, blobPath); !ok { + if err := is.cache.PutBlob(digest, blobPath); err != nil { + return err + } + } + } else { + // cache it + if ok := is.cache.HasBlob(digest, blobPath); !ok { + if err := is.cache.PutBlob(digest, blobPath); err != nil { + return err + } + } + + // if we have an original blob cached then we can safely dedupe the rest of them + if originalBlob != "" { + if err := is.store.PutContent(context.Background(), blobPath, []byte{}); err != nil { + is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: unable to dedupe blob") + + return err + } + } + + // mark blob as preserved + originalBlob = blobPath + } + } + + is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest finished successfully") + + return nil +} + +func (is *ObjectStorage) restoreDedupedBlobs(digest godigest.Digest, duplicateBlobs []string) error { + is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: restoring deduped blobs for digest") + + // first we need to find the original blob, either in cache or by checking each blob size + originalBlob, err := is.getOriginalBlob(digest, duplicateBlobs) + if err != nil { + is.log.Error().Err(err).Msg("rebuild dedupe: unable to find original blob") + + return zerr.ErrDedupeRebuild + } + + for _, blobPath := range duplicateBlobs { + binfo, err := is.store.Stat(context.Background(), blobPath) + if err != nil { + is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob") + + return err + } + + // if we find a deduped blob, then copy original blob content to deduped one + if binfo.Size() == 0 { + // move content from original blob to deduped one + buf, err := is.store.GetContent(context.Background(), originalBlob) + if err != nil { + is.log.Error().Err(err).Str("path", originalBlob).Msg("rebuild dedupe: failed to get original blob content") + + return err + } + + _, err = writeFile(is.store, blobPath, buf) + if err != nil { + return err + } + } + } + + is.log.Info().Str("digest", digest.String()). + Msgf("rebuild dedupe: restoring deduped blobs for digest finished successfully") + + return nil +} + +func (is *ObjectStorage) RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error { + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + + if dedupe { + return is.dedupeBlobs(digest, duplicateBlobs) + } + + return is.restoreDedupedBlobs(digest, duplicateBlobs) +} + +func (is *ObjectStorage) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) { + generator := &storage.DedupeTaskGenerator{ + ImgStore: is, + Dedupe: is.dedupe, + Log: is.log, + } + + sch.SubmitGenerator(generator, interval, scheduler.HighPriority) +} diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index e399e66e..e1512703 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -30,11 +30,13 @@ import ( "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" "zotregistry.io/zot/pkg/storage" "zotregistry.io/zot/pkg/storage/cache" storageConstants "zotregistry.io/zot/pkg/storage/constants" "zotregistry.io/zot/pkg/storage/s3" "zotregistry.io/zot/pkg/test" + "zotregistry.io/zot/pkg/test/mocks" ) //nolint:gochecknoglobals @@ -44,6 +46,7 @@ var ( fileInfoSize = 10 errorText = "new s3 error" errS3 = errors.New(errorText) + errCache = errors.New("new cache error") zotStorageTest = "zot-storage-test" s3Region = "us-east-2" ) @@ -85,11 +88,20 @@ func createMockStorage(rootDir string, cacheDir string, dedupe bool, store drive return il } -func createObjectsStore(rootDir string, cacheDir string, dedupe bool) ( - driver.StorageDriver, - storage.ImageStore, - error, -) { +func createMockStorageWithMockCache(rootDir string, dedupe bool, store driver.StorageDriver, + cacheDriver cache.Cache, +) storage.ImageStore { + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + + il := s3.NewImageStore(rootDir, "", false, storage.DefaultGCDelay, + dedupe, false, log, metrics, nil, store, cacheDriver, + ) + + return il +} + +func createStoreDriver(rootDir string) driver.StorageDriver { bucket := zotStorageTest endpoint := os.Getenv("S3MOCK_ENDPOINT") storageDriverParams := map[string]interface{}{ @@ -117,19 +129,33 @@ func createObjectsStore(rootDir string, cacheDir string, dedupe bool) ( panic(err) } + return store +} + +func createObjectsStore(rootDir string, cacheDir string, dedupe bool) ( + driver.StorageDriver, + storage.ImageStore, + error, +) { + store := createStoreDriver(rootDir) + log := log.Logger{Logger: zerolog.New(os.Stdout)} metrics := monitoring.NewMetricsServer(false, log) var cacheDriver cache.Cache + var err error + // from pkg/cli/root.go/applyDefaultValues, s3 magic - if _, err := os.Stat(path.Join(cacheDir, "s3_cache.db")); dedupe || (!dedupe && err == nil) { + s3CacheDBPath := path.Join(cacheDir, s3.CacheDBName+storageConstants.DBExtensionName) + if _, err = os.Stat(s3CacheDBPath); dedupe || (!dedupe && err == nil) { cacheDriver, _ = storage.Create("boltdb", cache.BoltDBDriverParameters{ RootDir: cacheDir, Name: "s3_cache", UseRelPaths: false, }, log) } + il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, dedupe, false, log, metrics, nil, store, cacheDriver) @@ -141,32 +167,7 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl storage.ImageStore, error, ) { - bucket := zotStorageTest - endpoint := os.Getenv("S3MOCK_ENDPOINT") - storageDriverParams := map[string]interface{}{ - "rootDir": rootDir, - "name": "s3", - "region": s3Region, - "bucket": bucket, - "regionendpoint": endpoint, - "accesskey": "minioadmin", - "secretkey": "minioadmin", - "secure": false, - "skipverify": false, - } - - storeName := fmt.Sprintf("%v", storageDriverParams["name"]) - - store, err := factory.Create(storeName, storageDriverParams) - if err != nil { - panic(err) - } - - // create bucket if it doesn't exists - _, err = resty.R().Put("http://" + endpoint + "/" + bucket) - if err != nil { - panic(err) - } + store := createStoreDriver(rootDir) log := log.Logger{Logger: zerolog.New(os.Stdout)} metrics := monitoring.NewMetricsServer(false, log) @@ -174,6 +175,7 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl var cacheDriver cache.Cache // from pkg/cli/root.go/applyDefaultValues, s3 magic + tableName = strings.ReplaceAll(tableName, "/", "") cacheDriver, _ = storage.Create("dynamodb", cache.DynamoDBDriverParameters{ Endpoint: os.Getenv("DYNAMODBMOCK_ENDPOINT"), @@ -181,11 +183,10 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl TableName: tableName, }, log) - tableName = strings.ReplaceAll(tableName, "/", "") //nolint:errcheck cacheDriverDynamo, _ := cacheDriver.(*cache.DynamoDBDriver) - err = cacheDriverDynamo.NewTable(tableName) + err := cacheDriverDynamo.NewTable(tableName) if err != nil { panic(err) } @@ -196,12 +197,27 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl return store, il, err } +func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { + taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler.RateLimit = 50 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + taskScheduler.RunScheduler(ctx) + + return taskScheduler, cancel +} + type FileInfoMock struct { IsDirFn func() bool SizeFn func() int64 + PathFn func() string } func (f *FileInfoMock) Path() string { + if f != nil && f.PathFn != nil { + return f.PathFn() + } + return "" } @@ -1558,6 +1574,59 @@ func TestS3Dedupe(t *testing.T) { // the new blob with dedupe false should be equal with the origin blob from dedupe1 So(fi1.Size(), ShouldEqual, fi3.Size()) + + Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl + taskScheduler, cancel := runAndGetScheduler() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + + time.Sleep(10 * time.Second) + + cancel() + + fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", + blobDigest1.Encoded())) + So(fi1.Size(), ShouldBeGreaterThan, 0) + So(err, ShouldBeNil) + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldEqual, fi1.Size()) + + blobContent, err := imgStore.GetBlobContent("dedupe2", blobDigest2) + So(err, ShouldBeNil) + So(len(blobContent), ShouldEqual, fi1.Size()) + + Convey("rebuild s3 dedupe index from false to true", func() { + taskScheduler, cancel := runAndGetScheduler() + + defer cancel() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + + time.Sleep(10 * time.Second) + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldEqual, 0) + + blobContent, err := imgStore.GetBlobContent("dedupe2", blobDigest2) + So(err, ShouldBeNil) + So(len(blobContent), ShouldBeGreaterThan, 0) + }) + }) }) }) @@ -1718,6 +1787,59 @@ func TestS3Dedupe(t *testing.T) { // deduped blob should be of size 0 So(fi2.Size(), ShouldEqual, 0) + Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl + taskScheduler, cancel := runAndGetScheduler() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + + time.Sleep(10 * time.Second) + + cancel() + + fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", + blobDigest1.Encoded())) + So(fi1.Size(), ShouldBeGreaterThan, 0) + So(err, ShouldBeNil) + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldEqual, fi1.Size()) + + blobContent, err := imgStore.GetBlobContent("dedupe2", blobDigest2) + So(err, ShouldBeNil) + So(len(blobContent), ShouldEqual, fi1.Size()) + + Convey("rebuild s3 dedupe index from false to true", func() { + taskScheduler, cancel := runAndGetScheduler() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + + time.Sleep(10 * time.Second) + + cancel() + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldEqual, 0) + + blobContent, err := imgStore.GetBlobContent("dedupe2", blobDigest2) + So(err, ShouldBeNil) + So(len(blobContent), ShouldBeGreaterThan, 0) + }) + }) + Convey("Check that delete blobs moves the real content to the next contenders", func() { // if we delete blob1, the content should be moved to blob2 err = imgStore.DeleteBlob("dedupe1", blobDigest1) @@ -1745,6 +1867,900 @@ func TestS3Dedupe(t *testing.T) { }) } +func TestRebuildDedupeIndex(t *testing.T) { + skipIt(t) + + Convey("Push images with dedupe true", t, func() { + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir := path.Join("/oci-repo-test", uuid.String()) + + tdir := t.TempDir() + + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) + defer cleanupStorage(storeDriver, testDir) + + // push image1 + content := []byte("test-data3") + buf := bytes.NewBuffer(content) + buflen := buf.Len() + digest := godigest.FromBytes(content) + + blobDigest1 := digest + + _, blen, err := imgStore.FullBlobUpload("dedupe1", buf, digest) + So(err, ShouldBeNil) + So(blen, ShouldEqual, buflen) + + hasBlob, blen1, err := imgStore.CheckBlob("dedupe1", digest) + So(blen1, ShouldEqual, buflen) + So(hasBlob, ShouldEqual, true) + So(err, ShouldBeNil) + + cblob, cdigest := test.GetRandomImageConfig() + _, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest) + So(err, ShouldBeNil) + So(clen, ShouldEqual, len(cblob)) + + hasBlob, clen, err = imgStore.CheckBlob("dedupe1", cdigest) + So(err, ShouldBeNil) + So(hasBlob, ShouldEqual, true) + So(clen, ShouldEqual, len(cblob)) + + manifest := ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(buflen), + }, + }, + } + + manifest.SchemaVersion = 2 + manifestBuf, err := json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(manifestBuf) + _, err = imgStore.PutImageManifest("dedupe1", digest.String(), + ispec.MediaTypeImageManifest, manifestBuf) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetImageManifest("dedupe1", digest.String()) + So(err, ShouldBeNil) + + content = []byte("test-data3") + buf = bytes.NewBuffer(content) + buflen = buf.Len() + digest = godigest.FromBytes(content) + + blobDigest2 := digest + + _, blen, err = imgStore.FullBlobUpload("dedupe2", buf, digest) + So(err, ShouldBeNil) + So(blen, ShouldEqual, buflen) + + hasBlob, blen1, err = imgStore.CheckBlob("dedupe2", digest) + So(blen1, ShouldEqual, buflen) + So(hasBlob, ShouldEqual, true) + So(err, ShouldBeNil) + + _, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest) + So(err, ShouldBeNil) + So(clen, ShouldEqual, len(cblob)) + + hasBlob, clen, err = imgStore.CheckBlob("dedupe2", cdigest) + So(err, ShouldBeNil) + So(hasBlob, ShouldEqual, true) + So(clen, ShouldEqual, len(cblob)) + + digest = godigest.FromBytes(manifestBuf) + _, err = imgStore.PutImageManifest("dedupe2", digest.String(), + ispec.MediaTypeImageManifest, manifestBuf) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetImageManifest("dedupe2", digest.String()) + So(err, ShouldBeNil) + + configFi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", + cdigest.Encoded())) + So(err, ShouldBeNil) + + configFi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + cdigest.Encoded())) + So(err, ShouldBeNil) + + fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", + blobDigest1.Encoded())) + So(err, ShouldBeNil) + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + + // original blob should have the real content of blob + So(fi1.Size(), ShouldNotEqual, fi2.Size()) + So(fi1.Size(), ShouldBeGreaterThan, 0) + // deduped blob should be of size 0 + So(fi2.Size(), ShouldEqual, 0) + + So(configFi1.Size(), ShouldNotEqual, configFi2.Size()) + So(configFi1.Size(), ShouldBeGreaterThan, 0) + // deduped blob should be of size 0 + So(configFi2.Size(), ShouldEqual, 0) + + Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { + for i := 0; i < 10; i++ { + taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler.RateLimit = 1 * time.Millisecond + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + + taskScheduler.RunScheduler(ctx) + + storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), false) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + sleepValue := i * 100 + time.Sleep(time.Duration(sleepValue) * time.Millisecond) + + cancel() + } + + taskScheduler, cancel := runAndGetScheduler() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(5 * time.Second) + + cancel() + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldEqual, fi1.Size()) + + configFi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + cdigest.Encoded())) + So(err, ShouldBeNil) + So(configFi2.Size(), ShouldEqual, configFi1.Size()) + + // now from dedupe false to true + for i := 0; i < 10; i++ { + taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler.RateLimit = 1 * time.Millisecond + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + taskScheduler.RunScheduler(ctx) + + storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), true) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + sleepValue := i * 100 + time.Sleep(time.Duration(sleepValue) * time.Millisecond) + + cancel() + } + + taskScheduler, cancel = runAndGetScheduler() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(5 * time.Second) + + cancel() + + fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldNotEqual, fi1.Size()) + So(fi2.Size(), ShouldEqual, 0) + + configFi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + cdigest.Encoded())) + So(err, ShouldBeNil) + So(configFi2.Size(), ShouldNotEqual, configFi1.Size()) + So(configFi2.Size(), ShouldEqual, 0) + }) + + Convey("Trigger ErrDedupeRebuild because cache is nil", func() { + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) + defer cleanupStorage(storeDriver, testDir) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(3 * time.Second) + }) + + Convey("Rebuild dedupe index already rebuilt", func() { + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) + defer cleanupStorage(storeDriver, testDir) + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(3 * time.Second) + }) + + Convey("Trigger Stat error while getting original blob", func() { + tdir := t.TempDir() + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, false) + defer cleanupStorage(storeDriver, testDir) + + // remove original blob + err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) + So(err, ShouldBeNil) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(5 * time.Second) + }) + + Convey("Trigger ErrDedupeRebuild while statting original blob", func() { + // remove original blob + err := storeDriver.Delete(context.Background(), fi1.Path()) + So(err, ShouldBeNil) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) + defer cleanupStorage(storeDriver, testDir) + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(5 * time.Second) + }) + + Convey("Trigger ErrDedupeRebuild when original blob has 0 size", func() { + // remove original blob + err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) + So(err, ShouldBeNil) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(3 * time.Second) + }) + + Convey("Trigger GetNextDigestWithBlobPaths path not found err", func() { + tdir := t.TempDir() + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) + defer cleanupStorage(storeDriver, testDir) + + // remove rootDir + err := storeDriver.Delete(context.Background(), imgStore.RootDir()) + So(err, ShouldBeNil) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(3 * time.Second) + }) + + Convey("Rebuild from true to false", func() { + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) + defer cleanupStorage(storeDriver, testDir) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(10 * time.Second) + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", + blobDigest2.Encoded())) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldEqual, fi1.Size()) + }) + }) +} + +func TestRebuildDedupeMockStoreDriver(t *testing.T) { + skipIt(t) + + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir := path.Join("/oci-repo-test", uuid.String()) + + tdir := t.TempDir() + + validDigest := godigest.FromString("digest") + + Convey("Trigger Stat error in getOriginalBlobFromDisk()", t, func() { + imgStore := createMockStorage(testDir, tdir, false, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return &FileInfoMock{}, errS3 + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + return walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() { + imgStore := createMockStorage(testDir, tdir, false, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(0) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + GetContentFn: func(ctx context.Context, path string) ([]byte, error) { + return []byte{}, errS3 + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() { + imgStore := createMockStorage(testDir, tdir, false, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(0) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { + return &FileWriterMock{}, errS3 + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + Convey("Trigger Stat() error in restoreDedupedBlobs()", t, func() { + imgStore := createMockStorage(testDir, tdir, false, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, errS3 + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + + Convey("Trigger Stat() error in dedupeBlobs()", func() { + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + imgStore := createMockStorage(testDir, t.TempDir(), true, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, errS3 + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + }) + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(500 * time.Millisecond) + }) + }) + + Convey("Trigger PutContent() error in dedupeBlobs()", t, func() { + tdir := t.TempDir() + imgStore := createMockStorage(testDir, tdir, true, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(0) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + PutContentFn: func(ctx context.Context, path string, content []byte) error { + return errS3 + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + //nolint: dupl + Convey("Trigger getOriginalBlob() error in dedupeBlobs()", t, func() { + tdir := t.TempDir() + imgStore := createMockStorage(testDir, tdir, true, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(0) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(0) + }, + }, nil + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + //nolint: dupl + Convey("Trigger Stat() error in dedupeBlobs()", t, func() { + tdir := t.TempDir() + imgStore := createMockStorage(testDir, tdir, true, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, errS3 + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + Convey("Trigger getNextDigestWithBlobPaths err", t, func() { + tdir := t.TempDir() + imgStore := createMockStorage(testDir, tdir, true, &StorageDriverMock{ + WalkFn: func(ctx context.Context, path string, f driver.WalkFn) error { + return errS3 + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + Convey("Trigger cache errors", t, func() { + storageDriverMockIfBranch := &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(0) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + } + + storageDriverMockElseBranch := &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + } + + return &FileInfoMock{ + SizeFn: func() int64 { + return int64(10) + }, + }, nil + }, + WalkFn: func(ctx context.Context, path string, walkFn driver.WalkFn) error { + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/%s", validDigest.Encoded()) + }, + }) + _ = walkFn(&FileInfoMock{ + IsDirFn: func() bool { + return false + }, + PathFn: func() string { + return fmt.Sprintf("path/to/second/%s", validDigest.Encoded()) + }, + }) + + return nil + }, + } + + Convey("on original blob", func() { + imgStore := createMockStorageWithMockCache(testDir, true, storageDriverMockIfBranch, + &mocks.CacheMock{ + HasBlobFn: func(digest godigest.Digest, path string) bool { + return false + }, + PutBlobFn: func(digest godigest.Digest, path string) error { + return errCache + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + Convey("on dedupe blob", func() { + imgStore := createMockStorageWithMockCache(testDir, true, storageDriverMockIfBranch, + &mocks.CacheMock{ + HasBlobFn: func(digest godigest.Digest, path string) bool { + return false + }, + PutBlobFn: func(digest godigest.Digest, path string) error { + if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { + return errCache + } + + return nil + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + + Convey("on else branch", func() { + imgStore := createMockStorageWithMockCache(testDir, true, storageDriverMockElseBranch, + &mocks.CacheMock{ + HasBlobFn: func(digest godigest.Digest, path string) bool { + return false + }, + PutBlobFn: func(digest godigest.Digest, path string) error { + return errCache + }, + }) + + taskScheduler, cancel := runAndGetScheduler() + defer cancel() + + // rebuild with dedupe false, should have all blobs with content + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + // wait until rebuild finishes + time.Sleep(200 * time.Millisecond) + }) + }) +} + func TestS3PullRange(t *testing.T) { skipIt(t) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 15c3acf4..342c2f2a 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -53,4 +53,7 @@ type ImageStore interface { //nolint:interfacebloat GetOrasReferrers(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error) RunGCRepo(repo string) error RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) + RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) + RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error + GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error) } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index ad983929..09392952 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -84,6 +84,7 @@ func createObjectsStore(rootDir string, cacheDir string) (driver.StorageDriver, Name: "s3_cache", UseRelPaths: false, }, log) + il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, true, false, log, metrics, nil, store, cacheDriver, ) diff --git a/pkg/test/mocks/cache_mock.go b/pkg/test/mocks/cache_mock.go new file mode 100644 index 00000000..1d4e95e8 --- /dev/null +++ b/pkg/test/mocks/cache_mock.go @@ -0,0 +1,60 @@ +package mocks + +import godigest "github.com/opencontainers/go-digest" + +type CacheMock struct { + // Returns the human-readable "name" of the driver. + NameFn func() string + + // Retrieves the blob matching provided digest. + GetBlobFn func(digest godigest.Digest) (string, error) + + // Uploads blob to cachedb. + PutBlobFn func(digest godigest.Digest, path string) error + + // Check if blob exists in cachedb. + HasBlobFn func(digest godigest.Digest, path string) bool + + // Delete a blob from the cachedb. + DeleteBlobFn func(digest godigest.Digest, path string) error +} + +func (cacheMock CacheMock) Name() string { + if cacheMock.NameFn != nil { + return cacheMock.NameFn() + } + + return "mock" +} + +func (cacheMock CacheMock) GetBlob(digest godigest.Digest) (string, error) { + if cacheMock.GetBlobFn != nil { + return cacheMock.GetBlobFn(digest) + } + + return "", nil +} + +func (cacheMock CacheMock) PutBlob(digest godigest.Digest, path string) error { + if cacheMock.PutBlobFn != nil { + return cacheMock.PutBlobFn(digest, path) + } + + return nil +} + +func (cacheMock CacheMock) HasBlob(digest godigest.Digest, path string) bool { + if cacheMock.HasBlobFn != nil { + return cacheMock.HasBlobFn(digest, path) + } + + return true +} + +func (cacheMock CacheMock) DeleteBlob(digest godigest.Digest, path string) error { + if cacheMock.DeleteBlobFn != nil { + return cacheMock.DeleteBlobFn(digest, path) + } + + return nil +} diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index a9eb7f23..31e3aedc 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -36,15 +36,19 @@ type MockedImageStore struct { CheckBlobFn func(repo string, digest godigest.Digest) (bool, int64, error) GetBlobPartialFn func(repo string, digest godigest.Digest, mediaType string, from, to int64, ) (io.ReadCloser, int64, int64, error) - GetBlobFn func(repo string, digest godigest.Digest, mediaType string) (io.ReadCloser, int64, error) - DeleteBlobFn func(repo string, digest godigest.Digest) error - GetIndexContentFn func(repo string) ([]byte, error) - GetBlobContentFn func(repo string, digest godigest.Digest) ([]byte, error) - GetReferrersFn func(repo string, digest godigest.Digest, artifactTypes []string) (ispec.Index, error) - GetOrasReferrersFn func(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error) - URLForPathFn func(path string) (string, error) - RunGCRepoFn func(repo string) error - RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler) + GetBlobFn func(repo string, digest godigest.Digest, mediaType string) (io.ReadCloser, int64, error) + DeleteBlobFn func(repo string, digest godigest.Digest) error + GetIndexContentFn func(repo string) ([]byte, error) + GetBlobContentFn func(repo string, digest godigest.Digest) ([]byte, error) + GetReferrersFn func(repo string, digest godigest.Digest, artifactTypes []string) (ispec.Index, error) + GetOrasReferrersFn func(repo string, digest godigest.Digest, artifactType string, + ) ([]artifactspec.Descriptor, error) + URLForPathFn func(path string) (string, error) + RunGCRepoFn func(repo string) error + RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler) + RunDedupeBlobsFn func(interval time.Duration, sch *scheduler.Scheduler) + RunDedupeForDigestFn func(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error + GetNextDigestWithBlobPathsFn func(lastDigests []godigest.Digest) (godigest.Digest, []string, error) } func (is MockedImageStore) Lock(t *time.Time) { @@ -332,3 +336,26 @@ func (is MockedImageStore) RunGCPeriodically(interval time.Duration, sch *schedu is.RunGCPeriodicallyFn(interval, sch) } } + +func (is MockedImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) { + if is.RunDedupeBlobsFn != nil { + is.RunDedupeBlobsFn(interval, sch) + } +} + +func (is MockedImageStore) RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error { + if is.RunDedupeForDigestFn != nil { + return is.RunDedupeForDigestFn(digest, dedupe, duplicateBlobs) + } + + return nil +} + +func (is MockedImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest, +) (godigest.Digest, []string, error) { + if is.GetNextDigestWithBlobPathsFn != nil { + return is.GetNextDigestWithBlobPathsFn(lastDigests) + } + + return "", []string{}, nil +} diff --git a/test/blackbox/helpers_cloud.bash b/test/blackbox/helpers_cloud.bash index 5daaff40..1b450408 100644 --- a/test/blackbox/helpers_cloud.bash +++ b/test/blackbox/helpers_cloud.bash @@ -1,8 +1,10 @@ ROOT_DIR=$(git rev-parse --show-toplevel) +TEST_DATA_DIR=${ROOT_DIR}/test/data/ OS="${OS:-linux}" ARCH="${ARCH:-amd64}" ZOT_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH} +mkdir -p ${TEST_DATA_DIR} function verify_prerequisites { if [ ! -f ${ZOT_PATH} ]; then @@ -28,17 +30,35 @@ function zot_serve_strace() { strace -o "strace.txt" -f -e trace=openat ${ZOT_PATH} serve ${config_file} & } +function zot_serve() { + local config_file=${1} + ${ZOT_PATH} serve ${config_file} & +} + function zot_stop() { pkill zot } +function wait_for_string() { + string=$1 + filepath=$2 + + while [ ! -f $filepath ] + do sleep 2; + done + + while ! grep "${string}" $filepath + do sleep 10; + done +} + function wait_zot_reachable() { zot_url=${1} curl --connect-timeout 3 \ - --max-time 3 \ + --max-time 10 \ --retry 10 \ --retry-delay 0 \ - --retry-max-time 60 \ + --retry-max-time 120 \ --retry-connrefused \ ${zot_url} } diff --git a/test/blackbox/pushpull_running_dedupe.bats b/test/blackbox/pushpull_running_dedupe.bats new file mode 100644 index 00000000..c06437f6 --- /dev/null +++ b/test/blackbox/pushpull_running_dedupe.bats @@ -0,0 +1,288 @@ +load helpers_pushpull + +function setup_file() { + # Verify prerequisites are available + if ! verify_prerequisites; then + exit 1 + fi + # Download test data to folder common for the entire suite, not just this file + skopeo --insecure-policy copy --format=oci docker://ghcr.io/project-zot/golang:1.20 oci:${TEST_DATA_DIR}/golang:1.20 + # Setup zot server + local zot_root_dir=${BATS_FILE_TMPDIR}/zot + local zot_config_file=${BATS_FILE_TMPDIR}/zot_config.json + local oci_data_dir=${BATS_FILE_TMPDIR}/oci + mkdir -p ${zot_root_dir} + mkdir -p ${oci_data_dir} + cat > ${zot_config_file}<&3 + + run curl http://127.0.0.1:8080/v2/_catalog + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.repositories[]') = '"golang"' ] + run curl http://127.0.0.1:8080/v2/golang/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"1.20"' ] +} + +@test "pull image - dedupe not running" { + local oci_data_dir=${BATS_FILE_TMPDIR}/oci + start=`date +%s` + run skopeo --insecure-policy copy --src-tls-verify=false \ + docker://127.0.0.1:8080/golang:1.20 \ + oci:${oci_data_dir}/golang:1.20 + [ "$status" -eq 0 ] + end=`date +%s` + + runtime=$((end-start)) + echo "pull image exec time: $runtime sec" >&3 + run cat ${BATS_FILE_TMPDIR}/oci/golang/index.json + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.manifests[].annotations."org.opencontainers.image.ref.name"') = '"1.20"' ] +} + +@test "push 50 images with dedupe disabled" { + for i in {1..50} + do + run skopeo --insecure-policy copy --dest-tls-verify=false \ + oci:${TEST_DATA_DIR}/golang:1.20 \ + docker://127.0.0.1:8080/golang${i}:1.20 + [ "$status" -eq 0 ] + done +} + +@test "restart zot with dedupe enabled" { + local zot_config_file=${BATS_FILE_TMPDIR}/zot_config.json + + # stop server + teardown_zot_file_level + + # enable dedupe + sed -i 's/false/true/g' ${zot_config_file} + + setup_zot_file_level ${zot_config_file} + wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog" + # deduping will now run in background (task scheduler) while we push images, shouldn't interfere +} + +@test "push image - dedupe running" { + start=`date +%s` + run skopeo --insecure-policy copy --dest-tls-verify=false \ + oci:${TEST_DATA_DIR}/golang:1.20 \ + docker://127.0.0.1:8080/dedupe/golang:1.20 + [ "$status" -eq 0 ] + end=`date +%s` + + runtime=$((end-start)) + echo "push image exec time: $runtime sec" >&3 +} + +@test "pull image - dedupe running" { + local oci_data_dir=${BATS_FILE_TMPDIR}/oci + + mkdir -p ${oci_data_dir}/dedupe/ + + start=`date +%s` + run skopeo --insecure-policy copy --src-tls-verify=false \ + docker://127.0.0.1:8080/dedupe/golang:1.20 \ + oci:${oci_data_dir}/dedupe/golang:1.20 + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "pull image exec time: $runtime sec" >&3 +} + +@test "pull deduped image - dedupe running" { + local oci_data_dir=${BATS_FILE_TMPDIR}/oci + + mkdir -p ${oci_data_dir}/dedupe/ + + start=`date +%s` + run skopeo --insecure-policy copy --src-tls-verify=false \ + docker://127.0.0.1:8080/golang2:1.20 \ + oci:${oci_data_dir}/dedupe/golang2:1.20 + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "pull image exec time: $runtime sec" >&3 +} + +@test "push image index - dedupe running" { + # --multi-arch below pushes an image index (containing many images) instead + # of an image manifest (single image) + start=`date +%s` + run skopeo --insecure-policy copy --format=oci --dest-tls-verify=false --multi-arch=all \ + docker://public.ecr.aws/docker/library/busybox:latest \ + docker://127.0.0.1:8080/busybox:latest + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "push image index exec time: $runtime sec" >&3 + run curl http://127.0.0.1:8080/v2/_catalog + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.repositories[0]') = '"busybox"' ] + run curl http://127.0.0.1:8080/v2/busybox/tags/list + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.tags[]') = '"latest"' ] +} + +@test "pull image index - dedupe running" { + local oci_data_dir=${BATS_FILE_TMPDIR}/oci + start=`date +%s` + run skopeo --insecure-policy copy --src-tls-verify=false --multi-arch=all \ + docker://127.0.0.1:8080/busybox:latest \ + oci:${oci_data_dir}/busybox:latest + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "pull image index exec time: $runtime sec" >&3 + run cat ${BATS_FILE_TMPDIR}/oci/busybox/index.json + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.manifests[].annotations."org.opencontainers.image.ref.name"') = '"latest"' ] + run skopeo --insecure-policy --override-arch=arm64 --override-os=linux copy --src-tls-verify=false --multi-arch=all \ + docker://127.0.0.1:8080/busybox:latest \ + oci:${oci_data_dir}/busybox:latest + [ "$status" -eq 0 ] + run cat ${BATS_FILE_TMPDIR}/oci/busybox/index.json + [ "$status" -eq 0 ] + [ $(echo "${lines[-1]}" | jq '.manifests[].annotations."org.opencontainers.image.ref.name"') = '"latest"' ] + run curl -X DELETE http://127.0.0.1:8080/v2/busybox/manifests/latest + [ "$status" -eq 0 ] +} + +@test "push oras artifact - dedupe running" { + echo "{\"name\":\"foo\",\"value\":\"bar\"}" > config.json + echo "hello world" > artifact.txt + start=`date +%s` + run oras push --plain-http 127.0.0.1:8080/hello-artifact:v2 \ + --config config.json:application/vnd.acme.rocket.config.v1+json artifact.txt:text/plain -d -v + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "push oras artifact exec time: $runtime sec" >&3 + rm -f artifact.txt + rm -f config.json +} + +@test "pull oras artifact - dedupe running" { + start=`date +%s` + run oras pull --plain-http 127.0.0.1:8080/hello-artifact:v2 -d -v + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "pull oras artifact exec time: $runtime sec" >&3 + grep -q "hello world" artifact.txt + rm -f artifact.txt +} + +@test "attach oras artifacts - dedupe running" { + # attach signature + echo "{\"artifact\": \"\", \"signature\": \"pat hancock\"}" > signature.json + start=`date +%s` + run oras attach --plain-http 127.0.0.1:8080/golang:1.20 --artifact-type 'signature/example' ./signature.json:application/json + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "attach signature exec time: $runtime sec" >&3 + # attach sbom + echo "{\"version\": \"0.0.0.0\", \"artifact\": \"'127.0.0.1:8080/golang:1.20'\", \"contents\": \"good\"}" > sbom.json + start=`date +%s` + run oras attach --plain-http 127.0.0.1:8080/golang:1.20 --artifact-type 'sbom/example' ./sbom.json:application/json + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "attach sbom exec time: $runtime sec" >&3 +} + +@test "discover oras artifacts - dedupe running" { + start=`date +%s` + run oras discover --plain-http -o json 127.0.0.1:8080/golang:1.20 + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "discover oras artifacts exec time: $runtime sec" >&3 + [ $(echo "$output" | jq -r ".manifests | length") -eq 2 ] +} + +@test "push helm chart - dedupe running" { + run helm package ${BATS_FILE_TMPDIR}/helm-charts/charts/zot + [ "$status" -eq 0 ] + local chart_version=$(awk '/version/{printf $2}' ${BATS_FILE_TMPDIR}/helm-charts/charts/zot/Chart.yaml) + start=`date +%s` + run helm push zot-${chart_version}.tgz oci://localhost:8080/zot-chart + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "helm push exec time: $runtime sec" >&3 +} + +@test "pull helm chart - dedupe running" { + local chart_version=$(awk '/version/{printf $2}' ${BATS_FILE_TMPDIR}/helm-charts/charts/zot/Chart.yaml) + start=`date +%s` + run helm pull oci://localhost:8080/zot-chart/zot --version ${chart_version} + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "helm pull exec time: $runtime sec" >&3 +} + +@test "push image with regclient - dedupe running" { + run regctl registry set localhost:8080 --tls disabled + [ "$status" -eq 0 ] + start=`date +%s` + run regctl image copy ocidir://${TEST_DATA_DIR}/golang:1.20 localhost:8080/test-regclient + [ "$status" -eq 0 ] + end=`date +%s` + runtime=$((end-start)) + + echo "regclient push exec time: $runtime" >&3 +} diff --git a/test/blackbox/restore_s3_blobs.bats b/test/blackbox/restore_s3_blobs.bats new file mode 100644 index 00000000..9711e969 --- /dev/null +++ b/test/blackbox/restore_s3_blobs.bats @@ -0,0 +1,140 @@ +load helpers_cloud + +function setup_file() { + # Verify prerequisites are available + if ! verify_prerequisites; then + exit 1 + fi + + # Download test data to folder common for the entire suite, not just this file + skopeo --insecure-policy copy --format=oci docker://ghcr.io/project-zot/golang:1.20 oci:${TEST_DATA_DIR}/golang:1.20 + # Setup zot server + local zot_root_dir=${BATS_FILE_TMPDIR}/zot + local zot_config_file_dedupe=${BATS_FILE_TMPDIR}/zot_config_dedupe.json + local zot_config_file_nodedupe=${BATS_FILE_TMPDIR}/zot_config_nodedupe.json + local ZOT_LOG_FILE=${zot_root_dir}/zot-log.json + mkdir -p ${zot_root_dir} + + cat > ${zot_config_file_dedupe}< ${zot_config_file_nodedupe}<&3 + wait_for_string "dedupe rebuild: finished" ${ZOT_LOG_FILE} + end=`date +%s` + + runtime=$((end-start)) + echo "restoring blobs finished in $runtime sec" >&3 + sleep 10 # wait a bit more because dedupe runs in background. +} + +@test "pulling a previous deduped image should work" { + # golang1 should have original blobs already + echo "pulling first image" >&3 + run skopeo --insecure-policy copy --src-tls-verify=false \ + docker://127.0.0.1:8080/golang1:1.20 \ + oci:${TEST_DATA_DIR}/golang1:1.20 + [ "$status" -eq 0 ] + + echo "pulling second image" >&3 + # golang2 should have original blobs after restoring blobs + run skopeo --insecure-policy copy --src-tls-verify=false \ + docker://127.0.0.1:8080/golang2:1.20 \ + oci:${TEST_DATA_DIR}/golang2:1.20 + [ "$status" -eq 0 ] +} + + diff --git a/test/blackbox/scrub.bats b/test/blackbox/scrub.bats index b6de71ac..01f8f111 100644 --- a/test/blackbox/scrub.bats +++ b/test/blackbox/scrub.bats @@ -23,7 +23,8 @@ function setup() { { "distSpecVersion": "1.1.0", "storage": { - "rootDirectory": "${ZOT_ROOT_DIR}" + "rootDirectory": "${ZOT_ROOT_DIR}", + "dedupe": false }, "http": { "address": "0.0.0.0",