mirror of
https://github.com/project-zot/zot.git
synced 2025-03-25 02:32:57 -05:00
s3: fix dedupe failing to manage blobs correctly (#772)
in order to know which blob is 'real' (has content) we need to know which was the first blob inserted in cache, because that is always the real one. because we can not modify the keys order in boltdb we'll do this by marking the first blob inserted with a value when GetBlob() return the blob which is marked when PutBlob() if is the first one, mark it when DeleteBlob() in case deleted is marked then mark the next blob Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
6471add89d
commit
5479e2c785
3 changed files with 106 additions and 40 deletions
|
@ -15,6 +15,8 @@ const (
|
|||
BlobsCache = "blobs"
|
||||
DBExtensionName = ".db"
|
||||
dbCacheLockCheckTimeout = 10 * time.Second
|
||||
// always mark the first key inserted in the BlobsCache with a value.
|
||||
firstKeyValue = "first"
|
||||
)
|
||||
|
||||
type Cache struct {
|
||||
|
@ -24,11 +26,6 @@ type Cache struct {
|
|||
useRelPaths bool // weather or not to use relative paths, should be true for filesystem and false for s3
|
||||
}
|
||||
|
||||
// Blob is a blob record.
|
||||
type Blob struct {
|
||||
Path string
|
||||
}
|
||||
|
||||
func NewCache(rootDir string, name string, useRelPaths bool, log zlog.Logger) *Cache {
|
||||
dbPath := path.Join(rootDir, name+DBExtensionName)
|
||||
dbOpts := &bbolt.Options{
|
||||
|
@ -88,15 +85,25 @@ func (c *Cache) PutBlob(digest, path string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
bucket, err := root.CreateBucketIfNotExists([]byte(digest))
|
||||
if err != nil {
|
||||
// this is a serious failure
|
||||
c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket")
|
||||
var value string
|
||||
|
||||
return err
|
||||
bucket := root.Bucket([]byte(digest))
|
||||
if bucket == nil {
|
||||
/* mark first key in bucket
|
||||
in the context of s3 we need to know which blob is real
|
||||
and we know that the first one is always the real, so mark them.
|
||||
*/
|
||||
value = firstKeyValue
|
||||
bucket, err = root.CreateBucket([]byte(digest))
|
||||
if err != nil {
|
||||
// this is a serious failure
|
||||
c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := bucket.Put([]byte(path), nil); err != nil {
|
||||
if err := bucket.Put([]byte(path), []byte(value)); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record")
|
||||
|
||||
return err
|
||||
|
@ -125,10 +132,20 @@ func (c *Cache) GetBlob(digest string) (string, error) {
|
|||
|
||||
b := root.Bucket([]byte(digest))
|
||||
if b != nil {
|
||||
// get first key
|
||||
c := b.Cursor()
|
||||
k, _ := c.First()
|
||||
blobPath.WriteString(string(k))
|
||||
if err := b.ForEach(func(k, v []byte) error {
|
||||
// always return the key with 'first' value
|
||||
if string(v) == firstKeyValue {
|
||||
blobPath.WriteString(string(k))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
c.log.Error().Err(err).Msg("unable to access digest bucket")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -194,6 +211,8 @@ func (c *Cache) DeleteBlob(digest, path string) error {
|
|||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
value := bucket.Get([]byte(path))
|
||||
|
||||
if err := bucket.Delete([]byte(path)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete")
|
||||
|
||||
|
@ -202,12 +221,19 @@ func (c *Cache) DeleteBlob(digest, path string) error {
|
|||
|
||||
cur := bucket.Cursor()
|
||||
|
||||
k, _ := cur.First()
|
||||
if k == nil {
|
||||
key, _ := cur.First()
|
||||
if key == nil {
|
||||
c.log.Debug().Str("digest", digest).Str("path", path).Msg("deleting empty bucket")
|
||||
if err := root.DeleteBucket([]byte(digest)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
// if deleted key has value 'first' then move this value to the next key
|
||||
} else if string(value) == firstKeyValue {
|
||||
if err := bucket.Put(key, value); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1273,10 +1273,6 @@ retry:
|
|||
return err
|
||||
}
|
||||
|
||||
if dstRecord == dst {
|
||||
is.log.Warn().Msg("FOUND equal dsts")
|
||||
}
|
||||
|
||||
// prevent overwrite original blob
|
||||
if fileInfo == nil && dstRecord != dst {
|
||||
// put empty file so that we are compliant with oci layout, this will act as a deduped blob
|
||||
|
@ -1286,8 +1282,12 @@ retry:
|
|||
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
is.log.Warn().Msg("prevent overwrite")
|
||||
|
||||
if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// remove temp blobupload
|
||||
|
@ -1445,33 +1445,37 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser,
|
|||
}
|
||||
|
||||
// is a 'deduped' blob
|
||||
if binfo.Size() == 0 && is.cache != nil {
|
||||
if binfo.Size() == 0 {
|
||||
// Check blobs in cache
|
||||
dstRecord, err := is.checkCacheBlob(digest)
|
||||
if err == nil {
|
||||
binfo, err := is.store.Stat(context.Background(), dstRecord)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob")
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found")
|
||||
|
||||
// the actual blob on disk may have been removed by GC, so sync the cache
|
||||
if err := is.cache.DeleteBlob(digest, dstRecord); err != nil {
|
||||
is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record")
|
||||
return nil, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
return nil, -1, err
|
||||
}
|
||||
binfo, err := is.store.Stat(context.Background(), dstRecord)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob")
|
||||
|
||||
return nil, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
blobReadCloser, err := is.store.Reader(context.Background(), dstRecord, 0)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob")
|
||||
// the actual blob on disk may have been removed by GC, so sync the cache
|
||||
if err := is.cache.DeleteBlob(digest, dstRecord); err != nil {
|
||||
is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record")
|
||||
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
return blobReadCloser, binfo.Size(), nil
|
||||
return nil, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
blobReadCloser, err := is.store.Reader(context.Background(), dstRecord, 0)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob")
|
||||
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
return blobReadCloser, binfo.Size(), nil
|
||||
}
|
||||
|
||||
// The caller function is responsible for calling Close()
|
||||
|
|
|
@ -1649,6 +1649,22 @@ func TestS3DedupeErr(t *testing.T) {
|
|||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DedupeBlob - error on cache.PutBlob()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
|
||||
err := imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DedupeBlob("", digest, "")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DedupeBlob - error on store.Delete()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
|
@ -1812,6 +1828,26 @@ func TestS3DedupeErr(t *testing.T) {
|
|||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test GetBlob() - error on checkCacheBlob()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
|
||||
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return &FileInfoMock{
|
||||
SizeFn: func() int64 {
|
||||
return 0
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
|
||||
_, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DeleteBlob() - error on store.Move()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
hash := "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc"
|
||||
|
|
Loading…
Add table
Reference in a new issue