From d1bf71357381d1d96beec6c6c3cd354fdef16c04 Mon Sep 17 00:00:00 2001 From: peusebiu Date: Tue, 16 Jan 2024 19:04:36 +0200 Subject: [PATCH] fix: excessive memory usage (#2164) instead of reading entire files before calculating their digests stream them by using their Reader method. Signed-off-by: Petu Eusebiu --- pkg/storage/imagestore/imagestore.go | 230 +++++++++++---------------- pkg/storage/s3/s3_test.go | 3 + pkg/storage/scrub.go | 11 +- pkg/storage/storage_test.go | 3 + pkg/storage/types/types.go | 1 + pkg/test/mocks/image_store_mock.go | 9 ++ 6 files changed, 114 insertions(+), 143 deletions(-) diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index 90f92739..33be3141 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -1,7 +1,6 @@ package imagestore import ( - "bytes" "context" "crypto/sha256" "encoding/json" @@ -951,25 +950,25 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi uuid := u.String() src := is.BlobUploadPath(repo, uuid) digester := sha256.New() - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(body) + blobFile, err := is.storeDriver.Writer(src, false) if err != nil { - is.log.Error().Err(err).Msg("failed to read blob") + is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") + return "", -1, zerr.ErrUploadNotFound + } + + defer blobFile.Close() + + mw := io.MultiWriter(blobFile, digester) + + nbytes, err := io.Copy(mw, body) + if err != nil { return "", -1, err } - nbytes, err := is.storeDriver.WriteFile(src, buf.Bytes()) - if err != nil { - is.log.Error().Err(err).Msg("failed to write blob") - - return "", -1, err - } - - _, err = digester.Write(buf.Bytes()) - if err != nil { - is.log.Error().Err(err).Str("component", "digester").Msg("failed to write") + if err := blobFile.Commit(); err != nil { + is.log.Error().Err(err).Str("blob", src).Msg("failed to commit blob") return "", -1, err } @@ -1008,7 +1007,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi } } - return uuid, int64(nbytes), nil + return uuid, nbytes, nil } func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo string, dst string) error { @@ -1210,36 +1209,9 @@ func (is *ImageStore) StatBlob(repo string, digest godigest.Digest) (bool, int64 return false, -1, time.Time{}, err } - blobPath := is.BlobPath(repo, digest) - - binfo, err := is.storeDriver.Stat(blobPath) - if err == nil && binfo.Size() > 0 { - is.log.Debug().Str("blob path", blobPath).Msg("blob path found") - - return true, binfo.Size(), binfo.ModTime(), nil - } - + binfo, err := is.originalBlobInfo(repo, digest) if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - - return false, -1, time.Time{}, zerr.ErrBlobNotFound - } - - // then it's a 'deduped' blob - - // Check blobs in cache - dstRecord, err := is.checkCacheBlob(digest) - if err != nil { - is.log.Warn().Err(err).Str("digest", digest.String()).Msg("not found in cache") - - return false, -1, time.Time{}, zerr.ErrBlobNotFound - } - - binfo, err = is.storeDriver.Stat(dstRecord) - if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - - return false, -1, time.Time{}, zerr.ErrBlobNotFound + return false, -1, time.Time{}, err } return true, binfo.Size(), binfo.ModTime(), nil @@ -1318,34 +1290,12 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT return nil, -1, -1, err } - blobPath := is.BlobPath(repo, digest) - is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) - binfo, err := is.storeDriver.Stat(blobPath) + binfo, err := is.originalBlobInfo(repo, digest) if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - - return nil, -1, -1, zerr.ErrBlobNotFound - } - - // is a deduped blob - if binfo.Size() == 0 { - // Check blobs in cache - blobPath, err = is.checkCacheBlob(digest) - if err != nil { - is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache") - - return nil, -1, -1, zerr.ErrBlobNotFound - } - - binfo, err = is.storeDriver.Stat(blobPath) - if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - - return nil, -1, -1, zerr.ErrBlobNotFound - } + return nil, -1, -1, err } end := to @@ -1354,16 +1304,16 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT end = binfo.Size() - 1 } - blobHandle, err := is.storeDriver.Reader(blobPath, from) + blobHandle, err := is.storeDriver.Reader(binfo.Path(), from) if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") + is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob") return nil, -1, -1, err } blobReadCloser, err := newBlobStream(blobHandle, from, end) if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob stream") + is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob stream") return nil, -1, -1, err } @@ -1372,6 +1322,42 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT return blobReadCloser, end - from + 1, binfo.Size(), nil } +/* + In the case of s3(which doesn't support links) we link them in our cache by + keeping a reference to the original blob and its duplicates + +On the storage, original blobs are those with contents, and duplicates one are just empty files. +This function helps handling this situation, by using this one you can make sure you always get the original blob. +*/ +func (is *ImageStore) originalBlobInfo(repo string, digest godigest.Digest) (driver.FileInfo, error) { + blobPath := is.BlobPath(repo, digest) + + binfo, err := is.storeDriver.Stat(blobPath) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") + + return nil, zerr.ErrBlobNotFound + } + + if binfo.Size() == 0 { + dstRecord, err := is.checkCacheBlob(digest) + if err != nil { + is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache") + + return nil, zerr.ErrBlobNotFound + } + + binfo, err = is.storeDriver.Stat(dstRecord) + if err != nil { + is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob") + + return nil, zerr.ErrBlobNotFound + } + } + + return binfo, nil +} + // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType string) (io.ReadCloser, int64, error) { @@ -1381,50 +1367,19 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str return nil, -1, err } - blobPath := is.BlobPath(repo, digest) - is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) - binfo, err := is.storeDriver.Stat(blobPath) + binfo, err := is.originalBlobInfo(repo, digest) if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - - return nil, -1, zerr.ErrBlobNotFound - } - - blobReadCloser, err := is.storeDriver.Reader(blobPath, 0) - if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") - return nil, -1, err } - // is a 'deduped' blob? - if binfo.Size() == 0 { - // Check blobs in cache - dstRecord, err := is.checkCacheBlob(digest) - if err != nil { - is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache") + blobReadCloser, err := is.storeDriver.Reader(binfo.Path(), 0) + if err != nil { + is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob") - return nil, -1, zerr.ErrBlobNotFound - } - - binfo, err := is.storeDriver.Stat(dstRecord) - if err != nil { - is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob") - - return nil, -1, zerr.ErrBlobNotFound - } - - blobReadCloser, err := is.storeDriver.Reader(dstRecord, 0) - if err != nil { - is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob") - - return nil, -1, err - } - - return blobReadCloser, binfo.Size(), nil + return nil, -1, err } // The caller function is responsible for calling Close() @@ -1432,48 +1387,57 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str } // GetBlobContent returns blob contents, the caller function MUST lock from outside. +// Should be used for small files(manifests/config blobs). func (is *ImageStore) GetBlobContent(repo string, digest godigest.Digest) ([]byte, error) { if err := digest.Validate(); err != nil { return []byte{}, err } - blobPath := is.BlobPath(repo, digest) - - binfo, err := is.storeDriver.Stat(blobPath) + binfo, err := is.originalBlobInfo(repo, digest) if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - - return []byte{}, zerr.ErrBlobNotFound + return nil, err } - blobBuf, err := is.storeDriver.ReadFile(blobPath) + blobBuf, err := is.storeDriver.ReadFile(binfo.Path()) if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") + is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob") return nil, err } - // is a 'deduped' blob? - if binfo.Size() == 0 { - // Check blobs in cache - dstRecord, err := is.checkCacheBlob(digest) - if err != nil { - is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache") + return blobBuf, nil +} - return nil, zerr.ErrBlobNotFound - } - - blobBuf, err := is.storeDriver.ReadFile(dstRecord) - if err != nil { - is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob") - - return nil, err - } - - return blobBuf, nil +// VerifyBlobDigestValue verifies that the blob which is addressed by given digest has a equivalent computed digest. +func (is *ImageStore) VerifyBlobDigestValue(repo string, digest godigest.Digest) error { + if err := digest.Validate(); err != nil { + return err } - return blobBuf, nil + binfo, err := is.originalBlobInfo(repo, digest) + if err != nil { + return err + } + + blobReadCloser, err := is.storeDriver.Reader(binfo.Path(), 0) + if err != nil { + return err + } + + defer blobReadCloser.Close() + + // compute its real digest + computedDigest, err := godigest.FromReader(blobReadCloser) + if err != nil { + return err + } + + // if the computed digest is different than the blob name(its initial digest) then the blob has been corrupted. + if computedDigest != digest { + return zerr.ErrBadBlobDigest + } + + return nil } func (is *ImageStore) GetReferrers(repo string, gdigest godigest.Digest, artifactTypes []string, diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 67b8b686..043fb465 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -3798,6 +3798,9 @@ func TestS3DedupeErr(t *testing.T) { SizeFn: func() int64 { return 0 }, + PathFn: func() string { + return "repo1/dst1" + }, }, nil }, ReaderFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { diff --git a/pkg/storage/scrub.go b/pkg/storage/scrub.go index 19677cd3..1b9d1b76 100644 --- a/pkg/storage/scrub.go +++ b/pkg/storage/scrub.go @@ -279,21 +279,12 @@ func CheckLayers( } for _, layer := range man.Layers { - layerContent, err := imgStore.GetBlobContent(imageName, layer.Digest) - if err != nil { + if err := imgStore.VerifyBlobDigestValue(imageName, layer.Digest); err != nil { imageRes = getResult(imageName, tagName, layer.Digest, err) break } - computedDigest := godigest.FromBytes(layerContent) - - if computedDigest != layer.Digest { - imageRes = getResult(imageName, tagName, layer.Digest, errors.ErrBadBlobDigest) - - break - } - imageRes = getResult(imageName, tagName, "", nil) } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 7ab29010..6b4947de 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -215,6 +215,9 @@ func TestStorageAPIs(t *testing.T) { So(err, ShouldBeNil) So(n, ShouldEqual, len(body)) So(upload, ShouldNotBeEmpty) + + err = imgStore.VerifyBlobDigestValue("test", digest) + So(err, ShouldBeNil) }) Convey("New blob upload", func() { diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 692dd7a6..a2ea7ac4 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -64,6 +64,7 @@ type ImageStore interface { //nolint:interfacebloat GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error) GetAllBlobs(repo string) ([]string, error) PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler) + VerifyBlobDigestValue(repo string, digest godigest.Digest) error } type Driver interface { //nolint:interfacebloat diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index 0c65316a..25d2aee3 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -59,6 +59,7 @@ type MockedImageStore struct { PutIndexContentFn func(repo string, index ispec.Index) error PopulateStorageMetricsFn func(interval time.Duration, sch *scheduler.Scheduler) StatIndexFn func(repo string) (bool, int64, time.Time, error) + VerifyBlobDigestValueFn func(repo string, digest godigest.Digest) error } func (is MockedImageStore) StatIndex(repo string) (bool, int64, time.Time, error) { @@ -425,3 +426,11 @@ func (is MockedImageStore) PopulateStorageMetrics(interval time.Duration, sch *s is.PopulateStorageMetricsFn(interval, sch) } } + +func (is MockedImageStore) VerifyBlobDigestValue(repo string, digest godigest.Digest) error { + if is.VerifyBlobDigestValueFn != nil { + return is.VerifyBlobDigestValueFn(repo, digest) + } + + return nil +}