0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-30 22:34:13 -05:00

Clean blob uploads when clients interrupts uploading, closes #225

This commit is contained in:
Petu Eusebiu 2021-09-27 17:32:53 +03:00 committed by Ramkumar Chinchani
parent d69ee3f562
commit 20f4051446
2 changed files with 256 additions and 0 deletions

View file

@ -391,6 +391,246 @@ func TestBasicAuth(t *testing.T) {
})
}
func TestInterruptedBlobUpload(t *testing.T) {
Convey("Successfully cleaning interrupted blob uploads", t, func() {
port := getFreePort()
baseURL := getBaseURL(port, false)
config := api.NewConfig()
config.HTTP.Port = port
c := api.NewController(config)
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
c.Config.Storage.RootDirectory = dir
go func() {
// this blocks
if err := c.Run(); err != nil {
return
}
}()
client := resty.New()
// wait till ready
for {
_, err := client.R().Get(baseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
defer func() {
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}()
blob := make([]byte, 50*1024*1024)
digest := godigest.FromBytes(blob).String()
// nolint: dupl
Convey("Test interrupt PATCH blob upload", func() {
resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 202)
loc := resp.Header().Get("Location")
splittedLoc := strings.Split(loc, "/")
sessionID := splittedLoc[len(splittedLoc)-1]
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// patch blob
go func(ctx context.Context) {
for i := 0; i < 3; i++ {
_, _ = client.R().
SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest).
SetBody(blob).
SetContext(ctx).
Patch(baseURL + loc)
time.Sleep(500 * time.Millisecond)
}
}(ctx)
// if the blob upload has started then interrupt by running cancel()
for {
n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID)
if n > 0 && err == nil {
cancel()
break
}
time.Sleep(100 * time.Millisecond)
}
// wait for zot to remove blobUpload
time.Sleep(1 * time.Second)
resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
})
Convey("Test negative interrupt PATCH blob upload", func() {
resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 202)
loc := resp.Header().Get("Location")
splittedLoc := strings.Split(loc, "/")
sessionID := splittedLoc[len(splittedLoc)-1]
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// patch blob
go func(ctx context.Context) {
for i := 0; i < 3; i++ {
_, _ = client.R().
SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest).
SetBody(blob).
SetContext(ctx).
Patch(baseURL + loc)
time.Sleep(500 * time.Millisecond)
}
}(ctx)
// if the blob upload has started then interrupt by running cancel()
for {
n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID)
if n > 0 && err == nil {
// cleaning blob uploads, so that zot fails to clean up, +code coverage
err = c.StoreController.DefaultStore.DeleteBlobUpload(AuthorizedNamespace, sessionID)
So(err, ShouldBeNil)
cancel()
break
}
time.Sleep(100 * time.Millisecond)
}
// wait for zot to remove blobUpload
time.Sleep(1 * time.Second)
resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
})
// nolint: dupl
Convey("Test interrupt PUT blob upload", func() {
resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 202)
loc := resp.Header().Get("Location")
splittedLoc := strings.Split(loc, "/")
sessionID := splittedLoc[len(splittedLoc)-1]
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// put blob
go func(ctx context.Context) {
for i := 0; i < 3; i++ {
_, _ = client.R().
SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest).
SetBody(blob).
SetContext(ctx).
Put(baseURL + loc)
time.Sleep(500 * time.Millisecond)
}
}(ctx)
// if the blob upload has started then interrupt by running cancel()
for {
n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID)
if n > 0 && err == nil {
cancel()
break
}
time.Sleep(100 * time.Millisecond)
}
// wait for zot to try to remove blobUpload
time.Sleep(1 * time.Second)
resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
})
Convey("Test negative interrupt PUT blob upload", func() {
resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 202)
loc := resp.Header().Get("Location")
splittedLoc := strings.Split(loc, "/")
sessionID := splittedLoc[len(splittedLoc)-1]
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// push blob
go func(ctx context.Context) {
for i := 0; i < 3; i++ {
_, _ = client.R().
SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest).
SetBody(blob).
SetContext(ctx).
Put(baseURL + loc)
time.Sleep(500 * time.Millisecond)
}
}(ctx)
// if the blob upload has started then interrupt by running cancel()
for {
n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID)
if n > 0 && err == nil {
// cleaning blob uploads, so that zot fails to clean up, +code coverage
err = c.StoreController.DefaultStore.DeleteBlobUpload(AuthorizedNamespace, sessionID)
So(err, ShouldBeNil)
cancel()
break
}
time.Sleep(100 * time.Millisecond)
}
// wait for zot to try to remove blobUpload
time.Sleep(1 * time.Second)
resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
})
})
}
func TestMultipleInstance(t *testing.T) {
Convey("Negative test zot multiple instance", t, func() {
port := getFreePort()

View file

@ -904,6 +904,14 @@ func (rh *RouteHandler) PatchBlobUpload(w http.ResponseWriter, r *http.Request)
if err != nil {
switch err {
case io.ErrUnexpectedEOF:
rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files")
if err = is.DeleteBlobUpload(name, sessionID); err != nil {
rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name)
}
w.WriteHeader(http.StatusInternalServerError)
case errors.ErrBadUploadRange:
WriteJSON(w, http.StatusRequestedRangeNotSatisfiable,
NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID})))
@ -1009,6 +1017,14 @@ func (rh *RouteHandler) UpdateBlobUpload(w http.ResponseWriter, r *http.Request)
_, err = is.PutBlobChunk(name, sessionID, from, to, r.Body)
if err != nil {
switch err {
case io.ErrUnexpectedEOF:
rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files")
if err = is.DeleteBlobUpload(name, sessionID); err != nil {
rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name)
}
w.WriteHeader(http.StatusInternalServerError)
case errors.ErrBadUploadRange:
WriteJSON(w, http.StatusBadRequest,
NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID})))