mirror of
https://github.com/project-zot/zot.git
synced 2024-12-16 21:56:37 -05:00
routes: support resumable pull
Some embedded clients use the "Range" header for blob pulls in order to resume downloads. Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>
This commit is contained in:
parent
7912b6a3fb
commit
90c8390c29
10 changed files with 723 additions and 27 deletions
|
@ -56,4 +56,6 @@ var (
|
|||
ErrImageLintAnnotations = errors.New("routes: lint checks failed")
|
||||
ErrParsingAuthHeader = errors.New("auth: failed parsing authorization header")
|
||||
ErrBadType = errors.New("core: invalid type")
|
||||
ErrParsingHTTPHeader = errors.New("routes: invalid HTTP header")
|
||||
ErrBadRange = errors.New("storage: bad range")
|
||||
)
|
||||
|
|
|
@ -5463,6 +5463,167 @@ func TestManifestImageIndex(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestPullRange(t *testing.T) {
|
||||
Convey("Make a new controller", t, func() {
|
||||
port := test.GetFreePort()
|
||||
baseURL := test.GetBaseURL(port)
|
||||
conf := config.New()
|
||||
conf.HTTP.Port = port
|
||||
|
||||
ctlr := api.NewController(conf)
|
||||
dir := t.TempDir()
|
||||
ctlr.Config.Storage.RootDirectory = dir
|
||||
|
||||
go startServer(ctlr)
|
||||
defer stopServer(ctlr)
|
||||
test.WaitTillServerReady(baseURL)
|
||||
|
||||
// create a blob/layer
|
||||
resp, err := resty.R().Post(baseURL + "/v2/index/blobs/uploads/")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusAccepted)
|
||||
loc := test.Location(baseURL, resp)
|
||||
So(loc, ShouldNotBeEmpty)
|
||||
|
||||
// since we are not specifying any prefix i.e provided in config while starting server,
|
||||
// so it should store index1 to global root dir
|
||||
_, err = os.Stat(path.Join(dir, "index"))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
resp, err = resty.R().Get(loc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusNoContent)
|
||||
content := []byte("0123456789")
|
||||
digest := godigest.FromBytes(content)
|
||||
So(digest, ShouldNotBeNil)
|
||||
// monolithic blob upload: success
|
||||
resp, err = resty.R().SetQueryParam("digest", digest.String()).
|
||||
SetHeader("Content-Type", "application/octet-stream").SetBody(content).Put(loc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
|
||||
blobLoc := resp.Header().Get("Location")
|
||||
So(blobLoc, ShouldNotBeEmpty)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, "0")
|
||||
So(resp.Header().Get(constants.DistContentDigestKey), ShouldNotBeEmpty)
|
||||
blobLoc = baseURL + blobLoc
|
||||
|
||||
Convey("Range is supported using 'bytes'", func() {
|
||||
resp, err = resty.R().Head(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.Header().Get("Accept-Ranges"), ShouldEqual, "bytes")
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
|
||||
})
|
||||
|
||||
Convey("Get a range of bytes", func() {
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=0-").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, fmt.Sprintf("%d", len(content)))
|
||||
So(resp.Body(), ShouldResemble, content)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=0-100").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, fmt.Sprintf("%d", len(content)))
|
||||
So(resp.Body(), ShouldResemble, content)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=0-10").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, fmt.Sprintf("%d", len(content)))
|
||||
So(resp.Body(), ShouldResemble, content)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=0-0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, "1")
|
||||
So(resp.Body(), ShouldResemble, content[0:1])
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=0-1").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, "2")
|
||||
So(resp.Body(), ShouldResemble, content[0:2])
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=2-3").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, "2")
|
||||
So(resp.Body(), ShouldResemble, content[2:4])
|
||||
})
|
||||
|
||||
Convey("Negative cases", func() {
|
||||
resp, err = resty.R().SetHeader("Range", "=0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "=a").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "=").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "byte=").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "byte=-0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "byte=0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "octet=-0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=-0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=1-0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=-1-0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=-1--0").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=1--2").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=0-a").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=a-10").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=a-b").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusRequestedRangeNotSatisfiable)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestInjectInterruptedImageManifest(t *testing.T) {
|
||||
Convey("Make a new controller", t, func() {
|
||||
port := test.GetFreePort()
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -599,10 +600,57 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
|
|||
}
|
||||
|
||||
response.Header().Set("Content-Length", fmt.Sprintf("%d", blen))
|
||||
response.Header().Set("Accept-Ranges", "bytes")
|
||||
response.Header().Set(constants.DistContentDigestKey, digest)
|
||||
response.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
/* parseRangeHeader validates the "Range" HTTP header and returns the range. */
|
||||
func parseRangeHeader(contentRange string) (int64, int64, error) {
|
||||
/* bytes=<start>- and bytes=<start>-<end> formats are supported */
|
||||
pattern := `bytes=(?P<rangeFrom>\d+)-(?P<rangeTo>\d*$)`
|
||||
|
||||
regex, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return -1, -1, zerr.ErrParsingHTTPHeader
|
||||
}
|
||||
|
||||
match := regex.FindStringSubmatch(contentRange)
|
||||
|
||||
paramsMap := make(map[string]string)
|
||||
|
||||
for i, name := range regex.SubexpNames() {
|
||||
if i > 0 && i <= len(match) {
|
||||
paramsMap[name] = match[i]
|
||||
}
|
||||
}
|
||||
|
||||
var from int64
|
||||
to := int64(-1)
|
||||
|
||||
rangeFrom := paramsMap["rangeFrom"]
|
||||
if rangeFrom == "" {
|
||||
return -1, -1, zerr.ErrParsingHTTPHeader
|
||||
}
|
||||
|
||||
if from, err = strconv.ParseInt(rangeFrom, 10, 64); err != nil {
|
||||
return -1, -1, zerr.ErrParsingHTTPHeader
|
||||
}
|
||||
|
||||
rangeTo := paramsMap["rangeTo"]
|
||||
if rangeTo != "" {
|
||||
if to, err = strconv.ParseInt(rangeTo, 10, 64); err != nil {
|
||||
return -1, -1, zerr.ErrParsingHTTPHeader
|
||||
}
|
||||
|
||||
if to < from {
|
||||
return -1, -1, zerr.ErrParsingHTTPHeader
|
||||
}
|
||||
}
|
||||
|
||||
return from, to, nil
|
||||
}
|
||||
|
||||
// GetBlob godoc
|
||||
// @Summary Get image blob/layer
|
||||
// @Description Get an image's blob/layer given a digest
|
||||
|
@ -634,7 +682,43 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
|||
|
||||
mediaType := request.Header.Get("Accept")
|
||||
|
||||
repo, blen, err := imgStore.GetBlob(name, digest, mediaType)
|
||||
var err error
|
||||
|
||||
/* content range is supported for resumbale pulls */
|
||||
partial := false
|
||||
|
||||
var from, to int64
|
||||
|
||||
contentRange := request.Header.Get("Range")
|
||||
|
||||
_, ok = request.Header["Range"]
|
||||
if ok && contentRange == "" {
|
||||
response.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if contentRange != "" {
|
||||
from, to, err = parseRangeHeader(contentRange)
|
||||
if err != nil {
|
||||
response.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
partial = true
|
||||
}
|
||||
|
||||
var repo io.ReadCloser
|
||||
|
||||
var blen, bsize int64
|
||||
|
||||
if partial {
|
||||
repo, blen, bsize, err = imgStore.GetBlobPartial(name, digest, mediaType, from, to)
|
||||
} else {
|
||||
repo, blen, err = imgStore.GetBlob(name, digest, mediaType)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, zerr.ErrBadBlobDigest) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
|
||||
WriteJSON(response,
|
||||
|
@ -658,9 +742,19 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
|||
defer repo.Close()
|
||||
|
||||
response.Header().Set("Content-Length", fmt.Sprintf("%d", blen))
|
||||
|
||||
status := http.StatusOK
|
||||
|
||||
if partial {
|
||||
status = http.StatusPartialContent
|
||||
|
||||
response.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", from, from+blen-1, bsize))
|
||||
} else {
|
||||
response.Header().Set(constants.DistContentDigestKey, digest)
|
||||
}
|
||||
|
||||
// return the blob data
|
||||
WriteDataFromReader(response, http.StatusOK, blen, mediaType, repo, rh.c.Log)
|
||||
WriteDataFromReader(response, status, blen, mediaType, repo, rh.c.Log)
|
||||
}
|
||||
|
||||
// DeleteBlob godoc
|
||||
|
|
|
@ -15,10 +15,10 @@ const (
|
|||
// global bucket.
|
||||
BlobsCache = "blobs"
|
||||
// bucket where we store all blobs from storage(deduped blobs + original blob).
|
||||
DedupedBucket = "deduped"
|
||||
DuplicatesBucket = "duplicates"
|
||||
/* bucket where we store only the original/source blob (used by s3 to know which is the blob with content)
|
||||
it should contain only one blob, this is the only place from which we'll get blobs. */
|
||||
OriginBucket = "origin"
|
||||
OriginalBucket = "original"
|
||||
DBExtensionName = ".db"
|
||||
dbCacheLockCheckTimeout = 10 * time.Second
|
||||
)
|
||||
|
@ -103,34 +103,34 @@ func (c *Cache) PutBlob(digest, path string) error {
|
|||
}
|
||||
|
||||
// create nested deduped bucket where we store all the deduped blobs + original blob
|
||||
deduped, err := bucket.CreateBucketIfNotExists([]byte(DedupedBucket))
|
||||
deduped, err := bucket.CreateBucketIfNotExists([]byte(DuplicatesBucket))
|
||||
if err != nil {
|
||||
// this is a serious failure
|
||||
c.log.Error().Err(err).Str("bucket", DedupedBucket).Msg("unable to create a bucket")
|
||||
c.log.Error().Err(err).Str("bucket", DuplicatesBucket).Msg("unable to create a bucket")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := deduped.Put([]byte(path), nil); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", DedupedBucket).Str("value", path).Msg("unable to put record")
|
||||
c.log.Error().Err(err).Str("bucket", DuplicatesBucket).Str("value", path).Msg("unable to put record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// create origin bucket and insert only the original blob
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
origin := bucket.Bucket([]byte(OriginalBucket))
|
||||
if origin == nil {
|
||||
// if the bucket doesn't exist yet then 'path' is the original blob
|
||||
origin, err := bucket.CreateBucket([]byte(OriginBucket))
|
||||
origin, err := bucket.CreateBucket([]byte(OriginalBucket))
|
||||
if err != nil {
|
||||
// this is a serious failure
|
||||
c.log.Error().Err(err).Str("bucket", OriginBucket).Msg("unable to create a bucket")
|
||||
c.log.Error().Err(err).Str("bucket", OriginalBucket).Msg("unable to create a bucket")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := origin.Put([]byte(path), nil); err != nil {
|
||||
c.log.Error().Err(err).Str("bucket", OriginBucket).Str("value", path).Msg("unable to put record")
|
||||
c.log.Error().Err(err).Str("bucket", OriginalBucket).Str("value", path).Msg("unable to put record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ func (c *Cache) GetBlob(digest string) (string, error) {
|
|||
|
||||
bucket := root.Bucket([]byte(digest))
|
||||
if bucket != nil {
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
origin := bucket.Bucket([]byte(OriginalBucket))
|
||||
blobPath.WriteString(string(c.getOne(origin)))
|
||||
|
||||
return nil
|
||||
|
@ -189,7 +189,7 @@ func (c *Cache) HasBlob(digest, blob string) bool {
|
|||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
origin := bucket.Bucket([]byte(OriginalBucket))
|
||||
if origin == nil {
|
||||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
@ -242,23 +242,25 @@ func (c *Cache) DeleteBlob(digest, path string) error {
|
|||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
deduped := bucket.Bucket([]byte(DedupedBucket))
|
||||
deduped := bucket.Bucket([]byte(DuplicatesBucket))
|
||||
if deduped == nil {
|
||||
return errors.ErrCacheMiss
|
||||
}
|
||||
|
||||
if err := deduped.Delete([]byte(path)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", DedupedBucket).Str("path", path).Msg("unable to delete")
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", DuplicatesBucket).
|
||||
Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
origin := bucket.Bucket([]byte(OriginBucket))
|
||||
origin := bucket.Bucket([]byte(OriginalBucket))
|
||||
if origin != nil {
|
||||
originBlob := c.getOne(origin)
|
||||
if originBlob != nil {
|
||||
if err := origin.Delete([]byte(path)); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to delete")
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginalBucket).
|
||||
Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -267,7 +269,7 @@ func (c *Cache) DeleteBlob(digest, path string) error {
|
|||
dedupedBlob := c.getOne(deduped)
|
||||
if dedupedBlob != nil {
|
||||
if err := origin.Put(dedupedBlob, nil); err != nil {
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to put")
|
||||
c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginalBucket).Str("path", path).Msg("unable to put")
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1510,6 +1510,84 @@ func (is *ImageStoreLocal) copyBlob(repo, blobPath, dstRecord string) (int64, er
|
|||
return -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
// blobStream is using to serve blob range requests.
|
||||
type blobStream struct {
|
||||
reader io.Reader
|
||||
closer io.Closer
|
||||
}
|
||||
|
||||
func newBlobStream(blobPath string, from, to int64) (io.ReadCloser, error) {
|
||||
blobFile, err := os.Open(blobPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if from > 0 {
|
||||
_, err = blobFile.Seek(from, io.SeekStart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if from < 0 || to < from {
|
||||
return nil, zerr.ErrBadRange
|
||||
}
|
||||
|
||||
blobstrm := blobStream{reader: blobFile, closer: blobFile}
|
||||
|
||||
blobstrm.reader = io.LimitReader(blobFile, to-from+1)
|
||||
|
||||
return &blobstrm, nil
|
||||
}
|
||||
|
||||
func (bs *blobStream) Read(buf []byte) (int, error) {
|
||||
return bs.reader.Read(buf)
|
||||
}
|
||||
|
||||
func (bs *blobStream) Close() error {
|
||||
return bs.closer.Close()
|
||||
}
|
||||
|
||||
// GetBlobPartial returns a partial stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ImageStoreLocal) GetBlobPartial(repo, digest, mediaType string, from, to int64,
|
||||
) (io.ReadCloser, int64, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
parsedDigest, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
|
||||
return nil, -1, -1, zerr.ErrBadBlobDigest
|
||||
}
|
||||
|
||||
blobPath := is.BlobPath(repo, parsedDigest)
|
||||
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
binfo, err := os.Stat(blobPath)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
||||
|
||||
return nil, -1, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
if to < 0 || to >= binfo.Size() {
|
||||
to = binfo.Size() - 1
|
||||
}
|
||||
|
||||
blobReadCloser, err := newBlobStream(blobPath, from, to)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
|
||||
|
||||
return nil, -1, -1, err
|
||||
}
|
||||
|
||||
// The caller function is responsible for calling Close()
|
||||
return blobReadCloser, to - from + 1, binfo.Size(), nil
|
||||
}
|
||||
|
||||
// GetBlob returns a stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ImageStoreLocal) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) {
|
||||
|
|
|
@ -2077,6 +2077,52 @@ func TestPutBlobChunkStreamed(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestPullRange(t *testing.T) {
|
||||
Convey("Repo layout", t, func(c C) {
|
||||
dir := t.TempDir()
|
||||
|
||||
log := log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
|
||||
Convey("Negative cases", func() {
|
||||
imgStore := storage.NewImageStore(dir, true, storage.DefaultGCDelay,
|
||||
true, true, log, metrics, nil)
|
||||
repoName := "pull-range"
|
||||
|
||||
upload, err := imgStore.NewBlobUpload(repoName)
|
||||
So(err, ShouldBeNil)
|
||||
So(upload, ShouldNotBeEmpty)
|
||||
|
||||
content := []byte("test-data1")
|
||||
buf := bytes.NewBuffer(content)
|
||||
buflen := buf.Len()
|
||||
bdigest := godigest.FromBytes(content)
|
||||
|
||||
blob, err := imgStore.PutBlobChunk(repoName, upload, 0, int64(buflen), buf)
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
err = imgStore.FinishBlobUpload(repoName, upload, buf, bdigest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial(repoName, "", "application/octet-stream", 0, 1)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial(repoName, bdigest.String(), "application/octet-stream", 1, 0)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial(repoName, bdigest.String(), "application/octet-stream", 1, 0)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
blobPath := path.Join(imgStore.RootDir(), repoName, "blobs", bdigest.Algorithm().String(), bdigest.Encoded())
|
||||
err = os.Chmod(blobPath, 0o000)
|
||||
So(err, ShouldBeNil)
|
||||
_, _, _, err = imgStore.GetBlobPartial(repoName, bdigest.String(), "application/octet-stream", -1, 1)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func NewRandomImgManifest(data []byte, cdigest, ldigest godigest.Digest, cblob, lblob []byte) (*ispec.Manifest, error) {
|
||||
annotationsMap := make(map[string]string)
|
||||
|
||||
|
|
|
@ -1413,6 +1413,122 @@ func (is *ObjectStorage) copyBlob(repo string, blobPath string, dstRecord string
|
|||
return -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
// blobStream is using to serve blob range requests.
|
||||
type blobStream struct {
|
||||
reader io.Reader
|
||||
closer io.Closer
|
||||
}
|
||||
|
||||
func NewBlobStream(readCloser io.ReadCloser, from, to int64) (io.ReadCloser, error) {
|
||||
return &blobStream{reader: io.LimitReader(readCloser, to-from+1), closer: readCloser}, nil
|
||||
}
|
||||
|
||||
func (bs *blobStream) Read(buf []byte) (int, error) {
|
||||
return bs.reader.Read(buf)
|
||||
}
|
||||
|
||||
func (bs *blobStream) Close() error {
|
||||
return bs.closer.Close()
|
||||
}
|
||||
|
||||
// GetBlobPartial returns a partial stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ObjectStorage) GetBlobPartial(repo, digest, mediaType string, from, to int64,
|
||||
) (io.ReadCloser, int64, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dgst, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
|
||||
return nil, -1, -1, zerr.ErrBadBlobDigest
|
||||
}
|
||||
|
||||
blobPath := is.BlobPath(repo, dgst)
|
||||
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
binfo, err := is.store.Stat(context.Background(), blobPath)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
||||
|
||||
return nil, -1, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
end := to
|
||||
|
||||
if to < 0 || to >= binfo.Size() {
|
||||
end = binfo.Size() - 1
|
||||
}
|
||||
|
||||
blobHandle, err := is.store.Reader(context.Background(), blobPath, from)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
|
||||
|
||||
return nil, -1, -1, err
|
||||
}
|
||||
|
||||
blobReadCloser, err := NewBlobStream(blobHandle, from, end)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob stream")
|
||||
|
||||
return nil, -1, -1, err
|
||||
}
|
||||
|
||||
// is a 'deduped' blob?
|
||||
if binfo.Size() == 0 {
|
||||
defer blobReadCloser.Close()
|
||||
|
||||
// Check blobs in cache
|
||||
dstRecord, err := is.checkCacheBlob(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found")
|
||||
|
||||
return nil, -1, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
binfo, err := is.store.Stat(context.Background(), dstRecord)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob")
|
||||
|
||||
// the actual blob on disk may have been removed by GC, so sync the cache
|
||||
if err := is.cache.DeleteBlob(digest, dstRecord); err != nil {
|
||||
is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record")
|
||||
|
||||
return nil, -1, -1, err
|
||||
}
|
||||
|
||||
return nil, -1, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
end := to
|
||||
|
||||
if to < 0 || to >= binfo.Size() {
|
||||
end = binfo.Size() - 1
|
||||
}
|
||||
|
||||
blobHandle, err := is.store.Reader(context.Background(), dstRecord, from)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob")
|
||||
|
||||
return nil, -1, -1, err
|
||||
}
|
||||
|
||||
blobReadCloser, err := NewBlobStream(blobHandle, from, end)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob stream")
|
||||
|
||||
return nil, -1, -1, err
|
||||
}
|
||||
|
||||
return blobReadCloser, end - from + 1, binfo.Size(), nil
|
||||
}
|
||||
|
||||
// The caller function is responsible for calling Close()
|
||||
return blobReadCloser, end - from + 1, binfo.Size(), nil
|
||||
}
|
||||
|
||||
// GetBlob returns a stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error) {
|
||||
|
@ -1444,7 +1560,7 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser,
|
|||
return nil, -1, err
|
||||
}
|
||||
|
||||
// is a 'deduped' blob
|
||||
// is a 'deduped' blob?
|
||||
if binfo.Size() == 0 {
|
||||
// Check blobs in cache
|
||||
dstRecord, err := is.checkCacheBlob(digest)
|
||||
|
|
|
@ -1114,6 +1114,182 @@ func TestS3Dedupe(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestS3PullRange(t *testing.T) {
|
||||
skipIt(t)
|
||||
|
||||
Convey("Test against s3 image store", t, func() {
|
||||
uuid, err := guuid.NewV4()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
testDir := path.Join("/oci-repo-test", uuid.String())
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true)
|
||||
defer cleanupStorage(storeDriver, testDir)
|
||||
|
||||
// create a blob/layer
|
||||
upload, err := imgStore.NewBlobUpload("index")
|
||||
So(err, ShouldBeNil)
|
||||
So(upload, ShouldNotBeEmpty)
|
||||
|
||||
content := []byte("0123456789")
|
||||
buf := bytes.NewBuffer(content)
|
||||
buflen := buf.Len()
|
||||
digest := godigest.FromBytes(content)
|
||||
So(digest, ShouldNotBeNil)
|
||||
blob, err := imgStore.PutBlobChunkStreamed("index", upload, buf)
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
err = imgStore.FinishBlobUpload("index", upload, buf, digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
Convey("Without Dedupe", func() {
|
||||
reader, _, _, err := imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, -1)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err := io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "application/octet-stream", 0, -1)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 100)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 10)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 0)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content[0:1])
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, 1)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content[0:2])
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 2, 3)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content[2:4])
|
||||
reader.Close()
|
||||
})
|
||||
|
||||
Convey("With Dedupe", func() {
|
||||
// create a blob/layer with same content
|
||||
upload, err := imgStore.NewBlobUpload("dupindex")
|
||||
So(err, ShouldBeNil)
|
||||
So(upload, ShouldNotBeEmpty)
|
||||
|
||||
dupcontent := []byte("0123456789")
|
||||
buf := bytes.NewBuffer(dupcontent)
|
||||
buflen := buf.Len()
|
||||
digest := godigest.FromBytes(dupcontent)
|
||||
So(digest, ShouldNotBeNil)
|
||||
blob, err := imgStore.PutBlobChunkStreamed("dupindex", upload, buf)
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
err = imgStore.FinishBlobUpload("dupindex", upload, buf, digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(blob, ShouldEqual, buflen)
|
||||
|
||||
reader, _, _, err := imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, -1)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err := io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "application/octet-stream", 0, -1)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 100)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 10)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content)
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 0)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content[0:1])
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 0, 1)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content[0:2])
|
||||
reader.Close()
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 2, 3)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content[2:4])
|
||||
reader.Close()
|
||||
|
||||
// delete original blob
|
||||
err = imgStore.DeleteBlob("index", digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
reader, _, _, err = imgStore.GetBlobPartial("dupindex", digest.String(), "*/*", 2, 3)
|
||||
So(err, ShouldBeNil)
|
||||
rdbuf, err = io.ReadAll(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(rdbuf, ShouldResemble, content[2:4])
|
||||
reader.Close()
|
||||
})
|
||||
|
||||
Convey("Negative cases", func() {
|
||||
_, _, _, err := imgStore.GetBlobPartial("index", "deadBEEF", "*/*", 0, -1)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
content := []byte("invalid content")
|
||||
digest := godigest.FromBytes(content)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial("index", digest.String(), "*/*", 0, -1)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestS3ManifestImageIndex(t *testing.T) {
|
||||
skipIt(t)
|
||||
|
||||
|
@ -1782,6 +1958,9 @@ func TestS3DedupeErr(t *testing.T) {
|
|||
|
||||
_, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test GetBlob() - error on store.Reader()", t, func(c C) {
|
||||
|
@ -1826,6 +2005,9 @@ func TestS3DedupeErr(t *testing.T) {
|
|||
|
||||
_, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test GetBlob() - error on checkCacheBlob()", t, func(c C) {
|
||||
|
@ -1846,6 +2028,9 @@ func TestS3DedupeErr(t *testing.T) {
|
|||
|
||||
_, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DeleteBlob() - error on store.Move()", t, func(c C) {
|
||||
|
|
|
@ -40,6 +40,7 @@ type ImageStore interface {
|
|||
BlobPath(repo string, digest digest.Digest) string
|
||||
CheckBlob(repo, digest string) (bool, int64, error)
|
||||
GetBlob(repo, digest, mediaType string) (io.ReadCloser, int64, error)
|
||||
GetBlobPartial(repo, digest, mediaType string, from, to int64) (io.ReadCloser, int64, int64, error)
|
||||
DeleteBlob(repo, digest string) error
|
||||
GetIndexContent(repo string) ([]byte, error)
|
||||
GetBlobContent(repo, digest string) ([]byte, error)
|
||||
|
|
|
@ -30,6 +30,8 @@ type MockedImageStore struct {
|
|||
DeleteBlobUploadFn func(repo string, uuid string) error
|
||||
BlobPathFn func(repo string, digest digest.Digest) string
|
||||
CheckBlobFn func(repo string, digest string) (bool, int64, error)
|
||||
GetBlobPartialFn func(repo string, digest string, mediaType string, from, to int64,
|
||||
) (io.ReadCloser, int64, int64, error)
|
||||
GetBlobFn func(repo string, digest string, mediaType string) (io.ReadCloser, int64, error)
|
||||
DeleteBlobFn func(repo string, digest string) error
|
||||
GetIndexContentFn func(repo string) ([]byte, error)
|
||||
|
@ -230,6 +232,15 @@ func (is MockedImageStore) CheckBlob(repo string, digest string) (bool, int64, e
|
|||
return true, 0, nil
|
||||
}
|
||||
|
||||
func (is MockedImageStore) GetBlobPartial(repo string, digest string, mediaType string, from, to int64,
|
||||
) (io.ReadCloser, int64, int64, error) {
|
||||
if is.GetBlobPartialFn != nil {
|
||||
return is.GetBlobPartialFn(repo, digest, mediaType, from, to)
|
||||
}
|
||||
|
||||
return io.NopCloser(&io.LimitedReader{}), 0, 0, nil
|
||||
}
|
||||
|
||||
func (is MockedImageStore) GetBlob(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) {
|
||||
if is.GetBlobFn != nil {
|
||||
return is.GetBlobFn(repo, digest, mediaType)
|
||||
|
|
Loading…
Reference in a new issue