From bd9ad998cd2137e3610d9a2d59ee2cf47b1c9f5f Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Fri, 19 Aug 2022 10:38:59 +0000 Subject: [PATCH] Fix file handlers not being closed after calls to ImageStore.GetBlob This is to fixes hitting the FD limit when reading blobs from the disk in the graphql API Signed-off-by: Andrei Aaron --- pkg/api/routes.go | 1 + pkg/api/routes_test.go | 8 ++-- pkg/extensions/sync/utils.go | 64 ++++++++++++++++-------------- pkg/storage/local.go | 8 ++-- pkg/storage/local_test.go | 13 ++++-- pkg/storage/s3/s3.go | 12 +++--- pkg/storage/s3/s3_test.go | 15 +++++-- pkg/storage/storage.go | 2 +- pkg/storage/storage_test.go | 8 +++- pkg/test/mocks/image_store_mock.go | 6 +-- 10 files changed, 83 insertions(+), 54 deletions(-) diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 290c0b63..0c1f94d9 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -655,6 +655,7 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ return } + defer repo.Close() response.Header().Set("Content-Length", fmt.Sprintf("%d", blen)) response.Header().Set(constants.DistContentDigestKey, digest) diff --git a/pkg/api/routes_test.go b/pkg/api/routes_test.go index 459f69af..13a5e44f 100644 --- a/pkg/api/routes_test.go +++ b/pkg/api/routes_test.go @@ -405,8 +405,8 @@ func TestRoutes(t *testing.T) { "digest": "sha256:7a0437f04f83f084b7ed68ad9c4a4947e12fc4e1b006b38129bac89114ec3621", }, &mocks.MockedImageStore{ - GetBlobFn: func(repo, digest, mediaType string) (io.Reader, int64, error) { - return bytes.NewBuffer([]byte("")), 0, zerr.ErrRepoNotFound + GetBlobFn: func(repo, digest, mediaType string) (io.ReadCloser, int64, error) { + return io.NopCloser(bytes.NewBuffer([]byte(""))), 0, zerr.ErrRepoNotFound }, }) So(statusCode, ShouldEqual, http.StatusNotFound) @@ -418,8 +418,8 @@ func TestRoutes(t *testing.T) { "digest": "sha256:7a0437f04f83f084b7ed68ad9c4a4947e12fc4e1b006b38129bac89114ec3621", }, &mocks.MockedImageStore{ - GetBlobFn: func(repo, digest, mediaType string) (io.Reader, int64, error) { - return bytes.NewBuffer([]byte("")), 0, zerr.ErrBadBlobDigest + GetBlobFn: func(repo, digest, mediaType string) (io.ReadCloser, int64, error) { + return io.NopCloser(bytes.NewBuffer([]byte(""))), 0, zerr.ErrBadBlobDigest }, }) So(statusCode, ShouldEqual, http.StatusBadRequest) diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index 6d461fd8..b37b4c31 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -294,45 +294,19 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string, } for _, blob := range manifest.Layers { - blobReader, _, err := cacheImageStore.GetBlob(localRepo, blob.Digest.String(), blob.MediaType) + err = copyBlob(localRepo, blob.Digest.String(), blob.MediaType, + cacheImageStore, imageStore, log) if err != nil { - log.Error().Str("errorType", TypeOf(err)). - Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), - localRepo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob") - return err } - - if found, _, _ := imageStore.CheckBlob(localRepo, blob.Digest.String()); !found { - _, _, err = imageStore.FullBlobUpload(localRepo, blobReader, blob.Digest.String()) - if err != nil { - log.Error().Str("errorType", TypeOf(err)). - Err(err).Str("blob digest", blob.Digest.String()).Msg("couldn't upload blob") - - return err - } - } } - blobReader, _, err := cacheImageStore.GetBlob(localRepo, manifest.Config.Digest.String(), manifest.Config.MediaType) + err = copyBlob(localRepo, manifest.Config.Digest.String(), manifest.Config.MediaType, + cacheImageStore, imageStore, log) if err != nil { - log.Error().Str("errorType", TypeOf(err)). - Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), - localRepo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob") - return err } - if found, _, _ := imageStore.CheckBlob(localRepo, manifest.Config.Digest.String()); !found { - _, _, err = imageStore.FullBlobUpload(localRepo, blobReader, manifest.Config.Digest.String()) - if err != nil { - log.Error().Str("errorType", TypeOf(err)). - Err(err).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't upload config blob") - - return err - } - } - _, err = imageStore.PutImageManifest(localRepo, tag, ispec.MediaTypeImageManifest, manifestContent) if err != nil { @@ -352,6 +326,36 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string, return nil } +// Copy a blob from one image store to another image store. +func copyBlob(localRepo, blobDigest, blobMediaType string, + souceImageStore, destinationImageStore storage.ImageStore, log log.Logger, +) error { + if found, _, _ := destinationImageStore.CheckBlob(localRepo, blobDigest); found { + // Blob is already at destination, nothing to do + return nil + } + + blobReadCloser, _, err := souceImageStore.GetBlob(localRepo, blobDigest, blobMediaType) + if err != nil { + log.Error().Str("errorType", TypeOf(err)).Err(err). + Str("dir", path.Join(souceImageStore.RootDir(), localRepo)). + Str("blob digest", blobDigest).Str("media type", blobMediaType). + Msg("couldn't read blob") + + return err + } + defer blobReadCloser.Close() + + _, _, err = destinationImageStore.FullBlobUpload(localRepo, blobReadCloser, blobDigest) + if err != nil { + log.Error().Str("errorType", TypeOf(err)).Err(err). + Str("blob digest", blobDigest).Str("media type", blobMediaType). + Msg("couldn't upload blob") + } + + return err +} + // sync needs transport to be stripped to not be wrongly interpreted as an image reference // at a non-fully qualified registry (hostname as image and port as tag). func StripRegistryTransport(url string) string { diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 59d3e6dc..0ffddf76 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -1351,7 +1351,7 @@ func (is *ImageStoreLocal) copyBlob(repo, blobPath, dstRecord string) (int64, er // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. -func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.Reader, int64, error) { +func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) { var lockLatency time.Time parsedDigest, err := godigest.Parse(digest) @@ -1373,14 +1373,15 @@ func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.Reader, i return nil, -1, zerr.ErrBlobNotFound } - blobReader, err := os.Open(blobPath) + blobReadCloser, err := os.Open(blobPath) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") return nil, -1, err } - return blobReader, binfo.Size(), nil + // The caller function is responsible for calling Close() + return blobReadCloser, binfo.Size(), nil } func (is *ImageStoreLocal) GetBlobContent(repo, digest string) ([]byte, error) { @@ -1388,6 +1389,7 @@ func (is *ImageStoreLocal) GetBlobContent(repo, digest string) ([]byte, error) { if err != nil { return []byte{}, err } + defer blob.Close() buf := new(bytes.Buffer) diff --git a/pkg/storage/local_test.go b/pkg/storage/local_test.go index e6a6552e..d384e355 100644 --- a/pkg/storage/local_test.go +++ b/pkg/storage/local_test.go @@ -751,13 +751,16 @@ func FuzzGetBlob(f *testing.F) { t.Error(err) } - _, _, err = imgStore.GetBlob(repoName, digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blobReadCloser, _, err := imgStore.GetBlob(repoName, digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") if err != nil { if isKnownErr(err) { return } t.Error(err) } + if err = blobReadCloser.Close(); err != nil { + t.Error(err) + } }) } @@ -951,7 +954,9 @@ func TestDedupeLinks(t *testing.T) { _, _, err = imgStore.CheckBlob("dedupe1", digest.String()) So(err, ShouldBeNil) - _, _, err = imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blobrc, _, err := imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + err = blobrc.Close() So(err, ShouldBeNil) cblob, cdigest := test.GetRandomImageConfig() @@ -1009,7 +1014,9 @@ func TestDedupeLinks(t *testing.T) { _, _, err = imgStore.CheckBlob("dedupe2", digest.String()) So(err, ShouldBeNil) - _, _, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blobrc, _, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + err = blobrc.Close() So(err, ShouldBeNil) cblob, cdigest = test.GetRandomImageConfig() diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index 90533319..fa1b0c53 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -1253,7 +1253,7 @@ func (is *ObjectStorage) copyBlob(repo string, blobPath string, dstRecord string // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. -func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int64, error) { +func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) { var lockLatency time.Time dgst, err := godigest.Parse(digest) @@ -1275,7 +1275,7 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int return nil, -1, zerr.ErrBlobNotFound } - blobReader, err := is.store.Reader(context.Background(), blobPath, 0) + blobReadCloser, err := is.store.Reader(context.Background(), blobPath, 0) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") @@ -1301,18 +1301,19 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int return nil, -1, zerr.ErrBlobNotFound } - blobReader, err := is.store.Reader(context.Background(), dstRecord, 0) + blobReadCloser, err := is.store.Reader(context.Background(), dstRecord, 0) if err != nil { is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob") return nil, -1, err } - return blobReader, binfo.Size(), nil + return blobReadCloser, binfo.Size(), nil } } - return blobReader, binfo.Size(), nil + // The caller function is responsible for calling Close() + return blobReadCloser, binfo.Size(), nil } func (is *ObjectStorage) GetBlobContent(repo, digest string) ([]byte, error) { @@ -1320,6 +1321,7 @@ func (is *ObjectStorage) GetBlobContent(repo, digest string) ([]byte, error) { if err != nil { return []byte{}, err } + defer blob.Close() buf := new(bytes.Buffer) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index c9adf650..1065c367 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -866,9 +866,12 @@ func TestS3Dedupe(t *testing.T) { So(checkBlobSize1, ShouldBeGreaterThan, 0) So(err, ShouldBeNil) - _, getBlobSize1, err := imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blobReadCloser, getBlobSize1, err := imgStore.GetBlob("dedupe1", digest.String(), + "application/vnd.oci.image.layer.v1.tar+gzip") So(getBlobSize1, ShouldBeGreaterThan, 0) So(err, ShouldBeNil) + err = blobReadCloser.Close() + So(err, ShouldBeNil) cblob, cdigest := test.GetRandomImageConfig() _, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest.String()) @@ -928,11 +931,14 @@ func TestS3Dedupe(t *testing.T) { So(err, ShouldBeNil) So(checkBlobSize2, ShouldBeGreaterThan, 0) - _, getBlobSize2, err := imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blobReadCloser, getBlobSize2, err := imgStore.GetBlob("dedupe2", digest.String(), + "application/vnd.oci.image.layer.v1.tar+gzip") So(err, ShouldBeNil) So(getBlobSize2, ShouldBeGreaterThan, 0) So(checkBlobSize1, ShouldEqual, checkBlobSize2) So(getBlobSize1, ShouldEqual, getBlobSize2) + err = blobReadCloser.Close() + So(err, ShouldBeNil) cblob, cdigest = test.GetRandomImageConfig() _, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest.String()) @@ -1039,9 +1045,12 @@ func TestS3Dedupe(t *testing.T) { So(err, ShouldBeNil) // check that we retrieve the real dedupe2/blob (which is deduped earlier - 0 size) when switching to dedupe false - _, getBlobSize2, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blobReadCloser, getBlobSize2, err = imgStore.GetBlob("dedupe2", digest.String(), + "application/vnd.oci.image.layer.v1.tar+gzip") So(err, ShouldBeNil) So(getBlobSize1, ShouldEqual, getBlobSize2) + err = blobReadCloser.Close() + So(err, ShouldBeNil) _, checkBlobSize2, err := imgStore.CheckBlob("dedupe2", digest.String()) So(err, ShouldBeNil) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 7723bd07..54af2c1d 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -39,7 +39,7 @@ type ImageStore interface { DeleteBlobUpload(repo, uuid string) error BlobPath(repo string, digest digest.Digest) string CheckBlob(repo, digest string) (bool, int64, error) - GetBlob(repo, digest, mediaType string) (io.Reader, int64, error) + GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) DeleteBlob(repo, digest string) error GetIndexContent(repo string) ([]byte, error) GetBlobContent(repo, digest string) ([]byte, error) diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 61bdc5b5..b3f7a6a6 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -242,7 +242,9 @@ func TestStorageAPIs(t *testing.T) { _, _, err = imgStore.CheckBlob("test", digest.String()) So(err, ShouldBeNil) - _, _, err = imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blob, _, err := imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + err = blob.Close() So(err, ShouldBeNil) manifest := ispec.Manifest{} @@ -431,7 +433,9 @@ func TestStorageAPIs(t *testing.T) { _, _, err = imgStore.GetBlob("test", "inexistent", "application/vnd.oci.image.layer.v1.tar+gzip") So(err, ShouldNotBeNil) - _, _, err = imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + blob, _, err := imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + err = blob.Close() So(err, ShouldBeNil) blobContent, err := imgStore.GetBlobContent("test", digest.String()) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index a8ec0c6c..c22e5add 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -30,7 +30,7 @@ type MockedImageStore struct { DeleteBlobUploadFn func(repo string, uuid string) error BlobPathFn func(repo string, digest digest.Digest) string CheckBlobFn func(repo string, digest string) (bool, int64, error) - GetBlobFn func(repo string, digest string, mediaType string) (io.Reader, int64, error) + GetBlobFn func(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) DeleteBlobFn func(repo string, digest string) error GetIndexContentFn func(repo string) ([]byte, error) GetBlobContentFn func(repo, digest string) ([]byte, error) @@ -230,12 +230,12 @@ func (is MockedImageStore) CheckBlob(repo string, digest string) (bool, int64, e return true, 0, nil } -func (is MockedImageStore) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) { +func (is MockedImageStore) GetBlob(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) { if is.GetBlobFn != nil { return is.GetBlobFn(repo, digest, mediaType) } - return &io.LimitedReader{}, 0, nil + return io.NopCloser(&io.LimitedReader{}), 0, nil } func (is MockedImageStore) DeleteBlobUpload(repo string, digest string) error {