From f89925fb274172b2664192d4243479c345c5b41b Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Mon, 10 Jan 2022 18:06:12 +0200 Subject: [PATCH] sync: periodically retry if on-demand fails inline, closes #281 sync: don't return error on sync signatures, just skip them, closes #375 sync: sync signatures on demand sync on demand: in case of parallel requests pull image just once, closes #344 Signed-off-by: Petu Eusebiu --- examples/README.md | 12 +- examples/config-sync.json | 8 +- pkg/api/routes.go | 44 +- pkg/cli/root.go | 7 + pkg/cli/root_test.go | 25 +- pkg/common/common_test.go | 17 + pkg/extensions/extensions.go | 4 +- pkg/extensions/minimal.go | 2 +- pkg/extensions/sync/on_demand.go | 313 ++++++++---- pkg/extensions/sync/sync.go | 84 ++-- pkg/extensions/sync/sync_internal_test.go | 29 +- pkg/extensions/sync/sync_test.go | 561 ++++++++++++++++++++-- pkg/extensions/sync/utils.go | 91 +++- 13 files changed, 948 insertions(+), 249 deletions(-) create mode 100644 pkg/common/common_test.go diff --git a/examples/README.md b/examples/README.md index 2eb464ff..03ca0cee 100644 --- a/examples/README.md +++ b/examples/README.md @@ -386,8 +386,10 @@ Configure each registry sync: "urls": ["https://registry1:5000"], "onDemand": false, # pull any image which the local registry doesn't have "pollInterval": "6h", # polling interval, if not set then periodically polling will not run - "tlsVerify": true, # whether or not to verify tls + "tlsVerify": true, # whether or not to verify tls (default is true) "certDir": "/home/user/certs", # use certificates at certDir path, if not specified then use the default certs dir + "maxRetries": 5, # mandatory option! maxRetries in case of temporary errors + "retryDelay": "10m", # mandatory option! delay between retries, retry options are applied for both on demand and periodically sync. "content":[ # which content to periodically pull, also it's used for filtering ondemand images, if not set then periodically polling will not run { "prefix":"/repo1/repo", # pull image repo1/repo @@ -409,6 +411,8 @@ Configure each registry sync: "pollInterval": "12h", "tlsVerify": false, "onDemand": false, + "maxRetries": 5, + "retryDelay": "10m", "content":[ { "prefix":"/repo2", @@ -420,8 +424,10 @@ Configure each registry sync: }, { "urls": ["https://docker.io/library"], - "onDemand": true, # doesn't have content, don't periodically pull, pull just on demand. - "tlsVerify": true + "onDemand": true, # doesn't have content, don't periodically pull, pull just on demand. + "tlsVerify": true, + "maxRetries": 3, + "retryDelay": "15m" } ] } diff --git a/examples/config-sync.json b/examples/config-sync.json index 0db65afa..cdc6360e 100644 --- a/examples/config-sync.json +++ b/examples/config-sync.json @@ -19,6 +19,8 @@ "pollInterval": "6h", "tlsVerify": true, "certDir": "/home/user/certs", + "maxRetries": 3, + "retryDelay": "5m", "content":[ { "prefix":"/repo1/repo", @@ -37,6 +39,8 @@ "pollInterval": "12h", "tlsVerify": false, "onDemand": false, + "maxRetries": 5, + "retryDelay": "10m", "content":[ { "prefix":"/repo2", @@ -49,7 +53,9 @@ { "urls": ["https://docker.io/library"], "onDemand": true, - "tlsVerify": true + "tlsVerify": true, + "maxRetries": 6, + "retryDelay": "5m" } ] } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 1c148a91..6176a9c9 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1324,27 +1324,13 @@ func getImageManifest(routeHandler *RouteHandler, imgStore storage.ImageStore, n reference string) ([]byte, string, string, error) { content, digest, mediaType, err := imgStore.GetImageManifest(name, reference) if err != nil { - if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain + if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrManifestNotFound) { if routeHandler.c.Config.Extensions != nil && routeHandler.c.Config.Extensions.Sync != nil { routeHandler.c.Log.Info().Msgf("image not found, trying to get image %s:%s by syncing on demand", name, reference) errSync := ext.SyncOneImage(routeHandler.c.Config, routeHandler.c.StoreController, - name, reference, routeHandler.c.Log) - if errSync != nil { - routeHandler.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", - name, reference) - } else { - content, digest, mediaType, err = imgStore.GetImageManifest(name, reference) - } - } - } else if errors.Is(err, zerr.ErrManifestNotFound) { - if routeHandler.c.Config.Extensions != nil && routeHandler.c.Config.Extensions.Sync != nil { - routeHandler.c.Log.Info().Msgf("manifest not found, trying to get image %s:%s by syncing on demand", - name, reference) - - errSync := ext.SyncOneImage(routeHandler.c.Config, routeHandler.c.StoreController, - name, reference, routeHandler.c.Log) + name, reference, false, routeHandler.c.Log) if errSync != nil { routeHandler.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", name, reference) @@ -1360,6 +1346,30 @@ func getImageManifest(routeHandler *RouteHandler, imgStore storage.ImageStore, n return content, digest, mediaType, err } +// will sync referrers on demand if they are not found, in case sync extensions is enabled. +func getReferrers(routeHandler *RouteHandler, imgStore storage.ImageStore, name, digest, + artifactType string) ([]artifactspec.Descriptor, error) { + refs, err := imgStore.GetReferrers(name, digest, artifactType) + if err != nil { + if routeHandler.c.Config.Extensions != nil && routeHandler.c.Config.Extensions.Sync != nil { + routeHandler.c.Log.Info().Msgf("signature not found, trying to get signature %s:%s by syncing on demand", + name, digest) + + errSync := ext.SyncOneImage(routeHandler.c.Config, routeHandler.c.StoreController, + name, digest, true, routeHandler.c.Log) + if errSync != nil { + routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest).Msg("unable to get references") + + return []artifactspec.Descriptor{}, err + } + + refs, err = imgStore.GetReferrers(name, digest, artifactType) + } + } + + return refs, err +} + type ReferenceList struct { References []artifactspec.Descriptor `json:"references"` } @@ -1414,7 +1424,7 @@ func (rh *RouteHandler) GetReferrers(response http.ResponseWriter, request *http rh.c.Log.Info().Str("digest", digest).Str("artifactType", artifactType).Msg("getting manifest") - refs, err := imgStore.GetReferrers(name, digest, artifactType) + refs, err := getReferrers(rh, imgStore, name, digest, artifactType) if err != nil { rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest).Msg("unable to get references") response.WriteHeader(http.StatusBadRequest) diff --git a/pkg/cli/root.go b/pkg/cli/root.go index 0579605d..223a6008 100644 --- a/pkg/cli/root.go +++ b/pkg/cli/root.go @@ -245,6 +245,13 @@ func validateConfiguration(config *config.Config) { // check glob patterns in sync config are compilable if config.Extensions != nil && config.Extensions.Sync != nil { for _, regCfg := range config.Extensions.Sync.Registries { + // check retry options are configured for sync + if regCfg.MaxRetries == nil || regCfg.RetryDelay == nil { + log.Error().Err(errors.ErrBadConfig).Msg("extensions.sync.registries[].MaxRetries" + + "and extensions.sync.registries[].RetryDelay fields are mandatory") + panic(errors.ErrBadConfig) + } + if regCfg.Content != nil { for _, content := range regCfg.Content { ok := glob.ValidatePattern(content.Prefix) diff --git a/pkg/cli/root_test.go b/pkg/cli/root_test.go index a7f0e79a..a8f93e85 100644 --- a/pkg/cli/root_test.go +++ b/pkg/cli/root_test.go @@ -179,7 +179,8 @@ func TestVerify(t *testing.T) { content := []byte(`{"storage":{"rootDirectory":"/tmp/zot", "storageDriver": {"name": "s3"}}, "http":{"address":"127.0.0.1","port":"8080","realm":"zot", "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, - "extensions":{"sync": {"registries": [{"urls":["localhost:9999"]}]}}}`) + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "maxRetries": 1, "retryDelay": "10s"}]}}}`) _, err = tmpfile.Write(content) So(err, ShouldBeNil) err = tmpfile.Close() @@ -195,7 +196,8 @@ func TestVerify(t *testing.T) { content := []byte(`{"storage":{"rootDirectory":"/tmp/zot"}, "http":{"address":"127.0.0.1","port":"8080","realm":"zot", "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, - "extensions":{"sync": {"registries": [{"urls":["localhost:9999"]}]}}}`) + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "maxRetries": 1, "retryDelay": "10s"}]}}}`) _, err = tmpfile.Write(content) So(err, ShouldBeNil) err = tmpfile.Close() @@ -212,6 +214,7 @@ func TestVerify(t *testing.T) { "http":{"address":"127.0.0.1","port":"8080","realm":"zot", "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "maxRetries": 1, "retryDelay": "10s", "content": [{"prefix":"[repo%^&"}]}]}}}`) _, err = tmpfile.Write(content) So(err, ShouldBeNil) @@ -245,6 +248,7 @@ func TestVerify(t *testing.T) { "http":{"address":"127.0.0.1","port":"8080","realm":"zot", "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "maxRetries": 1, "retryDelay": "10s", "content": [{"prefix":"repo**"}]}]}}}`) _, err = tmpfile.Write(content) So(err, ShouldBeNil) @@ -255,6 +259,23 @@ func TestVerify(t *testing.T) { So(err, ShouldBeNil) }) + Convey("Test verify sync without retry options", t, func(c C) { + tmpfile, err := ioutil.TempFile("", "zot-test*.json") + So(err, ShouldBeNil) + defer os.Remove(tmpfile.Name()) // clean up + content := []byte(`{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "content": [{"prefix":"repo**"}]}]}}}`) + _, err = tmpfile.Write(content) + So(err, ShouldBeNil) + err = tmpfile.Close() + So(err, ShouldBeNil) + os.Args = []string{"cli_test", "verify", tmpfile.Name()} + So(func() { _ = cli.NewServerRootCmd().Execute() }, ShouldPanic) + }) + Convey("Test verify good config", t, func(c C) { tmpfile, err := ioutil.TempFile("", "zot-test*.json") So(err, ShouldBeNil) diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go new file mode 100644 index 00000000..fe3be53f --- /dev/null +++ b/pkg/common/common_test.go @@ -0,0 +1,17 @@ +package common_test + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "zotregistry.io/zot/pkg/common" +) + +func TestCommon(t *testing.T) { + Convey("test Contains()", t, func() { + first := []string{"apple", "biscuit"} + So(common.Contains(first, "apple"), ShouldBeTrue) + So(common.Contains(first, "peach"), ShouldBeFalse) + So(common.Contains([]string{}, "apple"), ShouldBeFalse) + }) +} diff --git a/pkg/extensions/extensions.go b/pkg/extensions/extensions.go index 49e9d970..599d85e3 100644 --- a/pkg/extensions/extensions.go +++ b/pkg/extensions/extensions.go @@ -108,10 +108,10 @@ func SetupRoutes(config *config.Config, router *mux.Router, storeController stor // SyncOneImage syncs one image. func SyncOneImage(config *config.Config, storeController storage.StoreController, - repoName, reference string, log log.Logger) error { + repoName, reference string, isArtifact bool, log log.Logger) error { log.Info().Msgf("syncing image %s:%s", repoName, reference) - err := sync.OneImage(*config.Extensions.Sync, storeController, repoName, reference, log) + err := sync.OneImage(*config.Extensions.Sync, storeController, repoName, reference, isArtifact, log) return err } diff --git a/pkg/extensions/minimal.go b/pkg/extensions/minimal.go index 982d2b12..ea94d159 100644 --- a/pkg/extensions/minimal.go +++ b/pkg/extensions/minimal.go @@ -39,7 +39,7 @@ func SetupRoutes(conf *config.Config, router *mux.Router, storeController storag // SyncOneImage ... func SyncOneImage(config *config.Config, storeController storage.StoreController, - repoName, reference string, log log.Logger) error { + repoName, reference string, isArtifact bool, log log.Logger) error { log.Warn().Msg("skipping syncing on demand because given zot binary doesn't support any extensions," + "please build zot full binary for this feature") diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 971491a8..a4e424ac 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -3,29 +3,79 @@ package sync import ( "context" "fmt" + "net/url" "os" - "path" + "sync" + "time" "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/copy" - "github.com/containers/image/v5/docker" - "github.com/containers/image/v5/docker/reference" - "github.com/containers/image/v5/oci/layout" - guuid "github.com/gofrs/uuid" + "gopkg.in/resty.v1" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" ) -func OneImage(cfg Config, storeController storage.StoreController, - repo, tag string, log log.Logger) error { - var credentialsFile CredentialsFile +// nolint: gochecknoglobals +var demandedImgs demandedImages - /* don't copy cosign signature, containers/image doesn't support it - we will copy it manually later */ - if isCosignTag(tag) { +type demandedImages struct { + syncedMap sync.Map +} + +func (di *demandedImages) loadOrStoreChan(key string, value chan error) (chan error, bool) { + val, found := di.syncedMap.LoadOrStore(key, value) + errChannel, _ := val.(chan error) + + return errChannel, found +} + +func (di *demandedImages) loadOrStoreStr(key string, value string) (string, bool) { + val, found := di.syncedMap.LoadOrStore(key, value) + str, _ := val.(string) + + return str, found +} + +func (di *demandedImages) delete(key string) { + di.syncedMap.Delete(key) +} + +func OneImage(cfg Config, storeController storage.StoreController, + repo, tag string, isArtifact bool, log log.Logger) error { + // guard against multiple parallel requests + demandedImage := fmt.Sprintf("%s:%s", repo, tag) + // loadOrStore image-based channel + imageChannel, found := demandedImgs.loadOrStoreChan(demandedImage, make(chan error)) + // if value found wait on channel receive or close + if found { + log.Info().Msgf("image %s already demanded by another client, waiting on imageChannel", demandedImage) + + err, ok := <-imageChannel + // if channel closed exit + if !ok { + return nil + } + + return err + } + + defer demandedImgs.delete(demandedImage) + defer close(imageChannel) + + go syncOneImage(imageChannel, cfg, storeController, repo, tag, isArtifact, log) + + err, ok := <-imageChannel + if !ok { return nil } + return err +} + +func syncOneImage(imageChannel chan error, cfg Config, storeController storage.StoreController, + repo, tag string, isArtifact bool, log log.Logger) { + var credentialsFile CredentialsFile + if cfg.CredentialsFile != "" { var err error @@ -33,24 +83,23 @@ func OneImage(cfg Config, storeController storage.StoreController, if err != nil { log.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile) - return err + imageChannel <- err + + return } } + var copyErr error + localCtx, policyCtx, err := getLocalContexts(log) if err != nil { - return err + imageChannel <- err + + return } imageStore := storeController.GetImageStore(repo) - var copyErr error - - uuid, err := guuid.NewV4() - if err != nil { - return err - } - for _, registryCfg := range cfg.Registries { regCfg := registryCfg if !regCfg.OnDemand { @@ -70,101 +119,177 @@ func OneImage(cfg Config, storeController storage.StoreController, } } - registryConfig := regCfg - log.Info().Msgf("syncing on demand with %v", registryConfig.URLs) + retryOptions := &retry.RetryOptions{} + + if regCfg.MaxRetries != nil { + retryOptions.MaxRetry = *regCfg.MaxRetries + if regCfg.RetryDelay != nil { + retryOptions.Delay = *regCfg.RetryDelay + } + } + + log.Info().Msgf("syncing on demand with %v", regCfg.URLs) + + for _, regCfgURL := range regCfg.URLs { + upstreamURL := regCfgURL - for _, upstreamURL := range regCfg.URLs { - regCfgURL := upstreamURL upstreamAddr := StripRegistryTransport(upstreamURL) - upstreamCtx := getUpstreamContext(®istryConfig, credentialsFile[upstreamAddr]) - upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamAddr, repo)) + httpClient, err := getHTTPClient(®Cfg, upstreamURL, credentialsFile[upstreamAddr], log) if err != nil { - log.Error().Err(err).Msgf("error parsing repository reference %s/%s", upstreamAddr, repo) + imageChannel <- err - return err + return } - upstreamTaggedRef, err := reference.WithTag(upstreamRepoRef, tag) - if err != nil { - log.Error().Err(err).Msgf("error creating a reference for repository %s and tag %q", - upstreamRepoRef.Name(), tag) + // demanded 'image' is a signature + if isCosignTag(tag) || isArtifact { + // at tis point we should already have images synced, but not their signatures. + regURL, err := url.Parse(upstreamURL) + if err != nil { + log.Error().Err(err).Msgf("couldn't parse registry URL: %s", upstreamURL) - return err + imageChannel <- err + + return + } + + // is notary signature + if isArtifact { + err = syncNotarySignature(httpClient, storeController, *regURL, repo, tag, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, repo, tag) + + continue + } + + imageChannel <- nil + + return + } + // is cosign signature + err = syncCosignSignature(httpClient, storeController, *regURL, repo, tag, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, repo, tag) + + continue + } + + imageChannel <- nil + + return } - upstreamRef, err := docker.NewReference(upstreamTaggedRef) - if err != nil { - log.Error().Err(err).Msgf("error creating docker reference for repository %s and tag %q", - upstreamRepoRef.Name(), tag) - - return err - } - - localRepo := path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid.String(), repo) - - if err = os.MkdirAll(localRepo, storage.DefaultDirPerms); err != nil { - log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir") - - return err - } - - defer os.RemoveAll(path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid.String())) - - localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag) - - localRef, err := layout.ParseReference(localTaggedRepo) - if err != nil { - log.Error().Err(err).Msgf("cannot obtain a valid image reference for reference %q", localRepo) - - return err - } - - log.Info().Msgf("copying image %s:%s to %s", upstreamTaggedRef.Name(), - upstreamTaggedRef.Tag(), localRepo) - + // it's an image + upstreamCtx := getUpstreamContext(®Cfg, credentialsFile[upstreamAddr]) options := getCopyOptions(upstreamCtx, localCtx) - retryOptions := &retry.RetryOptions{ - MaxRetry: maxRetries, + upstreamImageRef, err := getImageRef(upstreamAddr, repo, tag) + if err != nil { + log.Error().Err(err).Msgf("error creating docker reference for repository %s/%s:%s", + upstreamAddr, repo, tag) + + imageChannel <- err + + return } - copyErr = retry.RetryIfNecessary(context.Background(), func() error { - _, copyErr = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options) + localCachePath, err := getLocalCachePath(imageStore, repo) + if err != nil { + log.Error().Err(err).Str("dir", localCachePath).Msg("couldn't create temporary dir") - return copyErr - }, retryOptions) + imageChannel <- err + + return + } + + localImageRef, err := getLocalImageRef(localCachePath, repo, tag) + if err != nil { + log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s", + localCachePath, repo, tag) + + imageChannel <- err + + return + } + + log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath) + + demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, repo, tag) + + _, copyErr = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options) if copyErr != nil { - log.Error().Err(copyErr).Msgf("error while copying image %s to %s", - upstreamRef.DockerReference().Name(), localTaggedRepo) + log.Error().Err(err).Msgf("error encountered while syncing on demand %s to %s", + upstreamImageRef.DockerReference(), localCachePath) + + _, found := demandedImgs.loadOrStoreStr(demandedImageRef, "") + if found || retryOptions.MaxRetry == 0 { + defer os.RemoveAll(localCachePath) + log.Info().Msgf("image %s already demanded in background or sync.registries[].MaxRetries == 0", demandedImageRef) + /* we already have a go routine spawned for this image + or retryOptions is not configured */ + continue + } + + // spawn goroutine to later pull the image + go func() { + // remove image after syncing + defer func() { + _ = os.RemoveAll(localCachePath) + + demandedImgs.delete(demandedImageRef) + log.Info().Msgf("sync routine: %s exited", demandedImageRef) + }() + + log.Info().Msgf("sync routine: starting routine to copy image %s, cause err: %v", + demandedImageRef, copyErr) + time.Sleep(retryOptions.Delay) + + if err = retry.RetryIfNecessary(context.Background(), func() error { + _, err := copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options) + + return err + }, retryOptions); err != nil { + log.Error().Err(err).Msgf("sync routine: error while copying image %s to %s", + demandedImageRef, localCachePath) + } else { + _ = finishSyncing(repo, tag, localCachePath, upstreamURL, storeController, retryOptions, httpClient, log) + } + }() } else { - err := pushSyncedLocalImage(repo, tag, uuid.String(), storeController, log) - if err != nil { - log.Error().Err(err).Msgf("error while pushing synced cached image %s", - localTaggedRepo) + err := finishSyncing(repo, tag, localCachePath, upstreamURL, storeController, retryOptions, httpClient, log) - return err - } + imageChannel <- err - log.Info().Msgf("successfully synced %s", upstreamRef.DockerReference().Name()) - - httpClient, err := getHTTPClient(®Cfg, upstreamURL, credentialsFile[upstreamAddr], log) - if err != nil { - return err - } - - if copyErr = retry.RetryIfNecessary(context.Background(), func() error { - copyErr = syncSignatures(httpClient, storeController, regCfgURL, repo, tag, log) - - return copyErr - }, retryOptions); copyErr != nil { - log.Error().Err(err).Msgf("Couldn't copy image signature %s", upstreamRef.DockerReference().Name()) - } - - return nil + return } } } - return copyErr + imageChannel <- err +} + +// push the local image into the storage, sync signatures. +func finishSyncing(repo, tag, localCachePath, upstreamURL string, + storeController storage.StoreController, retryOptions *retry.RetryOptions, + httpClient *resty.Client, log log.Logger) error { + err := pushSyncedLocalImage(repo, tag, localCachePath, storeController, log) + if err != nil { + log.Error().Err(err).Msgf("error while pushing synced cached image %s", + fmt.Sprintf("%s/%s:%s", localCachePath, repo, tag)) + + return err + } + + if err = retry.RetryIfNecessary(context.Background(), func() error { + err = syncSignatures(httpClient, storeController, upstreamURL, repo, tag, log) + + return err + }, retryOptions); err != nil { + log.Error().Err(err).Msgf("couldn't copy image signature for %s/%s:%s", upstreamURL, repo, tag) + } + + log.Info().Msgf("successfully synced %s/%s:%s", upstreamURL, repo, tag) + + return nil } diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 801954af..6f2e8025 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -6,9 +6,7 @@ import ( "fmt" "io" "os" - "path" "regexp" - "strings" goSync "sync" "time" @@ -17,10 +15,8 @@ import ( "github.com/containers/image/v5/copy" "github.com/containers/image/v5/docker" "github.com/containers/image/v5/docker/reference" - "github.com/containers/image/v5/oci/layout" "github.com/containers/image/v5/signature" "github.com/containers/image/v5/types" - guuid "github.com/gofrs/uuid" ispec "github.com/opencontainers/image-spec/specs-go/v1" "gopkg.in/resty.v1" "zotregistry.io/zot/errors" @@ -29,8 +25,6 @@ import ( ) const ( - maxRetries = 3 - delay = 5 * time.Minute SyncBlobUploadDir = ".sync" ) @@ -59,6 +53,8 @@ type RegistryConfig struct { TLSVerify *bool OnDemand bool CertDir string + MaxRetries *int + RetryDelay *time.Duration } type Content struct { @@ -238,6 +234,7 @@ func imagesToCopyFromUpstream(registryName string, repos []string, upstreamCtx * } log.Debug().Msgf("remaining upstream refs to be copied: %v", upstreamReferences) + filterImagesBySemver(&upstreamReferences, content, log) log.Debug().Msgf("remaining upstream refs to be copied: %v", upstreamReferences) @@ -280,8 +277,7 @@ func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types. } func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController storage.StoreController, - localCtx *types.SystemContext, policyCtx *signature.PolicyContext, credentials Credentials, - uuid string, log log.Logger) error { + localCtx *types.SystemContext, policyCtx *signature.PolicyContext, credentials Credentials, log log.Logger) error { log.Info().Msgf("syncing registry: %s", upstreamURL) var err error @@ -291,9 +287,13 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto upstreamCtx := getUpstreamContext(®Cfg, credentials) options := getCopyOptions(upstreamCtx, localCtx) - retryOptions := &retry.RetryOptions{ - MaxRetry: maxRetries, - Delay: delay, + retryOptions := &retry.RetryOptions{} + + if regCfg.MaxRetries != nil { + retryOptions.MaxRetry = *regCfg.MaxRetries + if regCfg.RetryDelay != nil { + retryOptions.Delay = *regCfg.RetryDelay + } } var catalog catalog @@ -346,66 +346,57 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto } for _, ref := range images { - upstreamRef := ref + upstreamImageRef := ref - imageName := strings.Replace(upstreamRef.DockerReference().Name(), upstreamAddr, "", 1) - imageName = strings.TrimPrefix(imageName, "/") + repo := getRepoFromRef(upstreamImageRef, upstreamAddr) + tag := getTagFromRef(upstreamImageRef, log).Tag() - imageStore := storeController.GetImageStore(imageName) + imageStore := storeController.GetImageStore(repo) - localRepo := path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid, imageName) - - if err = os.MkdirAll(localRepo, storage.DefaultDirPerms); err != nil { - log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir") - - return err - } - - defer os.RemoveAll(path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid)) - - upstreamTaggedRef := getTagFromRef(upstreamRef, log) - - localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, upstreamTaggedRef.Tag()) - - localRef, err := layout.ParseReference(localTaggedRepo) + localCachePath, err := getLocalCachePath(imageStore, repo) if err != nil { - log.Error().Err(err).Msgf("Cannot obtain a valid image reference for reference %q", localTaggedRepo) + log.Error().Err(err).Str("dir", localCachePath).Msg("couldn't create temporary dir") return err } - log.Info().Msgf("copying image %s:%s to %s", upstreamRef.DockerReference().Name(), - upstreamTaggedRef.Tag(), localTaggedRepo) + defer os.RemoveAll(localCachePath) + + localImageRef, err := getLocalImageRef(localCachePath, repo, tag) + if err != nil { + log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s", + localCachePath, repo, tag) + + return err + } + + log.Info().Msgf("copying image %s:%s to %s", upstreamImageRef.DockerReference(), tag, localCachePath) if err = retry.RetryIfNecessary(context.Background(), func() error { - _, err = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options) + _, err = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options) return err }, retryOptions); err != nil { log.Error().Err(err).Msgf("error while copying image %s:%s to %s", - upstreamRef.DockerReference().Name(), upstreamTaggedRef.Tag(), localTaggedRepo) + upstreamImageRef.DockerReference(), tag, localCachePath) return err } - log.Info().Msgf("successfully synced %s:%s", upstreamRef.DockerReference().Name(), upstreamTaggedRef.Tag()) - - err = pushSyncedLocalImage(imageName, upstreamTaggedRef.Tag(), uuid, storeController, log) + err = pushSyncedLocalImage(repo, tag, localCachePath, storeController, log) if err != nil { log.Error().Err(err).Msgf("error while pushing synced cached image %s", - localTaggedRepo) + fmt.Sprintf("%s/%s:%s", localCachePath, repo, tag)) return err } if err = retry.RetryIfNecessary(context.Background(), func() error { - err = syncSignatures(httpClient, storeController, upstreamURL, imageName, upstreamTaggedRef.Tag(), log) + err = syncSignatures(httpClient, storeController, upstreamURL, repo, tag, log) return err }, retryOptions); err != nil { - log.Error().Err(err).Msgf("Couldn't copy image signature %s", upstreamRef.DockerReference().Name()) - - return err + log.Error().Err(err).Msgf("couldn't copy image signature %s:%s", upstreamImageRef.DockerReference(), tag) } } @@ -457,11 +448,6 @@ func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.Wait return err } - uuid, err := guuid.NewV4() - if err != nil { - return err - } - // for each upstream registry, start a go routine. for _, regCfg := range cfg.Registries { // if content not provided, don't run periodically sync @@ -494,7 +480,7 @@ func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.Wait upstreamAddr := StripRegistryTransport(upstreamURL) // first try syncing main registry if err := syncRegistry(regCfg, upstreamURL, storeController, localCtx, policyCtx, - credentialsFile[upstreamAddr], uuid.String(), logger); err != nil { + credentialsFile[upstreamAddr], logger); err != nil { logger.Error().Err(err).Str("registry", upstreamURL). Msg("sync exited with error, falling back to auxiliary registries") } else { diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index b321d221..06c44a02 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -209,13 +209,13 @@ func TestSyncInternal(t *testing.T) { So(err, ShouldNotBeNil) }) - Convey("Test OneImage() skips cosign signatures", t, func() { - err := OneImage(Config{}, storage.StoreController{}, "repo", "sha256-.sig", log.NewLogger("", "")) - So(err, ShouldBeNil) - }) + // Convey("Test OneImage() skips cosign signatures", t, func() { + // err := OneImage(Config{}, storage.StoreController{}, "repo", "sha256-.sig", log.NewLogger("", "")) + // So(err, ShouldBeNil) + // }) Convey("Test syncSignatures()", t, func() { - log := log.NewLogger("", "") + log := log.NewLogger("debug", "") err := syncSignatures(resty.New(), storage.StoreController{}, "%", "repo", "tag", log) So(err, ShouldNotBeNil) err = syncSignatures(resty.New(), storage.StoreController{}, "http://zot", "repo", "tag", log) @@ -273,12 +273,12 @@ func TestSyncInternal(t *testing.T) { storeController := storage.StoreController{} storeController.DefaultStore = imageStore - err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) - So(err, ShouldNotBeNil) - testRootDir := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir) // testImagePath := path.Join(testRootDir, testImage) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + So(err, ShouldNotBeNil) + err = os.MkdirAll(testRootDir, 0o755) if err != nil { panic(err) @@ -305,9 +305,8 @@ func TestSyncInternal(t *testing.T) { if os.Geteuid() != 0 { So(func() { - _ = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) - }, - ShouldPanic) + _ = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + }, ShouldPanic) } if err := os.Chmod(storageDir, 0o755); err != nil { @@ -319,7 +318,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) So(err, ShouldNotBeNil) if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256", @@ -333,7 +332,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) So(err, ShouldNotBeNil) if err := os.Chmod(cachedManifestConfigPath, 0o755); err != nil { @@ -345,7 +344,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) So(err, ShouldNotBeNil) if err := os.Remove(manifestConfigPath); err != nil { @@ -359,7 +358,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) So(err, ShouldNotBeNil) }) } diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 8d46c4d9..7b17e2a1 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -18,6 +18,7 @@ import ( "path" "reflect" "strings" + goSync "sync" "testing" "time" @@ -49,6 +50,8 @@ const ( testImage = "zot-test" testImageTag = "0.0.1" testCveImage = "zot-cve-test" + + testSignedImage = "signed-repo" ) var ( @@ -389,6 +392,9 @@ func TestPeriodically(t *testing.T) { semver := true var tlsVerify bool + maxRetries := 1 + delay := 1 * time.Second + syncRegistryConfig := sync.RegistryConfig{ Content: []sync.Content{ { @@ -403,6 +409,8 @@ func TestPeriodically(t *testing.T) { PollInterval: updateDuration, TLSVerify: &tlsVerify, CertDir: "", + MaxRetries: &maxRetries, + RetryDelay: &delay, } syncConfig := &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} @@ -757,15 +765,15 @@ func TestBasicAuth(t *testing.T) { Convey("Verify sync basic auth", t, func() { updateDuration, _ := time.ParseDuration("1h") - sctlr, srcBaseURL, srcDir, htpasswdPath, srcClient := startUpstreamServer(false, true) - defer os.Remove(htpasswdPath) - defer os.RemoveAll(srcDir) - - defer func() { - sctlr.Shutdown() - }() - Convey("Verify sync basic auth with file credentials", func() { + sctlr, srcBaseURL, srcDir, htpasswdPath, srcClient := startUpstreamServer(false, true) + defer os.Remove(htpasswdPath) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + registryName := sync.StripRegistryTransport(srcBaseURL) credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`, registryName)) @@ -829,6 +837,14 @@ func TestBasicAuth(t *testing.T) { }) Convey("Verify sync basic auth with wrong file credentials", func() { + sctlr, srcBaseURL, srcDir, htpasswdPath, _ := startUpstreamServer(false, true) + defer os.Remove(htpasswdPath) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + destPort := test.GetFreePort() destBaseURL := test.GetBaseURL(destPort) @@ -916,6 +932,14 @@ func TestBasicAuth(t *testing.T) { }) Convey("Verify sync basic auth with bad file credentials", func() { + sctlr, srcBaseURL, srcDir, htpasswdPath, _ := startUpstreamServer(false, true) + defer os.Remove(htpasswdPath) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + registryName := sync.StripRegistryTransport(srcBaseURL) credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`, @@ -969,6 +993,14 @@ func TestBasicAuth(t *testing.T) { }) Convey("Verify on demand sync with basic auth", func() { + sctlr, srcBaseURL, srcDir, htpasswdPath, srcClient := startUpstreamServer(false, true) + defer os.Remove(htpasswdPath) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + registryName := sync.StripRegistryTransport(srcBaseURL) credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`, registryName)) @@ -1076,7 +1108,7 @@ func TestBadURL(t *testing.T) { }, }, }, - URLs: []string{"bad-registry-url"}, + URLs: []string{"bad-registry-url]"}, PollInterval: updateDuration, TLSVerify: &tlsVerify, CertDir: "", @@ -1088,13 +1120,13 @@ func TestBadURL(t *testing.T) { dctlr, destBaseURL, destDir, destClient := startDownstreamServer(false, syncConfig) defer os.RemoveAll(destDir) - resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) - So(err, ShouldBeNil) - So(resp.StatusCode(), ShouldEqual, 404) - defer func() { dctlr.Shutdown() }() + + resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) }) } @@ -1614,7 +1646,7 @@ func TestSubPaths(t *testing.T) { }) } -func TestSyncOnDemandRepoErr(t *testing.T) { +func TestOnDemandRepoErr(t *testing.T) { Convey("Verify sync on demand parseRepositoryReference error", t, func() { tlsVerify := false syncRegistryConfig := sync.RegistryConfig{ @@ -1899,7 +1931,7 @@ func TestMultipleURLs(t *testing.T) { }) } -func TestSyncSignatures(t *testing.T) { +func TestPeriodicallySignatures(t *testing.T) { Convey("Verify sync signatures", t, func() { updateDuration, _ := time.ParseDuration("30m") @@ -1911,7 +1943,7 @@ func TestSyncSignatures(t *testing.T) { }() // create repo, push and sign it - repoName := "signed-repo" + repoName := testSignedImage var digest godigest.Digest So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic) @@ -1926,6 +1958,7 @@ func TestSyncSignatures(t *testing.T) { So(err, ShouldBeNil) defer os.RemoveAll(tdir) _ = os.Chdir(tdir) + generateKeyPairs(tdir) So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) @@ -1936,7 +1969,6 @@ func TestSyncSignatures(t *testing.T) { syncRegistryConfig := sync.RegistryConfig{ Content: []sync.Content{ { - // won't match any image on source registry, we will sync on demand Prefix: repoName, Tags: &sync.Tags{ Regex: ®ex, @@ -2051,10 +2083,9 @@ func TestSyncSignatures(t *testing.T) { } } - // err = os.Chmod(path.Join(srcDir, repoName, "index.json"), 0000) - // if err != nil { - // panic(err) - // } + // remove already synced image + err = os.RemoveAll(path.Join(destDir, repoName)) + So(err, ShouldBeNil) // sync on demand resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") @@ -2067,16 +2098,11 @@ func TestSyncSignatures(t *testing.T) { So(err, ShouldBeNil) destBlobPath := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) - err = os.Remove(destBlobPath) - So(err, ShouldBeNil) + _ = os.Remove(destBlobPath) err = os.MkdirAll(destBlobPath, 0o000) So(err, ShouldBeNil) } - // // remove already synced image - // err = os.RemoveAll(path.Join(destDir, repoName)) - // So(err, ShouldBeNil) - // sync on demand resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") So(err, ShouldBeNil) @@ -2183,7 +2209,309 @@ func TestSyncSignatures(t *testing.T) { }) } -func TestSyncError(t *testing.T) { +func TestOnDemandRetryGoroutine(t *testing.T) { + Convey("Verify ondemand sync retries in background on error", t, func() { + srcPort := test.GetFreePort() + srcConfig := config.New() + srcBaseURL := test.GetBaseURL(srcPort) + + srcConfig.HTTP.Port = srcPort + + srcDir, err := ioutil.TempDir("", "oci-src-repo-test") + if err != nil { + panic(err) + } + + err = test.CopyFiles("../../../test/data", srcDir) + if err != nil { + panic(err) + } + + srcConfig.Storage.RootDirectory = srcDir + + sctlr := api.NewController(srcConfig) + + defer os.RemoveAll(srcDir) + + regex := ".*" + semver := true + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + OnDemand: true, + TLSVerify: &tlsVerify, + CertDir: "", + } + + maxRetries := 3 + delay := 2 * time.Second + syncRegistryConfig.MaxRetries = &maxRetries + syncRegistryConfig.RetryDelay = &delay + + syncConfig := &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc, destBaseURL, destDir, destClient := startDownstreamServer(false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dc.Shutdown() + }() + + resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + + // start upstream server + go func() { + // this blocks + if err := sctlr.Run(); err != nil { + return + } + }() + + defer func() { + sctlr.Shutdown() + }() + + // in the meantime ondemand should retry syncing + time.Sleep(15 * time.Second) + + // now we should have the image synced + binfo, err := os.Stat(path.Join(destDir, testImage, "index.json")) + So(err, ShouldBeNil) + So(binfo, ShouldNotBeNil) + So(binfo.Size(), ShouldNotBeZeroValue) + + resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + }) +} + +func TestOnDemandMultipleRetries(t *testing.T) { + Convey("Verify ondemand sync retries in background on error, multiple calls should spawn one routine", t, func() { + srcPort := test.GetFreePort() + srcConfig := config.New() + srcBaseURL := test.GetBaseURL(srcPort) + + srcConfig.HTTP.Port = srcPort + + srcDir, err := ioutil.TempDir("", "oci-src-repo-test") + if err != nil { + panic(err) + } + + err = test.CopyFiles("../../../test/data", srcDir) + if err != nil { + panic(err) + } + + srcConfig.Storage.RootDirectory = srcDir + + sctlr := api.NewController(srcConfig) + + defer os.RemoveAll(srcDir) + + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + URLs: []string{srcBaseURL}, + OnDemand: true, + TLSVerify: &tlsVerify, + CertDir: "", + } + + maxRetries := 5 + delay := 5 * time.Second + syncRegistryConfig.MaxRetries = &maxRetries + syncRegistryConfig.RetryDelay = &delay + + syncConfig := &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc, destBaseURL, destDir, destClient := startDownstreamServer(false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dc.Shutdown() + }() + + callsNo := 5 + for i := 0; i < callsNo; i++ { + _, _ = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + } + + populatedDirs := make(map[string]bool) + + done := make(chan bool) + go func() { + /* watch .sync local cache, make sure just one .sync/subdir is populated with image + the lock from ondemand should prevent spawning multiple go routines for the same image*/ + for { + time.Sleep(250 * time.Millisecond) + select { + case <-done: + return + default: + dirs, err := os.ReadDir(path.Join(destDir, testImage, ".sync")) + if err == nil { + for _, dir := range dirs { + contents, err := os.ReadDir(path.Join(destDir, testImage, ".sync", dir.Name())) + if err == nil { + if len(contents) > 0 { + populatedDirs[dir.Name()] = true + } + } + } + } + } + } + }() + + // start upstream server + go func() { + // this blocks + if err := sctlr.Run(); err != nil { + return + } + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + + time.Sleep(100 * time.Millisecond) + } + + defer func() { + sctlr.Shutdown() + }() + + // wait sync + for { + _, err := os.Stat(path.Join(destDir, testImage, "index.json")) + if err == nil { + // stop watching /.sync/ subdirs + done <- true + + break + } + time.Sleep(500 * time.Millisecond) + } + + waitSyncOndemand(destDir, testImage) + + So(len(populatedDirs), ShouldEqual, 1) + }) +} + +func TestOnDemandPullsOnce(t *testing.T) { + Convey("Verify sync on demand pulls only one time", t, func(conv C) { + sc, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sc.Shutdown() + }() + + regex := ".*" + semver := true + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + syncConfig := &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc, destBaseURL, destDir, _ := startDownstreamServer(false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dc.Shutdown() + }() + + var wg goSync.WaitGroup + + wg.Add(1) + go func(conv C) { + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + conv.So(err, ShouldBeNil) + conv.So(resp.StatusCode(), ShouldEqual, 200) + wg.Done() + }(conv) + + wg.Add(1) + go func(conv C) { + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + conv.So(err, ShouldBeNil) + conv.So(resp.StatusCode(), ShouldEqual, 200) + wg.Done() + }(conv) + + wg.Add(1) + go func(conv C) { + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + conv.So(err, ShouldBeNil) + conv.So(resp.StatusCode(), ShouldEqual, 200) + wg.Done() + }(conv) + + done := make(chan bool) + + var maxLen int + syncBlobUploadDir := path.Join(destDir, testImage, sync.SyncBlobUploadDir) + + go func() { + for { + select { + case <-done: + return + default: + dirs, err := ioutil.ReadDir(syncBlobUploadDir) + if err != nil { + continue + } + // check how many .sync/uuid/ dirs are created, if just one then on demand pulled only once + if len(dirs) > maxLen { + maxLen = len(dirs) + } + } + } + }() + + wg.Wait() + done <- true + + So(maxLen, ShouldEqual, 1) + }) +} + +func TestError(t *testing.T) { Convey("Verify periodically sync pushSyncedLocalImage() error", t, func() { updateDuration, _ := time.ParseDuration("30m") @@ -2234,7 +2562,7 @@ func TestSyncError(t *testing.T) { }) } -func TestSyncSignaturesOnDemand(t *testing.T) { +func TestSignaturesOnDemand(t *testing.T) { Convey("Verify sync signatures on demand feature", t, func() { sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) defer os.RemoveAll(srcDir) @@ -2244,7 +2572,7 @@ func TestSyncSignaturesOnDemand(t *testing.T) { }() // create repo, push and sign it - repoName := "signed-repo" + repoName := testSignedImage var digest godigest.Digest So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic) @@ -2260,6 +2588,8 @@ func TestSyncSignaturesOnDemand(t *testing.T) { defer os.RemoveAll(tdir) _ = os.Chdir(tdir) + generateKeyPairs(tdir) + So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) var tlsVerify bool @@ -2365,18 +2695,150 @@ func TestSyncSignaturesOnDemand(t *testing.T) { }) } -func signImage(tdir, port, repoName string, digest godigest.Digest) { - // push signatures to upstream server so that we can sync them later +func TestOnlySignaturesOnDemand(t *testing.T) { + Convey("Verify sync signatures on demand feature when we already have the image", t, func() { + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + + // create repo, push and sign it + repoName := testSignedImage + var digest godigest.Digest + So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic) + + splittedURL := strings.SplitAfter(srcBaseURL, ":") + srcPort := splittedURL[len(splittedURL)-1] + + cwd, err := os.Getwd() + So(err, ShouldBeNil) + + defer func() { _ = os.Chdir(cwd) }() + tdir, err := ioutil.TempDir("", "sigs") + So(err, ShouldBeNil) + defer os.RemoveAll(tdir) + _ = os.Chdir(tdir) + + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + URLs: []string{srcBaseURL}, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + syncBadRegistryConfig := sync.RegistryConfig{ + URLs: []string{"http://invalid.invalid:9999"}, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + syncConfig := &sync.Config{Registries: []sync.RegistryConfig{syncBadRegistryConfig, syncRegistryConfig}} + + dctlr, destBaseURL, destDir, _ := startDownstreamServer(false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dctlr.Shutdown() + }() + + // sync on demand + resp, err := resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + splittedURL = strings.SplitAfter(destBaseURL, ":") + destPort := splittedURL[len(splittedURL)-1] + + a := &options.AnnotationOptions{Annotations: []string{"tag=1.0"}} + amap, err := a.AnnotationsMap() + if err != nil { + panic(err) + } + + generateKeyPairs(tdir) + + // sync signature on demand when upstream doesn't have the signature + image := fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0") + cmd := exec.Command("notation", "verify", "--cert", "good", "--plain-http", image) + out, err := cmd.CombinedOutput() + So(err, ShouldNotBeNil) + msg := string(out) + So(msg, ShouldNotBeEmpty) + So(strings.Contains(msg, "signature failure"), ShouldBeTrue) + + // cosign verify the synced image + vrfy := verify.VerifyCommand{ + RegistryOptions: options.RegistryOptions{AllowInsecure: true}, + CheckClaims: true, + KeyRef: path.Join(tdir, "cosign.pub"), + Annotations: amap, + } + + err = vrfy.Exec(context.TODO(), []string{fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0")}) + So(err, ShouldNotBeNil) + + // sign upstream image + So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) + + // now it should sync signatures on demand, even if we already have the image + image = fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0") + cmd = exec.Command("notation", "verify", "--cert", "good", "--plain-http", image) + out, err = cmd.CombinedOutput() + So(err, ShouldBeNil) + msg = string(out) + So(msg, ShouldNotBeEmpty) + So(strings.Contains(msg, "verification failure"), ShouldBeFalse) + + // cosign verify the synced image + vrfy = verify.VerifyCommand{ + RegistryOptions: options.RegistryOptions{AllowInsecure: true}, + CheckClaims: true, + KeyRef: path.Join(tdir, "cosign.pub"), + Annotations: amap, + } + + err = vrfy.Exec(context.TODO(), []string{fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0")}) + So(err, ShouldBeNil) + }) +} + +func generateKeyPairs(tdir string) { // generate a keypair os.Setenv("COSIGN_PASSWORD", "") - err := generate.GenerateKeyPairCmd(context.TODO(), "", nil) + if _, err := os.Stat(path.Join(tdir, "cosign.key")); err != nil { + err := generate.GenerateKeyPairCmd(context.TODO(), "", nil) + if err != nil { + panic(err) + } + } + + // "notation" (notaryv2) doesn't yet support exported apis, so use the binary instead + _, err := exec.LookPath("notation") if err != nil { panic(err) } + os.Setenv("XDG_CONFIG_HOME", tdir) + + // generate a keypair + cmd := exec.Command("notation", "cert", "generate-test", "--trust", "good") + + err = cmd.Run() + if err != nil { + panic(err) + } +} + +func signImage(tdir, port, repoName string, digest godigest.Digest) { + // push signatures to upstream server so that we can sync them later // sign the image - err = sign.SignCmd(context.TODO(), + err := sign.SignCmd(context.TODO(), sign.KeyOpts{KeyRef: path.Join(tdir, "cosign.key"), PassFunc: generate.GetPass}, options.RegistryOptions{AllowInsecure: true}, map[string]interface{}{"tag": "1.0"}, @@ -2406,25 +2868,9 @@ func signImage(tdir, port, repoName string, digest godigest.Digest) { panic(err) } - // "notation" (notaryv2) doesn't yet support exported apis, so use the binary instead - _, err = exec.LookPath("notation") - if err != nil { - panic(err) - } - - os.Setenv("XDG_CONFIG_HOME", tdir) - - // generate a keypair - cmd := exec.Command("notation", "cert", "generate-test", "--trust", "good") - - err = cmd.Run() - if err != nil { - panic(err) - } - // sign the image image := fmt.Sprintf("localhost:%s/%s:%s", port, repoName, "1.0") - cmd = exec.Command("notation", "sign", "--key", "good", "--plain-http", image) + cmd := exec.Command("notation", "sign", "--key", "good", "--plain-http", image) err = cmd.Run() if err != nil { @@ -2526,3 +2972,16 @@ func pushRepo(url, repoName string) godigest.Digest { return digest } + +func waitSyncOndemand(rootDir, repoName string) { + // wait for .sync subdirs to be removed + for { + dirs, err := os.ReadDir(path.Join(rootDir, repoName, sync.SyncBlobUploadDir)) + if err == nil && len(dirs) == 0 { + // stop watching /.sync/ subdirs + return + } + + time.Sleep(500 * time.Millisecond) + } +} diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index 06e54150..8221d0d8 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "fmt" "io/ioutil" "net/url" "os" @@ -11,8 +12,11 @@ import ( "strings" glob "github.com/bmatcuk/doublestar/v4" + "github.com/containers/image/v5/docker" "github.com/containers/image/v5/docker/reference" + "github.com/containers/image/v5/oci/layout" "github.com/containers/image/v5/types" + guuid "github.com/gofrs/uuid" "github.com/notaryproject/notation-go-lib" ispec "github.com/opencontainers/image-spec/specs-go/v1" artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" @@ -38,6 +42,14 @@ func getTagFromRef(ref types.ImageReference, log log.Logger) reference.Tagged { return tagged } +// getRepoFromRef returns repo name from a registry ImageReference. +func getRepoFromRef(ref types.ImageReference, registryDomain string) string { + imageName := strings.Replace(ref.DockerReference().Name(), registryDomain, "", 1) + imageName = strings.TrimPrefix(imageName, "/") + + return imageName +} + // parseRepositoryReference parses input into a reference.Named, and verifies that it names a repository, not an image. func parseRepositoryReference(input string) (reference.Named, error) { ref, err := reference.ParseNormalizedNamed(input) @@ -164,14 +176,18 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont getCosignManifestURL := regURL - cosignEncodedDigest := strings.Replace(digest, ":", "-", 1) + ".sig" - getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", repo, "manifests", cosignEncodedDigest) + if !isCosignTag(digest) { + digest = strings.Replace(digest, ":", "-", 1) + ".sig" + } + + getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", repo, "manifests", digest) + getCosignManifestURL.RawQuery = getCosignManifestURL.Query().Encode() mResp, err := client.R().Get(getCosignManifestURL.String()) if err != nil { log.Error().Err(err).Str("url", getCosignManifestURL.String()). - Msgf("couldn't get cosign manifest: %s", cosignEncodedDigest) + Msgf("couldn't get cosign manifest: %s", digest) return err } @@ -188,7 +204,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont err = json.Unmarshal(mResp.Body(), &m) if err != nil { log.Error().Err(err).Str("url", getCosignManifestURL.String()). - Msgf("couldn't unmarshal cosign manifest %s", cosignEncodedDigest) + Msgf("couldn't unmarshal cosign manifest %s", digest) return err } @@ -254,7 +270,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont } // push manifest - _, err = imageStore.PutImageManifest(repo, cosignEncodedDigest, ispec.MediaTypeImageManifest, mResp.Body()) + _, err = imageStore.PutImageManifest(repo, digest, ispec.MediaTypeImageManifest, mResp.Body()) if err != nil { log.Error().Err(err).Msg("couldn't upload cosing manifest") @@ -269,7 +285,6 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont log.Info().Msg("syncing notary signatures") getReferrersURL := regURL - getRefManifestURL := regURL // based on manifest digest get referrers getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/", repo, "manifests", digest, "referrers") @@ -305,6 +320,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont for _, ref := range referrers.References { // get referrer manifest + getRefManifestURL := regURL getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", repo, "manifests", ref.Digest.String()) getRefManifestURL.RawQuery = getRefManifestURL.Query().Encode() @@ -368,7 +384,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont func syncSignatures(client *resty.Client, storeController storage.StoreController, registryURL, repo, tag string, log log.Logger) error { - log.Info().Msg("syncing signatures") + log.Info().Msgf("syncing signatures from %s/%s:%s", registryURL, repo, tag) // get manifest and find out its digest regURL, err := url.Parse(registryURL) if err != nil { @@ -414,21 +430,19 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle return err } - log.Info().Msg("successfully synced signatures") + log.Info().Msgf("successfully synced %s/%s:%s signatures", registryURL, repo, tag) return nil } -// copy from temporary oci repository to local registry. -func pushSyncedLocalImage(repo, tag, uuid string, +func pushSyncedLocalImage(repo, tag, localCachePath string, storeController storage.StoreController, log log.Logger) error { - log.Info().Msgf("pushing synced local image %s:%s to local registry", repo, tag) + log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, repo, tag) imageStore := storeController.GetImageStore(repo) metrics := monitoring.NewMetricsServer(false, log) - cacheImageStore := storage.NewImageStore(path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid), - false, false, log, metrics) + cacheImageStore := storage.NewImageStore(localCachePath, false, false, log, metrics) manifestContent, _, _, err := cacheImageStore.GetImageManifest(repo, tag) if err != nil { @@ -486,7 +500,7 @@ func pushSyncedLocalImage(repo, tag, uuid string, log.Info().Msgf("removing temporary cached synced repo %s", path.Join(cacheImageStore.RootDir(), repo)) - if err := os.RemoveAll(path.Join(cacheImageStore.RootDir(), repo)); err != nil { + if err := os.RemoveAll(cacheImageStore.RootDir()); err != nil { log.Error().Err(err).Msg("couldn't remove locally cached sync repo") return err @@ -510,3 +524,52 @@ func isCosignTag(tag string) bool { func StripRegistryTransport(url string) string { return strings.Replace(strings.Replace(url, "http://", "", 1), "https://", "", 1) } + +// get a .sync subdir used for temporary store one synced image. +func getLocalCachePath(imageStore storage.ImageStore, repo string) (string, error) { + uuid, err := guuid.NewV4() + if err != nil { + return "", err + } + + localCachePath := path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid.String()) + + if err = os.MkdirAll(path.Join(localCachePath, repo), storage.DefaultDirPerms); err != nil { + return "", err + } + + return localCachePath, nil +} + +// get an ImageReference given the registry, repo and tag. +func getImageRef(registryDomain, repo, tag string) (types.ImageReference, error) { + repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryDomain, repo)) + if err != nil { + return nil, err + } + + taggedRepoRef, err := reference.WithTag(repoRef, tag) + if err != nil { + return nil, err + } + + imageRef, err := docker.NewReference(taggedRepoRef) + if err != nil { + return nil, err + } + + return imageRef, err +} + +// get a local ImageReference used to temporary store one synced image. +func getLocalImageRef(localCachePath, repo, tag string) (types.ImageReference, error) { + localRepo := path.Join(localCachePath, repo) + localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag) + + localImageRef, err := layout.ParseReference(localTaggedRepo) + if err != nil { + return nil, err + } + + return localImageRef, nil +}