0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-16 21:56:37 -05:00

fix: excessive memory usage (#2164)

instead of reading entire files before calculating their digests
stream them by using their Reader method.

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu 2024-01-16 19:04:36 +02:00 committed by GitHub
parent d7f2429c01
commit d1bf713573
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 143 deletions

View file

@ -1,7 +1,6 @@
package imagestore
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
@ -951,25 +950,25 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi
uuid := u.String()
src := is.BlobUploadPath(repo, uuid)
digester := sha256.New()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(body)
blobFile, err := is.storeDriver.Writer(src, false)
if err != nil {
is.log.Error().Err(err).Msg("failed to read blob")
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return "", -1, zerr.ErrUploadNotFound
}
defer blobFile.Close()
mw := io.MultiWriter(blobFile, digester)
nbytes, err := io.Copy(mw, body)
if err != nil {
return "", -1, err
}
nbytes, err := is.storeDriver.WriteFile(src, buf.Bytes())
if err != nil {
is.log.Error().Err(err).Msg("failed to write blob")
return "", -1, err
}
_, err = digester.Write(buf.Bytes())
if err != nil {
is.log.Error().Err(err).Str("component", "digester").Msg("failed to write")
if err := blobFile.Commit(); err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to commit blob")
return "", -1, err
}
@ -1008,7 +1007,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi
}
}
return uuid, int64(nbytes), nil
return uuid, nbytes, nil
}
func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo string, dst string) error {
@ -1210,36 +1209,9 @@ func (is *ImageStore) StatBlob(repo string, digest godigest.Digest) (bool, int64
return false, -1, time.Time{}, err
}
blobPath := is.BlobPath(repo, digest)
binfo, err := is.storeDriver.Stat(blobPath)
if err == nil && binfo.Size() > 0 {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
return true, binfo.Size(), binfo.ModTime(), nil
}
binfo, err := is.originalBlobInfo(repo, digest)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return false, -1, time.Time{}, zerr.ErrBlobNotFound
}
// then it's a 'deduped' blob
// Check blobs in cache
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
is.log.Warn().Err(err).Str("digest", digest.String()).Msg("not found in cache")
return false, -1, time.Time{}, zerr.ErrBlobNotFound
}
binfo, err = is.storeDriver.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return false, -1, time.Time{}, zerr.ErrBlobNotFound
return false, -1, time.Time{}, err
}
return true, binfo.Size(), binfo.ModTime(), nil
@ -1318,34 +1290,12 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT
return nil, -1, -1, err
}
blobPath := is.BlobPath(repo, digest)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
binfo, err := is.storeDriver.Stat(blobPath)
binfo, err := is.originalBlobInfo(repo, digest)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, -1, -1, zerr.ErrBlobNotFound
}
// is a deduped blob
if binfo.Size() == 0 {
// Check blobs in cache
blobPath, err = is.checkCacheBlob(digest)
if err != nil {
is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache")
return nil, -1, -1, zerr.ErrBlobNotFound
}
binfo, err = is.storeDriver.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, -1, -1, zerr.ErrBlobNotFound
}
return nil, -1, -1, err
}
end := to
@ -1354,16 +1304,16 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT
end = binfo.Size() - 1
}
blobHandle, err := is.storeDriver.Reader(blobPath, from)
blobHandle, err := is.storeDriver.Reader(binfo.Path(), from)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob")
return nil, -1, -1, err
}
blobReadCloser, err := newBlobStream(blobHandle, from, end)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob stream")
is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob stream")
return nil, -1, -1, err
}
@ -1372,6 +1322,42 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT
return blobReadCloser, end - from + 1, binfo.Size(), nil
}
/*
In the case of s3(which doesn't support links) we link them in our cache by
keeping a reference to the original blob and its duplicates
On the storage, original blobs are those with contents, and duplicates one are just empty files.
This function helps handling this situation, by using this one you can make sure you always get the original blob.
*/
func (is *ImageStore) originalBlobInfo(repo string, digest godigest.Digest) (driver.FileInfo, error) {
blobPath := is.BlobPath(repo, digest)
binfo, err := is.storeDriver.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, zerr.ErrBlobNotFound
}
if binfo.Size() == 0 {
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache")
return nil, zerr.ErrBlobNotFound
}
binfo, err = is.storeDriver.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob")
return nil, zerr.ErrBlobNotFound
}
}
return binfo, nil
}
// GetBlob returns a stream to read the blob.
// blob selector instead of directly downloading the blob.
func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType string) (io.ReadCloser, int64, error) {
@ -1381,50 +1367,19 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str
return nil, -1, err
}
blobPath := is.BlobPath(repo, digest)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
binfo, err := is.storeDriver.Stat(blobPath)
binfo, err := is.originalBlobInfo(repo, digest)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, -1, zerr.ErrBlobNotFound
}
blobReadCloser, err := is.storeDriver.Reader(blobPath, 0)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
return nil, -1, err
}
// is a 'deduped' blob?
if binfo.Size() == 0 {
// Check blobs in cache
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache")
blobReadCloser, err := is.storeDriver.Reader(binfo.Path(), 0)
if err != nil {
is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob")
return nil, -1, zerr.ErrBlobNotFound
}
binfo, err := is.storeDriver.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob")
return nil, -1, zerr.ErrBlobNotFound
}
blobReadCloser, err := is.storeDriver.Reader(dstRecord, 0)
if err != nil {
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob")
return nil, -1, err
}
return blobReadCloser, binfo.Size(), nil
return nil, -1, err
}
// The caller function is responsible for calling Close()
@ -1432,48 +1387,57 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str
}
// GetBlobContent returns blob contents, the caller function MUST lock from outside.
// Should be used for small files(manifests/config blobs).
func (is *ImageStore) GetBlobContent(repo string, digest godigest.Digest) ([]byte, error) {
if err := digest.Validate(); err != nil {
return []byte{}, err
}
blobPath := is.BlobPath(repo, digest)
binfo, err := is.storeDriver.Stat(blobPath)
binfo, err := is.originalBlobInfo(repo, digest)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return []byte{}, zerr.ErrBlobNotFound
return nil, err
}
blobBuf, err := is.storeDriver.ReadFile(blobPath)
blobBuf, err := is.storeDriver.ReadFile(binfo.Path())
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
is.log.Error().Err(err).Str("blob", binfo.Path()).Msg("failed to open blob")
return nil, err
}
// is a 'deduped' blob?
if binfo.Size() == 0 {
// Check blobs in cache
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
is.log.Debug().Err(err).Str("digest", digest.String()).Msg("not found in cache")
return blobBuf, nil
}
return nil, zerr.ErrBlobNotFound
}
blobBuf, err := is.storeDriver.ReadFile(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob")
return nil, err
}
return blobBuf, nil
// VerifyBlobDigestValue verifies that the blob which is addressed by given digest has a equivalent computed digest.
func (is *ImageStore) VerifyBlobDigestValue(repo string, digest godigest.Digest) error {
if err := digest.Validate(); err != nil {
return err
}
return blobBuf, nil
binfo, err := is.originalBlobInfo(repo, digest)
if err != nil {
return err
}
blobReadCloser, err := is.storeDriver.Reader(binfo.Path(), 0)
if err != nil {
return err
}
defer blobReadCloser.Close()
// compute its real digest
computedDigest, err := godigest.FromReader(blobReadCloser)
if err != nil {
return err
}
// if the computed digest is different than the blob name(its initial digest) then the blob has been corrupted.
if computedDigest != digest {
return zerr.ErrBadBlobDigest
}
return nil
}
func (is *ImageStore) GetReferrers(repo string, gdigest godigest.Digest, artifactTypes []string,

View file

@ -3798,6 +3798,9 @@ func TestS3DedupeErr(t *testing.T) {
SizeFn: func() int64 {
return 0
},
PathFn: func() string {
return "repo1/dst1"
},
}, nil
},
ReaderFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {

View file

@ -279,21 +279,12 @@ func CheckLayers(
}
for _, layer := range man.Layers {
layerContent, err := imgStore.GetBlobContent(imageName, layer.Digest)
if err != nil {
if err := imgStore.VerifyBlobDigestValue(imageName, layer.Digest); err != nil {
imageRes = getResult(imageName, tagName, layer.Digest, err)
break
}
computedDigest := godigest.FromBytes(layerContent)
if computedDigest != layer.Digest {
imageRes = getResult(imageName, tagName, layer.Digest, errors.ErrBadBlobDigest)
break
}
imageRes = getResult(imageName, tagName, "", nil)
}

View file

@ -215,6 +215,9 @@ func TestStorageAPIs(t *testing.T) {
So(err, ShouldBeNil)
So(n, ShouldEqual, len(body))
So(upload, ShouldNotBeEmpty)
err = imgStore.VerifyBlobDigestValue("test", digest)
So(err, ShouldBeNil)
})
Convey("New blob upload", func() {

View file

@ -64,6 +64,7 @@ type ImageStore interface { //nolint:interfacebloat
GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error)
GetAllBlobs(repo string) ([]string, error)
PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler)
VerifyBlobDigestValue(repo string, digest godigest.Digest) error
}
type Driver interface { //nolint:interfacebloat

View file

@ -59,6 +59,7 @@ type MockedImageStore struct {
PutIndexContentFn func(repo string, index ispec.Index) error
PopulateStorageMetricsFn func(interval time.Duration, sch *scheduler.Scheduler)
StatIndexFn func(repo string) (bool, int64, time.Time, error)
VerifyBlobDigestValueFn func(repo string, digest godigest.Digest) error
}
func (is MockedImageStore) StatIndex(repo string) (bool, int64, time.Time, error) {
@ -425,3 +426,11 @@ func (is MockedImageStore) PopulateStorageMetrics(interval time.Duration, sch *s
is.PopulateStorageMetricsFn(interval, sch)
}
}
func (is MockedImageStore) VerifyBlobDigestValue(repo string, digest godigest.Digest) error {
if is.VerifyBlobDigestValueFn != nil {
return is.VerifyBlobDigestValueFn(repo, digest)
}
return nil
}