From f6273e2250f791d6a75aaaa98bd084ea9c4c6c23 Mon Sep 17 00:00:00 2001 From: Royce Remer Date: Sun, 3 Nov 2024 20:49:08 -0800 Subject: [PATCH] Make LFS http_client parallel within a batch. (#32369) Signed-off-by: Royce Remer Co-authored-by: wxiaoguang --- custom/conf/app.example.ini | 8 +- go.mod | 2 +- modules/lfs/http_client.go | 143 ++++++++++++++++------------- modules/lfs/http_client_test.go | 153 ++++++++++++++------------------ modules/repository/repo.go | 9 +- modules/setting/lfs.go | 8 +- modules/setting/lfs_test.go | 13 +++ 7 files changed, 183 insertions(+), 153 deletions(-) diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index 45b094d99c..2e5ab5bbab 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -2668,9 +2668,15 @@ LEVEL = Info ;; override the minio base path if storage type is minio ;MINIO_BASE_PATH = lfs/ +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint) +;; ;[lfs_client] -;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number +;; Limit the number of pointers in each batch request to this number ;BATCH_SIZE = 20 +;; Limit the number of concurrent upload/download operations within a batch +;BATCH_OPERATION_CONCURRENCY = 3 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/go.mod b/go.mod index c5beb3f760..6481a335f0 100644 --- a/go.mod +++ b/go.mod @@ -108,6 +108,7 @@ require ( golang.org/x/image v0.23.0 golang.org/x/net v0.31.0 golang.org/x/oauth2 v0.23.0 + golang.org/x/sync v0.10.0 golang.org/x/sys v0.28.0 golang.org/x/text v0.21.0 golang.org/x/tools v0.26.0 @@ -284,7 +285,6 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/sync v0.10.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect diff --git a/modules/lfs/http_client.go b/modules/lfs/http_client.go index aa9e744d72..411c4248c4 100644 --- a/modules/lfs/http_client.go +++ b/modules/lfs/http_client.go @@ -17,6 +17,8 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/setting" + + "golang.org/x/sync/errgroup" ) // HTTPClient is used to communicate with the LFS server @@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl return c.performOperation(ctx, objects, nil, callback) } +// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error { if len(objects) == 0 { return nil @@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc return fmt.Errorf("TransferAdapter not found: %s", result.Transfer) } + errGroup, groupCtx := errgroup.WithContext(ctx) + errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency) for _, object := range result.Objects { - if object.Error != nil { - log.Trace("Error on object %v: %v", object.Pointer, object.Error) - if uc != nil { - if _, err := uc(object.Pointer, object.Error); err != nil { - return err - } - } else { - if err := dc(object.Pointer, nil, object.Error); err != nil { - return err - } - } - continue - } - - if uc != nil { - if len(object.Actions) == 0 { - log.Trace("%v already present on server", object.Pointer) - continue - } - - link, ok := object.Actions["upload"] - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'upload'") - } - - content, err := uc(object.Pointer, nil) - if err != nil { - return err - } - - err = transferAdapter.Upload(ctx, link, object.Pointer, content) - if err != nil { - return err - } - - link, ok = object.Actions["verify"] - if ok { - if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { - return err - } - } - } else { - link, ok := object.Actions["download"] - if !ok { - // no actions block in response, try legacy response schema - link, ok = object.Links["download"] - } - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'download'") - } - - content, err := transferAdapter.Download(ctx, link) - if err != nil { - return err - } - - if err := dc(object.Pointer, content, nil); err != nil { - return err - } - } + errGroup.Go(func() error { + return performSingleOperation(groupCtx, object, dc, uc, transferAdapter) + }) } + // only the first error is returned, preserving legacy behavior before concurrency + return errGroup.Wait() +} + +// performSingleOperation performs an LFS upload or download operation on a single object +func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error { + // the response from a lfs batch api request for this specific object id contained an error + if object.Error != nil { + log.Trace("Error on object %v: %v", object.Pointer, object.Error) + + // this was an 'upload' request inside the batch request + if uc != nil { + if _, err := uc(object.Pointer, object.Error); err != nil { + return err + } + } else { + // this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request + if err := dc(object.Pointer, nil, object.Error); err != nil { + return err + } + } + // if the callback returns no err, then the error could be ignored, and the operations should continue + return nil + } + + // the response from an lfs batch api request contained necessary upload/download fields to act upon + if uc != nil { + if len(object.Actions) == 0 { + log.Trace("%v already present on server", object.Pointer) + return nil + } + + link, ok := object.Actions["upload"] + if !ok { + return errors.New("missing action 'upload'") + } + + content, err := uc(object.Pointer, nil) + if err != nil { + return err + } + + err = transferAdapter.Upload(ctx, link, object.Pointer, content) + if err != nil { + return err + } + + link, ok = object.Actions["verify"] + if ok { + if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { + return err + } + } + } else { + link, ok := object.Actions["download"] + if !ok { + // no actions block in response, try legacy response schema + link, ok = object.Links["download"] + } + if !ok { + log.Debug("%+v", object) + return errors.New("missing action 'download'") + } + + content, err := transferAdapter.Download(ctx, link) + if err != nil { + return err + } + + if err := dc(object.Pointer, content, nil); err != nil { + return err + } + } return nil } diff --git a/modules/lfs/http_client_test.go b/modules/lfs/http_client_test.go index c9415e7d61..aecb4692cd 100644 --- a/modules/lfs/http_client_test.go +++ b/modules/lfs/http_client_test.go @@ -12,6 +12,8 @@ import ( "testing" "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -184,93 +186,84 @@ func TestHTTPClientDownload(t *testing.T) { cases := []struct { endpoint string - expectederror string + expectedError string }{ - // case 0 { endpoint: "https://status-not-ok.io", - expectederror: io.ErrUnexpectedEOF.Error(), + expectedError: io.ErrUnexpectedEOF.Error(), }, - // case 1 { endpoint: "https://invalid-json-response.io", - expectederror: "invalid json", + expectedError: "invalid json", }, - // case 2 { endpoint: "https://valid-batch-request-download.io", - expectederror: "", + expectedError: "", }, - // case 3 { endpoint: "https://response-no-objects.io", - expectederror: "", + expectedError: "", }, - // case 4 { endpoint: "https://unknown-transfer-adapter.io", - expectederror: "TransferAdapter not found: ", + expectedError: "TransferAdapter not found: ", }, - // case 5 { endpoint: "https://error-in-response-objects.io", - expectederror: "Object not found", + expectedError: "Object not found", }, - // case 6 { endpoint: "https://empty-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 7 { endpoint: "https://download-actions-map.io", - expectederror: "", + expectedError: "", }, - // case 8 { endpoint: "https://upload-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 9 { endpoint: "https://verify-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 10 { endpoint: "https://unknown-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 11 { endpoint: "https://legacy-batch-request-download.io", - expectederror: "", + expectedError: "", }, } - for n, c := range cases { - client := &HTTPClient{ - client: hc, - endpoint: c.endpoint, - transfers: map[string]TransferAdapter{ - "dummy": dummy, - }, - } - - err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error { - if objectError != nil { - return objectError + defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)() + for _, c := range cases { + t.Run(c.endpoint, func(t *testing.T) { + client := &HTTPClient{ + client: hc, + endpoint: c.endpoint, + transfers: map[string]TransferAdapter{ + "dummy": dummy, + }, + } + + err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error { + if objectError != nil { + return objectError + } + b, err := io.ReadAll(content) + require.NoError(t, err) + assert.Equal(t, []byte("dummy"), b) + return nil + }) + if c.expectedError != "" { + assert.ErrorContains(t, err, c.expectedError) + } else { + require.NoError(t, err) } - b, err := io.ReadAll(content) - require.NoError(t, err) - assert.Equal(t, []byte("dummy"), b) - return nil }) - if len(c.expectederror) > 0 { - assert.Contains(t, err.Error(), c.expectederror, "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror) - } else { - require.NoError(t, err, "case %d", n) - } } } @@ -297,81 +290,73 @@ func TestHTTPClientUpload(t *testing.T) { cases := []struct { endpoint string - expectederror string + expectedError string }{ - // case 0 { endpoint: "https://status-not-ok.io", - expectederror: io.ErrUnexpectedEOF.Error(), + expectedError: io.ErrUnexpectedEOF.Error(), }, - // case 1 { endpoint: "https://invalid-json-response.io", - expectederror: "invalid json", + expectedError: "invalid json", }, - // case 2 { endpoint: "https://valid-batch-request-upload.io", - expectederror: "", + expectedError: "", }, - // case 3 { endpoint: "https://response-no-objects.io", - expectederror: "", + expectedError: "", }, - // case 4 { endpoint: "https://unknown-transfer-adapter.io", - expectederror: "TransferAdapter not found: ", + expectedError: "TransferAdapter not found: ", }, - // case 5 { endpoint: "https://error-in-response-objects.io", - expectederror: "Object not found", + expectedError: "Object not found", }, - // case 6 { endpoint: "https://empty-actions-map.io", - expectederror: "", + expectedError: "", }, - // case 7 { endpoint: "https://download-actions-map.io", - expectederror: "missing action 'upload'", + expectedError: "missing action 'upload'", }, - // case 8 { endpoint: "https://upload-actions-map.io", - expectederror: "", + expectedError: "", }, - // case 9 { endpoint: "https://verify-actions-map.io", - expectederror: "missing action 'upload'", + expectedError: "missing action 'upload'", }, - // case 10 { endpoint: "https://unknown-actions-map.io", - expectederror: "missing action 'upload'", + expectedError: "missing action 'upload'", }, } - for n, c := range cases { - client := &HTTPClient{ - client: hc, - endpoint: c.endpoint, - transfers: map[string]TransferAdapter{ - "dummy": dummy, - }, - } + defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)() + for _, c := range cases { + t.Run(c.endpoint, func(t *testing.T) { + client := &HTTPClient{ + client: hc, + endpoint: c.endpoint, + transfers: map[string]TransferAdapter{ + "dummy": dummy, + }, + } - err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) { - return io.NopCloser(new(bytes.Buffer)), objectError + err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) { + return io.NopCloser(new(bytes.Buffer)), objectError + }) + if c.expectedError != "" { + assert.ErrorContains(t, err, c.expectedError) + } else { + require.NoError(t, err) + } }) - if len(c.expectederror) > 0 { - assert.Contains(t, err.Error(), c.expectederror, "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror) - } else { - require.NoError(t, err, "case %d", n) - } } } diff --git a/modules/repository/repo.go b/modules/repository/repo.go index 9ce330b218..98e7fcbc0a 100644 --- a/modules/repository/repo.go +++ b/modules/repository/repo.go @@ -182,11 +182,12 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re downloadObjects := func(pointers []lfs.Pointer) error { err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error { + if errors.Is(objectError, lfs.ErrObjectNotExist) { + log.Warn("Ignoring missing upstream LFS object %-v: %v", p, objectError) + return nil + } + if objectError != nil { - if errors.Is(objectError, lfs.ErrObjectNotExist) { - log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError) - return nil - } return objectError } diff --git a/modules/setting/lfs.go b/modules/setting/lfs.go index ebb234a3ef..c3a37513ea 100644 --- a/modules/setting/lfs.go +++ b/modules/setting/lfs.go @@ -27,7 +27,8 @@ var LFS = struct { // LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS var LFSClient = struct { - BatchSize int `ini:"BATCH_SIZE"` + BatchSize int `ini:"BATCH_SIZE"` + BatchOperationConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"` }{} func loadLFSFrom(rootCfg ConfigProvider) error { @@ -65,6 +66,11 @@ func loadLFSFrom(rootCfg ConfigProvider) error { LFSClient.BatchSize = 20 } + if LFSClient.BatchOperationConcurrency < 1 { + // match the default git-lfs's `lfs.concurrenttransfers` + LFSClient.BatchOperationConcurrency = 3 + } + LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour) if !LFS.StartServer || !InstallLock { diff --git a/modules/setting/lfs_test.go b/modules/setting/lfs_test.go index 324965781d..e546690b33 100644 --- a/modules/setting/lfs_test.go +++ b/modules/setting/lfs_test.go @@ -115,4 +115,17 @@ BATCH_SIZE = 0 assert.NoError(t, loadLFSFrom(cfg)) assert.EqualValues(t, 100, LFS.MaxBatchSize) assert.EqualValues(t, 20, LFSClient.BatchSize) + assert.EqualValues(t, 3, LFSClient.BatchOperationConcurrency) + + iniStr = ` +[lfs_client] +BATCH_SIZE = 50 +BATCH_OPERATION_CONCURRENCY = 10 +` + cfg, err = NewConfigProviderFromData(iniStr) + assert.NoError(t, err) + + assert.NoError(t, loadLFSFrom(cfg)) + assert.EqualValues(t, 50, LFSClient.BatchSize) + assert.EqualValues(t, 10, LFSClient.BatchOperationConcurrency) }