From 20f40514465e08391bc973e26a724a5697c96b7f Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Mon, 27 Sep 2021 17:32:53 +0300 Subject: [PATCH] Clean blob uploads when clients interrupts uploading, closes #225 --- pkg/api/controller_test.go | 240 +++++++++++++++++++++++++++++++++++++ pkg/api/routes.go | 16 +++ 2 files changed, 256 insertions(+) diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index e40f164a..44a24c9c 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -391,6 +391,246 @@ func TestBasicAuth(t *testing.T) { }) } +func TestInterruptedBlobUpload(t *testing.T) { + Convey("Successfully cleaning interrupted blob uploads", t, func() { + port := getFreePort() + baseURL := getBaseURL(port, false) + config := api.NewConfig() + config.HTTP.Port = port + + c := api.NewController(config) + dir, err := ioutil.TempDir("", "oci-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(dir) + c.Config.Storage.RootDirectory = dir + go func() { + // this blocks + if err := c.Run(); err != nil { + return + } + }() + + client := resty.New() + + // wait till ready + for { + _, err := client.R().Get(baseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + defer func() { + ctx := context.Background() + _ = c.Server.Shutdown(ctx) + }() + + blob := make([]byte, 50*1024*1024) + + digest := godigest.FromBytes(blob).String() + + // nolint: dupl + Convey("Test interrupt PATCH blob upload", func() { + resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 202) + + loc := resp.Header().Get("Location") + splittedLoc := strings.Split(loc, "/") + sessionID := splittedLoc[len(splittedLoc)-1] + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // patch blob + go func(ctx context.Context) { + for i := 0; i < 3; i++ { + _, _ = client.R(). + SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", digest). + SetBody(blob). + SetContext(ctx). + Patch(baseURL + loc) + + time.Sleep(500 * time.Millisecond) + } + }(ctx) + + // if the blob upload has started then interrupt by running cancel() + for { + n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID) + if n > 0 && err == nil { + cancel() + break + } + time.Sleep(100 * time.Millisecond) + } + + // wait for zot to remove blobUpload + time.Sleep(1 * time.Second) + + resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) + + Convey("Test negative interrupt PATCH blob upload", func() { + resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 202) + + loc := resp.Header().Get("Location") + splittedLoc := strings.Split(loc, "/") + sessionID := splittedLoc[len(splittedLoc)-1] + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // patch blob + go func(ctx context.Context) { + for i := 0; i < 3; i++ { + _, _ = client.R(). + SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", digest). + SetBody(blob). + SetContext(ctx). + Patch(baseURL + loc) + + time.Sleep(500 * time.Millisecond) + } + }(ctx) + + // if the blob upload has started then interrupt by running cancel() + for { + n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID) + if n > 0 && err == nil { + // cleaning blob uploads, so that zot fails to clean up, +code coverage + err = c.StoreController.DefaultStore.DeleteBlobUpload(AuthorizedNamespace, sessionID) + So(err, ShouldBeNil) + cancel() + break + } + time.Sleep(100 * time.Millisecond) + } + + // wait for zot to remove blobUpload + time.Sleep(1 * time.Second) + + resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) + + // nolint: dupl + Convey("Test interrupt PUT blob upload", func() { + resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 202) + + loc := resp.Header().Get("Location") + splittedLoc := strings.Split(loc, "/") + sessionID := splittedLoc[len(splittedLoc)-1] + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // put blob + go func(ctx context.Context) { + for i := 0; i < 3; i++ { + _, _ = client.R(). + SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", digest). + SetBody(blob). + SetContext(ctx). + Put(baseURL + loc) + + time.Sleep(500 * time.Millisecond) + } + }(ctx) + + // if the blob upload has started then interrupt by running cancel() + for { + n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID) + if n > 0 && err == nil { + cancel() + break + } + time.Sleep(100 * time.Millisecond) + } + + // wait for zot to try to remove blobUpload + time.Sleep(1 * time.Second) + + resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) + + Convey("Test negative interrupt PUT blob upload", func() { + resp, err := client.R().Post(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 202) + + loc := resp.Header().Get("Location") + splittedLoc := strings.Split(loc, "/") + sessionID := splittedLoc[len(splittedLoc)-1] + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // push blob + go func(ctx context.Context) { + for i := 0; i < 3; i++ { + _, _ = client.R(). + SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", digest). + SetBody(blob). + SetContext(ctx). + Put(baseURL + loc) + + time.Sleep(500 * time.Millisecond) + } + }(ctx) + + // if the blob upload has started then interrupt by running cancel() + for { + n, err := c.StoreController.DefaultStore.GetBlobUpload(AuthorizedNamespace, sessionID) + if n > 0 && err == nil { + // cleaning blob uploads, so that zot fails to clean up, +code coverage + err = c.StoreController.DefaultStore.DeleteBlobUpload(AuthorizedNamespace, sessionID) + So(err, ShouldBeNil) + cancel() + break + } + time.Sleep(100 * time.Millisecond) + } + + // wait for zot to try to remove blobUpload + time.Sleep(1 * time.Second) + + resp, err = client.R().Get(baseURL + "/v2/" + AuthorizedNamespace + "/blobs/uploads/" + sessionID) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) + }) +} + func TestMultipleInstance(t *testing.T) { Convey("Negative test zot multiple instance", t, func() { port := getFreePort() diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 4432fbd5..e596d732 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -904,6 +904,14 @@ func (rh *RouteHandler) PatchBlobUpload(w http.ResponseWriter, r *http.Request) if err != nil { switch err { + case io.ErrUnexpectedEOF: + rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files") + + if err = is.DeleteBlobUpload(name, sessionID); err != nil { + rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name) + } + + w.WriteHeader(http.StatusInternalServerError) case errors.ErrBadUploadRange: WriteJSON(w, http.StatusRequestedRangeNotSatisfiable, NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID}))) @@ -1009,6 +1017,14 @@ func (rh *RouteHandler) UpdateBlobUpload(w http.ResponseWriter, r *http.Request) _, err = is.PutBlobChunk(name, sessionID, from, to, r.Body) if err != nil { switch err { + case io.ErrUnexpectedEOF: + rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files") + + if err = is.DeleteBlobUpload(name, sessionID); err != nil { + rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name) + } + + w.WriteHeader(http.StatusInternalServerError) case errors.ErrBadUploadRange: WriteJSON(w, http.StatusBadRequest, NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID})))