diff --git a/.github/workflows/ci-cd.yml b/.github/workflows/ci-cd.yml index 0fcd5396..4e2277e8 100644 --- a/.github/workflows/ci-cd.yml +++ b/.github/workflows/ci-cd.yml @@ -36,14 +36,13 @@ jobs: arch: [amd64, arm64] services: s3mock: - image: ghcr.io/project-zot/localstack/localstack:0.13.2 + image: ghcr.io/project-zot/localstack/localstack:1.2.0 env: SERVICES: s3 ports: - 4563-4599:4563-4599 - 9090:8080 steps: - - name: Install go uses: actions/setup-go@v3 with: diff --git a/examples/cluster/haproxy.cfg b/examples/cluster/haproxy.cfg index ac1d1048..279bb290 100644 --- a/examples/cluster/haproxy.cfg +++ b/examples/cluster/haproxy.cfg @@ -37,20 +37,9 @@ defaults frontend zot bind *:8080 mode http - acl write_methods method POST PATCH DELETE PUT - use_backend zot-stick-writes if write_methods - default_backend zot-reads + default_backend zot-cluster -backend zot-stick-writes - mode http - balance leastconn - stick-table type ip size 1m expire 30m - stick on src - server zot1 127.0.0.1:8081 check - server zot2 127.0.0.1:8082 check - server zot3 127.0.0.1:8083 check - -backend zot-reads +backend zot-cluster mode http balance roundrobin server zot1 127.0.0.1:8081 check diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index 4678784c..c8e28d1b 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -42,14 +42,10 @@ type ObjectStorage struct { lock *sync.RWMutex blobUploads map[string]storage.BlobUpload log zerolog.Logger - // We must keep track of multi part uploads to s3, because the lib - // which we are using doesn't cancel multiparts uploads - // see: https://github.com/distribution/distribution/blob/main/registry/storage/driver/s3-aws/s3.go#L545 - multiPartUploads sync.Map - metrics monitoring.MetricServer - cache *storage.Cache - dedupe bool - linter storage.Lint + metrics monitoring.MetricServer + cache *storage.Cache + dedupe bool + linter storage.Lint } func (is *ObjectStorage) RootDir() string { @@ -71,15 +67,14 @@ func NewImageStore(rootDir string, cacheDir string, gc bool, gcDelay time.Durati store driver.StorageDriver, ) storage.ImageStore { imgStore := &ObjectStorage{ - rootDir: rootDir, - store: store, - lock: &sync.RWMutex{}, - blobUploads: make(map[string]storage.BlobUpload), - log: log.With().Caller().Logger(), - multiPartUploads: sync.Map{}, - metrics: metrics, - dedupe: dedupe, - linter: linter, + rootDir: rootDir, + store: store, + lock: &sync.RWMutex{}, + blobUploads: make(map[string]storage.BlobUpload), + log: log.With().Caller().Logger(), + metrics: metrics, + dedupe: dedupe, + linter: linter, } cachePath := path.Join(cacheDir, CacheDBName+storage.DBExtensionName) @@ -546,11 +541,10 @@ func (is *ObjectStorage) NewBlobUpload(repo string) (string, error) { blobUploadPath := is.BlobUploadPath(repo, uid) - // here we should create an empty multi part upload, but that's not possible - // so we just create a regular empty file which will be overwritten by FinishBlobUpload - err = is.store.PutContent(context.Background(), blobUploadPath, []byte{}) + // create multipart upload (append false) + _, err = is.store.Writer(context.Background(), blobUploadPath, false) if err != nil { - return "", zerr.ErrRepoNotFound + return "", err } return uid, nil @@ -558,36 +552,18 @@ func (is *ObjectStorage) NewBlobUpload(repo string) (string, error) { // GetBlobUpload returns the current size of a blob upload. func (is *ObjectStorage) GetBlobUpload(repo, uuid string) (int64, error) { - var fileSize int64 - blobUploadPath := is.BlobUploadPath(repo, uuid) - // if it's not a multipart upload check for the regular empty file - // created by NewBlobUpload, it should have 0 size every time - _, hasStarted := is.multiPartUploads.Load(blobUploadPath) - if !hasStarted { - binfo, err := is.store.Stat(context.Background(), blobUploadPath) - if err != nil { - var perr driver.PathNotFoundError - if errors.As(err, &perr) { - return -1, zerr.ErrUploadNotFound - } - - return -1, err + writer, err := is.store.Writer(context.Background(), blobUploadPath, true) + if err != nil { + if errors.As(err, &driver.PathNotFoundError{}) { + return -1, zerr.ErrUploadNotFound } - fileSize = binfo.Size() - } else { - // otherwise get the size of multi parts upload - fi, err := getMultipartFileWriter(is, blobUploadPath) - if err != nil { - return -1, err - } - - fileSize = fi.Size() + return -1, err } - return fileSize, nil + return writer.Size(), nil } // PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns @@ -599,14 +575,13 @@ func (is *ObjectStorage) PutBlobChunkStreamed(repo, uuid string, body io.Reader) blobUploadPath := is.BlobUploadPath(repo, uuid) - _, err := is.store.Stat(context.Background(), blobUploadPath) + file, err := is.store.Writer(context.Background(), blobUploadPath, true) if err != nil { - return -1, zerr.ErrUploadNotFound - } + if errors.As(err, &driver.PathNotFoundError{}) { + return -1, zerr.ErrUploadNotFound + } - file, err := getMultipartFileWriter(is, blobUploadPath) - if err != nil { - is.log.Error().Err(err).Msg("failed to create multipart upload") + is.log.Error().Err(err).Msg("failed to continue multipart upload") return -1, err } @@ -643,14 +618,13 @@ func (is *ObjectStorage) PutBlobChunk(repo, uuid string, from, to int64, blobUploadPath := is.BlobUploadPath(repo, uuid) - _, err := is.store.Stat(context.Background(), blobUploadPath) + file, err := is.store.Writer(context.Background(), blobUploadPath, true) if err != nil { - return -1, zerr.ErrUploadNotFound - } + if errors.As(err, &driver.PathNotFoundError{}) { + return -1, zerr.ErrUploadNotFound + } - file, err := getMultipartFileWriter(is, blobUploadPath) - if err != nil { - is.log.Error().Err(err).Msg("failed to create multipart upload") + is.log.Error().Err(err).Msg("failed to continue multipart upload") return -1, err } @@ -658,16 +632,6 @@ func (is *ObjectStorage) PutBlobChunk(repo, uuid string, from, to int64, defer file.Close() if from != file.Size() { - // cancel multipart upload - is.multiPartUploads.Delete(blobUploadPath) - - err := file.Cancel() - if err != nil { - is.log.Error().Err(err).Msg("failed to cancel multipart upload") - - return -1, err - } - is.log.Error().Int64("expected", from).Int64("actual", file.Size()). Msg("invalid range start for blob upload") @@ -695,35 +659,18 @@ func (is *ObjectStorage) PutBlobChunk(repo, uuid string, from, to int64, // BlobUploadInfo returns the current blob size in bytes. func (is *ObjectStorage) BlobUploadInfo(repo, uuid string) (int64, error) { - var fileSize int64 - blobUploadPath := is.BlobUploadPath(repo, uuid) - // if it's not a multipart upload check for the regular empty file - // created by NewBlobUpload, it should have 0 size every time - _, hasStarted := is.multiPartUploads.Load(blobUploadPath) - if !hasStarted { - uploadInfo, err := is.store.Stat(context.Background(), blobUploadPath) - if err != nil { - is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob") - - return -1, err + writer, err := is.store.Writer(context.Background(), blobUploadPath, true) + if err != nil { + if errors.As(err, &driver.PathNotFoundError{}) { + return -1, zerr.ErrUploadNotFound } - fileSize = uploadInfo.Size() - } else { - // otherwise get the size of multi parts upload - binfo, err := getMultipartFileWriter(is, blobUploadPath) - if err != nil { - is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob") - - return -1, err - } - - fileSize = binfo.Size() + return -1, err } - return fileSize, nil + return writer.Size(), nil } // FinishBlobUpload finalizes the blob upload and moves blob the repository. @@ -803,8 +750,6 @@ func (is *ObjectStorage) FinishBlobUpload(repo, uuid string, body io.Reader, dig } } - is.multiPartUploads.Delete(src) - return nil } @@ -978,7 +923,19 @@ func (is *ObjectStorage) RunGCPeriodically(interval time.Duration, sch *schedule // DeleteBlobUpload deletes an existing blob upload that is currently in progress. func (is *ObjectStorage) DeleteBlobUpload(repo, uuid string) error { blobUploadPath := is.BlobUploadPath(repo, uuid) - if err := is.store.Delete(context.Background(), blobUploadPath); err != nil { + + writer, err := is.store.Writer(context.Background(), blobUploadPath, true) + if err != nil { + if errors.As(err, &driver.PathNotFoundError{}) { + return zerr.ErrUploadNotFound + } + + return err + } + + defer writer.Close() + + if err := writer.Cancel(); err != nil { is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload") return err @@ -1398,29 +1355,3 @@ func writeFile(store driver.StorageDriver, filepath string, buf []byte) (int, er return n, nil } - -// get a multipart upload FileWriter based on wheather or not one has already been started. -func getMultipartFileWriter(imgStore *ObjectStorage, filepath string) (driver.FileWriter, error) { - var file driver.FileWriter - - var err error - - _, hasStarted := imgStore.multiPartUploads.Load(filepath) - if !hasStarted { - // start multipart upload - file, err = imgStore.store.Writer(context.Background(), filepath, false) - if err != nil { - return file, err - } - - imgStore.multiPartUploads.Store(filepath, true) - } else { - // continue multipart upload - file, err = imgStore.store.Writer(context.Background(), filepath, true) - if err != nil { - return file, err - } - } - - return file, nil -} diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index e0b215f8..defcf9b9 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -637,8 +637,8 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { Convey("Test NewBlobUpload", func(c C) { imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ - PutContentFn: func(ctx context.Context, path string, content []byte) error { - return errS3 + WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { + return nil, errS3 }, }) _, err := imgStore.NewBlobUpload(testImage) @@ -647,14 +647,24 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { Convey("Test GetBlobUpload", func(c C) { imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ - StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { - return &FileInfoMock{}, errS3 + WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { + return nil, errS3 }, }) _, err := imgStore.GetBlobUpload(testImage, "uuid") So(err, ShouldNotBeNil) }) + Convey("Test BlobUploadInfo", func(c C) { + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ + WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { + return nil, errS3 + }, + }) + _, err := imgStore.BlobUploadInfo(testImage, "uuid") + So(err, ShouldNotBeNil) + }) + Convey("Test PutBlobChunkStreamed", func(c C) { imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { @@ -718,6 +728,16 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { So(err, ShouldNotBeNil) }) + Convey("Test PutBlobChunk4", func(c C) { + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ + WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { + return &FileWriterMock{}, driver.PathNotFoundError{} + }, + }) + _, err := imgStore.PutBlobChunk(testImage, "uuid", 0, 100, io.NopCloser(strings.NewReader(""))) + So(err, ShouldNotBeNil) + }) + Convey("Test FinishBlobUpload", func(c C) { imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {