mirror of
https://github.com/project-zot/zot.git
synced 2025-03-11 02:17:43 -05:00
Refactor s3 dedupe
Under digest bucket create 2 buckets, one for storing origin blob and one for storing deduped blobs. PutBlob() - puts an origin blob in both buckets - puts a deduped blob in deduped bucket GetBlob() - returns blobs only from origin bucket DeleteBlob() - deletes an origin blob from both buckets and moves one deduped blob into origin bucket - deletes a deduped blob from deduped bucket [storage] When deleting an origin blob, next time we GetBlob() we get a deduped blob with no content and we will move the content from the deleted origin blob to it (inside s3.go). Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
f3faae0e09
commit
7912b6a3fb
2 changed files with 126 additions and 50 deletions
|
@ -12,11 +12,15 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
BlobsCache = "blobs"
|
||||
// global bucket.
|
||||
BlobsCache = "blobs"
|
||||
// bucket where we store all blobs from storage(deduped blobs + original blob).
|
||||
DedupedBucket = "deduped"
|
||||
/* bucket where we store only the original/source blob (used by s3 to know which is the blob with content)
|
||||
it should contain only one blob, this is the only place from which we'll get blobs. */
|
||||
OriginBucket = "origin"
|
||||
DBExtensionName = ".db"
|
||||
dbCacheLockCheckTimeout = 10 * time.Second
|
||||
// always mark the first key inserted in the BlobsCache with a value.
|
||||
firstKeyValue = "first"
|
||||
)
|
||||
|
||||
type Cache struct {
|
||||
|
@ -26,6 +30,11 @@ type Cache struct {
|
|||
useRelPaths bool // weather or not to use relative paths, should be true for filesystem and false for s3
|
||||
}
|
||||
|
||||
// Blob is a blob record.
|
||||
type Blob struct {
|
||||
Path string
|
||||
}
|
||||
|
||||
func NewCache(rootDir string, name string, useRelPaths bool, log zlog.Logger) *Cache {
|
||||
dbPath := path.Join(rootDir, name+DBExtensionName)
|
||||
dbOpts := &bbolt.Options{
|
||||
|
@ -85,28 +94,46 @@ func (c *Cache) PutBlob(digest, path string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
var value string
|
||||
bucket, err := root.CreateBucketIfNotExists([]byte(digest))
|
||||
if err != nil {
|
||||
// this is a serious failure
|
||||
c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket")
|
||||
|
||||
bucket := root.Bucket([]byte(digest))
|
||||
if bucket == nil {
|
||||
/* mark first key in bucket
|
||||
in the context of s3 we need to know which blob is real
|
||||
and we know that the first one is always the real, so mark them.
|
||||
*/
|
||||
value = firstKeyValue
|
||||
bucket, err = root.CreateBucket([]byte(digest))
|
||||
return err
|
||||
}
|
||||
|
||||
// create nested deduped bucket where we store all the deduped blobs + original blob
|
||||
deduped, err := bucket.CreateBucketIfNotExists([]byte(DedupedBucket))
|
||||
if err != nil {
|
||||
// this is a serious failure
|
||||
c.log.Error().Err(err).Str("bucket", DedupedBucket).Msg("unable to create a bucket")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := deduped.Put([]byte(path), nil); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", DedupedBucket).Str("value", path).Msg("unable to put record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// create origin bucket and insert only the original blob
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
if origin == nil {
|
||||
// if the bucket doesn't exist yet then 'path' is the original blob
|
||||
origin, err := bucket.CreateBucket([]byte(OriginBucket))
|
||||
if err != nil {
|
||||
// this is a serious failure
|
||||
c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket")
|
||||
c.log.Error().Err(err).Str("bucket", OriginBucket).Msg("unable to create a bucket")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := bucket.Put([]byte(path), []byte(value)); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record")
|
||||
if err := origin.Put([]byte(path), nil); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", OriginBucket).Str("value", path).Msg("unable to put record")
|
||||
|
||||
return err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -130,22 +157,10 @@ func (c *Cache) GetBlob(digest string) (string, error) {
|
|||
return err
|
||||
}
|
||||
|
||||
b := root.Bucket([]byte(digest))
|
||||
if b != nil {
|
||||
if err := b.ForEach(func(k, v []byte) error {
|
||||
// always return the key with 'first' value
|
||||
if string(v) == firstKeyValue {
|
||||
blobPath.WriteString(string(k))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
c.log.Error().Err(err).Msg("unable to access digest bucket")
|
||||
|
||||
return err
|
||||
}
|
||||
bucket := root.Bucket([]byte(digest))
|
||||
if bucket != nil {
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
blobPath.WriteString(string(c.getOne(origin)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -169,12 +184,17 @@ func (c *Cache) HasBlob(digest, blob string) bool {
|
|||
return err
|
||||
}
|
||||
|
||||
b := root.Bucket([]byte(digest))
|
||||
if b == nil {
|
||||
bucket := root.Bucket([]byte(digest))
|
||||
if bucket == nil {
|
||||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
if b.Get([]byte(blob)) == nil {
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
if origin == nil {
|
||||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
if origin.Get([]byte(blob)) == nil {
|
||||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
|
@ -186,6 +206,17 @@ func (c *Cache) HasBlob(digest, blob string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (c *Cache) getOne(bucket *bbolt.Bucket) []byte {
|
||||
if bucket != nil {
|
||||
cursor := bucket.Cursor()
|
||||
k, _ := cursor.First()
|
||||
|
||||
return k
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) DeleteBlob(digest, path string) error {
|
||||
// use only relative (to rootDir) paths on blobs
|
||||
var err error
|
||||
|
@ -211,28 +242,45 @@ func (c *Cache) DeleteBlob(digest, path string) error {
|
|||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
value := bucket.Get([]byte(path))
|
||||
deduped := bucket.Bucket([]byte(DedupedBucket))
|
||||
if deduped == nil {
|
||||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
if err := bucket.Delete([]byte(path)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete")
|
||||
if err := deduped.Delete([]byte(path)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", DedupedBucket).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
cur := bucket.Cursor()
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
if origin != nil {
|
||||
originBlob := c.getOne(origin)
|
||||
if originBlob != nil {
|
||||
if err := origin.Delete([]byte(path)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to delete")
|
||||
|
||||
key, _ := cur.First()
|
||||
if key == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// move next candidate to origin bucket, next GetKey will return this one and storage will move the content here
|
||||
dedupedBlob := c.getOne(deduped)
|
||||
if dedupedBlob != nil {
|
||||
if err := origin.Put(dedupedBlob, nil); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to put")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if no key in origin bucket then digest bucket is empty, remove it
|
||||
k := c.getOne(origin)
|
||||
if k == nil {
|
||||
c.log.Debug().Str("digest", digest).Str("path", path).Msg("deleting empty bucket")
|
||||
if err := root.DeleteBucket([]byte(digest)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
// if deleted key has value 'first' then move this value to the next key
|
||||
} else if string(value) == firstKeyValue {
|
||||
if err := bucket.Put(key, value); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record")
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", digest).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1056,6 +1056,34 @@ func TestDedupeLinks(t *testing.T) {
|
|||
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
|
||||
So(err, ShouldBeNil)
|
||||
So(os.SameFile(fi1, fi2), ShouldBeTrue)
|
||||
|
||||
Convey("storage and cache inconsistency", func() {
|
||||
// delete blobs
|
||||
err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err := os.Remove(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// now cache is inconsistent with storage (blobs present in cache but not in storage)
|
||||
upload, err = imgStore.NewBlobUpload("dedupe3")
|
||||
So(err, ShouldBeNil)
|
||||
So(upload, ShouldNotBeEmpty)
|
||||
|
||||
content = []byte("test-data3")
|
||||
buf = bytes.NewBuffer(content)
|
||||
buflen = buf.Len()
|
||||
digest = godigest.FromBytes(content)
|
||||
blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf)
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
blobDigest2 := strings.Split(digest.String(), ":")[1]
|
||||
So(blobDigest2, ShouldNotBeEmpty)
|
||||
|
||||
err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue