mirror of
https://github.com/project-zot/zot.git
synced 2025-01-13 22:50:38 -05:00
s3: added logic for deduping blobs
Because s3 doesn't support hard links we store duplicated blobs as empty files. When the original blob is deleted its content is moved to the the next duplicated blob and so on. Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
ad08c08986
commit
5e22acbbc4
11 changed files with 1027 additions and 117 deletions
|
@ -308,13 +308,15 @@ zot also supports different storage drivers for each subpath.
|
|||
|
||||
### Specifying S3 credentials
|
||||
|
||||
There are multiple ways to specify S3 credentials:
|
||||
|
||||
- Config file:
|
||||
|
||||
```
|
||||
"storage": {
|
||||
"rootDirectory": "/tmp/zot", # local path used to store dedupe cache database
|
||||
"dedupe": true,
|
||||
"storageDriver": {
|
||||
"name": "s3",
|
||||
"rootdirectory": "/zot", # this is a prefix that is applied to all S3 keys to allow you to segment data in your bucket if necessary.
|
||||
"region": "us-east-2",
|
||||
"bucket": "zot-storage",
|
||||
"secure": true,
|
||||
|
@ -324,6 +326,8 @@ There are multiple ways to specify S3 credentials:
|
|||
}
|
||||
```
|
||||
|
||||
There are multiple ways to specify S3 credentials besides config file:
|
||||
|
||||
- Environment variables:
|
||||
|
||||
SDK looks for credentials in the following environment variables:
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
{
|
||||
"distSpecVersion": "1.0.1-dev",
|
||||
"storage": {
|
||||
"rootDirectory": "/zot",
|
||||
"rootDirectory": "/tmp/zot",
|
||||
"dedupe": true,
|
||||
"storageDriver": {
|
||||
"name": "s3",
|
||||
"rootdirectory": "/zot",
|
||||
"region": "us-east-2",
|
||||
"bucket": "zot-storage",
|
||||
"secure": true,
|
||||
|
@ -11,9 +13,11 @@
|
|||
},
|
||||
"subPaths": {
|
||||
"/a": {
|
||||
"rootDirectory": "/zot-a",
|
||||
"rootDirectory": "/tmp/zot1",
|
||||
"dedupe": false,
|
||||
"storageDriver": {
|
||||
"name": "s3",
|
||||
"rootdirectory": "/zot-a",
|
||||
"region": "us-east-2",
|
||||
"bucket": "zot-storage",
|
||||
"secure": true,
|
||||
|
@ -21,9 +25,11 @@
|
|||
}
|
||||
},
|
||||
"/b": {
|
||||
"rootDirectory": "/zot-b",
|
||||
"rootDirectory": "/tmp/zot2",
|
||||
"dedupe": true,
|
||||
"storageDriver": {
|
||||
"name": "s3",
|
||||
"rootdirectory": "/zot-b",
|
||||
"region": "us-east-2",
|
||||
"bucket": "zot-storage",
|
||||
"secure": true,
|
||||
|
@ -31,9 +37,11 @@
|
|||
}
|
||||
},
|
||||
"/c": {
|
||||
"rootDirectory": "/zot-c",
|
||||
"rootDirectory": "/tmp/zot3",
|
||||
"dedupe": true,
|
||||
"storageDriver": {
|
||||
"name": "s3",
|
||||
"rootdirectory": "/zot-c",
|
||||
"region": "us-east-2",
|
||||
"bucket": "zot-storage",
|
||||
"secure": false,
|
||||
|
|
|
@ -218,7 +218,8 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
c.StoreController = storage.StoreController{}
|
||||
|
||||
if c.Config.Storage.RootDirectory != "" {
|
||||
if c.Config.Storage.Dedupe {
|
||||
// no need to validate hard links work on s3
|
||||
if c.Config.Storage.Dedupe && c.Config.Storage.StorageDriver == nil {
|
||||
err := storage.ValidateHardLink(c.Config.Storage.RootDirectory)
|
||||
if err != nil {
|
||||
c.Log.Warn().Msg("input storage root directory filesystem does not supports hardlinking," +
|
||||
|
@ -229,7 +230,7 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
}
|
||||
|
||||
var defaultStore storage.ImageStore
|
||||
if len(c.Config.Storage.StorageDriver) == 0 {
|
||||
if c.Config.Storage.StorageDriver == nil {
|
||||
defaultStore = storage.NewImageStore(c.Config.Storage.RootDirectory,
|
||||
c.Config.Storage.GC, c.Config.Storage.GCDelay, c.Config.Storage.Dedupe, c.Config.Storage.Commit, c.Log, c.Metrics)
|
||||
} else {
|
||||
|
@ -246,7 +247,14 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
defaultStore = s3.NewImageStore(c.Config.Storage.RootDirectory,
|
||||
/* in the case of s3 c.Config.Storage.RootDirectory is used for caching blobs locally and
|
||||
c.Config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */
|
||||
rootDir := "/"
|
||||
if c.Config.Storage.StorageDriver["rootdirectory"] != nil {
|
||||
rootDir = fmt.Sprintf("%v", c.Config.Storage.StorageDriver["rootdirectory"])
|
||||
}
|
||||
|
||||
defaultStore = s3.NewImageStore(rootDir, c.Config.Storage.RootDirectory,
|
||||
c.Config.Storage.GC, c.Config.Storage.GCDelay, c.Config.Storage.Dedupe,
|
||||
c.Config.Storage.Commit, c.Log, c.Metrics, store)
|
||||
}
|
||||
|
@ -267,7 +275,8 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
|
||||
// creating image store per subpaths
|
||||
for route, storageConfig := range subPaths {
|
||||
if storageConfig.Dedupe {
|
||||
// no need to validate hard links work on s3
|
||||
if storageConfig.Dedupe && storageConfig.StorageDriver == nil {
|
||||
err := storage.ValidateHardLink(storageConfig.RootDirectory)
|
||||
if err != nil {
|
||||
c.Log.Warn().Msg("input storage root directory filesystem does not supports hardlinking, " +
|
||||
|
@ -277,7 +286,7 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
if len(storageConfig.StorageDriver) == 0 {
|
||||
if storageConfig.StorageDriver == nil {
|
||||
subImageStore[route] = storage.NewImageStore(storageConfig.RootDirectory,
|
||||
storageConfig.GC, storageConfig.GCDelay, storageConfig.Dedupe, storageConfig.Commit, c.Log, c.Metrics)
|
||||
} else {
|
||||
|
@ -294,7 +303,14 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
subImageStore[route] = s3.NewImageStore(storageConfig.RootDirectory,
|
||||
/* in the case of s3 c.Config.Storage.RootDirectory is used for caching blobs locally and
|
||||
c.Config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */
|
||||
rootDir := "/"
|
||||
if c.Config.Storage.StorageDriver["rootdirectory"] != nil {
|
||||
rootDir = fmt.Sprintf("%v", c.Config.Storage.StorageDriver["rootdirectory"])
|
||||
}
|
||||
|
||||
subImageStore[route] = s3.NewImageStore(rootDir, storageConfig.RootDirectory,
|
||||
storageConfig.GC, storageConfig.GCDelay, storageConfig.Dedupe, storageConfig.Commit, c.Log, c.Metrics, store)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ func TestObjectStorageController(t *testing.T) {
|
|||
conf := config.New()
|
||||
conf.HTTP.Port = port
|
||||
storageDriverParams := map[string]interface{}{
|
||||
"rootDir": "zot",
|
||||
"rootdirectory": "zot",
|
||||
"name": storage.S3StorageDriverName,
|
||||
}
|
||||
conf.Storage.StorageDriver = storageDriverParams
|
||||
|
@ -174,7 +174,7 @@ func TestObjectStorageController(t *testing.T) {
|
|||
endpoint := os.Getenv("S3MOCK_ENDPOINT")
|
||||
|
||||
storageDriverParams := map[string]interface{}{
|
||||
"rootDir": "zot",
|
||||
"rootdirectory": "zot",
|
||||
"name": storage.S3StorageDriverName,
|
||||
"region": "us-east-2",
|
||||
"bucket": bucket,
|
||||
|
@ -206,7 +206,7 @@ func TestObjectStorageControllerSubPaths(t *testing.T) {
|
|||
endpoint := os.Getenv("S3MOCK_ENDPOINT")
|
||||
|
||||
storageDriverParams := map[string]interface{}{
|
||||
"rootDir": "zot",
|
||||
"rootdirectory": "zot",
|
||||
"name": storage.S3StorageDriverName,
|
||||
"region": "us-east-2",
|
||||
"bucket": bucket,
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
const (
|
||||
BlobsCache = "blobs"
|
||||
DBExtensionName = ".db"
|
||||
dbCacheLockCheckTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
|
@ -20,6 +21,7 @@ type Cache struct {
|
|||
rootDir string
|
||||
db *bbolt.DB
|
||||
log zlog.Logger
|
||||
useRelPaths bool // weather or not to use relative paths, should be true for filesystem and false for s3
|
||||
}
|
||||
|
||||
// Blob is a blob record.
|
||||
|
@ -27,8 +29,8 @@ type Blob struct {
|
|||
Path string
|
||||
}
|
||||
|
||||
func NewCache(rootDir, name string, log zlog.Logger) *Cache {
|
||||
dbPath := path.Join(rootDir, name+".db")
|
||||
func NewCache(rootDir string, name string, useRelPaths bool, log zlog.Logger) *Cache {
|
||||
dbPath := path.Join(rootDir, name+DBExtensionName)
|
||||
dbOpts := &bbolt.Options{
|
||||
Timeout: dbCacheLockCheckTimeout,
|
||||
FreelistType: bbolt.FreelistArrayType,
|
||||
|
@ -57,7 +59,7 @@ func NewCache(rootDir, name string, log zlog.Logger) *Cache {
|
|||
return nil
|
||||
}
|
||||
|
||||
return &Cache{rootDir: rootDir, db: cacheDB, log: log}
|
||||
return &Cache{rootDir: rootDir, db: cacheDB, useRelPaths: useRelPaths, log: log}
|
||||
}
|
||||
|
||||
func (c *Cache) PutBlob(digest, path string) error {
|
||||
|
@ -68,10 +70,13 @@ func (c *Cache) PutBlob(digest, path string) error {
|
|||
}
|
||||
|
||||
// use only relative (to rootDir) paths on blobs
|
||||
relp, err := filepath.Rel(c.rootDir, path)
|
||||
var err error
|
||||
if c.useRelPaths {
|
||||
path, err = filepath.Rel(c.rootDir, path)
|
||||
if err != nil {
|
||||
c.log.Error().Err(err).Str("path", path).Msg("unable to get relative path")
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||
root := tx.Bucket([]byte(BlobsCache))
|
||||
|
@ -91,8 +96,8 @@ func (c *Cache) PutBlob(digest, path string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := bucket.Put([]byte(relp), nil); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", digest).Str("value", relp).Msg("unable to put record")
|
||||
if err := bucket.Put([]byte(path), nil); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -166,10 +171,13 @@ func (c *Cache) HasBlob(digest, blob string) bool {
|
|||
|
||||
func (c *Cache) DeleteBlob(digest, path string) error {
|
||||
// use only relative (to rootDir) paths on blobs
|
||||
relp, err := filepath.Rel(c.rootDir, path)
|
||||
var err error
|
||||
if c.useRelPaths {
|
||||
path, err = filepath.Rel(c.rootDir, path)
|
||||
if err != nil {
|
||||
c.log.Error().Err(err).Str("path", path).Msg("unable to get relative path")
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||
root := tx.Bucket([]byte(BlobsCache))
|
||||
|
@ -186,8 +194,8 @@ func (c *Cache) DeleteBlob(digest, path string) error {
|
|||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
if err := bucket.Delete([]byte(relp)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", relp).Msg("unable to delete")
|
||||
if err := bucket.Delete([]byte(path)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -196,9 +204,9 @@ func (c *Cache) DeleteBlob(digest, path string) error {
|
|||
|
||||
k, _ := cur.First()
|
||||
if k == nil {
|
||||
c.log.Debug().Str("digest", digest).Str("path", relp).Msg("deleting empty bucket")
|
||||
c.log.Debug().Str("digest", digest).Str("path", path).Msg("deleting empty bucket")
|
||||
if err := root.DeleteBucket([]byte(digest)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", relp).Msg("unable to delete")
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -17,9 +17,9 @@ func TestCache(t *testing.T) {
|
|||
log := log.NewLogger("debug", "")
|
||||
So(log, ShouldNotBeNil)
|
||||
|
||||
So(storage.NewCache("/deadBEEF", "cache_test", log), ShouldBeNil)
|
||||
So(storage.NewCache("/deadBEEF", "cache_test", true, log), ShouldBeNil)
|
||||
|
||||
cache := storage.NewCache(dir, "cache_test", log)
|
||||
cache := storage.NewCache(dir, "cache_test", true, log)
|
||||
So(cache, ShouldNotBeNil)
|
||||
|
||||
val, err := cache.GetBlob("key")
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
_ "crypto/sha256"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
_ "github.com/docker/distribution/registry/storage/driver/s3-aws"
|
||||
guuid "github.com/gofrs/uuid"
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
ispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/rs/zerolog"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"gopkg.in/resty.v1"
|
||||
|
@ -27,6 +29,7 @@ import (
|
|||
"zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
"zotregistry.io/zot/pkg/storage/s3"
|
||||
"zotregistry.io/zot/pkg/test"
|
||||
)
|
||||
|
||||
// nolint: gochecknoglobals
|
||||
|
@ -50,15 +53,19 @@ func skipIt(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func createMockStorage(rootDir string, store driver.StorageDriver) storage.ImageStore {
|
||||
func createMockStorage(rootDir string, cacheDir string, dedupe bool, store driver.StorageDriver) storage.ImageStore {
|
||||
log := log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
il := s3.NewImageStore(rootDir, false, storage.DefaultGCDelay, false, false, log, metrics, store)
|
||||
il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, dedupe, false, log, metrics, store)
|
||||
|
||||
return il
|
||||
}
|
||||
|
||||
func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStore, error) {
|
||||
func createObjectsStore(rootDir string, cacheDir string, dedupe bool) (
|
||||
driver.StorageDriver,
|
||||
storage.ImageStore,
|
||||
error,
|
||||
) {
|
||||
bucket := "zot-storage-test"
|
||||
endpoint := os.Getenv("S3MOCK_ENDPOINT")
|
||||
storageDriverParams := map[string]interface{}{
|
||||
|
@ -67,6 +74,8 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor
|
|||
"region": "us-east-2",
|
||||
"bucket": bucket,
|
||||
"regionendpoint": endpoint,
|
||||
"accesskey": "minioadmin",
|
||||
"secretkey": "minioadmin",
|
||||
"secure": false,
|
||||
"skipverify": false,
|
||||
}
|
||||
|
@ -86,13 +95,14 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor
|
|||
|
||||
log := log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
il := s3.NewImageStore(rootDir, false, storage.DefaultGCDelay, false, false, log, metrics, store)
|
||||
il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, dedupe, false, log, metrics, store)
|
||||
|
||||
return store, il, err
|
||||
}
|
||||
|
||||
type FileInfoMock struct {
|
||||
isDirFn func() bool
|
||||
sizeFn func() int64
|
||||
}
|
||||
|
||||
func (f *FileInfoMock) Path() string {
|
||||
|
@ -100,6 +110,10 @@ func (f *FileInfoMock) Path() string {
|
|||
}
|
||||
|
||||
func (f *FileInfoMock) Size() int64 {
|
||||
if f != nil && f.sizeFn != nil {
|
||||
return f.sizeFn()
|
||||
}
|
||||
|
||||
return int64(fileInfoSize)
|
||||
}
|
||||
|
||||
|
@ -265,7 +279,7 @@ func TestStorageDriverStatFunction(t *testing.T) {
|
|||
|
||||
testDir := path.Join("/oci-repo-test", uuid.String())
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir)
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
|
||||
/* There is an issue with storageDriver.Stat() that returns a storageDriver.FileInfo()
|
||||
|
@ -345,11 +359,12 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
|
||||
testDir := path.Join("/oci-repo-test", uuid.String())
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir)
|
||||
tdir := t.TempDir()
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
|
||||
Convey("Invalid validate repo", t, func(c C) {
|
||||
So(imgStore, ShouldNotBeNil)
|
||||
So(imgStore.InitRepo(testImage), ShouldBeNil)
|
||||
objects, err := storeDriver.List(context.Background(), path.Join(imgStore.RootDir(), testImage))
|
||||
So(err, ShouldBeNil)
|
||||
|
@ -365,9 +380,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Invalid get image tags", t, func(c C) {
|
||||
storeDriver, imgStore, err := createObjectsStore(testDir)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
So(err, ShouldBeNil)
|
||||
So(imgStore.InitRepo(testImage), ShouldBeNil)
|
||||
|
||||
So(storeDriver.Move(context.Background(), path.Join(testDir, testImage, "index.json"),
|
||||
|
@ -386,10 +398,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Invalid get image manifest", t, func(c C) {
|
||||
storeDriver, imgStore, err := createObjectsStore(testDir)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
So(err, ShouldBeNil)
|
||||
So(imgStore, ShouldNotBeNil)
|
||||
So(imgStore.InitRepo(testImage), ShouldBeNil)
|
||||
So(storeDriver.Delete(context.Background(), path.Join(testDir, testImage, "index.json")), ShouldBeNil)
|
||||
_, _, _, err = imgStore.GetImageManifest(testImage, "")
|
||||
|
@ -402,9 +410,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Invalid validate repo", t, func(c C) {
|
||||
storeDriver, imgStore, err := createObjectsStore(testDir)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
So(err, ShouldBeNil)
|
||||
So(imgStore, ShouldNotBeNil)
|
||||
|
||||
So(imgStore.InitRepo(testImage), ShouldBeNil)
|
||||
|
@ -421,9 +426,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Invalid finish blob upload", t, func(c C) {
|
||||
storeDriver, imgStore, err := createObjectsStore(testDir)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
So(err, ShouldBeNil)
|
||||
So(imgStore, ShouldNotBeNil)
|
||||
|
||||
So(imgStore.InitRepo(testImage), ShouldBeNil)
|
||||
|
@ -455,7 +457,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test storage driver errors", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
listFn: func(ctx context.Context, path string) ([]string, error) {
|
||||
return []string{testImage}, errS3
|
||||
},
|
||||
|
@ -527,17 +529,32 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test ValidateRepo", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
tdir := t.TempDir()
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
listFn: func(ctx context.Context, path string) ([]string, error) {
|
||||
return []string{testImage, testImage}, errS3
|
||||
},
|
||||
})
|
||||
|
||||
_, err := imgStore.ValidateRepo(testImage)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
listFn: func(ctx context.Context, path string) ([]string, error) {
|
||||
return []string{testImage, testImage}, nil
|
||||
},
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return nil, errS3
|
||||
},
|
||||
})
|
||||
|
||||
_, err = imgStore.ValidateRepo(testImage)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test ValidateRepo2", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
listFn: func(ctx context.Context, path string) ([]string, error) {
|
||||
return []string{"test/test/oci-layout", "test/test/index.json"}, nil
|
||||
},
|
||||
|
@ -550,7 +567,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test ValidateRepo3", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
listFn: func(ctx context.Context, path string) ([]string, error) {
|
||||
return []string{"test/test/oci-layout", "test/test/index.json"}, nil
|
||||
},
|
||||
|
@ -567,7 +584,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
|
||||
Convey("Test ValidateRepo4", t, func(c C) {
|
||||
ociLayout := []byte(`{"imageLayoutVersion": "9.9.9"}`)
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
listFn: func(ctx context.Context, path string) ([]string, error) {
|
||||
return []string{"test/test/oci-layout", "test/test/index.json"}, nil
|
||||
},
|
||||
|
@ -583,7 +600,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test GetRepositories", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
walkFn: func(ctx context.Context, path string, f driver.WalkFn) error {
|
||||
return f(new(FileInfoMock))
|
||||
},
|
||||
|
@ -594,7 +611,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test DeleteImageManifest", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
getContentFn: func(ctx context.Context, path string) ([]byte, error) {
|
||||
return []byte{}, errS3
|
||||
},
|
||||
|
@ -604,13 +621,13 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test DeleteImageManifest2", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{})
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{})
|
||||
err := imgStore.DeleteImageManifest(testImage, "1.0")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test NewBlobUpload", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
putContentFn: func(ctx context.Context, path string, content []byte) error {
|
||||
return errS3
|
||||
},
|
||||
|
@ -620,7 +637,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test GetBlobUpload", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return &FileInfoMock{}, errS3
|
||||
},
|
||||
|
@ -630,7 +647,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test PutBlobChunkStreamed", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{}, errS3
|
||||
},
|
||||
|
@ -640,7 +657,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test PutBlobChunkStreamed2", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{writeFn: func(b []byte) (int, error) {
|
||||
return 0, errS3
|
||||
|
@ -652,7 +669,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test PutBlobChunk", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{}, errS3
|
||||
},
|
||||
|
@ -662,7 +679,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test PutBlobChunk2", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{
|
||||
writeFn: func(b []byte) (int, error) {
|
||||
|
@ -679,7 +696,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test PutBlobChunk3", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{
|
||||
writeFn: func(b []byte) (int, error) {
|
||||
|
@ -693,7 +710,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test FinishBlobUpload", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{
|
||||
commitFn: func() error {
|
||||
|
@ -708,7 +725,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test FinishBlobUpload2", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{
|
||||
closeFn: func() error {
|
||||
|
@ -723,7 +740,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test FinishBlobUpload3", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
readerFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
return nil, errS3
|
||||
},
|
||||
|
@ -734,7 +751,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test FinishBlobUpload4", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
moveFn: func(ctx context.Context, sourcePath, destPath string) error {
|
||||
return errS3
|
||||
},
|
||||
|
@ -745,7 +762,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test FullBlobUpload", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{}, errS3
|
||||
},
|
||||
|
@ -756,14 +773,14 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test FullBlobUpload2", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{})
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{})
|
||||
d := godigest.FromBytes([]byte(" "))
|
||||
_, _, err := imgStore.FullBlobUpload(testImage, ioutil.NopCloser(strings.NewReader("")), d.String())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test FullBlobUpload3", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
moveFn: func(ctx context.Context, sourcePath, destPath string) error {
|
||||
return errS3
|
||||
},
|
||||
|
@ -774,7 +791,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test GetBlob", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
readerFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
return ioutil.NopCloser(strings.NewReader("")), errS3
|
||||
},
|
||||
|
@ -785,7 +802,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test DeleteBlob", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
deleteFn: func(ctx context.Context, path string) error {
|
||||
return errS3
|
||||
},
|
||||
|
@ -796,7 +813,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Test GetReferrers", t, func(c C) {
|
||||
imgStore = createMockStorage(testDir, &StorageDriverMock{
|
||||
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
|
||||
deleteFn: func(ctx context.Context, path string) error {
|
||||
return errS3
|
||||
},
|
||||
|
@ -807,3 +824,622 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
|||
So(err, ShouldEqual, zerr.ErrMethodNotSupported)
|
||||
})
|
||||
}
|
||||
|
||||
func TestS3Dedupe(t *testing.T) {
|
||||
skipIt(t)
|
||||
Convey("Dedupe", t, func(c C) {
|
||||
uuid, err := guuid.NewV4()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
testDir := path.Join("/oci-repo-test", uuid.String())
|
||||
|
||||
tdir := t.TempDir()
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
|
||||
// manifest1
|
||||
upload, err := imgStore.NewBlobUpload("dedupe1")
|
||||
So(err, ShouldBeNil)
|
||||
So(upload, ShouldNotBeEmpty)
|
||||
|
||||
content := []byte("test-data3")
|
||||
buf := bytes.NewBuffer(content)
|
||||
buflen := buf.Len()
|
||||
digest := godigest.FromBytes(content)
|
||||
blob, err := imgStore.PutBlobChunkStreamed("dedupe1", upload, buf)
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
blobDigest1 := strings.Split(digest.String(), ":")[1]
|
||||
So(blobDigest1, ShouldNotBeEmpty)
|
||||
|
||||
err = imgStore.FinishBlobUpload("dedupe1", upload, buf, digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
_, checkBlobSize1, err := imgStore.CheckBlob("dedupe1", digest.String())
|
||||
So(checkBlobSize1, ShouldBeGreaterThan, 0)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, getBlobSize1, err := imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(getBlobSize1, ShouldBeGreaterThan, 0)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
cblob, cdigest := test.GetRandomImageConfig()
|
||||
_, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(clen, ShouldEqual, len(cblob))
|
||||
hasBlob, _, err := imgStore.CheckBlob("dedupe1", cdigest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(hasBlob, ShouldEqual, true)
|
||||
|
||||
manifest := ispec.Manifest{
|
||||
Config: ispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.config.v1+json",
|
||||
Digest: cdigest,
|
||||
Size: int64(len(cblob)),
|
||||
},
|
||||
Layers: []ispec.Descriptor{
|
||||
{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar",
|
||||
Digest: digest,
|
||||
Size: int64(buflen),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
manifest.SchemaVersion = 2
|
||||
manifestBuf, err := json.Marshal(manifest)
|
||||
So(err, ShouldBeNil)
|
||||
digest = godigest.FromBytes(manifestBuf)
|
||||
_, err = imgStore.PutImageManifest("dedupe1", digest.String(), ispec.MediaTypeImageManifest, manifestBuf)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetImageManifest("dedupe1", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// manifest2
|
||||
upload, err = imgStore.NewBlobUpload("dedupe2")
|
||||
So(err, ShouldBeNil)
|
||||
So(upload, ShouldNotBeEmpty)
|
||||
|
||||
content = []byte("test-data3")
|
||||
buf = bytes.NewBuffer(content)
|
||||
buflen = buf.Len()
|
||||
digest = godigest.FromBytes(content)
|
||||
|
||||
blob, err = imgStore.PutBlobChunkStreamed("dedupe2", upload, buf)
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
blobDigest2 := strings.Split(digest.String(), ":")[1]
|
||||
So(blobDigest2, ShouldNotBeEmpty)
|
||||
|
||||
err = imgStore.FinishBlobUpload("dedupe2", upload, buf, digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
_, checkBlobSize2, err := imgStore.CheckBlob("dedupe2", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(checkBlobSize2, ShouldBeGreaterThan, 0)
|
||||
|
||||
_, getBlobSize2, err := imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
So(getBlobSize2, ShouldBeGreaterThan, 0)
|
||||
So(checkBlobSize1, ShouldEqual, checkBlobSize2)
|
||||
So(getBlobSize1, ShouldEqual, getBlobSize2)
|
||||
|
||||
cblob, cdigest = test.GetRandomImageConfig()
|
||||
_, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(clen, ShouldEqual, len(cblob))
|
||||
hasBlob, _, err = imgStore.CheckBlob("dedupe2", cdigest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(hasBlob, ShouldEqual, true)
|
||||
|
||||
manifest = ispec.Manifest{
|
||||
Config: ispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.config.v1+json",
|
||||
Digest: cdigest,
|
||||
Size: int64(len(cblob)),
|
||||
},
|
||||
Layers: []ispec.Descriptor{
|
||||
{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar",
|
||||
Digest: digest,
|
||||
Size: int64(buflen),
|
||||
},
|
||||
},
|
||||
}
|
||||
manifest.SchemaVersion = 2
|
||||
manifestBuf, err = json.Marshal(manifest)
|
||||
So(err, ShouldBeNil)
|
||||
digest = godigest.FromBytes(manifestBuf)
|
||||
_, err = imgStore.PutImageManifest("dedupe2", "1.0", ispec.MediaTypeImageManifest, manifestBuf)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetImageManifest("dedupe2", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// original blob should have the real content of blob
|
||||
So(fi1.Size(), ShouldNotEqual, fi2.Size())
|
||||
So(fi1.Size(), ShouldBeGreaterThan, 0)
|
||||
// deduped blob should be of size 0
|
||||
So(fi2.Size(), ShouldEqual, 0)
|
||||
|
||||
Convey("Check that delete blobs moves the real content to the next contenders", func() {
|
||||
// if we delete blob1, the content should be moved to blob2
|
||||
err = imgStore.DeleteBlob("dedupe1", "sha256:"+blobDigest1)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(fi2.Size(), ShouldBeGreaterThan, 0)
|
||||
// the second blob should now be equal to the deleted blob.
|
||||
So(fi2.Size(), ShouldEqual, fi1.Size())
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe2", "sha256:"+blobDigest2)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2))
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Check backward compatibility - switch dedupe to false", func() {
|
||||
/* copy cache to the new storage with dedupe false (doing this because we
|
||||
already have a cache object holding the lock on cache db file) */
|
||||
input, err := ioutil.ReadFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
tdir = t.TempDir()
|
||||
|
||||
err = ioutil.WriteFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName), input, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, false)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
|
||||
// manifest3 without dedupe
|
||||
upload, err = imgStore.NewBlobUpload("dedupe3")
|
||||
So(err, ShouldBeNil)
|
||||
So(upload, ShouldNotBeEmpty)
|
||||
|
||||
content = []byte("test-data3")
|
||||
buf = bytes.NewBuffer(content)
|
||||
buflen = buf.Len()
|
||||
digest = godigest.FromBytes(content)
|
||||
|
||||
blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf)
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
blobDigest2 := strings.Split(digest.String(), ":")[1]
|
||||
So(blobDigest2, ShouldNotBeEmpty)
|
||||
|
||||
err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
_, _, err = imgStore.CheckBlob("dedupe3", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// check that we retrieve the real dedupe2/blob (which is deduped earlier - 0 size) when switching to dedupe false
|
||||
_, getBlobSize2, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
So(getBlobSize1, ShouldEqual, getBlobSize2)
|
||||
|
||||
_, checkBlobSize2, err := imgStore.CheckBlob("dedupe2", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(checkBlobSize2, ShouldBeGreaterThan, 0)
|
||||
So(checkBlobSize2, ShouldEqual, getBlobSize2)
|
||||
|
||||
_, getBlobSize3, err := imgStore.GetBlob("dedupe3", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
So(getBlobSize1, ShouldEqual, getBlobSize3)
|
||||
|
||||
_, checkBlobSize3, err := imgStore.CheckBlob("dedupe3", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(checkBlobSize3, ShouldBeGreaterThan, 0)
|
||||
So(checkBlobSize3, ShouldEqual, getBlobSize3)
|
||||
|
||||
cblob, cdigest = test.GetRandomImageConfig()
|
||||
_, clen, err = imgStore.FullBlobUpload("dedupe3", bytes.NewReader(cblob), cdigest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(clen, ShouldEqual, len(cblob))
|
||||
hasBlob, _, err = imgStore.CheckBlob("dedupe3", cdigest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(hasBlob, ShouldEqual, true)
|
||||
|
||||
manifest = ispec.Manifest{
|
||||
Config: ispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.config.v1+json",
|
||||
Digest: cdigest,
|
||||
Size: int64(len(cblob)),
|
||||
},
|
||||
Layers: []ispec.Descriptor{
|
||||
{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar",
|
||||
Digest: digest,
|
||||
Size: int64(buflen),
|
||||
},
|
||||
},
|
||||
}
|
||||
manifest.SchemaVersion = 2
|
||||
manifestBuf, err = json.Marshal(manifest)
|
||||
So(err, ShouldBeNil)
|
||||
digest = godigest.FromBytes(manifestBuf)
|
||||
_, err = imgStore.PutImageManifest("dedupe3", "1.0", ispec.MediaTypeImageManifest, manifestBuf)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetImageManifest("dedupe3", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest1))
|
||||
So(err, ShouldBeNil)
|
||||
So(fi2.Size(), ShouldEqual, 0)
|
||||
|
||||
fi3, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe3", "blobs", "sha256", blobDigest2))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// the new blob with dedupe false should be equal with the origin blob from dedupe1
|
||||
So(fi1.Size(), ShouldEqual, fi3.Size())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestS3DedupeErr(t *testing.T) {
|
||||
skipIt(t)
|
||||
|
||||
uuid, err := guuid.NewV4()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
testDir := path.Join("/oci-repo-test", uuid.String())
|
||||
|
||||
tdir := t.TempDir()
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
|
||||
Convey("Test DedupeBlob", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{})
|
||||
|
||||
err = os.Remove(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName))
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
|
||||
|
||||
// trigger unable to insert blob record
|
||||
err := imgStore.DedupeBlob("", digest, "")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
moveFn: func(ctx context.Context, sourcePath string, destPath string) error {
|
||||
return errS3
|
||||
},
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return driver.FileInfoInternal{}, errS3
|
||||
},
|
||||
})
|
||||
|
||||
// trigger unable to rename blob
|
||||
err = imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
// trigger retry
|
||||
err = imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DedupeBlob - error on second store.Stat()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
if path == "dst2" {
|
||||
return driver.FileInfoInternal{}, errS3
|
||||
}
|
||||
|
||||
return driver.FileInfoInternal{}, nil
|
||||
},
|
||||
})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
|
||||
err := imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DedupeBlob("", digest, "dst2")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DedupeBlob - error on store.PutContent()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
putContentFn: func(ctx context.Context, path string, content []byte) error {
|
||||
return errS3
|
||||
},
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
|
||||
err := imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DedupeBlob("", digest, "dst2")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DedupeBlob - error on store.Delete()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
deleteFn: func(ctx context.Context, path string) error {
|
||||
return errS3
|
||||
},
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
|
||||
err := imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test copyBlob() - error on initRepo()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
putContentFn: func(ctx context.Context, path string, content []byte) error {
|
||||
return errS3
|
||||
},
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return driver.FileInfoInternal{}, errS3
|
||||
},
|
||||
writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{}, errS3
|
||||
},
|
||||
})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
|
||||
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")
|
||||
|
||||
err := imgStore.DedupeBlob("repo", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.CheckBlob("repo", digest.String())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test copyBlob() - error on store.PutContent()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
putContentFn: func(ctx context.Context, path string, content []byte) error {
|
||||
return errS3
|
||||
},
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return driver.FileInfoInternal{}, errS3
|
||||
},
|
||||
})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
|
||||
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")
|
||||
|
||||
err := imgStore.DedupeBlob("repo", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.CheckBlob("repo", digest.String())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test copyBlob() - error on store.Stat()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return driver.FileInfoInternal{}, errS3
|
||||
},
|
||||
})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
|
||||
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")
|
||||
|
||||
err := imgStore.DedupeBlob("repo", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.CheckBlob("repo", digest.String())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test GetBlob() - error on second store.Stat()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
|
||||
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")
|
||||
|
||||
err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// copy cache db to the new imagestore
|
||||
input, err := ioutil.ReadFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
tdir = t.TempDir()
|
||||
|
||||
err = ioutil.WriteFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName), input, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
if strings.Contains(path, "repo1/dst1") {
|
||||
return driver.FileInfoInternal{}, driver.PathNotFoundError{}
|
||||
}
|
||||
|
||||
return driver.FileInfoInternal{}, nil
|
||||
},
|
||||
})
|
||||
|
||||
_, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test GetBlob() - error on store.Reader()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{})
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256,
|
||||
"7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc")
|
||||
|
||||
err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// copy cache db to the new imagestore
|
||||
input, err := ioutil.ReadFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
tdir = t.TempDir()
|
||||
|
||||
err = ioutil.WriteFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName), input, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return &FileInfoMock{
|
||||
sizeFn: func() int64 {
|
||||
return 0
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
readerFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
if strings.Contains(path, "repo1/dst1") {
|
||||
return ioutil.NopCloser(strings.NewReader("")), errS3
|
||||
}
|
||||
|
||||
return ioutil.NopCloser(strings.NewReader("")), nil
|
||||
},
|
||||
})
|
||||
|
||||
_, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DeleteBlob() - error on store.Move()", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
hash := "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc"
|
||||
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256, hash)
|
||||
|
||||
blobPath := path.Join(testDir, "repo/blobs/sha256", hash)
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
moveFn: func(ctx context.Context, sourcePath, destPath string) error {
|
||||
if destPath == blobPath {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errS3
|
||||
},
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
if path != blobPath {
|
||||
return nil, errS3
|
||||
}
|
||||
|
||||
return &FileInfoMock{}, nil
|
||||
},
|
||||
})
|
||||
|
||||
err := imgStore.DedupeBlob("repo", digest, blobPath)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.CheckBlob("repo2", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("repo", digest.String())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test FullBlobUpload", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
moveFn: func(ctx context.Context, sourcePath, destPath string) error {
|
||||
return errS3
|
||||
},
|
||||
})
|
||||
d := godigest.FromBytes([]byte(""))
|
||||
_, _, err := imgStore.FullBlobUpload(testImage, ioutil.NopCloser(strings.NewReader("")), d.String())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test FinishBlobUpload", t, func(c C) {
|
||||
tdir := t.TempDir()
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
moveFn: func(ctx context.Context, sourcePath, destPath string) error {
|
||||
return errS3
|
||||
},
|
||||
})
|
||||
d := godigest.FromBytes([]byte(""))
|
||||
err := imgStore.FinishBlobUpload(testImage, "uuid", ioutil.NopCloser(strings.NewReader("")), d.String())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
}
|
||||
|
||||
func TestInjectDedupe(t *testing.T) {
|
||||
tdir := t.TempDir()
|
||||
|
||||
uuid, err := guuid.NewV4()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
testDir := path.Join("/oci-repo-test", uuid.String())
|
||||
|
||||
Convey("Inject errors in DedupeBlob function", t, func() {
|
||||
imgStore := createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
statFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
|
||||
return &FileInfoMock{}, errS3
|
||||
},
|
||||
})
|
||||
err := imgStore.DedupeBlob("blob", "digest", "newblob")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
injected := test.InjectFailure(0)
|
||||
err = imgStore.DedupeBlob("blob", "digest", "newblob")
|
||||
if injected {
|
||||
So(err, ShouldNotBeNil)
|
||||
} else {
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
injected = test.InjectFailure(1)
|
||||
err = imgStore.DedupeBlob("blob", "digest", "newblob")
|
||||
if injected {
|
||||
So(err, ShouldNotBeNil)
|
||||
} else {
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,9 +8,9 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -27,11 +27,13 @@ import (
|
|||
"zotregistry.io/zot/pkg/extensions/monitoring"
|
||||
zlog "zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
"zotregistry.io/zot/pkg/test"
|
||||
)
|
||||
|
||||
const (
|
||||
RLOCK = "RLock"
|
||||
RWLOCK = "RWLock"
|
||||
CacheDBName = "s3_cache"
|
||||
)
|
||||
|
||||
// ObjectStorage provides the image storage operations.
|
||||
|
@ -46,6 +48,8 @@ type ObjectStorage struct {
|
|||
// see: https://github.com/distribution/distribution/blob/main/registry/storage/driver/s3-aws/s3.go#L545
|
||||
multiPartUploads sync.Map
|
||||
metrics monitoring.MetricServer
|
||||
cache *storage.Cache
|
||||
dedupe bool
|
||||
}
|
||||
|
||||
func (is *ObjectStorage) RootDir() string {
|
||||
|
@ -62,7 +66,7 @@ func (is *ObjectStorage) DirExists(d string) bool {
|
|||
|
||||
// NewObjectStorage returns a new image store backed by cloud storages.
|
||||
// see https://github.com/docker/docker.github.io/tree/master/registry/storage-drivers
|
||||
func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commit bool,
|
||||
func NewImageStore(rootDir string, cacheDir string, gc bool, gcDelay time.Duration, dedupe, commit bool,
|
||||
log zlog.Logger, metrics monitoring.MetricServer,
|
||||
store driver.StorageDriver,
|
||||
) storage.ImageStore {
|
||||
|
@ -74,6 +78,19 @@ func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commi
|
|||
log: log.With().Caller().Logger(),
|
||||
multiPartUploads: sync.Map{},
|
||||
metrics: metrics,
|
||||
dedupe: dedupe,
|
||||
}
|
||||
|
||||
cachePath := path.Join(cacheDir, CacheDBName+storage.DBExtensionName)
|
||||
|
||||
if dedupe {
|
||||
imgStore.cache = storage.NewCache(cacheDir, CacheDBName, false, log)
|
||||
} else {
|
||||
// if dedupe was used in previous runs use it to serve blobs correctly
|
||||
if _, err := os.Stat(cachePath); err == nil {
|
||||
log.Info().Str("cache path", cachePath).Msg("found cache database")
|
||||
imgStore.cache = storage.NewCache(cacheDir, CacheDBName, false, log)
|
||||
}
|
||||
}
|
||||
|
||||
return imgStore
|
||||
|
@ -197,15 +214,11 @@ func (is *ObjectStorage) ValidateRepo(name string) (bool, error) {
|
|||
}
|
||||
|
||||
for _, file := range files {
|
||||
f, err := is.store.Stat(context.Background(), file)
|
||||
_, err := is.store.Stat(context.Background(), file)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if strings.HasSuffix(file, "blobs") && !f.IsDir() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
filename, err := filepath.Rel(dir, file)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -923,12 +936,21 @@ func (is *ObjectStorage) FinishBlobUpload(repo, uuid string, body io.Reader, dig
|
|||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
if is.dedupe && is.cache != nil {
|
||||
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
|
||||
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
|
||||
Str("dst", dst).Msg("unable to dedupe blob")
|
||||
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := is.store.Move(context.Background(), src, dst); err != nil {
|
||||
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
|
||||
Str("dst", dst).Msg("unable to finish blob")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
is.multiPartUploads.Delete(src)
|
||||
|
||||
|
@ -994,17 +1016,104 @@ func (is *ObjectStorage) FullBlobUpload(repo string, body io.Reader, digest stri
|
|||
|
||||
dst := is.BlobPath(repo, dstDigest)
|
||||
|
||||
if is.dedupe && is.cache != nil {
|
||||
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
|
||||
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
|
||||
Str("dst", dst).Msg("unable to dedupe blob")
|
||||
|
||||
return "", -1, err
|
||||
}
|
||||
} else {
|
||||
if err := is.store.Move(context.Background(), src, dst); err != nil {
|
||||
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
|
||||
Str("dst", dst).Msg("unable to finish blob")
|
||||
|
||||
return "", -1, err
|
||||
}
|
||||
}
|
||||
|
||||
return uuid, int64(nbytes), nil
|
||||
}
|
||||
|
||||
func (is *ObjectStorage) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error {
|
||||
retry:
|
||||
is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: enter")
|
||||
|
||||
dstRecord, err := is.cache.GetBlob(dstDigest.String())
|
||||
if err := test.Error(err); err != nil && !errors.Is(err, zerr.ErrCacheMiss) {
|
||||
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to lookup blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if dstRecord == "" {
|
||||
// cache record doesn't exist, so first disk and cache entry for this digest
|
||||
if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// move the blob from uploads to final dest
|
||||
if err := is.store.Move(context.Background(), src, dst); err != nil {
|
||||
is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
is.log.Debug().Str("src", src).Str("dst", dst).Msg("dedupe: rename")
|
||||
} else {
|
||||
// cache record exists, but due to GC and upgrades from older versions,
|
||||
// disk content and cache records may go out of sync
|
||||
_, err := is.store.Stat(context.Background(), dstRecord)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
|
||||
// the actual blob on disk may have been removed by GC, so sync the cache
|
||||
err := is.cache.DeleteBlob(dstDigest.String(), dstRecord)
|
||||
if err = test.Error(err); err != nil {
|
||||
// nolint:lll
|
||||
is.log.Error().Err(err).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: unable to delete blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
goto retry
|
||||
}
|
||||
|
||||
fileInfo, err := is.store.Stat(context.Background(), dst)
|
||||
if err != nil && !errors.As(err, &driver.PathNotFoundError{}) {
|
||||
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if dstRecord == dst {
|
||||
is.log.Warn().Msg("FOUND equal dsts")
|
||||
}
|
||||
|
||||
// prevent overwrite original blob
|
||||
if fileInfo == nil && dstRecord != dst {
|
||||
// put empty file so that we are compliant with oci layout, this will act as a deduped blob
|
||||
err = is.store.PutContent(context.Background(), dst, []byte{})
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to write empty file")
|
||||
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
is.log.Warn().Msg("prevent overwrite")
|
||||
}
|
||||
|
||||
// remove temp blobupload
|
||||
if err := is.store.Delete(context.Background(), src); err != nil {
|
||||
is.log.Error().Err(err).Str("src", src).Msg("dedupe: unable to remove blob")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
is.log.Debug().Str("src", src).Msg("dedupe: remove")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1041,24 +1150,81 @@ func (is *ObjectStorage) CheckBlob(repo, digest string) (bool, int64, error) {
|
|||
|
||||
blobPath := is.BlobPath(repo, dgst)
|
||||
|
||||
if is.dedupe && is.cache != nil {
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
} else {
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
}
|
||||
|
||||
binfo, err := is.store.Stat(context.Background(), blobPath)
|
||||
if err == nil && binfo.Size() > 0 {
|
||||
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
|
||||
|
||||
return true, binfo.Size(), nil
|
||||
}
|
||||
// otherwise is a 'deduped' blob (empty file)
|
||||
|
||||
// Check blobs in cache
|
||||
dstRecord, err := is.checkCacheBlob(digest)
|
||||
if err != nil {
|
||||
var perr driver.PathNotFoundError
|
||||
if errors.As(err, &perr) {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found")
|
||||
|
||||
return false, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
||||
// If found copy to location
|
||||
blobSize, err := is.copyBlob(repo, blobPath, dstRecord)
|
||||
if err != nil {
|
||||
return false, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
// put deduped blob in cache
|
||||
if err := is.cache.PutBlob(digest, blobPath); err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("dedupe: unable to insert blob record")
|
||||
|
||||
return false, -1, err
|
||||
}
|
||||
|
||||
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
|
||||
return true, blobSize, nil
|
||||
}
|
||||
|
||||
return true, binfo.Size(), nil
|
||||
func (is *ObjectStorage) checkCacheBlob(digest string) (string, error) {
|
||||
if is.cache == nil {
|
||||
return "", zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
dstRecord, err := is.cache.GetBlob(digest)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
is.log.Debug().Str("digest", digest).Str("dstRecord", dstRecord).Msg("cache: found dedupe record")
|
||||
|
||||
return dstRecord, nil
|
||||
}
|
||||
|
||||
func (is *ObjectStorage) copyBlob(repo string, blobPath string, dstRecord string) (int64, error) {
|
||||
if err := is.initRepo(repo); err != nil {
|
||||
is.log.Error().Err(err).Str("repo", repo).Msg("unable to initialize an empty repo")
|
||||
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if err := is.store.PutContent(context.Background(), blobPath, []byte{}); err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", blobPath).Str("link", dstRecord).Msg("dedupe: unable to link")
|
||||
|
||||
return -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
// return original blob with content instead of the deduped one (blobPath)
|
||||
binfo, err := is.store.Stat(context.Background(), dstRecord)
|
||||
if err == nil {
|
||||
return binfo.Size(), nil
|
||||
}
|
||||
|
||||
return -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
// GetBlob returns a stream to read the blob.
|
||||
|
@ -1092,6 +1258,36 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int
|
|||
return nil, -1, err
|
||||
}
|
||||
|
||||
// is a 'deduped' blob
|
||||
if binfo.Size() == 0 && is.cache != nil {
|
||||
// Check blobs in cache
|
||||
dstRecord, err := is.checkCacheBlob(digest)
|
||||
if err == nil {
|
||||
binfo, err := is.store.Stat(context.Background(), dstRecord)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob")
|
||||
|
||||
// the actual blob on disk may have been removed by GC, so sync the cache
|
||||
if err := is.cache.DeleteBlob(digest, dstRecord); err != nil {
|
||||
is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record")
|
||||
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
return nil, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
blobReader, err := is.store.Reader(context.Background(), dstRecord, 0)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob")
|
||||
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
return blobReader, binfo.Size(), nil
|
||||
}
|
||||
}
|
||||
|
||||
return blobReader, binfo.Size(), nil
|
||||
}
|
||||
|
||||
|
@ -1158,6 +1354,44 @@ func (is *ObjectStorage) DeleteBlob(repo, digest string) error {
|
|||
return zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
if is.cache != nil {
|
||||
dstRecord, err := is.cache.GetBlob(digest)
|
||||
if err != nil && !errors.Is(err, zerr.ErrCacheMiss) {
|
||||
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to lookup blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// remove cache entry and move blob contents to the next candidate if there is any
|
||||
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// if the deleted blob is one with content
|
||||
if dstRecord == blobPath {
|
||||
// get next candidate
|
||||
dstRecord, err := is.cache.GetBlob(digest)
|
||||
if err != nil && !errors.Is(err, zerr.ErrCacheMiss) {
|
||||
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to lookup blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// if we have a new candidate move the blob content to it
|
||||
if dstRecord != "" {
|
||||
if err := is.store.Move(context.Background(), blobPath, dstRecord); err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := is.store.Delete(context.Background(), blobPath); err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path")
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commi
|
|||
}
|
||||
|
||||
if dedupe {
|
||||
imgStore.cache = NewCache(rootDir, "cache", log)
|
||||
imgStore.cache = NewCache(rootDir, "cache", true, log)
|
||||
}
|
||||
|
||||
if gc {
|
||||
|
|
|
@ -42,7 +42,7 @@ func skipIt(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStore, error) {
|
||||
func createObjectsStore(rootDir string, cacheDir string) (driver.StorageDriver, storage.ImageStore, error) {
|
||||
bucket := "zot-storage-test"
|
||||
endpoint := os.Getenv("S3MOCK_ENDPOINT")
|
||||
storageDriverParams := map[string]interface{}{
|
||||
|
@ -51,6 +51,8 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor
|
|||
"region": "us-east-2",
|
||||
"bucket": bucket,
|
||||
"regionendpoint": endpoint,
|
||||
"accesskey": "minioadmin",
|
||||
"secretkey": "minioadmin",
|
||||
"secure": false,
|
||||
"skipverify": false,
|
||||
}
|
||||
|
@ -71,7 +73,7 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor
|
|||
log := log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
|
||||
il := s3.NewImageStore(rootDir, false, storage.DefaultGCDelay, false, false, log, metrics, store)
|
||||
il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, true, false, log, metrics, store)
|
||||
|
||||
return store, il, err
|
||||
}
|
||||
|
@ -105,9 +107,10 @@ func TestStorageAPIs(t *testing.T) {
|
|||
}
|
||||
|
||||
testDir := path.Join("/oci-repo-test", uuid.String())
|
||||
tdir := t.TempDir()
|
||||
|
||||
var store driver.StorageDriver
|
||||
store, imgStore, _ = createObjectsStore(testDir)
|
||||
store, imgStore, _ = createObjectsStore(testDir, tdir)
|
||||
defer cleanupStorage(store, testDir)
|
||||
} else {
|
||||
dir := t.TempDir()
|
||||
|
@ -676,15 +679,15 @@ func TestStorageHandler(t *testing.T) {
|
|||
var thirdStorageDriver driver.StorageDriver
|
||||
|
||||
firstRootDir = "/util_test1"
|
||||
firstStorageDriver, firstStore, _ = createObjectsStore(firstRootDir)
|
||||
firstStorageDriver, firstStore, _ = createObjectsStore(firstRootDir, t.TempDir())
|
||||
defer cleanupStorage(firstStorageDriver, firstRootDir)
|
||||
|
||||
secondRootDir = "/util_test2"
|
||||
secondStorageDriver, secondStore, _ = createObjectsStore(secondRootDir)
|
||||
secondStorageDriver, secondStore, _ = createObjectsStore(secondRootDir, t.TempDir())
|
||||
defer cleanupStorage(secondStorageDriver, secondRootDir)
|
||||
|
||||
thirdRootDir = "/util_test3"
|
||||
thirdStorageDriver, thirdStore, _ = createObjectsStore(thirdRootDir)
|
||||
thirdStorageDriver, thirdStore, _ = createObjectsStore(thirdRootDir, t.TempDir())
|
||||
defer cleanupStorage(thirdStorageDriver, thirdRootDir)
|
||||
} else {
|
||||
// Create temporary directory
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
{
|
||||
"distSpecVersion": "1.0.0",
|
||||
"distSpecVersion": "1.0.1",
|
||||
"storage": {
|
||||
"rootDirectory": "/zot",
|
||||
"rootDirectory": "/tmp/zot",
|
||||
"gc": false,
|
||||
"dedupe": false,
|
||||
"storageDriver": {
|
||||
"name": "s3",
|
||||
"rootdirectory": "/zot",
|
||||
"region": "us-east-2",
|
||||
"bucket": "zot-storage",
|
||||
"regionendpoint": "http://localhost:9000",
|
||||
|
|
Loading…
Add table
Reference in a new issue