mirror of
https://github.com/project-zot/zot.git
synced 2024-12-16 21:56:37 -05:00
Fix file handlers not being closed after calls to ImageStore.GetBlob
This is to fixes hitting the FD limit when reading blobs from the disk in the graphql API Signed-off-by: Andrei Aaron <andaaron@cisco.com>
This commit is contained in:
parent
74630ed3a0
commit
bd9ad998cd
10 changed files with 83 additions and 54 deletions
|
@ -655,6 +655,7 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
|||
|
||||
return
|
||||
}
|
||||
defer repo.Close()
|
||||
|
||||
response.Header().Set("Content-Length", fmt.Sprintf("%d", blen))
|
||||
response.Header().Set(constants.DistContentDigestKey, digest)
|
||||
|
|
|
@ -405,8 +405,8 @@ func TestRoutes(t *testing.T) {
|
|||
"digest": "sha256:7a0437f04f83f084b7ed68ad9c4a4947e12fc4e1b006b38129bac89114ec3621",
|
||||
},
|
||||
&mocks.MockedImageStore{
|
||||
GetBlobFn: func(repo, digest, mediaType string) (io.Reader, int64, error) {
|
||||
return bytes.NewBuffer([]byte("")), 0, zerr.ErrRepoNotFound
|
||||
GetBlobFn: func(repo, digest, mediaType string) (io.ReadCloser, int64, error) {
|
||||
return io.NopCloser(bytes.NewBuffer([]byte(""))), 0, zerr.ErrRepoNotFound
|
||||
},
|
||||
})
|
||||
So(statusCode, ShouldEqual, http.StatusNotFound)
|
||||
|
@ -418,8 +418,8 @@ func TestRoutes(t *testing.T) {
|
|||
"digest": "sha256:7a0437f04f83f084b7ed68ad9c4a4947e12fc4e1b006b38129bac89114ec3621",
|
||||
},
|
||||
&mocks.MockedImageStore{
|
||||
GetBlobFn: func(repo, digest, mediaType string) (io.Reader, int64, error) {
|
||||
return bytes.NewBuffer([]byte("")), 0, zerr.ErrBadBlobDigest
|
||||
GetBlobFn: func(repo, digest, mediaType string) (io.ReadCloser, int64, error) {
|
||||
return io.NopCloser(bytes.NewBuffer([]byte(""))), 0, zerr.ErrBadBlobDigest
|
||||
},
|
||||
})
|
||||
So(statusCode, ShouldEqual, http.StatusBadRequest)
|
||||
|
|
|
@ -294,45 +294,19 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string,
|
|||
}
|
||||
|
||||
for _, blob := range manifest.Layers {
|
||||
blobReader, _, err := cacheImageStore.GetBlob(localRepo, blob.Digest.String(), blob.MediaType)
|
||||
err = copyBlob(localRepo, blob.Digest.String(), blob.MediaType,
|
||||
cacheImageStore, imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Str("dir", path.Join(cacheImageStore.RootDir(),
|
||||
localRepo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if found, _, _ := imageStore.CheckBlob(localRepo, blob.Digest.String()); !found {
|
||||
_, _, err = imageStore.FullBlobUpload(localRepo, blobReader, blob.Digest.String())
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Str("blob digest", blob.Digest.String()).Msg("couldn't upload blob")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
blobReader, _, err := cacheImageStore.GetBlob(localRepo, manifest.Config.Digest.String(), manifest.Config.MediaType)
|
||||
err = copyBlob(localRepo, manifest.Config.Digest.String(), manifest.Config.MediaType,
|
||||
cacheImageStore, imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Str("dir", path.Join(cacheImageStore.RootDir(),
|
||||
localRepo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if found, _, _ := imageStore.CheckBlob(localRepo, manifest.Config.Digest.String()); !found {
|
||||
_, _, err = imageStore.FullBlobUpload(localRepo, blobReader, manifest.Config.Digest.String())
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't upload config blob")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err = imageStore.PutImageManifest(localRepo, tag,
|
||||
ispec.MediaTypeImageManifest, manifestContent)
|
||||
if err != nil {
|
||||
|
@ -352,6 +326,36 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string,
|
|||
return nil
|
||||
}
|
||||
|
||||
// Copy a blob from one image store to another image store.
|
||||
func copyBlob(localRepo, blobDigest, blobMediaType string,
|
||||
souceImageStore, destinationImageStore storage.ImageStore, log log.Logger,
|
||||
) error {
|
||||
if found, _, _ := destinationImageStore.CheckBlob(localRepo, blobDigest); found {
|
||||
// Blob is already at destination, nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
blobReadCloser, _, err := souceImageStore.GetBlob(localRepo, blobDigest, blobMediaType)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).Err(err).
|
||||
Str("dir", path.Join(souceImageStore.RootDir(), localRepo)).
|
||||
Str("blob digest", blobDigest).Str("media type", blobMediaType).
|
||||
Msg("couldn't read blob")
|
||||
|
||||
return err
|
||||
}
|
||||
defer blobReadCloser.Close()
|
||||
|
||||
_, _, err = destinationImageStore.FullBlobUpload(localRepo, blobReadCloser, blobDigest)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).Err(err).
|
||||
Str("blob digest", blobDigest).Str("media type", blobMediaType).
|
||||
Msg("couldn't upload blob")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// sync needs transport to be stripped to not be wrongly interpreted as an image reference
|
||||
// at a non-fully qualified registry (hostname as image and port as tag).
|
||||
func StripRegistryTransport(url string) string {
|
||||
|
|
|
@ -1351,7 +1351,7 @@ func (is *ImageStoreLocal) copyBlob(repo, blobPath, dstRecord string) (int64, er
|
|||
|
||||
// GetBlob returns a stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.Reader, int64, error) {
|
||||
func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
parsedDigest, err := godigest.Parse(digest)
|
||||
|
@ -1373,14 +1373,15 @@ func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.Reader, i
|
|||
return nil, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
blobReader, err := os.Open(blobPath)
|
||||
blobReadCloser, err := os.Open(blobPath)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
|
||||
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
return blobReader, binfo.Size(), nil
|
||||
// The caller function is responsible for calling Close()
|
||||
return blobReadCloser, binfo.Size(), nil
|
||||
}
|
||||
|
||||
func (is *ImageStoreLocal) GetBlobContent(repo, digest string) ([]byte, error) {
|
||||
|
@ -1388,6 +1389,7 @@ func (is *ImageStoreLocal) GetBlobContent(repo, digest string) ([]byte, error) {
|
|||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
defer blob.Close()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
|
|
|
@ -751,13 +751,16 @@ func FuzzGetBlob(f *testing.F) {
|
|||
t.Error(err)
|
||||
}
|
||||
|
||||
_, _, err = imgStore.GetBlob(repoName, digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blobReadCloser, _, err := imgStore.GetBlob(repoName, digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
if err != nil {
|
||||
if isKnownErr(err) {
|
||||
return
|
||||
}
|
||||
t.Error(err)
|
||||
}
|
||||
if err = blobReadCloser.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -951,7 +954,9 @@ func TestDedupeLinks(t *testing.T) {
|
|||
_, _, err = imgStore.CheckBlob("dedupe1", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blobrc, _, err := imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
err = blobrc.Close()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
cblob, cdigest := test.GetRandomImageConfig()
|
||||
|
@ -1009,7 +1014,9 @@ func TestDedupeLinks(t *testing.T) {
|
|||
_, _, err = imgStore.CheckBlob("dedupe2", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blobrc, _, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
err = blobrc.Close()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
cblob, cdigest = test.GetRandomImageConfig()
|
||||
|
|
|
@ -1253,7 +1253,7 @@ func (is *ObjectStorage) copyBlob(repo string, blobPath string, dstRecord string
|
|||
|
||||
// GetBlob returns a stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int64, error) {
|
||||
func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dgst, err := godigest.Parse(digest)
|
||||
|
@ -1275,7 +1275,7 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int
|
|||
return nil, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
blobReader, err := is.store.Reader(context.Background(), blobPath, 0)
|
||||
blobReadCloser, err := is.store.Reader(context.Background(), blobPath, 0)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
|
||||
|
||||
|
@ -1301,18 +1301,19 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int
|
|||
return nil, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
blobReader, err := is.store.Reader(context.Background(), dstRecord, 0)
|
||||
blobReadCloser, err := is.store.Reader(context.Background(), dstRecord, 0)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob")
|
||||
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
return blobReader, binfo.Size(), nil
|
||||
return blobReadCloser, binfo.Size(), nil
|
||||
}
|
||||
}
|
||||
|
||||
return blobReader, binfo.Size(), nil
|
||||
// The caller function is responsible for calling Close()
|
||||
return blobReadCloser, binfo.Size(), nil
|
||||
}
|
||||
|
||||
func (is *ObjectStorage) GetBlobContent(repo, digest string) ([]byte, error) {
|
||||
|
@ -1320,6 +1321,7 @@ func (is *ObjectStorage) GetBlobContent(repo, digest string) ([]byte, error) {
|
|||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
defer blob.Close()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
|
|
|
@ -866,9 +866,12 @@ func TestS3Dedupe(t *testing.T) {
|
|||
So(checkBlobSize1, ShouldBeGreaterThan, 0)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, getBlobSize1, err := imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blobReadCloser, getBlobSize1, err := imgStore.GetBlob("dedupe1", digest.String(),
|
||||
"application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(getBlobSize1, ShouldBeGreaterThan, 0)
|
||||
So(err, ShouldBeNil)
|
||||
err = blobReadCloser.Close()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
cblob, cdigest := test.GetRandomImageConfig()
|
||||
_, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest.String())
|
||||
|
@ -928,11 +931,14 @@ func TestS3Dedupe(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
So(checkBlobSize2, ShouldBeGreaterThan, 0)
|
||||
|
||||
_, getBlobSize2, err := imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blobReadCloser, getBlobSize2, err := imgStore.GetBlob("dedupe2", digest.String(),
|
||||
"application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
So(getBlobSize2, ShouldBeGreaterThan, 0)
|
||||
So(checkBlobSize1, ShouldEqual, checkBlobSize2)
|
||||
So(getBlobSize1, ShouldEqual, getBlobSize2)
|
||||
err = blobReadCloser.Close()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
cblob, cdigest = test.GetRandomImageConfig()
|
||||
_, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest.String())
|
||||
|
@ -1039,9 +1045,12 @@ func TestS3Dedupe(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
|
||||
// check that we retrieve the real dedupe2/blob (which is deduped earlier - 0 size) when switching to dedupe false
|
||||
_, getBlobSize2, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blobReadCloser, getBlobSize2, err = imgStore.GetBlob("dedupe2", digest.String(),
|
||||
"application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
So(getBlobSize1, ShouldEqual, getBlobSize2)
|
||||
err = blobReadCloser.Close()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, checkBlobSize2, err := imgStore.CheckBlob("dedupe2", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
|
|
@ -39,7 +39,7 @@ type ImageStore interface {
|
|||
DeleteBlobUpload(repo, uuid string) error
|
||||
BlobPath(repo string, digest digest.Digest) string
|
||||
CheckBlob(repo, digest string) (bool, int64, error)
|
||||
GetBlob(repo, digest, mediaType string) (io.Reader, int64, error)
|
||||
GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error)
|
||||
DeleteBlob(repo, digest string) error
|
||||
GetIndexContent(repo string) ([]byte, error)
|
||||
GetBlobContent(repo, digest string) ([]byte, error)
|
||||
|
|
|
@ -242,7 +242,9 @@ func TestStorageAPIs(t *testing.T) {
|
|||
_, _, err = imgStore.CheckBlob("test", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blob, _, err := imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
err = blob.Close()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
manifest := ispec.Manifest{}
|
||||
|
@ -431,7 +433,9 @@ func TestStorageAPIs(t *testing.T) {
|
|||
_, _, err = imgStore.GetBlob("test", "inexistent", "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, err = imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
blob, _, err := imgStore.GetBlob("test", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldBeNil)
|
||||
err = blob.Close()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
blobContent, err := imgStore.GetBlobContent("test", digest.String())
|
||||
|
|
|
@ -30,7 +30,7 @@ type MockedImageStore struct {
|
|||
DeleteBlobUploadFn func(repo string, uuid string) error
|
||||
BlobPathFn func(repo string, digest digest.Digest) string
|
||||
CheckBlobFn func(repo string, digest string) (bool, int64, error)
|
||||
GetBlobFn func(repo string, digest string, mediaType string) (io.Reader, int64, error)
|
||||
GetBlobFn func(repo string, digest string, mediaType string) (io.ReadCloser, int64, error)
|
||||
DeleteBlobFn func(repo string, digest string) error
|
||||
GetIndexContentFn func(repo string) ([]byte, error)
|
||||
GetBlobContentFn func(repo, digest string) ([]byte, error)
|
||||
|
@ -230,12 +230,12 @@ func (is MockedImageStore) CheckBlob(repo string, digest string) (bool, int64, e
|
|||
return true, 0, nil
|
||||
}
|
||||
|
||||
func (is MockedImageStore) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) {
|
||||
func (is MockedImageStore) GetBlob(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) {
|
||||
if is.GetBlobFn != nil {
|
||||
return is.GetBlobFn(repo, digest, mediaType)
|
||||
}
|
||||
|
||||
return &io.LimitedReader{}, 0, nil
|
||||
return io.NopCloser(&io.LimitedReader{}), 0, nil
|
||||
}
|
||||
|
||||
func (is MockedImageStore) DeleteBlobUpload(repo string, digest string) error {
|
||||
|
|
Loading…
Reference in a new issue