diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index e3728885..4c730034 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand" "path" + "strconv" "strings" "time" @@ -149,6 +150,11 @@ func (gc GarbageCollect) cleanRepo(ctx context.Context, repo string) error { return err } + // gc old blob uploads + if err := gc.removeBlobUploads(repo, gc.opts.Delay); err != nil { + return err + } + return nil } @@ -546,6 +552,52 @@ func (gc GarbageCollect) identifyManifestsReferencedInIndex(index ispec.Index, r return nil } +// removeBlobUploads gc all temporary uploads which are past their gc delay. +func (gc GarbageCollect) removeBlobUploads(repo string, delay time.Duration) error { + gc.log.Debug().Str("module", "gc").Str("repository", repo).Msg("cleaning unclaimed blob uploads") + + if dir := path.Join(gc.imgStore.RootDir(), repo); !gc.imgStore.DirExists(dir) { + // The repository was already cleaned up by a different codepath + return nil + } + + blobUploads, err := gc.imgStore.ListBlobUploads(repo) + if err != nil { + gc.log.Error().Err(err).Str("module", "gc").Str("repository", repo).Msg("failed to get list of blob uploads") + + return err + } + + var aggregatedErr error + + for _, uuid := range blobUploads { + _, size, modtime, err := gc.imgStore.StatBlobUpload(repo, uuid) + if err != nil { + gc.log.Error().Err(err).Str("module", "gc").Str("repository", repo).Str("blobUpload", uuid). + Msg("failed to stat blob upload") + + aggregatedErr = errors.Join(aggregatedErr, err) + + continue + } + + if modtime.Add(delay).After(time.Now()) { + // Do not delete blob uploads which have been updated recently + continue + } + + err = gc.imgStore.DeleteBlobUpload(repo, uuid) + if err != nil { + gc.log.Error().Err(err).Str("module", "gc").Str("repository", repo).Str("blobUpload", uuid). + Str("size", strconv.FormatInt(size, 10)).Str("modified", modtime.String()).Msg("failed to delete blob upload") + + aggregatedErr = errors.Join(aggregatedErr, err) + } + } + + return aggregatedErr +} + // removeUnreferencedBlobs gc all blobs which are not referenced by any manifest found in repo's index.json. func (gc GarbageCollect) removeUnreferencedBlobs(repo string, delay time.Duration, log zlog.Logger, ) error { diff --git a/pkg/storage/gc/gc_test.go b/pkg/storage/gc/gc_test.go index 20ee464d..b43da73c 100644 --- a/pkg/storage/gc/gc_test.go +++ b/pkg/storage/gc/gc_test.go @@ -1,6 +1,7 @@ package gc_test import ( + "bytes" "context" "fmt" "os" @@ -32,7 +33,9 @@ import ( ) const ( - region = "us-east-2" + region = "us-east-2" + s3TestName = "S3APIs" + localTestName = "LocalAPIs" ) //nolint:gochecknoglobals @@ -41,17 +44,17 @@ var testCases = []struct { storageType string }{ { - testCaseName: "S3APIs", + testCaseName: s3TestName, storageType: storageConstants.S3StorageDriverName, }, { - testCaseName: "LocalAPIs", + testCaseName: localTestName, storageType: storageConstants.LocalStorageDriverName, }, } func TestGarbageCollectAndRetention(t *testing.T) { - log := zlog.NewLogger("info", "/dev/null") + log := zlog.NewLogger("debug", "") audit := zlog.NewAuditLogger("debug", "/dev/null") metrics := monitoring.NewMetricsServer(false, log) @@ -887,6 +890,105 @@ func TestGarbageCollectAndRetention(t *testing.T) { err := gc.CleanRepo(ctx, "gc-test1") So(err, ShouldNotBeNil) }) + + Convey("should gc only stale blob uploads", func() { + gcDelay := 1 * time.Second + repoName := "gc-test1" + + gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{ + Delay: gcDelay, + ImageRetention: config.ImageRetention{ + Delay: storageConstants.DefaultRetentionDelay, + Policies: []config.RetentionPolicy{ + { + Repositories: []string{"**"}, + DeleteReferrers: true, + DeleteUntagged: &trueVal, + KeepTags: []config.KeepTagsPolicy{ + {}, + }, + }, + }, + }, + }, audit, log) + + blobUploadID, err := imgStore.NewBlobUpload(repoName) + So(err, ShouldBeNil) + + content := []byte("test-data3") + buf := bytes.NewBuffer(content) + _, err = imgStore.PutBlobChunkStreamed(repoName, blobUploadID, buf) + So(err, ShouldBeNil) + + // Blob upload should be there + uploads, err := imgStore.ListBlobUploads(repoName) + So(err, ShouldBeNil) + + if testcase.testCaseName == s3TestName { + // Remote sorage is written to only after the blob upload is finished, + // there should be no space used by blob uploads + So(uploads, ShouldEqual, []string{}) + } else { + // Local storage is used right away + So(uploads, ShouldEqual, []string{blobUploadID}) + } + + isPresent, _, _, err := imgStore.StatBlobUpload(repoName, blobUploadID) + + if testcase.testCaseName == s3TestName { + // Remote sorage is written to only after the blob upload is finished, + // there should be no space used by blob uploads + So(err, ShouldNotBeNil) + So(isPresent, ShouldBeFalse) + } else { + // Local storage is used right away + So(err, ShouldBeNil) + So(isPresent, ShouldBeTrue) + } + + err = gc.CleanRepo(ctx, repoName) + So(err, ShouldBeNil) + + // Blob upload is recent it should still be there + uploads, err = imgStore.ListBlobUploads(repoName) + So(err, ShouldBeNil) + + if testcase.testCaseName == s3TestName { + // Remote sorage is written to only after the blob upload is finished, + // there should be no space used by blob uploads + So(uploads, ShouldEqual, []string{}) + } else { + // Local storage is used right away + So(uploads, ShouldEqual, []string{blobUploadID}) + } + + isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID) + + if testcase.testCaseName == s3TestName { + // Remote sorage is written to only after the blob upload is finished, + // there should be no space used by blob uploads + So(err, ShouldNotBeNil) + So(isPresent, ShouldBeFalse) + } else { + // Local storage is used right away + So(err, ShouldBeNil) + So(isPresent, ShouldBeTrue) + } + + time.Sleep(gcDelay + 1*time.Second) + + err = gc.CleanRepo(ctx, repoName) + So(err, ShouldBeNil) + + // Blob uploads should be GCed + uploads, err = imgStore.ListBlobUploads(repoName) + So(err, ShouldBeNil) + So(uploads, ShouldBeEmpty) + + isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID) + So(err, ShouldNotBeNil) + So(isPresent, ShouldBeFalse) + }) }) }) } diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index be4b79d9..64f0f5d6 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -691,6 +691,42 @@ func (is *ImageStore) BlobUploadPath(repo, uuid string) string { return blobUploadPath } +/* +ListBlobUploads returns all blob uploads present in the repository. The caller function MUST lock from outside. +*/ +func (is *ImageStore) ListBlobUploads(repo string) ([]string, error) { + blobUploadPaths, err := is.storeDriver.List(path.Join(is.RootDir(), repo, storageConstants.BlobUploadDir)) + if err != nil { + if errors.As(err, &driver.PathNotFoundError{}) { + // blobs uploads folder does not exist + return []string{}, nil + } + + is.log.Debug().Str("repository", repo).Msg("failed to list .uploads/ dir") + } + + blobUploads := []string{} + for _, blobUploadPath := range blobUploadPaths { + blobUploads = append(blobUploads, path.Base(blobUploadPath)) + } + + return blobUploads, err +} + +// StatBlobUpload verifies if a blob upload is present inside a repository. The caller function MUST lock from outside. +func (is *ImageStore) StatBlobUpload(repo, uuid string) (bool, int64, time.Time, error) { + blobUploadPath := is.BlobUploadPath(repo, uuid) + + binfo, err := is.storeDriver.Stat(blobUploadPath) + if err != nil { + is.log.Error().Err(err).Str("blobUpload", blobUploadPath).Msg("failed to stat blob upload") + + return false, -1, time.Time{}, err + } + + return true, binfo.Size(), binfo.ModTime(), nil +} + // NewBlobUpload returns the unique ID for an upload in progress. func (is *ImageStore) NewBlobUpload(repo string) (string, error) { if err := is.InitRepo(repo); err != nil { @@ -1572,10 +1608,7 @@ func (is *ImageStore) CleanupRepo(repo string, blobs []godigest.Digest, removeRe } } - blobUploads, err := is.storeDriver.List(path.Join(is.RootDir(), repo, storageConstants.BlobUploadDir)) - if err != nil { - is.log.Debug().Str("repository", repo).Msg("failed to list .uploads/ dir") - } + blobUploads, _ := is.ListBlobUploads(repo) // if removeRepo flag is true and we cleanup all blobs and there are no blobs currently being uploaded. if removeRepo && count == len(blobs) && count > 0 && len(blobUploads) == 0 { diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 44579f79..ce603fef 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -1970,7 +1970,7 @@ func TestGarbageCollectForImageStore(t *testing.T) { }, log) imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) - repoName := "gc-all-repos-short" + repoName := "gc-all-repos-short" //nolint:goconst // test data gc := gc.NewGarbageCollect(imgStore, mocks.MetaDBMock{}, gc.Options{ Delay: 1 * time.Second, @@ -2046,11 +2046,7 @@ func TestGarbageCollectForImageStore(t *testing.T) { }) Convey("Garbage collect - the manifest which the reference points to can be found", func() { - logFile, _ := os.CreateTemp("", "zot-log*.txt") - - defer os.Remove(logFile.Name()) // clean up - - log := zlog.NewLogger("debug", logFile.Name()) + log := zlog.NewLogger("debug", "") audit := zlog.NewAuditLogger("debug", "") metrics := monitoring.NewMetricsServer(false, log) @@ -2121,6 +2117,78 @@ func TestGarbageCollectForImageStore(t *testing.T) { err = gc.CleanRepo(ctx, repoName) So(err, ShouldBeNil) }) + + Convey("Garbage collect error - not enough permissions to access blob upload", func() { + logFile, _ := os.CreateTemp("", "zot-log*.txt") + + defer os.Remove(logFile.Name()) // clean up + + log := zlog.NewLogger("debug", logFile.Name()) + audit := zlog.NewAuditLogger("debug", "") + + metrics := monitoring.NewMetricsServer(false, log) + cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: dir, + Name: "cache", + UseRelPaths: true, + }, log) + + imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) + repoName := "gc-all-repos-short" + + gc := gc.NewGarbageCollect(imgStore, mocks.MetaDBMock{}, gc.Options{ + Delay: 1 * time.Second, + ImageRetention: DeleteReferrers, + }, audit, log) + + blobUploadID, err := imgStore.NewBlobUpload(repoName) + So(err, ShouldBeNil) + + err = gc.CleanRepo(ctx, repoName) + So(err, ShouldBeNil) + + // Blob upload is recent it should still be there + isPresent, _, _, err := imgStore.StatBlobUpload(repoName, blobUploadID) + So(err, ShouldBeNil) + So(isPresent, ShouldBeTrue) + + So(os.Chmod(imgStore.BlobUploadPath(repoName, blobUploadID), 0o000), ShouldBeNil) + + err = gc.CleanRepo(ctx, repoName) + So(err, ShouldBeNil) + + // Blob upload is recent it should still be there + isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID) + So(err, ShouldBeNil) + So(isPresent, ShouldBeTrue) + + time.Sleep(1002 * time.Millisecond) + + // GC should fail because of bad permissions + err = gc.CleanRepo(ctx, repoName) + So(err, ShouldNotBeNil) + + // Blob uploads should not be GCed as there was an error + isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID) + So(err, ShouldBeNil) + So(isPresent, ShouldBeTrue) + + So(os.Chmod(imgStore.BlobUploadPath(repoName, blobUploadID), 0o777), ShouldBeNil) + + // GC should no longer error + err = gc.CleanRepo(ctx, repoName) + So(err, ShouldBeNil) + + // Blob uploads should have correctly been GCed + isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID) + So(err, ShouldNotBeNil) + So(isPresent, ShouldBeFalse) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, + "failed to run GC for "+path.Join(imgStore.RootDir(), repoName)) + }) }) } diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 8d5b38bf..9d5cd488 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -35,6 +35,8 @@ type ImageStore interface { //nolint:interfacebloat PutImageManifest(repo, reference, mediaType string, body []byte) (godigest.Digest, godigest.Digest, error) DeleteImageManifest(repo, reference string, detectCollision bool) error BlobUploadPath(repo, uuid string) string + StatBlobUpload(repo, uuid string) (bool, int64, time.Time, error) + ListBlobUploads(repo string) ([]string, error) NewBlobUpload(repo string) (string, error) GetBlobUpload(repo, uuid string) (int64, error) PutBlobChunkStreamed(repo, uuid string, body io.Reader) (int64, error) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index ff18afab..d94b540d 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -25,6 +25,8 @@ type MockedImageStore struct { godigest.Digest, error) DeleteImageManifestFn func(repo string, reference string, detectCollision bool) error BlobUploadPathFn func(repo string, uuid string) string + StatBlobUploadFn func(repo string, uuid string) (bool, int64, time.Time, error) + ListBlobUploadsFn func(repo string) ([]string, error) NewBlobUploadFn func(repo string) (string, error) GetBlobUploadFn func(repo string, uuid string) (int64, error) BlobUploadInfoFn func(repo string, uuid string) (int64, error) @@ -181,6 +183,22 @@ func (is MockedImageStore) DeleteImageManifest(name string, reference string, de return nil } +func (is MockedImageStore) ListBlobUploads(repo string) ([]string, error) { + if is.ListBlobUploadsFn != nil { + return is.ListBlobUploadsFn(repo) + } + + return []string{}, nil +} + +func (is MockedImageStore) StatBlobUpload(repo string, uuid string) (bool, int64, time.Time, error) { + if is.StatBlobUploadFn != nil { + return is.StatBlobUploadFn(repo, uuid) + } + + return true, 0, time.Time{}, nil +} + func (is MockedImageStore) NewBlobUpload(repo string) (string, error) { if is.NewBlobUploadFn != nil { return is.NewBlobUploadFn(repo)