diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index b3e9e553..2bac4dca 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -45,8 +45,8 @@ type ObjectStorage struct { // We must keep track of multi part uploads to s3, because the lib // which we are using doesn't cancel multiparts uploads // see: https://github.com/distribution/distribution/blob/main/registry/storage/driver/s3-aws/s3.go#L545 - isMultiPartUpload map[string]bool - metrics monitoring.MetricServer + multiPartUploads sync.Map + metrics monitoring.MetricServer } func (is *ObjectStorage) RootDir() string { @@ -67,13 +67,13 @@ func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commi log zlog.Logger, metrics monitoring.MetricServer, store driver.StorageDriver) storage.ImageStore { imgStore := &ObjectStorage{ - rootDir: rootDir, - store: store, - lock: &sync.RWMutex{}, - blobUploads: make(map[string]storage.BlobUpload), - log: log.With().Caller().Logger(), - isMultiPartUpload: make(map[string]bool), - metrics: metrics, + rootDir: rootDir, + store: store, + lock: &sync.RWMutex{}, + blobUploads: make(map[string]storage.BlobUpload), + log: log.With().Caller().Logger(), + multiPartUploads: sync.Map{}, + metrics: metrics, } return imgStore @@ -281,16 +281,11 @@ func (is *ObjectStorage) GetRepositories() ([]string, error) { // GetImageTags returns a list of image tags available in the specified repository. func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) { - var lockLatency time.Time - dir := path.Join(is.rootDir, repo) if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() { return nil, zerr.ErrRepoNotFound } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - buf, err := is.GetIndexContent(repo) if err != nil { return nil, err @@ -324,9 +319,6 @@ func (is *ObjectStorage) GetImageManifest(repo string, reference string) ([]byte return nil, "", "", zerr.ErrRepoNotFound } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - buf, err := is.GetIndexContent(repo) if err != nil { return nil, "", "", err @@ -368,11 +360,14 @@ func (is *ObjectStorage) GetImageManifest(repo string, reference string) ([]byte return nil, "", "", zerr.ErrManifestNotFound } - p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) + manifestPath := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) - buf, err = is.store.GetContent(context.Background(), p) + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + + buf, err = is.store.GetContent(context.Background(), manifestPath) if err != nil { - is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest") + is.log.Error().Err(err).Str("blob", manifestPath).Msg("failed to read manifest") return nil, "", "", err } @@ -451,11 +446,6 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy refIsDigest = true } - var lockLatency time.Time - - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) - dir := path.Join(is.rootDir, repo) buf, err := is.GetIndexContent(repo) @@ -463,6 +453,11 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy return "", err } + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + var index ispec.Index if err := json.Unmarshal(buf, &index); err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") @@ -574,9 +569,6 @@ func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) erro isTag = true } - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) - buf, err := is.GetIndexContent(repo) if err != nil { return err @@ -623,6 +615,9 @@ func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) erro return zerr.ErrManifestNotFound } + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + // now update "index.json" dir = path.Join(is.rootDir, repo) file := path.Join(dir, "index.json") @@ -707,8 +702,8 @@ func (is *ObjectStorage) GetBlobUpload(repo string, uuid string) (int64, error) // if it's not a multipart upload check for the regular empty file // created by NewBlobUpload, it should have 0 size every time - isMultiPartStarted, ok := is.isMultiPartUpload[blobUploadPath] - if !isMultiPartStarted || !ok { + _, hasStarted := is.multiPartUploads.Load(blobUploadPath) + if !hasStarted { binfo, err := is.store.Stat(context.Background(), blobUploadPath) if err != nil { var perr driver.PathNotFoundError @@ -800,7 +795,9 @@ func (is *ObjectStorage) PutBlobChunk(repo string, uuid string, from int64, to i defer file.Close() if from != file.Size() { - // cancel multipart upload created earlier + // cancel multipart upload + is.multiPartUploads.Delete(blobUploadPath) + err := file.Cancel() if err != nil { is.log.Error().Err(err).Msg("failed to cancel multipart upload") @@ -830,8 +827,6 @@ func (is *ObjectStorage) PutBlobChunk(repo string, uuid string, from int64, to i return -1, err } - is.isMultiPartUpload[blobUploadPath] = true - return int64(nbytes), err } @@ -843,8 +838,8 @@ func (is *ObjectStorage) BlobUploadInfo(repo string, uuid string) (int64, error) // if it's not a multipart upload check for the regular empty file // created by NewBlobUpload, it should have 0 size every time - isMultiPartStarted, ok := is.isMultiPartUpload[blobUploadPath] - if !isMultiPartStarted || !ok { + _, hasStarted := is.multiPartUploads.Load(blobUploadPath) + if !hasStarted { uploadInfo, err := is.store.Stat(context.Background(), blobUploadPath) if err != nil { is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob") @@ -887,6 +882,8 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read return zerr.ErrBadBlobDigest } + defer fileWriter.Close() + if err := fileWriter.Commit(); err != nil { is.log.Error().Err(err).Msg("failed to commit file") @@ -922,6 +919,11 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read dst := is.BlobPath(repo, dstDigest) + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + if err := is.store.Move(context.Background(), src, dst); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to finish blob") @@ -929,8 +931,7 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read return err } - // remove multipart upload, not needed anymore - delete(is.isMultiPartUpload, src) + is.multiPartUploads.Delete(src) return nil } @@ -1115,8 +1116,13 @@ func (is *ObjectStorage) GetReferrers(repo, digest string, mediaType string) ([] } func (is *ObjectStorage) GetIndexContent(repo string) ([]byte, error) { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + buf, err := is.store.GetContent(context.Background(), path.Join(dir, "index.json")) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") @@ -1181,21 +1187,23 @@ func writeFile(store driver.StorageDriver, filepath string, buf []byte) (int, er return n, nil } -// Because we can not create an empty multipart upload, we store multi part uploads -// so that we know when to create a fileWriter with append=true or with append=false -// Trying and handling errors results in weird s3 api errors. +// get a multipart upload FileWriter based on wheather or not one has already been started. func getMultipartFileWriter(imgStore *ObjectStorage, filepath string) (driver.FileWriter, error) { var file driver.FileWriter var err error - isMultiPartStarted, ok := imgStore.isMultiPartUpload[filepath] - if !isMultiPartStarted || !ok { + _, hasStarted := imgStore.multiPartUploads.Load(filepath) + if !hasStarted { + // start multipart upload file, err = imgStore.store.Writer(context.Background(), filepath, false) if err != nil { return file, err } + + imgStore.multiPartUploads.Store(filepath, true) } else { + // continue multipart upload file, err = imgStore.store.Writer(context.Background(), filepath, true) if err != nil { return file, err diff --git a/pkg/storage/storage_fs.go b/pkg/storage/storage_fs.go index 55b22a81..d69d0d6f 100644 --- a/pkg/storage/storage_fs.go +++ b/pkg/storage/storage_fs.go @@ -1387,8 +1387,13 @@ func (is *ImageStoreFS) GetBlobContent(repo string, digest string) ([]byte, erro } func (is *ImageStoreFS) GetIndexContent(repo string) ([]byte, error) { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { if os.IsNotExist(err) {