From 0d148e1d6b0e526312c1f5a9983fb05447e243af Mon Sep 17 00:00:00 2001 From: laurentiuNiculae Date: Thu, 10 Mar 2022 17:39:11 +0200 Subject: [PATCH] new config option for sync-destination Signed-off-by: laurentiuNiculae --- errors/errors.go | 1 + examples/README.md | 12 +- examples/config-sync.json | 5 + pkg/extensions/sync/on_demand.go | 50 +++--- pkg/extensions/sync/sync.go | 44 ++++-- pkg/extensions/sync/sync_internal_test.go | 165 +++++++++++++++++++- pkg/extensions/sync/utils.go | 176 ++++++++++++++++------ 7 files changed, 365 insertions(+), 88 deletions(-) diff --git a/errors/errors.go b/errors/errors.go index a0d73812..049d4bcd 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -49,4 +49,5 @@ var ( ErrInvalidMetric = errors.New("metrics: invalid metric func") ErrInjected = errors.New("test: injected failure") ErrSyncInvalidUpstreamURL = errors.New("sync: upstream url not found in sync config") + ErrRegistryNoContent = errors.New("sync: could not find a Content that matches localRepo") ) diff --git a/examples/README.md b/examples/README.md index 3fa1b018..a32a40b1 100644 --- a/examples/README.md +++ b/examples/README.md @@ -403,7 +403,17 @@ Configure each registry sync: }, { "prefix":"/repo3/**" # pull all images under repo3/ (matches recursively all repos under repo3/) - } + }, + { + "prefix":"/repo1/repo", # pull /repo1/repo + "destination":"/localrepo", # put /repo1/repo under /localrepo + "stripPrefix":true # strip the path specified in "prefix", if true resulting /localpath, if false resulting /localrepo/repo1/repo" + } + { + "prefix":"/repo1/**", # pull all images under repo1/ (matches recursively all repos under repo1/) + "destination":"/localrepo", # put all images found under /localrepo. + "stripPrefix":true # strip the path specified in "prefix" until meta-characters like "**". If we match /repo1/repo the local repo will be /localrepo/repo. + } ] }, { diff --git a/examples/config-sync.json b/examples/config-sync.json index 69f81bf5..16658863 100644 --- a/examples/config-sync.json +++ b/examples/config-sync.json @@ -30,6 +30,11 @@ "semver":true } }, + { + "prefix":"/repo1/repo", + "destination": "/repo", + "stripPrefix": true + }, { "prefix":"/repo2/repo" } diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index d37b5c23..ec63fc03 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -73,7 +73,7 @@ func OneImage(cfg Config, storeController storage.StoreController, } func syncOneImage(imageChannel chan error, cfg Config, storeController storage.StoreController, - repo, tag string, isArtifact bool, log log.Logger) { + localRepo, tag string, isArtifact bool, log log.Logger) { var credentialsFile CredentialsFile if cfg.CredentialsFile != "" { @@ -98,7 +98,7 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S return } - imageStore := storeController.GetImageStore(repo) + imageStore := storeController.GetImageStore(localRepo) for _, registryCfg := range cfg.Registries { regCfg := registryCfg @@ -108,15 +108,19 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S continue } + remoteRepo := localRepo + // if content config is not specified, then don't filter, just sync demanded image if len(regCfg.Content) != 0 { - repos := filterRepos([]string{repo}, regCfg.Content, log) - if len(repos) == 0 { + contentID, err := findRepoMatchingContentID(localRepo, regCfg.Content) + if err != nil { log.Info().Msgf("skipping syncing on demand %s from %v registry because it's filtered out by content config", - repo, regCfg.URLs) + localRepo, regCfg.URLs) continue } + + remoteRepo = getRepoSource(localRepo, regCfg.Content[contentID]) } retryOptions := &retry.RetryOptions{} @@ -156,9 +160,9 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S // is notary signature if isArtifact { - err = syncNotarySignature(httpClient, storeController, *regURL, repo, tag, log) + err = syncNotarySignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log) if err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, repo, tag) + log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag) continue } @@ -168,9 +172,9 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S return } // is cosign signature - err = syncCosignSignature(httpClient, storeController, *regURL, repo, tag, log) + err = syncCosignSignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log) if err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, repo, tag) + log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag) continue } @@ -184,20 +188,20 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S upstreamCtx := getUpstreamContext(®Cfg, credentialsFile[upstreamAddr]) options := getCopyOptions(upstreamCtx, localCtx) - upstreamImageRef, err := getImageRef(upstreamAddr, repo, tag) + upstreamImageRef, err := getImageRef(upstreamAddr, remoteRepo, tag) if err != nil { log.Error().Err(err).Msgf("error creating docker reference for repository %s/%s:%s", - upstreamAddr, repo, tag) + upstreamAddr, remoteRepo, tag) imageChannel <- err return } - localImageRef, localCachePath, err := getLocalImageRef(imageStore, repo, tag) + localImageRef, localCachePath, err := getLocalImageRef(imageStore, localRepo, tag) if err != nil { log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s", - localCachePath, repo, tag) + localCachePath, localRepo, tag) imageChannel <- err @@ -206,7 +210,7 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath) - demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, repo, tag) + demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, remoteRepo, tag) _, copyErr = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options) if copyErr != nil { @@ -244,11 +248,13 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S log.Error().Err(err).Msgf("sync routine: error while copying image %s to %s", demandedImageRef, localCachePath) } else { - _ = finishSyncing(repo, tag, localCachePath, upstreamURL, storeController, retryOptions, httpClient, log) + _ = finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController, + retryOptions, httpClient, log) } }() } else { - err := finishSyncing(repo, tag, localCachePath, upstreamURL, storeController, retryOptions, httpClient, log) + err := finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController, + retryOptions, httpClient, log) imageChannel <- err @@ -261,26 +267,26 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S } // push the local image into the storage, sync signatures. -func finishSyncing(repo, tag, localCachePath, upstreamURL string, +func finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL string, storeController storage.StoreController, retryOptions *retry.RetryOptions, httpClient *resty.Client, log log.Logger) error { - err := pushSyncedLocalImage(repo, tag, localCachePath, storeController, log) + err := pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log) if err != nil { log.Error().Err(err).Msgf("error while pushing synced cached image %s", - fmt.Sprintf("%s/%s:%s", localCachePath, repo, tag)) + fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag)) return err } if err = retry.RetryIfNecessary(context.Background(), func() error { - err = syncSignatures(httpClient, storeController, upstreamURL, repo, tag, log) + err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log) return err }, retryOptions); err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature for %s/%s:%s", upstreamURL, repo, tag) + log.Error().Err(err).Msgf("couldn't copy image signature for %s/%s:%s", upstreamURL, remoteRepo, tag) } - log.Info().Msgf("successfully synced %s/%s:%s", upstreamURL, repo, tag) + log.Info().Msgf("successfully synced %s/%s:%s", upstreamURL, remoteRepo, tag) return nil } diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 87bf6126..930cae1a 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -60,8 +60,10 @@ type RegistryConfig struct { } type Content struct { - Prefix string - Tags *Tags + Prefix string + Tags *Tags + Destination string `mapstructure:",omitempty"` + StripPrefix bool } type Tags struct { @@ -323,17 +325,28 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string log.Info().Msgf("got repos: %v", repos) - var images []types.ImageReference + var images []struct { + ref types.ImageReference + content Content + } upstreamAddr := StripRegistryTransport(upstreamURL) for contentID, repos := range repos { r := repos - id := contentID + contentID := contentID if err = retry.RetryIfNecessary(ctx, func() error { - refs, err := imagesToCopyFromUpstream(ctx, upstreamAddr, r, upstreamCtx, regCfg.Content[id], log) - images = append(images, refs...) + refs, err := imagesToCopyFromUpstream(ctx, upstreamAddr, r, upstreamCtx, regCfg.Content[contentID], log) + for _, ref := range refs { + images = append(images, struct { + ref types.ImageReference + content Content + }{ + ref: ref, + content: regCfg.Content[contentID], + }) + } return err }, retryOptions); err != nil { @@ -350,14 +363,15 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string } for _, ref := range images { - upstreamImageRef := ref + upstreamImageRef := ref.ref - repo := getRepoFromRef(upstreamImageRef, upstreamAddr) + remoteRepo := getRepoFromRef(upstreamImageRef, upstreamAddr) + localRepo := getRepoDestination(remoteRepo, ref.content) tag := getTagFromRef(upstreamImageRef, log).Tag() - imageStore := storeController.GetImageStore(repo) + imageStore := storeController.GetImageStore(localRepo) - canBeSkipped, err := canSkipImage(ctx, repo, tag, upstreamImageRef, imageStore, upstreamCtx, log) + canBeSkipped, err := canSkipImage(ctx, localRepo, tag, upstreamImageRef, imageStore, upstreamCtx, log) if err != nil { log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped", upstreamImageRef.DockerReference()) @@ -367,10 +381,10 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string continue } - localImageRef, localCachePath, err := getLocalImageRef(imageStore, repo, tag) + localImageRef, localCachePath, err := getLocalImageRef(imageStore, localRepo, tag) if err != nil { log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s", - localCachePath, repo, tag) + localCachePath, localRepo, tag) return err } @@ -390,16 +404,16 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string return err } - err = pushSyncedLocalImage(repo, tag, localCachePath, storeController, log) + err = pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log) if err != nil { log.Error().Err(err).Msgf("error while pushing synced cached image %s", - fmt.Sprintf("%s/%s:%s", localCachePath, repo, tag)) + fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag)) return err } if err = retry.RetryIfNecessary(ctx, func() error { - err = syncSignatures(httpClient, storeController, upstreamURL, repo, tag, log) + err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log) return err }, retryOptions); err != nil { diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 0aa3a084..e9c79375 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -270,16 +270,16 @@ func TestSyncInternal(t *testing.T) { Convey("Test syncSignatures()", t, func() { log := log.NewLogger("debug", "") - err := syncSignatures(resty.New(), storage.StoreController{}, "%", "repo", "tag", log) + err := syncSignatures(resty.New(), storage.StoreController{}, "%", "repo", "repo", "tag", log) So(err, ShouldNotBeNil) - err = syncSignatures(resty.New(), storage.StoreController{}, "http://zot", "repo", "tag", log) + err = syncSignatures(resty.New(), storage.StoreController{}, "http://zot", "repo", "repo", "tag", log) So(err, ShouldNotBeNil) - err = syncSignatures(resty.New(), storage.StoreController{}, "https://google.com", "repo", "tag", log) + err = syncSignatures(resty.New(), storage.StoreController{}, "https://google.com", "repo", "repo", "tag", log) So(err, ShouldNotBeNil) url, _ := url.Parse("invalid") - err = syncCosignSignature(resty.New(), storage.StoreController{}, *url, "repo", "tag", log) + err = syncCosignSignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log) So(err, ShouldNotBeNil) - err = syncNotarySignature(resty.New(), storage.StoreController{}, *url, "repo", "tag", log) + err = syncNotarySignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log) So(err, ShouldNotBeNil) }) @@ -453,3 +453,158 @@ func TestSyncInternal(t *testing.T) { So(err, ShouldNotBeNil) }) } + +func TestURLHelperFunctions(t *testing.T) { + testCases := []struct { + repo string + content Content + expected string + }{ + { + repo: "alpine/zot-fold/alpine", + content: Content{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: false}, + expected: "zot-fold/alpine", + }, + { + repo: "zot-fold/alpine", + content: Content{Prefix: "zot-fold/alpine", Destination: "/", StripPrefix: false}, + expected: "zot-fold/alpine", + }, + { + repo: "alpine", + content: Content{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true}, + expected: "zot-fold/alpine", + }, + { + repo: "/", + content: Content{Prefix: "zot-fold/alpine", Destination: "/", StripPrefix: true}, + expected: "zot-fold/alpine", + }, + { + repo: "/", + content: Content{Prefix: "/", Destination: "/", StripPrefix: true}, + expected: "/", + }, + { + repo: "alpine", + content: Content{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true}, + expected: "zot-fold/alpine", + }, + { + repo: "alpine", + content: Content{Prefix: "zot-fold/*", Destination: "/", StripPrefix: true}, + expected: "zot-fold/alpine", + }, + { + repo: "alpine", + content: Content{Prefix: "zot-fold/**", Destination: "/", StripPrefix: true}, + expected: "zot-fold/alpine", + }, + { + repo: "zot-fold/alpine", + content: Content{Prefix: "zot-fold/**", Destination: "/", StripPrefix: false}, + expected: "zot-fold/alpine", + }, + } + + Convey("Test getRepoDestination()", t, func() { + for _, test := range testCases { + actualResult := getRepoDestination(test.expected, test.content) + So(actualResult, ShouldEqual, test.repo) + } + }) + + // this is the inverse function of getRepoDestination() + Convey("Test getRepoSource()", t, func() { + for _, test := range testCases { + actualResult := getRepoSource(test.repo, test.content) + So(actualResult, ShouldEqual, test.expected) + } + }) +} + +func TestFindRepoMatchingContentID(t *testing.T) { + testCases := []struct { + repo string + content []Content + expected struct { + contentID int + err error + } + }{ + { + repo: "alpine/zot-fold/alpine", + content: []Content{ + {Prefix: "zot-fold/alpine/", Destination: "/alpine", StripPrefix: true}, + {Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: false}, + }, + expected: struct { + contentID int + err error + }{contentID: 1, err: nil}, + }, + { + repo: "alpine/zot-fold/alpine", + content: []Content{ + {Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: false}, + {Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true}, + }, + expected: struct { + contentID int + err error + }{contentID: 0, err: nil}, + }, + { + repo: "myFold/zot-fold/internal/alpine", + content: []Content{ + {Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true}, + {Prefix: "zot-fold/**", Destination: "/myFold", StripPrefix: false}, + }, + expected: struct { + contentID int + err error + }{contentID: 1, err: nil}, + }, + { + repo: "alpine", + content: []Content{ + {Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: true}, + {Prefix: "zot-fold/alpine", Destination: "/", StripPrefix: true}, + }, + expected: struct { + contentID int + err error + }{contentID: -1, err: errors.ErrRegistryNoContent}, + }, + { + repo: "alpine", + content: []Content{ + {Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: true}, + {Prefix: "zot-fold/*", Destination: "/", StripPrefix: true}, + }, + expected: struct { + contentID int + err error + }{contentID: 1, err: nil}, + }, + { + repo: "alpine/alpine", + content: []Content{ + {Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: true}, + {Prefix: "zot-fold/*", Destination: "/", StripPrefix: true}, + }, + expected: struct { + contentID int + err error + }{contentID: 0, err: nil}, + }, + } + + Convey("Test findRepoMatchingContentID()", t, func() { + for _, test := range testCases { + actualResult, err := findRepoMatchingContentID(test.repo, test.content) + So(actualResult, ShouldEqual, test.expected.contentID) + So(err, ShouldResemble, test.expected.err) + } + }) +} diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index d050ad35..8aa96719 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -100,6 +100,95 @@ func filterRepos(repos []string, contentList []Content, log log.Logger) map[int] return filtered } +// findRepoContentID return the contentID that maches the localRepo path for a given RegistryConfig in the config file. +func findRepoMatchingContentID(localRepo string, contentList []Content) (int, error) { + contentID := -1 + localRepo = strings.Trim(localRepo, "/") + + for cID, content := range contentList { + // make sure prefix ends in "/" to extract the meta characters + prefix := strings.Trim(content.Prefix, "/") + "/" + destination := strings.Trim(content.Destination, "/") + + var patternSlice []string + + if content.StripPrefix { + _, metaCharacters := glob.SplitPattern(prefix) + patternSlice = append(patternSlice, destination, metaCharacters) + } else { + patternSlice = append(patternSlice, destination, prefix) + } + + pattern := strings.Trim(strings.Join(patternSlice, "/"), "/") + + matched, err := glob.Match(pattern, localRepo) + if err != nil { + continue + } + + if matched { + contentID = cID + + break + } + } + + if contentID == -1 { + return -1, zerr.ErrRegistryNoContent + } + + return contentID, nil +} + +func getRepoSource(localRepo string, content Content) string { + localRepo = strings.Trim(localRepo, "/") + destination := strings.Trim(content.Destination, "/") + prefix := strings.Trim(content.Prefix, "/*") + + var localRepoSlice []string + + localRepo = strings.TrimPrefix(localRepo, destination) + localRepo = strings.Trim(localRepo, "/") + + if content.StripPrefix { + localRepoSlice = append([]string{prefix}, localRepo) + } else { + localRepoSlice = []string{localRepo} + } + + repoSource := strings.Join(localRepoSlice, "/") + if repoSource == "/" { + return repoSource + } + + return strings.Trim(repoSource, "/") +} + +// getRepoDestination returns the local storage path of the synced repo based on the specified destination. +func getRepoDestination(remoteRepo string, content Content) string { + remoteRepo = strings.Trim(remoteRepo, "/") + destination := strings.Trim(content.Destination, "/") + prefix := strings.Trim(content.Prefix, "/*") + + var repoDestSlice []string + + if content.StripPrefix { + remoteRepo = strings.TrimPrefix(remoteRepo, prefix) + remoteRepo = strings.Trim(remoteRepo, "/") + repoDestSlice = append(repoDestSlice, destination, remoteRepo) + } else { + repoDestSlice = append(repoDestSlice, destination, remoteRepo) + } + + repoDestination := strings.Join(repoDestSlice, "/") + + if repoDestination == "/" { + return "/" + } + + return strings.Trim(repoDestination, "/") +} + // Get sync.FileCredentials from file. func getFileCredentials(filepath string) (CredentialsFile, error) { credsFile, err := ioutil.ReadFile(filepath) @@ -174,7 +263,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede } func syncCosignSignature(client *resty.Client, storeController storage.StoreController, - regURL url.URL, repo, digest string, log log.Logger) error { + regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error { log.Info().Msg("syncing cosign signatures") getCosignManifestURL := regURL @@ -183,7 +272,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont digest = strings.Replace(digest, ":", "-", 1) + ".sig" } - getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", repo, "manifests", digest) + getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", remoteRepo, "manifests", digest) getCosignManifestURL.RawQuery = getCosignManifestURL.Query().Encode() @@ -212,12 +301,12 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont return err } - imageStore := storeController.GetImageStore(repo) + imageStore := storeController.GetImageStore(localRepo) for _, blob := range m.Layers { // get blob getBlobURL := regURL - getBlobURL.Path = path.Join(getBlobURL.Path, "v2", repo, "blobs", blob.Digest.String()) + getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String()) getBlobURL.RawQuery = getBlobURL.Query().Encode() resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) @@ -236,7 +325,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont defer resp.RawBody().Close() // push blob - _, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), blob.Digest.String()) + _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String()) if err != nil { log.Error().Err(err).Msg("couldn't upload cosign blob") @@ -246,7 +335,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont // get config blob getBlobURL := regURL - getBlobURL.Path = path.Join(getBlobURL.Path, "v2", repo, "blobs", m.Config.Digest.String()) + getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", m.Config.Digest.String()) getBlobURL.RawQuery = getBlobURL.Query().Encode() resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) @@ -265,7 +354,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont defer resp.RawBody().Close() // push config blob - _, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), m.Config.Digest.String()) + _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), m.Config.Digest.String()) if err != nil { log.Error().Err(err).Msg("couldn't upload cosign blob") @@ -273,7 +362,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont } // push manifest - _, err = imageStore.PutImageManifest(repo, digest, ispec.MediaTypeImageManifest, mResp.Body()) + _, err = imageStore.PutImageManifest(localRepo, digest, ispec.MediaTypeImageManifest, mResp.Body()) if err != nil { log.Error().Err(err).Msg("couldn't upload cosing manifest") @@ -284,13 +373,14 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont } func syncNotarySignature(client *resty.Client, storeController storage.StoreController, - regURL url.URL, repo, digest string, log log.Logger) error { + regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error { log.Info().Msg("syncing notary signatures") getReferrersURL := regURL // based on manifest digest get referrers - getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/", repo, "manifests", digest, "referrers") + getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/", + remoteRepo, "manifests", digest, "referrers") getReferrersURL.RawQuery = getReferrersURL.Query().Encode() resp, err := client.R(). @@ -319,12 +409,12 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont return err } - imageStore := storeController.GetImageStore(repo) + imageStore := storeController.GetImageStore(localRepo) for _, ref := range referrers.References { // get referrer manifest getRefManifestURL := regURL - getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", repo, "manifests", ref.Digest.String()) + getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", remoteRepo, "manifests", ref.Digest.String()) getRefManifestURL.RawQuery = getRefManifestURL.Query().Encode() resp, err := client.R(). @@ -347,7 +437,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont for _, blob := range m.Blobs { getBlobURL := regURL - getBlobURL.Path = path.Join(getBlobURL.Path, "v2", repo, "blobs", blob.Digest.String()) + getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String()) getBlobURL.RawQuery = getBlobURL.Query().Encode() resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) @@ -366,7 +456,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont return zerr.ErrBadBlobDigest } - _, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), blob.Digest.String()) + _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String()) if err != nil { log.Error().Err(err).Msg("couldn't upload notary sig blob") @@ -374,7 +464,8 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont } } - _, err = imageStore.PutImageManifest(repo, ref.Digest.String(), artifactspec.MediaTypeArtifactManifest, resp.Body()) + _, err = imageStore.PutImageManifest(localRepo, ref.Digest.String(), artifactspec.MediaTypeArtifactManifest, + resp.Body()) if err != nil { log.Error().Err(err).Msg("couldn't upload notary sig manifest") @@ -386,8 +477,8 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont } func syncSignatures(client *resty.Client, storeController storage.StoreController, - registryURL, repo, tag string, log log.Logger) error { - log.Info().Msgf("syncing signatures from %s/%s:%s", registryURL, repo, tag) + registryURL, remoteRepo, localRepo, tag string, log log.Logger) error { + log.Info().Msgf("syncing signatures from %s/%s:%s", registryURL, remoteRepo, tag) // get manifest and find out its digest regURL, err := url.Parse(registryURL) if err != nil { @@ -398,7 +489,7 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle getManifestURL := *regURL - getManifestURL.Path = path.Join(getManifestURL.Path, "v2", repo, "manifests", tag) + getManifestURL.Path = path.Join(getManifestURL.Path, "v2", remoteRepo, "manifests", tag) resp, err := client.R().SetHeader("Content-Type", "application/json").Head(getManifestURL.String()) if err != nil { @@ -408,48 +499,42 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle return err } - digests, ok := resp.Header()["Docker-Content-Digest"] - if !ok { + digest := resp.Header().Get("Docker-Content-Digest") + if digest == "" { log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()). - Msgf("couldn't get digest for manifest: %s:%s", repo, tag) + Msgf("couldn't get digest for manifest: %s:%s", remoteRepo, tag) return zerr.ErrBadBlobDigest } - if len(digests) != 1 { - log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()). - Msgf("multiple digests found for: %s:%s", repo, tag) - - return zerr.ErrBadBlobDigest - } - - err = syncNotarySignature(client, storeController, *regURL, repo, digests[0], log) + err = syncNotarySignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log) if err != nil { return err } - err = syncCosignSignature(client, storeController, *regURL, repo, digests[0], log) + err = syncCosignSignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log) if err != nil { return err } - log.Info().Msgf("successfully synced %s/%s:%s signatures", registryURL, repo, tag) + log.Info().Msgf("successfully synced %s/%s:%s signatures", registryURL, remoteRepo, tag) return nil } -func pushSyncedLocalImage(repo, tag, localCachePath string, +func pushSyncedLocalImage(localRepo, tag, localCachePath string, storeController storage.StoreController, log log.Logger) error { - log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, repo, tag) + log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, tag) - imageStore := storeController.GetImageStore(repo) + imageStore := storeController.GetImageStore(localRepo) metrics := monitoring.NewMetricsServer(false, log) cacheImageStore := storage.NewImageStore(localCachePath, false, storage.DefaultGCDelay, false, false, log, metrics) - manifestContent, _, _, err := cacheImageStore.GetImageManifest(repo, tag) + manifestContent, _, _, err := cacheImageStore.GetImageManifest(localRepo, tag) if err != nil { - log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("couldn't find index.json") + log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)). + Msg("couldn't find index.json") return err } @@ -457,21 +542,22 @@ func pushSyncedLocalImage(repo, tag, localCachePath string, var manifest ispec.Manifest if err := json.Unmarshal(manifestContent, &manifest); err != nil { - log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("invalid JSON") + log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)). + Msg("invalid JSON") return err } for _, blob := range manifest.Layers { - blobReader, _, err := cacheImageStore.GetBlob(repo, blob.Digest.String(), blob.MediaType) + blobReader, _, err := cacheImageStore.GetBlob(localRepo, blob.Digest.String(), blob.MediaType) if err != nil { log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), - repo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob") + localRepo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob") return err } - _, _, err = imageStore.FullBlobUpload(repo, blobReader, blob.Digest.String()) + _, _, err = imageStore.FullBlobUpload(localRepo, blobReader, blob.Digest.String()) if err != nil { log.Error().Err(err).Str("blob digest", blob.Digest.String()).Msg("couldn't upload blob") @@ -479,29 +565,29 @@ func pushSyncedLocalImage(repo, tag, localCachePath string, } } - blobReader, _, err := cacheImageStore.GetBlob(repo, manifest.Config.Digest.String(), manifest.Config.MediaType) + blobReader, _, err := cacheImageStore.GetBlob(localRepo, manifest.Config.Digest.String(), manifest.Config.MediaType) if err != nil { log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), - repo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob") + localRepo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob") return err } - _, _, err = imageStore.FullBlobUpload(repo, blobReader, manifest.Config.Digest.String()) + _, _, err = imageStore.FullBlobUpload(localRepo, blobReader, manifest.Config.Digest.String()) if err != nil { log.Error().Err(err).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't upload config blob") return err } - _, err = imageStore.PutImageManifest(repo, tag, ispec.MediaTypeImageManifest, manifestContent) + _, err = imageStore.PutImageManifest(localRepo, tag, ispec.MediaTypeImageManifest, manifestContent) if err != nil { log.Error().Err(err).Msg("couldn't upload manifest") return err } - log.Info().Msgf("removing temporary cached synced repo %s", path.Join(cacheImageStore.RootDir(), repo)) + log.Info().Msgf("removing temporary cached synced repo %s", path.Join(cacheImageStore.RootDir(), localRepo)) if err := os.RemoveAll(cacheImageStore.RootDir()); err != nil { log.Error().Err(err).Msg("couldn't remove locally cached sync repo")