0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-16 21:56:37 -05:00

new config option for sync-destination

Signed-off-by: laurentiuNiculae <themelopeus@gmail.com>
This commit is contained in:
laurentiuNiculae 2022-03-10 17:39:11 +02:00 committed by Ramkumar Chinchani
parent 6d04ab3cdc
commit 0d148e1d6b
7 changed files with 365 additions and 88 deletions

View file

@ -49,4 +49,5 @@ var (
ErrInvalidMetric = errors.New("metrics: invalid metric func")
ErrInjected = errors.New("test: injected failure")
ErrSyncInvalidUpstreamURL = errors.New("sync: upstream url not found in sync config")
ErrRegistryNoContent = errors.New("sync: could not find a Content that matches localRepo")
)

View file

@ -403,7 +403,17 @@ Configure each registry sync:
},
{
"prefix":"/repo3/**" # pull all images under repo3/ (matches recursively all repos under repo3/)
}
},
{
"prefix":"/repo1/repo", # pull /repo1/repo
"destination":"/localrepo", # put /repo1/repo under /localrepo
"stripPrefix":true # strip the path specified in "prefix", if true resulting /localpath, if false resulting /localrepo/repo1/repo"
}
{
"prefix":"/repo1/**", # pull all images under repo1/ (matches recursively all repos under repo1/)
"destination":"/localrepo", # put all images found under /localrepo.
"stripPrefix":true # strip the path specified in "prefix" until meta-characters like "**". If we match /repo1/repo the local repo will be /localrepo/repo.
}
]
},
{

View file

@ -30,6 +30,11 @@
"semver":true
}
},
{
"prefix":"/repo1/repo",
"destination": "/repo",
"stripPrefix": true
},
{
"prefix":"/repo2/repo"
}

View file

@ -73,7 +73,7 @@ func OneImage(cfg Config, storeController storage.StoreController,
}
func syncOneImage(imageChannel chan error, cfg Config, storeController storage.StoreController,
repo, tag string, isArtifact bool, log log.Logger) {
localRepo, tag string, isArtifact bool, log log.Logger) {
var credentialsFile CredentialsFile
if cfg.CredentialsFile != "" {
@ -98,7 +98,7 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
return
}
imageStore := storeController.GetImageStore(repo)
imageStore := storeController.GetImageStore(localRepo)
for _, registryCfg := range cfg.Registries {
regCfg := registryCfg
@ -108,15 +108,19 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
continue
}
remoteRepo := localRepo
// if content config is not specified, then don't filter, just sync demanded image
if len(regCfg.Content) != 0 {
repos := filterRepos([]string{repo}, regCfg.Content, log)
if len(repos) == 0 {
contentID, err := findRepoMatchingContentID(localRepo, regCfg.Content)
if err != nil {
log.Info().Msgf("skipping syncing on demand %s from %v registry because it's filtered out by content config",
repo, regCfg.URLs)
localRepo, regCfg.URLs)
continue
}
remoteRepo = getRepoSource(localRepo, regCfg.Content[contentID])
}
retryOptions := &retry.RetryOptions{}
@ -156,9 +160,9 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
// is notary signature
if isArtifact {
err = syncNotarySignature(httpClient, storeController, *regURL, repo, tag, log)
err = syncNotarySignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, repo, tag)
log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag)
continue
}
@ -168,9 +172,9 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
return
}
// is cosign signature
err = syncCosignSignature(httpClient, storeController, *regURL, repo, tag, log)
err = syncCosignSignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, repo, tag)
log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag)
continue
}
@ -184,20 +188,20 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
upstreamCtx := getUpstreamContext(&regCfg, credentialsFile[upstreamAddr])
options := getCopyOptions(upstreamCtx, localCtx)
upstreamImageRef, err := getImageRef(upstreamAddr, repo, tag)
upstreamImageRef, err := getImageRef(upstreamAddr, remoteRepo, tag)
if err != nil {
log.Error().Err(err).Msgf("error creating docker reference for repository %s/%s:%s",
upstreamAddr, repo, tag)
upstreamAddr, remoteRepo, tag)
imageChannel <- err
return
}
localImageRef, localCachePath, err := getLocalImageRef(imageStore, repo, tag)
localImageRef, localCachePath, err := getLocalImageRef(imageStore, localRepo, tag)
if err != nil {
log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
localCachePath, repo, tag)
localCachePath, localRepo, tag)
imageChannel <- err
@ -206,7 +210,7 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath)
demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, repo, tag)
demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, remoteRepo, tag)
_, copyErr = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options)
if copyErr != nil {
@ -244,11 +248,13 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
log.Error().Err(err).Msgf("sync routine: error while copying image %s to %s",
demandedImageRef, localCachePath)
} else {
_ = finishSyncing(repo, tag, localCachePath, upstreamURL, storeController, retryOptions, httpClient, log)
_ = finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController,
retryOptions, httpClient, log)
}
}()
} else {
err := finishSyncing(repo, tag, localCachePath, upstreamURL, storeController, retryOptions, httpClient, log)
err := finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController,
retryOptions, httpClient, log)
imageChannel <- err
@ -261,26 +267,26 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
}
// push the local image into the storage, sync signatures.
func finishSyncing(repo, tag, localCachePath, upstreamURL string,
func finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL string,
storeController storage.StoreController, retryOptions *retry.RetryOptions,
httpClient *resty.Client, log log.Logger) error {
err := pushSyncedLocalImage(repo, tag, localCachePath, storeController, log)
err := pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log)
if err != nil {
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
fmt.Sprintf("%s/%s:%s", localCachePath, repo, tag))
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
return err
}
if err = retry.RetryIfNecessary(context.Background(), func() error {
err = syncSignatures(httpClient, storeController, upstreamURL, repo, tag, log)
err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature for %s/%s:%s", upstreamURL, repo, tag)
log.Error().Err(err).Msgf("couldn't copy image signature for %s/%s:%s", upstreamURL, remoteRepo, tag)
}
log.Info().Msgf("successfully synced %s/%s:%s", upstreamURL, repo, tag)
log.Info().Msgf("successfully synced %s/%s:%s", upstreamURL, remoteRepo, tag)
return nil
}

View file

@ -60,8 +60,10 @@ type RegistryConfig struct {
}
type Content struct {
Prefix string
Tags *Tags
Prefix string
Tags *Tags
Destination string `mapstructure:",omitempty"`
StripPrefix bool
}
type Tags struct {
@ -323,17 +325,28 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
log.Info().Msgf("got repos: %v", repos)
var images []types.ImageReference
var images []struct {
ref types.ImageReference
content Content
}
upstreamAddr := StripRegistryTransport(upstreamURL)
for contentID, repos := range repos {
r := repos
id := contentID
contentID := contentID
if err = retry.RetryIfNecessary(ctx, func() error {
refs, err := imagesToCopyFromUpstream(ctx, upstreamAddr, r, upstreamCtx, regCfg.Content[id], log)
images = append(images, refs...)
refs, err := imagesToCopyFromUpstream(ctx, upstreamAddr, r, upstreamCtx, regCfg.Content[contentID], log)
for _, ref := range refs {
images = append(images, struct {
ref types.ImageReference
content Content
}{
ref: ref,
content: regCfg.Content[contentID],
})
}
return err
}, retryOptions); err != nil {
@ -350,14 +363,15 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
}
for _, ref := range images {
upstreamImageRef := ref
upstreamImageRef := ref.ref
repo := getRepoFromRef(upstreamImageRef, upstreamAddr)
remoteRepo := getRepoFromRef(upstreamImageRef, upstreamAddr)
localRepo := getRepoDestination(remoteRepo, ref.content)
tag := getTagFromRef(upstreamImageRef, log).Tag()
imageStore := storeController.GetImageStore(repo)
imageStore := storeController.GetImageStore(localRepo)
canBeSkipped, err := canSkipImage(ctx, repo, tag, upstreamImageRef, imageStore, upstreamCtx, log)
canBeSkipped, err := canSkipImage(ctx, localRepo, tag, upstreamImageRef, imageStore, upstreamCtx, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped",
upstreamImageRef.DockerReference())
@ -367,10 +381,10 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
continue
}
localImageRef, localCachePath, err := getLocalImageRef(imageStore, repo, tag)
localImageRef, localCachePath, err := getLocalImageRef(imageStore, localRepo, tag)
if err != nil {
log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
localCachePath, repo, tag)
localCachePath, localRepo, tag)
return err
}
@ -390,16 +404,16 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
return err
}
err = pushSyncedLocalImage(repo, tag, localCachePath, storeController, log)
err = pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log)
if err != nil {
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
fmt.Sprintf("%s/%s:%s", localCachePath, repo, tag))
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
return err
}
if err = retry.RetryIfNecessary(ctx, func() error {
err = syncSignatures(httpClient, storeController, upstreamURL, repo, tag, log)
err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log)
return err
}, retryOptions); err != nil {

View file

@ -270,16 +270,16 @@ func TestSyncInternal(t *testing.T) {
Convey("Test syncSignatures()", t, func() {
log := log.NewLogger("debug", "")
err := syncSignatures(resty.New(), storage.StoreController{}, "%", "repo", "tag", log)
err := syncSignatures(resty.New(), storage.StoreController{}, "%", "repo", "repo", "tag", log)
So(err, ShouldNotBeNil)
err = syncSignatures(resty.New(), storage.StoreController{}, "http://zot", "repo", "tag", log)
err = syncSignatures(resty.New(), storage.StoreController{}, "http://zot", "repo", "repo", "tag", log)
So(err, ShouldNotBeNil)
err = syncSignatures(resty.New(), storage.StoreController{}, "https://google.com", "repo", "tag", log)
err = syncSignatures(resty.New(), storage.StoreController{}, "https://google.com", "repo", "repo", "tag", log)
So(err, ShouldNotBeNil)
url, _ := url.Parse("invalid")
err = syncCosignSignature(resty.New(), storage.StoreController{}, *url, "repo", "tag", log)
err = syncCosignSignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log)
So(err, ShouldNotBeNil)
err = syncNotarySignature(resty.New(), storage.StoreController{}, *url, "repo", "tag", log)
err = syncNotarySignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log)
So(err, ShouldNotBeNil)
})
@ -453,3 +453,158 @@ func TestSyncInternal(t *testing.T) {
So(err, ShouldNotBeNil)
})
}
func TestURLHelperFunctions(t *testing.T) {
testCases := []struct {
repo string
content Content
expected string
}{
{
repo: "alpine/zot-fold/alpine",
content: Content{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: false},
expected: "zot-fold/alpine",
},
{
repo: "zot-fold/alpine",
content: Content{Prefix: "zot-fold/alpine", Destination: "/", StripPrefix: false},
expected: "zot-fold/alpine",
},
{
repo: "alpine",
content: Content{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true},
expected: "zot-fold/alpine",
},
{
repo: "/",
content: Content{Prefix: "zot-fold/alpine", Destination: "/", StripPrefix: true},
expected: "zot-fold/alpine",
},
{
repo: "/",
content: Content{Prefix: "/", Destination: "/", StripPrefix: true},
expected: "/",
},
{
repo: "alpine",
content: Content{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true},
expected: "zot-fold/alpine",
},
{
repo: "alpine",
content: Content{Prefix: "zot-fold/*", Destination: "/", StripPrefix: true},
expected: "zot-fold/alpine",
},
{
repo: "alpine",
content: Content{Prefix: "zot-fold/**", Destination: "/", StripPrefix: true},
expected: "zot-fold/alpine",
},
{
repo: "zot-fold/alpine",
content: Content{Prefix: "zot-fold/**", Destination: "/", StripPrefix: false},
expected: "zot-fold/alpine",
},
}
Convey("Test getRepoDestination()", t, func() {
for _, test := range testCases {
actualResult := getRepoDestination(test.expected, test.content)
So(actualResult, ShouldEqual, test.repo)
}
})
// this is the inverse function of getRepoDestination()
Convey("Test getRepoSource()", t, func() {
for _, test := range testCases {
actualResult := getRepoSource(test.repo, test.content)
So(actualResult, ShouldEqual, test.expected)
}
})
}
func TestFindRepoMatchingContentID(t *testing.T) {
testCases := []struct {
repo string
content []Content
expected struct {
contentID int
err error
}
}{
{
repo: "alpine/zot-fold/alpine",
content: []Content{
{Prefix: "zot-fold/alpine/", Destination: "/alpine", StripPrefix: true},
{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: false},
},
expected: struct {
contentID int
err error
}{contentID: 1, err: nil},
},
{
repo: "alpine/zot-fold/alpine",
content: []Content{
{Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: false},
{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true},
},
expected: struct {
contentID int
err error
}{contentID: 0, err: nil},
},
{
repo: "myFold/zot-fold/internal/alpine",
content: []Content{
{Prefix: "zot-fold/alpine", Destination: "/alpine", StripPrefix: true},
{Prefix: "zot-fold/**", Destination: "/myFold", StripPrefix: false},
},
expected: struct {
contentID int
err error
}{contentID: 1, err: nil},
},
{
repo: "alpine",
content: []Content{
{Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: true},
{Prefix: "zot-fold/alpine", Destination: "/", StripPrefix: true},
},
expected: struct {
contentID int
err error
}{contentID: -1, err: errors.ErrRegistryNoContent},
},
{
repo: "alpine",
content: []Content{
{Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: true},
{Prefix: "zot-fold/*", Destination: "/", StripPrefix: true},
},
expected: struct {
contentID int
err error
}{contentID: 1, err: nil},
},
{
repo: "alpine/alpine",
content: []Content{
{Prefix: "zot-fold/*", Destination: "/alpine", StripPrefix: true},
{Prefix: "zot-fold/*", Destination: "/", StripPrefix: true},
},
expected: struct {
contentID int
err error
}{contentID: 0, err: nil},
},
}
Convey("Test findRepoMatchingContentID()", t, func() {
for _, test := range testCases {
actualResult, err := findRepoMatchingContentID(test.repo, test.content)
So(actualResult, ShouldEqual, test.expected.contentID)
So(err, ShouldResemble, test.expected.err)
}
})
}

View file

@ -100,6 +100,95 @@ func filterRepos(repos []string, contentList []Content, log log.Logger) map[int]
return filtered
}
// findRepoContentID return the contentID that maches the localRepo path for a given RegistryConfig in the config file.
func findRepoMatchingContentID(localRepo string, contentList []Content) (int, error) {
contentID := -1
localRepo = strings.Trim(localRepo, "/")
for cID, content := range contentList {
// make sure prefix ends in "/" to extract the meta characters
prefix := strings.Trim(content.Prefix, "/") + "/"
destination := strings.Trim(content.Destination, "/")
var patternSlice []string
if content.StripPrefix {
_, metaCharacters := glob.SplitPattern(prefix)
patternSlice = append(patternSlice, destination, metaCharacters)
} else {
patternSlice = append(patternSlice, destination, prefix)
}
pattern := strings.Trim(strings.Join(patternSlice, "/"), "/")
matched, err := glob.Match(pattern, localRepo)
if err != nil {
continue
}
if matched {
contentID = cID
break
}
}
if contentID == -1 {
return -1, zerr.ErrRegistryNoContent
}
return contentID, nil
}
func getRepoSource(localRepo string, content Content) string {
localRepo = strings.Trim(localRepo, "/")
destination := strings.Trim(content.Destination, "/")
prefix := strings.Trim(content.Prefix, "/*")
var localRepoSlice []string
localRepo = strings.TrimPrefix(localRepo, destination)
localRepo = strings.Trim(localRepo, "/")
if content.StripPrefix {
localRepoSlice = append([]string{prefix}, localRepo)
} else {
localRepoSlice = []string{localRepo}
}
repoSource := strings.Join(localRepoSlice, "/")
if repoSource == "/" {
return repoSource
}
return strings.Trim(repoSource, "/")
}
// getRepoDestination returns the local storage path of the synced repo based on the specified destination.
func getRepoDestination(remoteRepo string, content Content) string {
remoteRepo = strings.Trim(remoteRepo, "/")
destination := strings.Trim(content.Destination, "/")
prefix := strings.Trim(content.Prefix, "/*")
var repoDestSlice []string
if content.StripPrefix {
remoteRepo = strings.TrimPrefix(remoteRepo, prefix)
remoteRepo = strings.Trim(remoteRepo, "/")
repoDestSlice = append(repoDestSlice, destination, remoteRepo)
} else {
repoDestSlice = append(repoDestSlice, destination, remoteRepo)
}
repoDestination := strings.Join(repoDestSlice, "/")
if repoDestination == "/" {
return "/"
}
return strings.Trim(repoDestination, "/")
}
// Get sync.FileCredentials from file.
func getFileCredentials(filepath string) (CredentialsFile, error) {
credsFile, err := ioutil.ReadFile(filepath)
@ -174,7 +263,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede
}
func syncCosignSignature(client *resty.Client, storeController storage.StoreController,
regURL url.URL, repo, digest string, log log.Logger) error {
regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error {
log.Info().Msg("syncing cosign signatures")
getCosignManifestURL := regURL
@ -183,7 +272,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
digest = strings.Replace(digest, ":", "-", 1) + ".sig"
}
getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", repo, "manifests", digest)
getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", remoteRepo, "manifests", digest)
getCosignManifestURL.RawQuery = getCosignManifestURL.Query().Encode()
@ -212,12 +301,12 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
return err
}
imageStore := storeController.GetImageStore(repo)
imageStore := storeController.GetImageStore(localRepo)
for _, blob := range m.Layers {
// get blob
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", repo, "blobs", blob.Digest.String())
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
@ -236,7 +325,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
defer resp.RawBody().Close()
// push blob
_, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), blob.Digest.String())
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosign blob")
@ -246,7 +335,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
// get config blob
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", repo, "blobs", m.Config.Digest.String())
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", m.Config.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
@ -265,7 +354,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
defer resp.RawBody().Close()
// push config blob
_, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), m.Config.Digest.String())
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), m.Config.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosign blob")
@ -273,7 +362,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
}
// push manifest
_, err = imageStore.PutImageManifest(repo, digest, ispec.MediaTypeImageManifest, mResp.Body())
_, err = imageStore.PutImageManifest(localRepo, digest, ispec.MediaTypeImageManifest, mResp.Body())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosing manifest")
@ -284,13 +373,14 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
}
func syncNotarySignature(client *resty.Client, storeController storage.StoreController,
regURL url.URL, repo, digest string, log log.Logger) error {
regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error {
log.Info().Msg("syncing notary signatures")
getReferrersURL := regURL
// based on manifest digest get referrers
getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/", repo, "manifests", digest, "referrers")
getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/",
remoteRepo, "manifests", digest, "referrers")
getReferrersURL.RawQuery = getReferrersURL.Query().Encode()
resp, err := client.R().
@ -319,12 +409,12 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont
return err
}
imageStore := storeController.GetImageStore(repo)
imageStore := storeController.GetImageStore(localRepo)
for _, ref := range referrers.References {
// get referrer manifest
getRefManifestURL := regURL
getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", repo, "manifests", ref.Digest.String())
getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", remoteRepo, "manifests", ref.Digest.String())
getRefManifestURL.RawQuery = getRefManifestURL.Query().Encode()
resp, err := client.R().
@ -347,7 +437,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont
for _, blob := range m.Blobs {
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", repo, "blobs", blob.Digest.String())
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
@ -366,7 +456,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont
return zerr.ErrBadBlobDigest
}
_, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), blob.Digest.String())
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload notary sig blob")
@ -374,7 +464,8 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont
}
}
_, err = imageStore.PutImageManifest(repo, ref.Digest.String(), artifactspec.MediaTypeArtifactManifest, resp.Body())
_, err = imageStore.PutImageManifest(localRepo, ref.Digest.String(), artifactspec.MediaTypeArtifactManifest,
resp.Body())
if err != nil {
log.Error().Err(err).Msg("couldn't upload notary sig manifest")
@ -386,8 +477,8 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont
}
func syncSignatures(client *resty.Client, storeController storage.StoreController,
registryURL, repo, tag string, log log.Logger) error {
log.Info().Msgf("syncing signatures from %s/%s:%s", registryURL, repo, tag)
registryURL, remoteRepo, localRepo, tag string, log log.Logger) error {
log.Info().Msgf("syncing signatures from %s/%s:%s", registryURL, remoteRepo, tag)
// get manifest and find out its digest
regURL, err := url.Parse(registryURL)
if err != nil {
@ -398,7 +489,7 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle
getManifestURL := *regURL
getManifestURL.Path = path.Join(getManifestURL.Path, "v2", repo, "manifests", tag)
getManifestURL.Path = path.Join(getManifestURL.Path, "v2", remoteRepo, "manifests", tag)
resp, err := client.R().SetHeader("Content-Type", "application/json").Head(getManifestURL.String())
if err != nil {
@ -408,48 +499,42 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle
return err
}
digests, ok := resp.Header()["Docker-Content-Digest"]
if !ok {
digest := resp.Header().Get("Docker-Content-Digest")
if digest == "" {
log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()).
Msgf("couldn't get digest for manifest: %s:%s", repo, tag)
Msgf("couldn't get digest for manifest: %s:%s", remoteRepo, tag)
return zerr.ErrBadBlobDigest
}
if len(digests) != 1 {
log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()).
Msgf("multiple digests found for: %s:%s", repo, tag)
return zerr.ErrBadBlobDigest
}
err = syncNotarySignature(client, storeController, *regURL, repo, digests[0], log)
err = syncNotarySignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log)
if err != nil {
return err
}
err = syncCosignSignature(client, storeController, *regURL, repo, digests[0], log)
err = syncCosignSignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log)
if err != nil {
return err
}
log.Info().Msgf("successfully synced %s/%s:%s signatures", registryURL, repo, tag)
log.Info().Msgf("successfully synced %s/%s:%s signatures", registryURL, remoteRepo, tag)
return nil
}
func pushSyncedLocalImage(repo, tag, localCachePath string,
func pushSyncedLocalImage(localRepo, tag, localCachePath string,
storeController storage.StoreController, log log.Logger) error {
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, repo, tag)
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, tag)
imageStore := storeController.GetImageStore(repo)
imageStore := storeController.GetImageStore(localRepo)
metrics := monitoring.NewMetricsServer(false, log)
cacheImageStore := storage.NewImageStore(localCachePath, false, storage.DefaultGCDelay, false, false, log, metrics)
manifestContent, _, _, err := cacheImageStore.GetImageManifest(repo, tag)
manifestContent, _, _, err := cacheImageStore.GetImageManifest(localRepo, tag)
if err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("couldn't find index.json")
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)).
Msg("couldn't find index.json")
return err
}
@ -457,21 +542,22 @@ func pushSyncedLocalImage(repo, tag, localCachePath string,
var manifest ispec.Manifest
if err := json.Unmarshal(manifestContent, &manifest); err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("invalid JSON")
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)).
Msg("invalid JSON")
return err
}
for _, blob := range manifest.Layers {
blobReader, _, err := cacheImageStore.GetBlob(repo, blob.Digest.String(), blob.MediaType)
blobReader, _, err := cacheImageStore.GetBlob(localRepo, blob.Digest.String(), blob.MediaType)
if err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(),
repo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob")
localRepo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob")
return err
}
_, _, err = imageStore.FullBlobUpload(repo, blobReader, blob.Digest.String())
_, _, err = imageStore.FullBlobUpload(localRepo, blobReader, blob.Digest.String())
if err != nil {
log.Error().Err(err).Str("blob digest", blob.Digest.String()).Msg("couldn't upload blob")
@ -479,29 +565,29 @@ func pushSyncedLocalImage(repo, tag, localCachePath string,
}
}
blobReader, _, err := cacheImageStore.GetBlob(repo, manifest.Config.Digest.String(), manifest.Config.MediaType)
blobReader, _, err := cacheImageStore.GetBlob(localRepo, manifest.Config.Digest.String(), manifest.Config.MediaType)
if err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(),
repo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob")
localRepo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob")
return err
}
_, _, err = imageStore.FullBlobUpload(repo, blobReader, manifest.Config.Digest.String())
_, _, err = imageStore.FullBlobUpload(localRepo, blobReader, manifest.Config.Digest.String())
if err != nil {
log.Error().Err(err).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't upload config blob")
return err
}
_, err = imageStore.PutImageManifest(repo, tag, ispec.MediaTypeImageManifest, manifestContent)
_, err = imageStore.PutImageManifest(localRepo, tag, ispec.MediaTypeImageManifest, manifestContent)
if err != nil {
log.Error().Err(err).Msg("couldn't upload manifest")
return err
}
log.Info().Msgf("removing temporary cached synced repo %s", path.Join(cacheImageStore.RootDir(), repo))
log.Info().Msgf("removing temporary cached synced repo %s", path.Join(cacheImageStore.RootDir(), localRepo))
if err := os.RemoveAll(cacheImageStore.RootDir()); err != nil {
log.Error().Err(err).Msg("couldn't remove locally cached sync repo")