From 90c8390c29ce4b70312f3211821af3367df737a5 Mon Sep 17 00:00:00 2001 From: Ramkumar Chinchani Date: Tue, 30 Aug 2022 22:12:10 +0000 Subject: [PATCH] routes: support resumable pull Some embedded clients use the "Range" header for blob pulls in order to resume downloads. Signed-off-by: Ramkumar Chinchani --- errors/errors.go | 2 + pkg/api/controller_test.go | 161 +++++++++++++++++++++++++ pkg/api/routes.go | 100 +++++++++++++++- pkg/storage/cache.go | 34 +++--- pkg/storage/local.go | 78 ++++++++++++ pkg/storage/local_test.go | 46 +++++++ pkg/storage/s3/s3.go | 118 +++++++++++++++++- pkg/storage/s3/s3_test.go | 185 +++++++++++++++++++++++++++++ pkg/storage/storage.go | 1 + pkg/test/mocks/image_store_mock.go | 25 ++-- 10 files changed, 723 insertions(+), 27 deletions(-) diff --git a/errors/errors.go b/errors/errors.go index 456017a8..eb6e2b14 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -56,4 +56,6 @@ var ( ErrImageLintAnnotations = errors.New("routes: lint checks failed") ErrParsingAuthHeader = errors.New("auth: failed parsing authorization header") ErrBadType = errors.New("core: invalid type") + ErrParsingHTTPHeader = errors.New("routes: invalid HTTP header") + ErrBadRange = errors.New("storage: bad range") ) diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index eba208c6..27b99ffc 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -5463,6 +5463,167 @@ func TestManifestImageIndex(t *testing.T) { }) } +func TestPullRange(t *testing.T) { + Convey("Make a new controller", t, func() { + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) + conf := config.New() + conf.HTTP.Port = port + + ctlr := api.NewController(conf) + dir := t.TempDir() + ctlr.Config.Storage.RootDirectory = dir + + go startServer(ctlr) + defer stopServer(ctlr) + test.WaitTillServerReady(baseURL) + + // create a blob/layer + resp, err := resty.R().Post(baseURL + "/v2/index/blobs/uploads/") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusAccepted) + loc := test.Location(baseURL, resp) + So(loc, ShouldNotBeEmpty) + + // since we are not specifying any prefix i.e provided in config while starting server, + // so it should store index1 to global root dir + _, err = os.Stat(path.Join(dir, "index")) + So(err, ShouldBeNil) + + resp, err = resty.R().Get(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNoContent) + content := []byte("0123456789") + digest := godigest.FromBytes(content) + So(digest, ShouldNotBeNil) + // monolithic blob upload: success + resp, err = resty.R().SetQueryParam("digest", digest.String()). + SetHeader("Content-Type", "application/octet-stream").SetBody(content).Put(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusCreated) + blobLoc := resp.Header().Get("Location") + So(blobLoc, ShouldNotBeEmpty) + So(resp.Header().Get("Content-Length"), ShouldEqual, "0") + So(resp.Header().Get(constants.DistContentDigestKey), ShouldNotBeEmpty) + blobLoc = baseURL + blobLoc + + Convey("Range is supported using 'bytes'", func() { + resp, err = resty.R().Head(blobLoc) + So(err, ShouldBeNil) + So(resp.Header().Get("Accept-Ranges"), ShouldEqual, "bytes") + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Get a range of bytes", func() { + resp, err = resty.R().SetHeader("Range", "bytes=0-").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent) + So(resp.Header().Get("Content-Length"), ShouldEqual, fmt.Sprintf("%d", len(content))) + So(resp.Body(), ShouldResemble, content) + + resp, err = resty.R().SetHeader("Range", "bytes=0-100").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent) + So(resp.Header().Get("Content-Length"), ShouldEqual, fmt.Sprintf("%d", len(content))) + So(resp.Body(), ShouldResemble, content) + + resp, err = resty.R().SetHeader("Range", "bytes=0-10").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent) + So(resp.Header().Get("Content-Length"), ShouldEqual, fmt.Sprintf("%d", len(content))) + So(resp.Body(), ShouldResemble, content) + + resp, err = resty.R().SetHeader("Range", "bytes=0-0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent) + So(resp.Header().Get("Content-Length"), ShouldEqual, "1") + So(resp.Body(), ShouldResemble, content[0:1]) + + resp, err = resty.R().SetHeader("Range", "bytes=0-1").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent) + So(resp.Header().Get("Content-Length"), ShouldEqual, "2") + So(resp.Body(), ShouldResemble, content[0:2]) + + resp, err = resty.R().SetHeader("Range", "bytes=2-3").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent) + So(resp.Header().Get("Content-Length"), ShouldEqual, "2") + So(resp.Body(), ShouldResemble, content[2:4]) + }) + + Convey("Negative cases", func() { + resp, err = resty.R().SetHeader("Range", "=0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "=a").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "=").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "byte=").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "byte=-0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "byte=0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "octet=-0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=-0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=1-0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=-1-0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=-1--0").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=1--2").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=0-a").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=a-10").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + + resp, err = resty.R().SetHeader("Range", "bytes=a-b").Get(blobLoc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable) + }) + }) +} + func TestInjectInterruptedImageManifest(t *testing.T) { Convey("Make a new controller", t, func() { port := test.GetFreePort() diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 8fcf66d4..92766433 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -18,6 +18,7 @@ import ( "net/http" "net/url" "path" + "regexp" "sort" "strconv" "strings" @@ -599,10 +600,57 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re } response.Header().Set("Content-Length", fmt.Sprintf("%d", blen)) + response.Header().Set("Accept-Ranges", "bytes") response.Header().Set(constants.DistContentDigestKey, digest) response.WriteHeader(http.StatusOK) } +/* parseRangeHeader validates the "Range" HTTP header and returns the range. */ +func parseRangeHeader(contentRange string) (int64, int64, error) { + /* bytes=- and bytes=- formats are supported */ + pattern := `bytes=(?P\d+)-(?P\d*$)` + + regex, err := regexp.Compile(pattern) + if err != nil { + return -1, -1, zerr.ErrParsingHTTPHeader + } + + match := regex.FindStringSubmatch(contentRange) + + paramsMap := make(map[string]string) + + for i, name := range regex.SubexpNames() { + if i > 0 && i <= len(match) { + paramsMap[name] = match[i] + } + } + + var from int64 + to := int64(-1) + + rangeFrom := paramsMap["rangeFrom"] + if rangeFrom == "" { + return -1, -1, zerr.ErrParsingHTTPHeader + } + + if from, err = strconv.ParseInt(rangeFrom, 10, 64); err != nil { + return -1, -1, zerr.ErrParsingHTTPHeader + } + + rangeTo := paramsMap["rangeTo"] + if rangeTo != "" { + if to, err = strconv.ParseInt(rangeTo, 10, 64); err != nil { + return -1, -1, zerr.ErrParsingHTTPHeader + } + + if to < from { + return -1, -1, zerr.ErrParsingHTTPHeader + } + } + + return from, to, nil +} + // GetBlob godoc // @Summary Get image blob/layer // @Description Get an image's blob/layer given a digest @@ -634,7 +682,43 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ mediaType := request.Header.Get("Accept") - repo, blen, err := imgStore.GetBlob(name, digest, mediaType) + var err error + + /* content range is supported for resumbale pulls */ + partial := false + + var from, to int64 + + contentRange := request.Header.Get("Range") + + _, ok = request.Header["Range"] + if ok && contentRange == "" { + response.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + + return + } + + if contentRange != "" { + from, to, err = parseRangeHeader(contentRange) + if err != nil { + response.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + + return + } + + partial = true + } + + var repo io.ReadCloser + + var blen, bsize int64 + + if partial { + repo, blen, bsize, err = imgStore.GetBlobPartial(name, digest, mediaType, from, to) + } else { + repo, blen, err = imgStore.GetBlob(name, digest, mediaType) + } + if err != nil { if errors.Is(err, zerr.ErrBadBlobDigest) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain WriteJSON(response, @@ -658,9 +742,19 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ defer repo.Close() response.Header().Set("Content-Length", fmt.Sprintf("%d", blen)) - response.Header().Set(constants.DistContentDigestKey, digest) + + status := http.StatusOK + + if partial { + status = http.StatusPartialContent + + response.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", from, from+blen-1, bsize)) + } else { + response.Header().Set(constants.DistContentDigestKey, digest) + } + // return the blob data - WriteDataFromReader(response, http.StatusOK, blen, mediaType, repo, rh.c.Log) + WriteDataFromReader(response, status, blen, mediaType, repo, rh.c.Log) } // DeleteBlob godoc diff --git a/pkg/storage/cache.go b/pkg/storage/cache.go index 39c3fc3f..1416f65a 100644 --- a/pkg/storage/cache.go +++ b/pkg/storage/cache.go @@ -15,10 +15,10 @@ const ( // global bucket. BlobsCache = "blobs" // bucket where we store all blobs from storage(deduped blobs + original blob). - DedupedBucket = "deduped" + DuplicatesBucket = "duplicates" /* bucket where we store only the original/source blob (used by s3 to know which is the blob with content) it should contain only one blob, this is the only place from which we'll get blobs. */ - OriginBucket = "origin" + OriginalBucket = "original" DBExtensionName = ".db" dbCacheLockCheckTimeout = 10 * time.Second ) @@ -103,34 +103,34 @@ func (c *Cache) PutBlob(digest, path string) error { } // create nested deduped bucket where we store all the deduped blobs + original blob - deduped, err := bucket.CreateBucketIfNotExists([]byte(DedupedBucket)) + deduped, err := bucket.CreateBucketIfNotExists([]byte(DuplicatesBucket)) if err != nil { // this is a serious failure - c.log.Error().Err(err).Str("bucket", DedupedBucket).Msg("unable to create a bucket") + c.log.Error().Err(err).Str("bucket", DuplicatesBucket).Msg("unable to create a bucket") return err } if err := deduped.Put([]byte(path), nil); err != nil { - c.log.Error().Err(err).Str("bucket", DedupedBucket).Str("value", path).Msg("unable to put record") + c.log.Error().Err(err).Str("bucket", DuplicatesBucket).Str("value", path).Msg("unable to put record") return err } // create origin bucket and insert only the original blob - origin := bucket.Bucket([]byte(OriginBucket)) + origin := bucket.Bucket([]byte(OriginalBucket)) if origin == nil { // if the bucket doesn't exist yet then 'path' is the original blob - origin, err := bucket.CreateBucket([]byte(OriginBucket)) + origin, err := bucket.CreateBucket([]byte(OriginalBucket)) if err != nil { // this is a serious failure - c.log.Error().Err(err).Str("bucket", OriginBucket).Msg("unable to create a bucket") + c.log.Error().Err(err).Str("bucket", OriginalBucket).Msg("unable to create a bucket") return err } if err := origin.Put([]byte(path), nil); err != nil { - c.log.Error().Err(err).Str("bucket", OriginBucket).Str("value", path).Msg("unable to put record") + c.log.Error().Err(err).Str("bucket", OriginalBucket).Str("value", path).Msg("unable to put record") return err } @@ -159,7 +159,7 @@ func (c *Cache) GetBlob(digest string) (string, error) { bucket := root.Bucket([]byte(digest)) if bucket != nil { - origin := bucket.Bucket([]byte(OriginBucket)) + origin := bucket.Bucket([]byte(OriginalBucket)) blobPath.WriteString(string(c.getOne(origin))) return nil @@ -189,7 +189,7 @@ func (c *Cache) HasBlob(digest, blob string) bool { return errors.ErrCacheMiss } - origin := bucket.Bucket([]byte(OriginBucket)) + origin := bucket.Bucket([]byte(OriginalBucket)) if origin == nil { return errors.ErrCacheMiss } @@ -242,23 +242,25 @@ func (c *Cache) DeleteBlob(digest, path string) error { return errors.ErrCacheMiss } - deduped := bucket.Bucket([]byte(DedupedBucket)) + deduped := bucket.Bucket([]byte(DuplicatesBucket)) if deduped == nil { return errors.ErrCacheMiss } if err := deduped.Delete([]byte(path)); err != nil { - c.log.Error().Err(err).Str("digest", digest).Str("bucket", DedupedBucket).Str("path", path).Msg("unable to delete") + c.log.Error().Err(err).Str("digest", digest).Str("bucket", DuplicatesBucket). + Str("path", path).Msg("unable to delete") return err } - origin := bucket.Bucket([]byte(OriginBucket)) + origin := bucket.Bucket([]byte(OriginalBucket)) if origin != nil { originBlob := c.getOne(origin) if originBlob != nil { if err := origin.Delete([]byte(path)); err != nil { - c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to delete") + c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginalBucket). + Str("path", path).Msg("unable to delete") return err } @@ -267,7 +269,7 @@ func (c *Cache) DeleteBlob(digest, path string) error { dedupedBlob := c.getOne(deduped) if dedupedBlob != nil { if err := origin.Put(dedupedBlob, nil); err != nil { - c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to put") + c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginalBucket).Str("path", path).Msg("unable to put") return err } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index a6892d13..4ef21bac 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -1510,6 +1510,84 @@ func (is *ImageStoreLocal) copyBlob(repo, blobPath, dstRecord string) (int64, er return -1, zerr.ErrBlobNotFound } +// blobStream is using to serve blob range requests. +type blobStream struct { + reader io.Reader + closer io.Closer +} + +func newBlobStream(blobPath string, from, to int64) (io.ReadCloser, error) { + blobFile, err := os.Open(blobPath) + if err != nil { + return nil, err + } + + if from > 0 { + _, err = blobFile.Seek(from, io.SeekStart) + if err != nil { + return nil, err + } + } + + if from < 0 || to < from { + return nil, zerr.ErrBadRange + } + + blobstrm := blobStream{reader: blobFile, closer: blobFile} + + blobstrm.reader = io.LimitReader(blobFile, to-from+1) + + return &blobstrm, nil +} + +func (bs *blobStream) Read(buf []byte) (int, error) { + return bs.reader.Read(buf) +} + +func (bs *blobStream) Close() error { + return bs.closer.Close() +} + +// GetBlobPartial returns a partial stream to read the blob. +// blob selector instead of directly downloading the blob. +func (is *ImageStoreLocal) GetBlobPartial(repo, digest, mediaType string, from, to int64, +) (io.ReadCloser, int64, int64, error) { + var lockLatency time.Time + + parsedDigest, err := godigest.Parse(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") + + return nil, -1, -1, zerr.ErrBadBlobDigest + } + + blobPath := is.BlobPath(repo, parsedDigest) + + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + + binfo, err := os.Stat(blobPath) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") + + return nil, -1, -1, zerr.ErrBlobNotFound + } + + if to < 0 || to >= binfo.Size() { + to = binfo.Size() - 1 + } + + blobReadCloser, err := newBlobStream(blobPath, from, to) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") + + return nil, -1, -1, err + } + + // The caller function is responsible for calling Close() + return blobReadCloser, to - from + 1, binfo.Size(), nil +} + // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) { diff --git a/pkg/storage/local_test.go b/pkg/storage/local_test.go index d510a17b..006dcfdd 100644 --- a/pkg/storage/local_test.go +++ b/pkg/storage/local_test.go @@ -2077,6 +2077,52 @@ func TestPutBlobChunkStreamed(t *testing.T) { }) } +func TestPullRange(t *testing.T) { + Convey("Repo layout", t, func(c C) { + dir := t.TempDir() + + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + + Convey("Negative cases", func() { + imgStore := storage.NewImageStore(dir, true, storage.DefaultGCDelay, + true, true, log, metrics, nil) + repoName := "pull-range" + + upload, err := imgStore.NewBlobUpload(repoName) + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + content := []byte("test-data1") + buf := bytes.NewBuffer(content) + buflen := buf.Len() + bdigest := godigest.FromBytes(content) + + blob, err := imgStore.PutBlobChunk(repoName, upload, 0, int64(buflen), buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + err = imgStore.FinishBlobUpload(repoName, upload, buf, bdigest.String()) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetBlobPartial(repoName, "", "application/octet-stream", 0, 1) + So(err, ShouldNotBeNil) + + _, _, _, err = imgStore.GetBlobPartial(repoName, bdigest.String(), "application/octet-stream", 1, 0) + So(err, ShouldNotBeNil) + + _, _, _, err = imgStore.GetBlobPartial(repoName, bdigest.String(), "application/octet-stream", 1, 0) + So(err, ShouldNotBeNil) + + blobPath := path.Join(imgStore.RootDir(), repoName, "blobs", bdigest.Algorithm().String(), bdigest.Encoded()) + err = os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + _, _, _, err = imgStore.GetBlobPartial(repoName, bdigest.String(), "application/octet-stream", -1, 1) + So(err, ShouldNotBeNil) + }) + }) +} + func NewRandomImgManifest(data []byte, cdigest, ldigest godigest.Digest, cblob, lblob []byte) (*ispec.Manifest, error) { annotationsMap := make(map[string]string) diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index 2974f4b3..4241baba 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -1413,6 +1413,122 @@ func (is *ObjectStorage) copyBlob(repo string, blobPath string, dstRecord string return -1, zerr.ErrBlobNotFound } +// blobStream is using to serve blob range requests. +type blobStream struct { + reader io.Reader + closer io.Closer +} + +func NewBlobStream(readCloser io.ReadCloser, from, to int64) (io.ReadCloser, error) { + return &blobStream{reader: io.LimitReader(readCloser, to-from+1), closer: readCloser}, nil +} + +func (bs *blobStream) Read(buf []byte) (int, error) { + return bs.reader.Read(buf) +} + +func (bs *blobStream) Close() error { + return bs.closer.Close() +} + +// GetBlobPartial returns a partial stream to read the blob. +// blob selector instead of directly downloading the blob. +func (is *ObjectStorage) GetBlobPartial(repo, digest, mediaType string, from, to int64, +) (io.ReadCloser, int64, int64, error) { + var lockLatency time.Time + + dgst, err := godigest.Parse(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") + + return nil, -1, -1, zerr.ErrBadBlobDigest + } + + blobPath := is.BlobPath(repo, dgst) + + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + + binfo, err := is.store.Stat(context.Background(), blobPath) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") + + return nil, -1, -1, zerr.ErrBlobNotFound + } + + end := to + + if to < 0 || to >= binfo.Size() { + end = binfo.Size() - 1 + } + + blobHandle, err := is.store.Reader(context.Background(), blobPath, from) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") + + return nil, -1, -1, err + } + + blobReadCloser, err := NewBlobStream(blobHandle, from, end) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob stream") + + return nil, -1, -1, err + } + + // is a 'deduped' blob? + if binfo.Size() == 0 { + defer blobReadCloser.Close() + + // Check blobs in cache + dstRecord, err := is.checkCacheBlob(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found") + + return nil, -1, -1, zerr.ErrBlobNotFound + } + + binfo, err := is.store.Stat(context.Background(), dstRecord) + if err != nil { + is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob") + + // the actual blob on disk may have been removed by GC, so sync the cache + if err := is.cache.DeleteBlob(digest, dstRecord); err != nil { + is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record") + + return nil, -1, -1, err + } + + return nil, -1, -1, zerr.ErrBlobNotFound + } + + end := to + + if to < 0 || to >= binfo.Size() { + end = binfo.Size() - 1 + } + + blobHandle, err := is.store.Reader(context.Background(), dstRecord, from) + if err != nil { + is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob") + + return nil, -1, -1, err + } + + blobReadCloser, err := NewBlobStream(blobHandle, from, end) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob stream") + + return nil, -1, -1, err + } + + return blobReadCloser, end - from + 1, binfo.Size(), nil + } + + // The caller function is responsible for calling Close() + return blobReadCloser, end - from + 1, binfo.Size(), nil +} + // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) { @@ -1444,7 +1560,7 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser, return nil, -1, err } - // is a 'deduped' blob + // is a 'deduped' blob? if binfo.Size() == 0 { // Check blobs in cache dstRecord, err := is.checkCacheBlob(digest) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 8b8894db..ec18072e 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -1114,6 +1114,182 @@ func TestS3Dedupe(t *testing.T) { }) } +func TestS3PullRange(t *testing.T) { + skipIt(t) + + Convey("Test against s3 image store", t, func() { + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir := path.Join("/oci-repo-test", uuid.String()) + + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) + defer cleanupStorage(storeDriver, testDir) + + // create a blob/layer + upload, err := imgStore.NewBlobUpload("index") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + content := []byte("0123456789") + buf := bytes.NewBuffer(content) + buflen := buf.Len() + digest := godigest.FromBytes(content) + So(digest, ShouldNotBeNil) + blob, err := imgStore.PutBlobChunkStreamed("index", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + err = imgStore.FinishBlobUpload("index", upload, buf, digest.String()) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + Convey("Without Dedupe", func() { + reader, _, _, err := imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, -1) + So(err, ShouldBeNil) + rdbuf, err := io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "application/octet-stream", 0, -1) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 100) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 10) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 0) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content[0:1]) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 1) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content[0:2]) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 2, 3) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content[2:4]) + reader.Close() + }) + + Convey("With Dedupe", func() { + // create a blob/layer with same content + upload, err := imgStore.NewBlobUpload("dupindex") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + dupcontent := []byte("0123456789") + buf := bytes.NewBuffer(dupcontent) + buflen := buf.Len() + digest := godigest.FromBytes(dupcontent) + So(digest, ShouldNotBeNil) + blob, err := imgStore.PutBlobChunkStreamed("dupindex", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + err = imgStore.FinishBlobUpload("dupindex", upload, buf, digest.String()) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + reader, _, _, err := imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, -1) + So(err, ShouldBeNil) + rdbuf, err := io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "application/octet-stream", 0, -1) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 100) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 10) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 0) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content[0:1]) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 1) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content[0:2]) + reader.Close() + + reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 2, 3) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content[2:4]) + reader.Close() + + // delete original blob + err = imgStore.DeleteBlob("index", digest.String()) + So(err, ShouldBeNil) + + reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 2, 3) + So(err, ShouldBeNil) + rdbuf, err = io.ReadAll(reader) + So(err, ShouldBeNil) + So(rdbuf, ShouldResemble, content[2:4]) + reader.Close() + }) + + Convey("Negative cases", func() { + _, _, _, err := imgStore.GetBlobPartial("index", "deadBEEF", "*/*", 0, -1) + So(err, ShouldNotBeNil) + + content := []byte("invalid content") + digest := godigest.FromBytes(content) + + _, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, -1) + So(err, ShouldNotBeNil) + }) + }) +} + func TestS3ManifestImageIndex(t *testing.T) { skipIt(t) @@ -1782,6 +1958,9 @@ func TestS3DedupeErr(t *testing.T) { _, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") So(err, ShouldNotBeNil) + + _, _, _, err = imgStore.GetBlobPartial("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1) + So(err, ShouldNotBeNil) }) Convey("Test GetBlob() - error on store.Reader()", t, func(c C) { @@ -1826,6 +2005,9 @@ func TestS3DedupeErr(t *testing.T) { _, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") So(err, ShouldNotBeNil) + + _, _, _, err = imgStore.GetBlobPartial("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1) + So(err, ShouldNotBeNil) }) Convey("Test GetBlob() - error on checkCacheBlob()", t, func(c C) { @@ -1846,6 +2028,9 @@ func TestS3DedupeErr(t *testing.T) { _, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") So(err, ShouldNotBeNil) + + _, _, _, err = imgStore.GetBlobPartial("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1) + So(err, ShouldNotBeNil) }) Convey("Test DeleteBlob() - error on store.Move()", t, func(c C) { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 54af2c1d..f0d4d3ed 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -40,6 +40,7 @@ type ImageStore interface { BlobPath(repo string, digest digest.Digest) string CheckBlob(repo, digest string) (bool, int64, error) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) + GetBlobPartial(repo, digest, mediaType string, from, to int64) (io.ReadCloser, int64, int64, error) DeleteBlob(repo, digest string) error GetIndexContent(repo string) ([]byte, error) GetBlobContent(repo, digest string) ([]byte, error) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index c22e5add..e43cf5b0 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -30,13 +30,15 @@ type MockedImageStore struct { DeleteBlobUploadFn func(repo string, uuid string) error BlobPathFn func(repo string, digest digest.Digest) string CheckBlobFn func(repo string, digest string) (bool, int64, error) - GetBlobFn func(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) - DeleteBlobFn func(repo string, digest string) error - GetIndexContentFn func(repo string) ([]byte, error) - GetBlobContentFn func(repo, digest string) ([]byte, error) - GetReferrersFn func(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error) - URLForPathFn func(path string) (string, error) - RunGCRepoFn func(repo string) + GetBlobPartialFn func(repo string, digest string, mediaType string, from, to int64, + ) (io.ReadCloser, int64, int64, error) + GetBlobFn func(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) + DeleteBlobFn func(repo string, digest string) error + GetIndexContentFn func(repo string) ([]byte, error) + GetBlobContentFn func(repo, digest string) ([]byte, error) + GetReferrersFn func(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error) + URLForPathFn func(path string) (string, error) + RunGCRepoFn func(repo string) } func (is MockedImageStore) Lock(t *time.Time) { @@ -230,6 +232,15 @@ func (is MockedImageStore) CheckBlob(repo string, digest string) (bool, int64, e return true, 0, nil } +func (is MockedImageStore) GetBlobPartial(repo string, digest string, mediaType string, from, to int64, +) (io.ReadCloser, int64, int64, error) { + if is.GetBlobPartialFn != nil { + return is.GetBlobPartialFn(repo, digest, mediaType, from, to) + } + + return io.NopCloser(&io.LimitedReader{}), 0, 0, nil +} + func (is MockedImageStore) GetBlob(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) { if is.GetBlobFn != nil { return is.GetBlobFn(repo, digest, mediaType)