0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-16 21:56:37 -05:00

fix(gc): gc now removes blob uploads which have not changed within the gc delay interval (#2599)

See #2598

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
This commit is contained in:
Andrei Aaron 2024-08-12 21:58:46 +03:00 committed by GitHub
parent 2dea22f74a
commit 253aad3195
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 289 additions and 14 deletions

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"path" "path"
"strconv"
"strings" "strings"
"time" "time"
@ -149,6 +150,11 @@ func (gc GarbageCollect) cleanRepo(ctx context.Context, repo string) error {
return err return err
} }
// gc old blob uploads
if err := gc.removeBlobUploads(repo, gc.opts.Delay); err != nil {
return err
}
return nil return nil
} }
@ -546,6 +552,52 @@ func (gc GarbageCollect) identifyManifestsReferencedInIndex(index ispec.Index, r
return nil return nil
} }
// removeBlobUploads gc all temporary uploads which are past their gc delay.
func (gc GarbageCollect) removeBlobUploads(repo string, delay time.Duration) error {
gc.log.Debug().Str("module", "gc").Str("repository", repo).Msg("cleaning unclaimed blob uploads")
if dir := path.Join(gc.imgStore.RootDir(), repo); !gc.imgStore.DirExists(dir) {
// The repository was already cleaned up by a different codepath
return nil
}
blobUploads, err := gc.imgStore.ListBlobUploads(repo)
if err != nil {
gc.log.Error().Err(err).Str("module", "gc").Str("repository", repo).Msg("failed to get list of blob uploads")
return err
}
var aggregatedErr error
for _, uuid := range blobUploads {
_, size, modtime, err := gc.imgStore.StatBlobUpload(repo, uuid)
if err != nil {
gc.log.Error().Err(err).Str("module", "gc").Str("repository", repo).Str("blobUpload", uuid).
Msg("failed to stat blob upload")
aggregatedErr = errors.Join(aggregatedErr, err)
continue
}
if modtime.Add(delay).After(time.Now()) {
// Do not delete blob uploads which have been updated recently
continue
}
err = gc.imgStore.DeleteBlobUpload(repo, uuid)
if err != nil {
gc.log.Error().Err(err).Str("module", "gc").Str("repository", repo).Str("blobUpload", uuid).
Str("size", strconv.FormatInt(size, 10)).Str("modified", modtime.String()).Msg("failed to delete blob upload")
aggregatedErr = errors.Join(aggregatedErr, err)
}
}
return aggregatedErr
}
// removeUnreferencedBlobs gc all blobs which are not referenced by any manifest found in repo's index.json. // removeUnreferencedBlobs gc all blobs which are not referenced by any manifest found in repo's index.json.
func (gc GarbageCollect) removeUnreferencedBlobs(repo string, delay time.Duration, log zlog.Logger, func (gc GarbageCollect) removeUnreferencedBlobs(repo string, delay time.Duration, log zlog.Logger,
) error { ) error {

View file

@ -1,6 +1,7 @@
package gc_test package gc_test
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"os" "os"
@ -33,6 +34,8 @@ import (
const ( const (
region = "us-east-2" region = "us-east-2"
s3TestName = "S3APIs"
localTestName = "LocalAPIs"
) )
//nolint:gochecknoglobals //nolint:gochecknoglobals
@ -41,17 +44,17 @@ var testCases = []struct {
storageType string storageType string
}{ }{
{ {
testCaseName: "S3APIs", testCaseName: s3TestName,
storageType: storageConstants.S3StorageDriverName, storageType: storageConstants.S3StorageDriverName,
}, },
{ {
testCaseName: "LocalAPIs", testCaseName: localTestName,
storageType: storageConstants.LocalStorageDriverName, storageType: storageConstants.LocalStorageDriverName,
}, },
} }
func TestGarbageCollectAndRetention(t *testing.T) { func TestGarbageCollectAndRetention(t *testing.T) {
log := zlog.NewLogger("info", "/dev/null") log := zlog.NewLogger("debug", "")
audit := zlog.NewAuditLogger("debug", "/dev/null") audit := zlog.NewAuditLogger("debug", "/dev/null")
metrics := monitoring.NewMetricsServer(false, log) metrics := monitoring.NewMetricsServer(false, log)
@ -887,6 +890,105 @@ func TestGarbageCollectAndRetention(t *testing.T) {
err := gc.CleanRepo(ctx, "gc-test1") err := gc.CleanRepo(ctx, "gc-test1")
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("should gc only stale blob uploads", func() {
gcDelay := 1 * time.Second
repoName := "gc-test1"
gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{
Delay: gcDelay,
ImageRetention: config.ImageRetention{
Delay: storageConstants.DefaultRetentionDelay,
Policies: []config.RetentionPolicy{
{
Repositories: []string{"**"},
DeleteReferrers: true,
DeleteUntagged: &trueVal,
KeepTags: []config.KeepTagsPolicy{
{},
},
},
},
},
}, audit, log)
blobUploadID, err := imgStore.NewBlobUpload(repoName)
So(err, ShouldBeNil)
content := []byte("test-data3")
buf := bytes.NewBuffer(content)
_, err = imgStore.PutBlobChunkStreamed(repoName, blobUploadID, buf)
So(err, ShouldBeNil)
// Blob upload should be there
uploads, err := imgStore.ListBlobUploads(repoName)
So(err, ShouldBeNil)
if testcase.testCaseName == s3TestName {
// Remote sorage is written to only after the blob upload is finished,
// there should be no space used by blob uploads
So(uploads, ShouldEqual, []string{})
} else {
// Local storage is used right away
So(uploads, ShouldEqual, []string{blobUploadID})
}
isPresent, _, _, err := imgStore.StatBlobUpload(repoName, blobUploadID)
if testcase.testCaseName == s3TestName {
// Remote sorage is written to only after the blob upload is finished,
// there should be no space used by blob uploads
So(err, ShouldNotBeNil)
So(isPresent, ShouldBeFalse)
} else {
// Local storage is used right away
So(err, ShouldBeNil)
So(isPresent, ShouldBeTrue)
}
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)
// Blob upload is recent it should still be there
uploads, err = imgStore.ListBlobUploads(repoName)
So(err, ShouldBeNil)
if testcase.testCaseName == s3TestName {
// Remote sorage is written to only after the blob upload is finished,
// there should be no space used by blob uploads
So(uploads, ShouldEqual, []string{})
} else {
// Local storage is used right away
So(uploads, ShouldEqual, []string{blobUploadID})
}
isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID)
if testcase.testCaseName == s3TestName {
// Remote sorage is written to only after the blob upload is finished,
// there should be no space used by blob uploads
So(err, ShouldNotBeNil)
So(isPresent, ShouldBeFalse)
} else {
// Local storage is used right away
So(err, ShouldBeNil)
So(isPresent, ShouldBeTrue)
}
time.Sleep(gcDelay + 1*time.Second)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)
// Blob uploads should be GCed
uploads, err = imgStore.ListBlobUploads(repoName)
So(err, ShouldBeNil)
So(uploads, ShouldBeEmpty)
isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID)
So(err, ShouldNotBeNil)
So(isPresent, ShouldBeFalse)
})
}) })
}) })
} }

View file

@ -691,6 +691,42 @@ func (is *ImageStore) BlobUploadPath(repo, uuid string) string {
return blobUploadPath return blobUploadPath
} }
/*
ListBlobUploads returns all blob uploads present in the repository. The caller function MUST lock from outside.
*/
func (is *ImageStore) ListBlobUploads(repo string) ([]string, error) {
blobUploadPaths, err := is.storeDriver.List(path.Join(is.RootDir(), repo, storageConstants.BlobUploadDir))
if err != nil {
if errors.As(err, &driver.PathNotFoundError{}) {
// blobs uploads folder does not exist
return []string{}, nil
}
is.log.Debug().Str("repository", repo).Msg("failed to list .uploads/ dir")
}
blobUploads := []string{}
for _, blobUploadPath := range blobUploadPaths {
blobUploads = append(blobUploads, path.Base(blobUploadPath))
}
return blobUploads, err
}
// StatBlobUpload verifies if a blob upload is present inside a repository. The caller function MUST lock from outside.
func (is *ImageStore) StatBlobUpload(repo, uuid string) (bool, int64, time.Time, error) {
blobUploadPath := is.BlobUploadPath(repo, uuid)
binfo, err := is.storeDriver.Stat(blobUploadPath)
if err != nil {
is.log.Error().Err(err).Str("blobUpload", blobUploadPath).Msg("failed to stat blob upload")
return false, -1, time.Time{}, err
}
return true, binfo.Size(), binfo.ModTime(), nil
}
// NewBlobUpload returns the unique ID for an upload in progress. // NewBlobUpload returns the unique ID for an upload in progress.
func (is *ImageStore) NewBlobUpload(repo string) (string, error) { func (is *ImageStore) NewBlobUpload(repo string) (string, error) {
if err := is.InitRepo(repo); err != nil { if err := is.InitRepo(repo); err != nil {
@ -1572,10 +1608,7 @@ func (is *ImageStore) CleanupRepo(repo string, blobs []godigest.Digest, removeRe
} }
} }
blobUploads, err := is.storeDriver.List(path.Join(is.RootDir(), repo, storageConstants.BlobUploadDir)) blobUploads, _ := is.ListBlobUploads(repo)
if err != nil {
is.log.Debug().Str("repository", repo).Msg("failed to list .uploads/ dir")
}
// if removeRepo flag is true and we cleanup all blobs and there are no blobs currently being uploaded. // if removeRepo flag is true and we cleanup all blobs and there are no blobs currently being uploaded.
if removeRepo && count == len(blobs) && count > 0 && len(blobUploads) == 0 { if removeRepo && count == len(blobs) && count > 0 && len(blobUploads) == 0 {

View file

@ -1970,7 +1970,7 @@ func TestGarbageCollectForImageStore(t *testing.T) {
}, log) }, log)
imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
repoName := "gc-all-repos-short" repoName := "gc-all-repos-short" //nolint:goconst // test data
gc := gc.NewGarbageCollect(imgStore, mocks.MetaDBMock{}, gc.Options{ gc := gc.NewGarbageCollect(imgStore, mocks.MetaDBMock{}, gc.Options{
Delay: 1 * time.Second, Delay: 1 * time.Second,
@ -2046,11 +2046,7 @@ func TestGarbageCollectForImageStore(t *testing.T) {
}) })
Convey("Garbage collect - the manifest which the reference points to can be found", func() { Convey("Garbage collect - the manifest which the reference points to can be found", func() {
logFile, _ := os.CreateTemp("", "zot-log*.txt") log := zlog.NewLogger("debug", "")
defer os.Remove(logFile.Name()) // clean up
log := zlog.NewLogger("debug", logFile.Name())
audit := zlog.NewAuditLogger("debug", "") audit := zlog.NewAuditLogger("debug", "")
metrics := monitoring.NewMetricsServer(false, log) metrics := monitoring.NewMetricsServer(false, log)
@ -2121,6 +2117,78 @@ func TestGarbageCollectForImageStore(t *testing.T) {
err = gc.CleanRepo(ctx, repoName) err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
Convey("Garbage collect error - not enough permissions to access blob upload", func() {
logFile, _ := os.CreateTemp("", "zot-log*.txt")
defer os.Remove(logFile.Name()) // clean up
log := zlog.NewLogger("debug", logFile.Name())
audit := zlog.NewAuditLogger("debug", "")
metrics := monitoring.NewMetricsServer(false, log)
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: dir,
Name: "cache",
UseRelPaths: true,
}, log)
imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
repoName := "gc-all-repos-short"
gc := gc.NewGarbageCollect(imgStore, mocks.MetaDBMock{}, gc.Options{
Delay: 1 * time.Second,
ImageRetention: DeleteReferrers,
}, audit, log)
blobUploadID, err := imgStore.NewBlobUpload(repoName)
So(err, ShouldBeNil)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)
// Blob upload is recent it should still be there
isPresent, _, _, err := imgStore.StatBlobUpload(repoName, blobUploadID)
So(err, ShouldBeNil)
So(isPresent, ShouldBeTrue)
So(os.Chmod(imgStore.BlobUploadPath(repoName, blobUploadID), 0o000), ShouldBeNil)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)
// Blob upload is recent it should still be there
isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID)
So(err, ShouldBeNil)
So(isPresent, ShouldBeTrue)
time.Sleep(1002 * time.Millisecond)
// GC should fail because of bad permissions
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)
// Blob uploads should not be GCed as there was an error
isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID)
So(err, ShouldBeNil)
So(isPresent, ShouldBeTrue)
So(os.Chmod(imgStore.BlobUploadPath(repoName, blobUploadID), 0o777), ShouldBeNil)
// GC should no longer error
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)
// Blob uploads should have correctly been GCed
isPresent, _, _, err = imgStore.StatBlobUpload(repoName, blobUploadID)
So(err, ShouldNotBeNil)
So(isPresent, ShouldBeFalse)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring,
"failed to run GC for "+path.Join(imgStore.RootDir(), repoName))
})
}) })
} }

View file

@ -35,6 +35,8 @@ type ImageStore interface { //nolint:interfacebloat
PutImageManifest(repo, reference, mediaType string, body []byte) (godigest.Digest, godigest.Digest, error) PutImageManifest(repo, reference, mediaType string, body []byte) (godigest.Digest, godigest.Digest, error)
DeleteImageManifest(repo, reference string, detectCollision bool) error DeleteImageManifest(repo, reference string, detectCollision bool) error
BlobUploadPath(repo, uuid string) string BlobUploadPath(repo, uuid string) string
StatBlobUpload(repo, uuid string) (bool, int64, time.Time, error)
ListBlobUploads(repo string) ([]string, error)
NewBlobUpload(repo string) (string, error) NewBlobUpload(repo string) (string, error)
GetBlobUpload(repo, uuid string) (int64, error) GetBlobUpload(repo, uuid string) (int64, error)
PutBlobChunkStreamed(repo, uuid string, body io.Reader) (int64, error) PutBlobChunkStreamed(repo, uuid string, body io.Reader) (int64, error)

View file

@ -25,6 +25,8 @@ type MockedImageStore struct {
godigest.Digest, error) godigest.Digest, error)
DeleteImageManifestFn func(repo string, reference string, detectCollision bool) error DeleteImageManifestFn func(repo string, reference string, detectCollision bool) error
BlobUploadPathFn func(repo string, uuid string) string BlobUploadPathFn func(repo string, uuid string) string
StatBlobUploadFn func(repo string, uuid string) (bool, int64, time.Time, error)
ListBlobUploadsFn func(repo string) ([]string, error)
NewBlobUploadFn func(repo string) (string, error) NewBlobUploadFn func(repo string) (string, error)
GetBlobUploadFn func(repo string, uuid string) (int64, error) GetBlobUploadFn func(repo string, uuid string) (int64, error)
BlobUploadInfoFn func(repo string, uuid string) (int64, error) BlobUploadInfoFn func(repo string, uuid string) (int64, error)
@ -181,6 +183,22 @@ func (is MockedImageStore) DeleteImageManifest(name string, reference string, de
return nil return nil
} }
func (is MockedImageStore) ListBlobUploads(repo string) ([]string, error) {
if is.ListBlobUploadsFn != nil {
return is.ListBlobUploadsFn(repo)
}
return []string{}, nil
}
func (is MockedImageStore) StatBlobUpload(repo string, uuid string) (bool, int64, time.Time, error) {
if is.StatBlobUploadFn != nil {
return is.StatBlobUploadFn(repo, uuid)
}
return true, 0, time.Time{}, nil
}
func (is MockedImageStore) NewBlobUpload(repo string) (string, error) { func (is MockedImageStore) NewBlobUpload(repo string) (string, error) {
if is.NewBlobUploadFn != nil { if is.NewBlobUploadFn != nil {
return is.NewBlobUploadFn(repo) return is.NewBlobUploadFn(repo)