0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-13 22:50:38 -05:00

fix(s3): remove tracking multipart uploads (#883)

Remove sticky sessions from clustering

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu 2022-10-20 19:36:58 +03:00 committed by GitHub
parent 7f9052972d
commit 92d97d48d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 138 deletions

View file

@ -36,14 +36,13 @@ jobs:
arch: [amd64, arm64] arch: [amd64, arm64]
services: services:
s3mock: s3mock:
image: ghcr.io/project-zot/localstack/localstack:0.13.2 image: ghcr.io/project-zot/localstack/localstack:1.2.0
env: env:
SERVICES: s3 SERVICES: s3
ports: ports:
- 4563-4599:4563-4599 - 4563-4599:4563-4599
- 9090:8080 - 9090:8080
steps: steps:
- name: Install go - name: Install go
uses: actions/setup-go@v3 uses: actions/setup-go@v3
with: with:

View file

@ -37,20 +37,9 @@ defaults
frontend zot frontend zot
bind *:8080 bind *:8080
mode http mode http
acl write_methods method POST PATCH DELETE PUT default_backend zot-cluster
use_backend zot-stick-writes if write_methods
default_backend zot-reads
backend zot-stick-writes backend zot-cluster
mode http
balance leastconn
stick-table type ip size 1m expire 30m
stick on src
server zot1 127.0.0.1:8081 check
server zot2 127.0.0.1:8082 check
server zot3 127.0.0.1:8083 check
backend zot-reads
mode http mode http
balance roundrobin balance roundrobin
server zot1 127.0.0.1:8081 check server zot1 127.0.0.1:8081 check

View file

@ -42,14 +42,10 @@ type ObjectStorage struct {
lock *sync.RWMutex lock *sync.RWMutex
blobUploads map[string]storage.BlobUpload blobUploads map[string]storage.BlobUpload
log zerolog.Logger log zerolog.Logger
// We must keep track of multi part uploads to s3, because the lib metrics monitoring.MetricServer
// which we are using doesn't cancel multiparts uploads cache *storage.Cache
// see: https://github.com/distribution/distribution/blob/main/registry/storage/driver/s3-aws/s3.go#L545 dedupe bool
multiPartUploads sync.Map linter storage.Lint
metrics monitoring.MetricServer
cache *storage.Cache
dedupe bool
linter storage.Lint
} }
func (is *ObjectStorage) RootDir() string { func (is *ObjectStorage) RootDir() string {
@ -71,15 +67,14 @@ func NewImageStore(rootDir string, cacheDir string, gc bool, gcDelay time.Durati
store driver.StorageDriver, store driver.StorageDriver,
) storage.ImageStore { ) storage.ImageStore {
imgStore := &ObjectStorage{ imgStore := &ObjectStorage{
rootDir: rootDir, rootDir: rootDir,
store: store, store: store,
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
blobUploads: make(map[string]storage.BlobUpload), blobUploads: make(map[string]storage.BlobUpload),
log: log.With().Caller().Logger(), log: log.With().Caller().Logger(),
multiPartUploads: sync.Map{}, metrics: metrics,
metrics: metrics, dedupe: dedupe,
dedupe: dedupe, linter: linter,
linter: linter,
} }
cachePath := path.Join(cacheDir, CacheDBName+storage.DBExtensionName) cachePath := path.Join(cacheDir, CacheDBName+storage.DBExtensionName)
@ -546,11 +541,10 @@ func (is *ObjectStorage) NewBlobUpload(repo string) (string, error) {
blobUploadPath := is.BlobUploadPath(repo, uid) blobUploadPath := is.BlobUploadPath(repo, uid)
// here we should create an empty multi part upload, but that's not possible // create multipart upload (append false)
// so we just create a regular empty file which will be overwritten by FinishBlobUpload _, err = is.store.Writer(context.Background(), blobUploadPath, false)
err = is.store.PutContent(context.Background(), blobUploadPath, []byte{})
if err != nil { if err != nil {
return "", zerr.ErrRepoNotFound return "", err
} }
return uid, nil return uid, nil
@ -558,36 +552,18 @@ func (is *ObjectStorage) NewBlobUpload(repo string) (string, error) {
// GetBlobUpload returns the current size of a blob upload. // GetBlobUpload returns the current size of a blob upload.
func (is *ObjectStorage) GetBlobUpload(repo, uuid string) (int64, error) { func (is *ObjectStorage) GetBlobUpload(repo, uuid string) (int64, error) {
var fileSize int64
blobUploadPath := is.BlobUploadPath(repo, uuid) blobUploadPath := is.BlobUploadPath(repo, uuid)
// if it's not a multipart upload check for the regular empty file writer, err := is.store.Writer(context.Background(), blobUploadPath, true)
// created by NewBlobUpload, it should have 0 size every time if err != nil {
_, hasStarted := is.multiPartUploads.Load(blobUploadPath) if errors.As(err, &driver.PathNotFoundError{}) {
if !hasStarted { return -1, zerr.ErrUploadNotFound
binfo, err := is.store.Stat(context.Background(), blobUploadPath)
if err != nil {
var perr driver.PathNotFoundError
if errors.As(err, &perr) {
return -1, zerr.ErrUploadNotFound
}
return -1, err
} }
fileSize = binfo.Size() return -1, err
} else {
// otherwise get the size of multi parts upload
fi, err := getMultipartFileWriter(is, blobUploadPath)
if err != nil {
return -1, err
}
fileSize = fi.Size()
} }
return fileSize, nil return writer.Size(), nil
} }
// PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns // PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns
@ -599,14 +575,13 @@ func (is *ObjectStorage) PutBlobChunkStreamed(repo, uuid string, body io.Reader)
blobUploadPath := is.BlobUploadPath(repo, uuid) blobUploadPath := is.BlobUploadPath(repo, uuid)
_, err := is.store.Stat(context.Background(), blobUploadPath) file, err := is.store.Writer(context.Background(), blobUploadPath, true)
if err != nil { if err != nil {
return -1, zerr.ErrUploadNotFound if errors.As(err, &driver.PathNotFoundError{}) {
} return -1, zerr.ErrUploadNotFound
}
file, err := getMultipartFileWriter(is, blobUploadPath) is.log.Error().Err(err).Msg("failed to continue multipart upload")
if err != nil {
is.log.Error().Err(err).Msg("failed to create multipart upload")
return -1, err return -1, err
} }
@ -643,14 +618,13 @@ func (is *ObjectStorage) PutBlobChunk(repo, uuid string, from, to int64,
blobUploadPath := is.BlobUploadPath(repo, uuid) blobUploadPath := is.BlobUploadPath(repo, uuid)
_, err := is.store.Stat(context.Background(), blobUploadPath) file, err := is.store.Writer(context.Background(), blobUploadPath, true)
if err != nil { if err != nil {
return -1, zerr.ErrUploadNotFound if errors.As(err, &driver.PathNotFoundError{}) {
} return -1, zerr.ErrUploadNotFound
}
file, err := getMultipartFileWriter(is, blobUploadPath) is.log.Error().Err(err).Msg("failed to continue multipart upload")
if err != nil {
is.log.Error().Err(err).Msg("failed to create multipart upload")
return -1, err return -1, err
} }
@ -658,16 +632,6 @@ func (is *ObjectStorage) PutBlobChunk(repo, uuid string, from, to int64,
defer file.Close() defer file.Close()
if from != file.Size() { if from != file.Size() {
// cancel multipart upload
is.multiPartUploads.Delete(blobUploadPath)
err := file.Cancel()
if err != nil {
is.log.Error().Err(err).Msg("failed to cancel multipart upload")
return -1, err
}
is.log.Error().Int64("expected", from).Int64("actual", file.Size()). is.log.Error().Int64("expected", from).Int64("actual", file.Size()).
Msg("invalid range start for blob upload") Msg("invalid range start for blob upload")
@ -695,35 +659,18 @@ func (is *ObjectStorage) PutBlobChunk(repo, uuid string, from, to int64,
// BlobUploadInfo returns the current blob size in bytes. // BlobUploadInfo returns the current blob size in bytes.
func (is *ObjectStorage) BlobUploadInfo(repo, uuid string) (int64, error) { func (is *ObjectStorage) BlobUploadInfo(repo, uuid string) (int64, error) {
var fileSize int64
blobUploadPath := is.BlobUploadPath(repo, uuid) blobUploadPath := is.BlobUploadPath(repo, uuid)
// if it's not a multipart upload check for the regular empty file writer, err := is.store.Writer(context.Background(), blobUploadPath, true)
// created by NewBlobUpload, it should have 0 size every time if err != nil {
_, hasStarted := is.multiPartUploads.Load(blobUploadPath) if errors.As(err, &driver.PathNotFoundError{}) {
if !hasStarted { return -1, zerr.ErrUploadNotFound
uploadInfo, err := is.store.Stat(context.Background(), blobUploadPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob")
return -1, err
} }
fileSize = uploadInfo.Size() return -1, err
} else {
// otherwise get the size of multi parts upload
binfo, err := getMultipartFileWriter(is, blobUploadPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob")
return -1, err
}
fileSize = binfo.Size()
} }
return fileSize, nil return writer.Size(), nil
} }
// FinishBlobUpload finalizes the blob upload and moves blob the repository. // FinishBlobUpload finalizes the blob upload and moves blob the repository.
@ -803,8 +750,6 @@ func (is *ObjectStorage) FinishBlobUpload(repo, uuid string, body io.Reader, dig
} }
} }
is.multiPartUploads.Delete(src)
return nil return nil
} }
@ -978,7 +923,19 @@ func (is *ObjectStorage) RunGCPeriodically(interval time.Duration, sch *schedule
// DeleteBlobUpload deletes an existing blob upload that is currently in progress. // DeleteBlobUpload deletes an existing blob upload that is currently in progress.
func (is *ObjectStorage) DeleteBlobUpload(repo, uuid string) error { func (is *ObjectStorage) DeleteBlobUpload(repo, uuid string) error {
blobUploadPath := is.BlobUploadPath(repo, uuid) blobUploadPath := is.BlobUploadPath(repo, uuid)
if err := is.store.Delete(context.Background(), blobUploadPath); err != nil {
writer, err := is.store.Writer(context.Background(), blobUploadPath, true)
if err != nil {
if errors.As(err, &driver.PathNotFoundError{}) {
return zerr.ErrUploadNotFound
}
return err
}
defer writer.Close()
if err := writer.Cancel(); err != nil {
is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload") is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload")
return err return err
@ -1398,29 +1355,3 @@ func writeFile(store driver.StorageDriver, filepath string, buf []byte) (int, er
return n, nil return n, nil
} }
// get a multipart upload FileWriter based on wheather or not one has already been started.
func getMultipartFileWriter(imgStore *ObjectStorage, filepath string) (driver.FileWriter, error) {
var file driver.FileWriter
var err error
_, hasStarted := imgStore.multiPartUploads.Load(filepath)
if !hasStarted {
// start multipart upload
file, err = imgStore.store.Writer(context.Background(), filepath, false)
if err != nil {
return file, err
}
imgStore.multiPartUploads.Store(filepath, true)
} else {
// continue multipart upload
file, err = imgStore.store.Writer(context.Background(), filepath, true)
if err != nil {
return file, err
}
}
return file, nil
}

View file

@ -637,8 +637,8 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
Convey("Test NewBlobUpload", func(c C) { Convey("Test NewBlobUpload", func(c C) {
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
PutContentFn: func(ctx context.Context, path string, content []byte) error { WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
return errS3 return nil, errS3
}, },
}) })
_, err := imgStore.NewBlobUpload(testImage) _, err := imgStore.NewBlobUpload(testImage)
@ -647,14 +647,24 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
Convey("Test GetBlobUpload", func(c C) { Convey("Test GetBlobUpload", func(c C) {
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
return &FileInfoMock{}, errS3 return nil, errS3
}, },
}) })
_, err := imgStore.GetBlobUpload(testImage, "uuid") _, err := imgStore.GetBlobUpload(testImage, "uuid")
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("Test BlobUploadInfo", func(c C) {
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
return nil, errS3
},
})
_, err := imgStore.BlobUploadInfo(testImage, "uuid")
So(err, ShouldNotBeNil)
})
Convey("Test PutBlobChunkStreamed", func(c C) { Convey("Test PutBlobChunkStreamed", func(c C) {
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
@ -718,6 +728,16 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("Test PutBlobChunk4", func(c C) {
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
return &FileWriterMock{}, driver.PathNotFoundError{}
},
})
_, err := imgStore.PutBlobChunk(testImage, "uuid", 0, 100, io.NopCloser(strings.NewReader("")))
So(err, ShouldNotBeNil)
})
Convey("Test FinishBlobUpload", func(c C) { Convey("Test FinishBlobUpload", func(c C) {
imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{
WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {