0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-16 21:56:37 -05:00

sync: Add a new flag to enforce syncing only signed images, closes #455

sync: When checking if a image is already synced also check for changes in upstream signatures.

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
Petu Eusebiu 2022-03-07 10:45:10 +02:00 committed by Ramkumar Chinchani
parent dd6cedcf78
commit f53dc9eb8d
11 changed files with 1417 additions and 676 deletions

View file

@ -50,4 +50,6 @@ var (
ErrInjected = errors.New("test: injected failure")
ErrSyncInvalidUpstreamURL = errors.New("sync: upstream url not found in sync config")
ErrRegistryNoContent = errors.New("sync: could not find a Content that matches localRepo")
ErrSyncSignatureNotFound = errors.New("sync: couldn't find any upstream notary/cosign signatures")
ErrSyncSignature = errors.New("sync: couldn't get upstream notary/cosign signatures")
)

View file

@ -390,6 +390,7 @@ Configure each registry sync:
"certDir": "/home/user/certs", # use certificates at certDir path, if not specified then use the default certs dir
"maxRetries": 5, # maxRetries in case of temporary errors (default: no retries)
"retryDelay": "10m", # delay between retries, retry options are applied for both on demand and periodically sync and retryDelay is mandatory when using maxRetries.
"onlySigned": true, # sync only signed images (either notary or cosign)
"content":[ # which content to periodically pull, also it's used for filtering ondemand images, if not set then periodically polling will not run
{
"prefix":"/repo1/repo", # pull image repo1/repo

View file

@ -22,6 +22,7 @@
"certDir": "/home/user/certs",
"maxRetries": 3,
"retryDelay": "5m",
"onlySigned": true,
"content":[
{
"prefix":"/repo1/repo",
@ -64,4 +65,4 @@
]
}
}
}
}

17
go.sum
View file

@ -76,7 +76,10 @@ cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjp
cloud.google.com/go/pubsub v1.17.1/go.mod h1:4qDxMr1WsM9+aQAz36ltDwCIM+R0QdlseyFjBuNvnss=
cloud.google.com/go/secretmanager v1.0.0/go.mod h1:+Qkm5qxIJ5mk74xxIXA+87fseaY1JLYBcFPQoc/GQxg=
cloud.google.com/go/security v1.1.1/go.mod h1:QZd0wTwNJNKnl0H4/wAFD10TSX8kI4nk8V6ie6fyc9w=
cloud.google.com/go/security v1.1.1/go.mod h1:QZd0wTwNJNKnl0H4/wAFD10TSX8kI4nk8V6ie6fyc9w=
cloud.google.com/go/spanner v1.17.0/go.mod h1:+17t2ixFwRG4lWRwE+5kipDR9Ef07Jkmc8z0IbMDKUs=
cloud.google.com/go/spanner v1.17.0/go.mod h1:+17t2ixFwRG4lWRwE+5kipDR9Ef07Jkmc8z0IbMDKUs=
cloud.google.com/go/spanner v1.18.0/go.mod h1:LvAjUXPeJRGNuGpikMULjhLj/t9cRvdc+fxRoLiugXA=
cloud.google.com/go/spanner v1.18.0/go.mod h1:LvAjUXPeJRGNuGpikMULjhLj/t9cRvdc+fxRoLiugXA=
cloud.google.com/go/spanner v1.25.0/go.mod h1:kQUft3x355hzzaeFbObjsvkzZDgpDkesp3v75WBnI8w=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
@ -142,6 +145,8 @@ github.com/Azure/azure-sdk-for-go v61.5.0+incompatible h1:OSHSFeNm7D1InGsQrFjyN9
github.com/Azure/azure-sdk-for-go v61.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-service-bus-go v0.9.1/go.mod h1:yzBx6/BUGfjfeqbRZny9AQIbIe3AcV9WZbAdpkoXOa0=
github.com/Azure/azure-service-bus-go v0.11.5/go.mod h1:MI6ge2CuQWBVq+ly456MY7XqNLJip5LO1iSFodbNLbU=
github.com/Azure/azure-service-bus-go v0.11.5/go.mod h1:MI6ge2CuQWBVq+ly456MY7XqNLJip5LO1iSFodbNLbU=
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck=
github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
@ -1370,6 +1375,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/go-replayers/grpcreplay v0.1.0/go.mod h1:8Ig2Idjpr6gifRd6pNVggX6TC1Zw6Jx74AKp7QNH2QE=
github.com/google/go-replayers/grpcreplay v1.1.0/go.mod h1:qzAvJ8/wi57zq7gWqaE6AwLM6miiXUQwP1S+I9icmhk=
github.com/google/go-replayers/grpcreplay v1.1.0/go.mod h1:qzAvJ8/wi57zq7gWqaE6AwLM6miiXUQwP1S+I9icmhk=
github.com/google/go-replayers/httpreplay v0.1.0/go.mod h1:YKZViNhiGgqdBlUbI2MwGpq4pXxNmhJLPHQ7cv2b5no=
github.com/google/go-replayers/httpreplay v0.1.0/go.mod h1:YKZViNhiGgqdBlUbI2MwGpq4pXxNmhJLPHQ7cv2b5no=
github.com/google/go-replayers/httpreplay v1.0.0/go.mod h1:LJhKoTwS5Wy5Ld/peq8dFFG5OfJyHEz7ft+DsTUv25M=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
@ -2069,6 +2076,8 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ=
github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ=
github.com/oras-project/artifacts-spec v0.0.0-20210914235636-eecc5d95bcee/go.mod h1:Xch2aLzSwtkhbFFN6LUzTfLtukYvMMdXJ4oZ8O7BOdc=
github.com/oras-project/artifacts-spec v0.0.0-20210914235636-eecc5d95bcee/go.mod h1:Xch2aLzSwtkhbFFN6LUzTfLtukYvMMdXJ4oZ8O7BOdc=
github.com/oras-project/artifacts-spec v1.0.0-draft.1.1 h1:2YMUDyDH0glYA4gNG/zEg9HNVzgGX8kr/NBLR9AQkLQ=
github.com/oras-project/artifacts-spec v1.0.0-draft.1.1/go.mod h1:Xch2aLzSwtkhbFFN6LUzTfLtukYvMMdXJ4oZ8O7BOdc=
@ -2257,8 +2266,11 @@ github.com/sassoftware/relic v0.0.0-20210427151427-dfb082b79b74/go.mod h1:YlB8wF
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U=
github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U=
github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/secure-systems-lab/go-securesystemslib v0.2.0/go.mod h1:eIjBmIP8LD2MLBL/DkQWayLiz006Q4p+hCu79rvWleY=
@ -2283,6 +2295,7 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk=
github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sigstore/cosign v1.6.0 h1:GBG+asPgsf2iawldL9uO8JDGbFO5yXuvaBTjdejsMzo=
github.com/sigstore/cosign v1.6.0/go.mod h1:Ocd28z0Pwtd6+A8s/Vb4SbhwuWOqVdeYAW4yCGF4Ndg=
github.com/sigstore/fulcio v0.1.2-0.20220114150912-86a2036f9bc7 h1:XE7A9lJ+wYhmUFBWYTaw3Ph943zHB4iBYd5R0SX0ZOA=
@ -2706,6 +2719,8 @@ go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
gocloud.dev v0.19.0/go.mod h1:SmKwiR8YwIMMJvQBKLsC3fHNyMwXLw3PMDO+VVteJMI=
gocloud.dev v0.19.0/go.mod h1:SmKwiR8YwIMMJvQBKLsC3fHNyMwXLw3PMDO+VVteJMI=
gocloud.dev v0.24.1-0.20211119014450-028788aaaa4c/go.mod h1:EIJSlY7nvfeoWaV2GauF6es27gZfqtTVon47QFueoyE=
golang.org/x/build v0.0.0-20190314133821-5284462c4bec/go.mod h1:atTaCNAy0f16Ah5aV1gMSwgiKVHwu/JncqDpuRr7lS4=
@ -3625,6 +3640,7 @@ k8s.io/component-base v0.20.1/go.mod h1:guxkoJnNoh8LNrbtiQOlyp2Y2XFCZQmrcg2n/DeY
k8s.io/component-base v0.20.4/go.mod h1:t4p9EdiagbVCJKrQ1RsA5/V4rFQNDfRlevJajlGwgjI=
k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMYnrM=
k8s.io/component-base v0.22.5/go.mod h1:VK3I+TjuF9eaa+Ln67dKxhGar5ynVbwnGrUiNF4MqCI=
k8s.io/component-base v0.22.5/go.mod h1:VK3I+TjuF9eaa+Ln67dKxhGar5ynVbwnGrUiNF4MqCI=
k8s.io/cri-api v0.17.3/go.mod h1:X1sbHmuXhwaHs9xxYffLqJogVsnI+f6cPRcgPel7ywM=
k8s.io/cri-api v0.20.1/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI=
k8s.io/cri-api v0.20.4/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI=
@ -3670,6 +3686,7 @@ k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20220127004650-9b3446523e65 h1:ONWS0Wgdg5wRiQIAui7L/023aC9+IxrIrydY7l8llsE=
k8s.io/utils v0.0.0-20220127004650-9b3446523e65/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/hack v0.0.0-20220118141833-9b2ed8471e30/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack v0.0.0-20220118141833-9b2ed8471e30/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20220224013837-e1785985d364/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/pkg v0.0.0-20220202132633-df430fa0dd96 h1:JU0DFa06CaUtwSkAY0b3j47ohEJLIYlpPPgNgbPHlAo=
knative.dev/pkg v0.0.0-20220202132633-df430fa0dd96/go.mod h1:etVT7Tm8pSDf4RKhGk4r7j/hj3dNBpvT7bO6a6wpahs=

View file

@ -10,11 +10,26 @@ import (
"github.com/containers/common/pkg/retry"
"github.com/containers/image/v5/copy"
"github.com/containers/image/v5/docker"
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/types"
"gopkg.in/resty.v1"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage"
)
type syncContextUtils struct {
imageStore storage.ImageStore
policyCtx *signature.PolicyContext
localCtx *types.SystemContext
upstreamCtx *types.SystemContext
client *resty.Client
url *url.URL
upstreamAddr string
retryOptions *retry.RetryOptions
copyOptions copy.Options
}
// nolint: gochecknoglobals
var demandedImgs demandedImages
@ -89,8 +104,6 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
}
}
var copyErr error
localCtx, policyCtx, err := getLocalContexts(log)
if err != nil {
imageChannel <- err
@ -139,42 +152,50 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
upstreamAddr := StripRegistryTransport(upstreamURL)
httpClient, err := getHTTPClient(&regCfg, upstreamURL, credentialsFile[upstreamAddr], log)
httpClient, registryURL, err := getHTTPClient(&regCfg, upstreamURL, credentialsFile[upstreamAddr], log)
if err != nil {
imageChannel <- err
return
}
// it's an image
upstreamCtx := getUpstreamContext(&regCfg, credentialsFile[upstreamAddr])
options := getCopyOptions(upstreamCtx, localCtx)
// demanded 'image' is a signature
if isCosignTag(tag) || isArtifact {
if isCosignTag(tag) {
// at tis point we should already have images synced, but not their signatures.
regURL, err := url.Parse(upstreamURL)
if err != nil {
log.Error().Err(err).Msgf("couldn't parse registry URL: %s", upstreamURL)
imageChannel <- err
return
}
// is notary signature
if isArtifact {
err = syncNotarySignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag)
continue
}
imageChannel <- nil
return
}
// is cosign signature
err = syncCosignSignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log)
cosignManifest, err := getCosignManifest(httpClient, *registryURL, remoteRepo, tag, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag)
log.Error().Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, remoteRepo, tag)
continue
}
err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, tag, cosignManifest, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, remoteRepo, tag)
continue
}
imageChannel <- nil
return
} else if isArtifact {
// is notary signature
refs, err := getNotaryRefs(httpClient, *registryURL, remoteRepo, tag, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, remoteRepo, tag)
continue
}
err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, tag, refs, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, remoteRepo, tag)
continue
}
@ -184,43 +205,35 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
return
}
// it's an image
upstreamCtx := getUpstreamContext(&regCfg, credentialsFile[upstreamAddr])
options := getCopyOptions(upstreamCtx, localCtx)
upstreamImageRef, err := getImageRef(upstreamAddr, remoteRepo, tag)
if err != nil {
log.Error().Err(err).Msgf("error creating docker reference for repository %s/%s:%s",
upstreamAddr, remoteRepo, tag)
imageChannel <- err
return
syncContextUtils := syncContextUtils{
imageStore: imageStore,
policyCtx: policyCtx,
localCtx: localCtx,
upstreamCtx: upstreamCtx,
client: httpClient,
url: registryURL,
upstreamAddr: upstreamAddr,
retryOptions: retryOptions,
copyOptions: options,
}
localImageRef, localCachePath, err := getLocalImageRef(imageStore, localRepo, tag)
if err != nil {
log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
localCachePath, localRepo, tag)
imageChannel <- err
return
skipped, copyErr := syncRun(regCfg, localRepo, remoteRepo, tag, syncContextUtils, log)
if skipped {
continue
}
log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath)
// key used to check if we already have a go routine syncing this image
demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, remoteRepo, tag)
_, copyErr = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options)
if copyErr != nil {
log.Error().Err(err).Msgf("error encountered while syncing on demand %s to %s",
upstreamImageRef.DockerReference(), localCachePath)
// don't retry in background if maxretry is 0
if retryOptions.MaxRetry == 0 {
continue
}
_, found := demandedImgs.loadOrStoreStr(demandedImageRef, "")
if found || retryOptions.MaxRetry == 0 {
defer os.RemoveAll(localCachePath)
log.Info().Msgf("image %s already demanded in background or sync.registries[].MaxRetries == 0", demandedImageRef)
if found {
log.Info().Msgf("image %s already demanded in background", demandedImageRef)
/* we already have a go routine spawned for this image
or retryOptions is not configured */
continue
@ -230,8 +243,6 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
go func() {
// remove image after syncing
defer func() {
_ = os.RemoveAll(localCachePath)
demandedImgs.delete(demandedImageRef)
log.Info().Msgf("sync routine: %s exited", demandedImageRef)
}()
@ -241,52 +252,105 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
time.Sleep(retryOptions.Delay)
if err = retry.RetryIfNecessary(context.Background(), func() error {
_, err := copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options)
_, err := syncRun(regCfg, localRepo, remoteRepo, tag, syncContextUtils, log)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("sync routine: error while copying image %s to %s",
demandedImageRef, localCachePath)
} else {
_ = finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController,
retryOptions, httpClient, log)
log.Error().Err(err).Msgf("sync routine: error while copying image %s", demandedImageRef)
}
}()
} else {
err := finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController,
retryOptions, httpClient, log)
imageChannel <- err
return
}
}
}
imageChannel <- err
imageChannel <- nil
}
// push the local image into the storage, sync signatures.
func finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL string,
storeController storage.StoreController, retryOptions *retry.RetryOptions,
httpClient *resty.Client, log log.Logger) error {
err := pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log)
func syncRun(regCfg RegistryConfig, localRepo, remoteRepo, tag string, utils syncContextUtils,
log log.Logger) (bool, error) {
upstreamImageRef, err := getImageRef(utils.upstreamAddr, remoteRepo, tag)
if err != nil {
log.Error().Err(err).Msgf("error creating docker reference for repository %s/%s:%s",
utils.upstreamAddr, remoteRepo, tag)
return false, err
}
upstreamImageDigest, err := docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef)
if err != nil {
log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference())
return false, err
}
// get upstream signatures
cosignManifest, err := getCosignManifest(utils.client, *utils.url, remoteRepo,
upstreamImageDigest.String(), log)
if err != nil {
log.Error().Err(err).Msgf("couldn't get upstream image %s cosign manifest", upstreamImageRef.DockerReference())
}
refs, err := getNotaryRefs(utils.client, *utils.url, remoteRepo, upstreamImageDigest.String(), log)
if err != nil {
log.Error().Err(err).Msgf("couldn't get upstream image %s notary references", upstreamImageRef.DockerReference())
}
// check if upstream image is signed
if cosignManifest == nil && len(refs.References) == 0 {
// upstream image not signed
if regCfg.OnlySigned != nil && *regCfg.OnlySigned {
// skip unsigned images
log.Info().Msgf("skipping image without signature %s", upstreamImageRef.DockerReference())
return true, nil
}
}
localImageRef, localCachePath, err := getLocalImageRef(utils.imageStore, localRepo, tag)
if err != nil {
log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
localCachePath, localRepo, tag)
return false, err
}
defer os.RemoveAll(localCachePath)
log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath)
_, err = copy.Image(context.Background(), utils.policyCtx, localImageRef, upstreamImageRef, &utils.copyOptions)
if err != nil {
log.Error().Err(err).Msgf("error encountered while syncing on demand %s to %s",
upstreamImageRef.DockerReference(), localCachePath)
return false, err
}
err = pushSyncedLocalImage(localRepo, tag, localCachePath, utils.imageStore, log)
if err != nil {
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
return err
return false, err
}
if err = retry.RetryIfNecessary(context.Background(), func() error {
err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log)
err = syncCosignSignature(utils.client, utils.imageStore, *utils.url, localRepo, remoteRepo,
upstreamImageDigest.String(), cosignManifest, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, remoteRepo, tag)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature for %s/%s:%s", upstreamURL, remoteRepo, tag)
return false, err
}
log.Info().Msgf("successfully synced %s/%s:%s", upstreamURL, remoteRepo, tag)
err = syncNotarySignature(utils.client, utils.imageStore, *utils.url, localRepo, remoteRepo,
upstreamImageDigest.String(), refs, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, remoteRepo, tag)
return nil
return false, err
}
log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, remoteRepo, tag)
return false, nil
}

View file

@ -0,0 +1,352 @@
package sync
import (
"encoding/json"
"errors"
"net/http"
"net/url"
"path"
"strings"
notreg "github.com/notaryproject/notation/pkg/registry"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/sigstore/cosign/pkg/cosign"
"gopkg.in/resty.v1"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage"
)
func getCosignManifest(client *resty.Client, regURL url.URL, repo, digest string,
log log.Logger) (*ispec.Manifest, error) {
var m ispec.Manifest
cosignTag := getCosignTagFromImageDigest(digest)
getCosignManifestURL := regURL
getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", repo, "manifests", cosignTag)
getCosignManifestURL.RawQuery = getCosignManifestURL.Query().Encode()
resp, err := client.R().Get(getCosignManifestURL.String())
if err != nil {
log.Error().Err(err).Str("url", getCosignManifestURL.String()).
Msgf("couldn't get cosign manifest: %s", cosignTag)
return nil, err
}
if resp.StatusCode() == http.StatusNotFound {
log.Info().Msgf("couldn't find any cosign signature from %s, status code: %d skipping",
getCosignManifestURL.String(), resp.StatusCode())
return nil, zerr.ErrSyncSignatureNotFound
} else if resp.IsError() {
log.Error().Err(zerr.ErrSyncSignature).Msgf("couldn't get cosign signature from %s, status code: %d skipping",
getCosignManifestURL.String(), resp.StatusCode())
return nil, zerr.ErrSyncSignature
}
err = json.Unmarshal(resp.Body(), &m)
if err != nil {
log.Error().Err(err).Str("url", getCosignManifestURL.String()).
Msgf("couldn't unmarshal cosign manifest %s", cosignTag)
return nil, err
}
return &m, nil
}
func getNotaryRefs(client *resty.Client, regURL url.URL, repo, digest string, log log.Logger) (ReferenceList, error) {
var referrers ReferenceList
getReferrersURL := regURL
// based on manifest digest get referrers
getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/", repo, "manifests", digest, "referrers")
getReferrersURL.RawQuery = getReferrersURL.Query().Encode()
resp, err := client.R().
SetHeader("Content-Type", "application/json").
SetQueryParam("artifactType", notreg.ArtifactTypeNotation).
Get(getReferrersURL.String())
if err != nil {
log.Error().Err(err).Str("url", getReferrersURL.String()).Msg("couldn't get referrers")
return referrers, err
}
if resp.StatusCode() == http.StatusNotFound || resp.StatusCode() == http.StatusBadRequest {
log.Info().Msgf("couldn't find any notary signature from %s, status code: %d, skipping",
getReferrersURL.String(), resp.StatusCode())
return ReferenceList{}, zerr.ErrSyncSignatureNotFound
} else if resp.IsError() {
log.Error().Err(zerr.ErrSyncSignature).Msgf("couldn't get notary signature from %s, status code: %d skipping",
getReferrersURL.String(), resp.StatusCode())
return ReferenceList{}, zerr.ErrSyncSignature
}
err = json.Unmarshal(resp.Body(), &referrers)
if err != nil {
log.Error().Err(err).Str("url", getReferrersURL.String()).
Msgf("couldn't unmarshal notary signature")
return referrers, err
}
return referrers, nil
}
func syncCosignSignature(client *resty.Client, imageStore storage.ImageStore,
regURL url.URL, localRepo, remoteRepo, digest string, cosignManifest *ispec.Manifest, log log.Logger,
) error {
cosignTag := getCosignTagFromImageDigest(digest)
// if no manifest found
if cosignManifest == nil {
return nil
}
log.Info().Msg("syncing cosign signatures")
for _, blob := range cosignManifest.Layers {
// get blob
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get cosign blob: %s", blob.Digest.String())
return err
}
if resp.IsError() {
log.Info().Msgf("couldn't find cosign blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
return zerr.ErrSyncSignature
}
defer resp.RawBody().Close()
// push blob
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosign blob")
return err
}
}
// get config blob
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", cosignManifest.Config.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get cosign config blob: %s", getBlobURL.String())
return err
}
if resp.IsError() {
log.Info().Msgf("couldn't find cosign config blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
return zerr.ErrSyncSignature
}
defer resp.RawBody().Close()
// push config blob
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), cosignManifest.Config.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosign config blob")
return err
}
cosignManifestBuf, err := json.Marshal(cosignManifest)
if err != nil {
log.Error().Err(err).Msg("couldn't marshal cosign manifest")
}
// push manifest
_, err = imageStore.PutImageManifest(localRepo, cosignTag, ispec.MediaTypeImageManifest, cosignManifestBuf)
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosign manifest")
return err
}
log.Info().Msgf("successfully synced cosign signature for repo %s digest %s", localRepo, digest)
return nil
}
func syncNotarySignature(client *resty.Client, imageStore storage.ImageStore,
regURL url.URL, localRepo, remoteRepo, digest string, referrers ReferenceList, log log.Logger) error {
if len(referrers.References) == 0 {
return nil
}
log.Info().Msg("syncing notary signatures")
for _, ref := range referrers.References {
// get referrer manifest
getRefManifestURL := regURL
getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", remoteRepo, "manifests", ref.Digest.String())
getRefManifestURL.RawQuery = getRefManifestURL.Query().Encode()
resp, err := client.R().
Get(getRefManifestURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get notary manifest: %s", getRefManifestURL.String())
return err
}
// read manifest
var m artifactspec.Manifest
err = json.Unmarshal(resp.Body(), &m)
if err != nil {
log.Error().Err(err).Msgf("couldn't unmarshal notary manifest: %s", getRefManifestURL.String())
return err
}
for _, blob := range m.Blobs {
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get notary blob: %s", getBlobURL.String())
return err
}
defer resp.RawBody().Close()
if resp.IsError() {
log.Info().Msgf("couldn't find notary blob from %s, status code: %d",
getBlobURL.String(), resp.StatusCode())
return zerr.ErrSyncSignature
}
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload notary sig blob")
return err
}
}
_, err = imageStore.PutImageManifest(localRepo, ref.Digest.String(),
artifactspec.MediaTypeArtifactManifest, resp.Body())
if err != nil {
log.Error().Err(err).Msg("couldn't upload notary sig manifest")
return err
}
}
log.Info().Msgf("successfully synced notary signature for repo %s digest %s", localRepo, digest)
return nil
}
func canSkipNotarySignature(repo, tag, digest string, refs ReferenceList, imageStore storage.ImageStore,
log log.Logger) (bool, error) {
// check notary signature already synced
if len(refs.References) > 0 {
localRefs, err := imageStore.GetReferrers(repo, digest, notreg.ArtifactTypeNotation)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) {
return false, nil
}
log.Error().Err(err).Msgf("couldn't get local notary signature %s:%s manifest", repo, tag)
return false, err
}
if !artifactDescriptorsEqual(localRefs, refs.References) {
log.Info().Msgf("upstream notary signatures %s:%s changed, syncing again", repo, tag)
return false, nil
}
}
log.Info().Msgf("skipping notary signature %s:%s, already synced", repo, tag)
return true, nil
}
func canSkipCosignSignature(repo, tag, digest string, cosignManifest *ispec.Manifest, imageStore storage.ImageStore,
log log.Logger) (bool, error) {
// check cosign signature already synced
if cosignManifest != nil {
var localCosignManifest ispec.Manifest
/* we need to use tag (cosign format: sha256-$IMAGE_TAG.sig) instead of digest to get local cosign manifest
because of an issue where cosign digests differs between upstream and downstream */
cosignManifestTag := getCosignTagFromImageDigest(digest)
localCosignManifestBuf, _, _, err := imageStore.GetImageManifest(repo, cosignManifestTag)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) {
return false, nil
}
log.Error().Err(err).Msgf("couldn't get local cosign %s:%s manifest", repo, tag)
return false, err
}
err = json.Unmarshal(localCosignManifestBuf, &localCosignManifest)
if err != nil {
log.Error().Err(err).Msgf("couldn't unmarshal local cosign signature %s:%s manifest", repo, tag)
return false, err
}
if !manifestsEqual(localCosignManifest, *cosignManifest) {
log.Info().Msgf("upstream cosign signatures %s:%s changed, syncing again", repo, tag)
return false, nil
}
}
log.Info().Msgf("skipping cosign signature %s:%s, already synced", repo, tag)
return true, nil
}
// sync feature will try to pull cosign signature because for sync cosign signature is just an image
// this function will check if tag is a cosign tag.
func isCosignTag(tag string) bool {
if strings.HasPrefix(tag, "sha256-") && strings.HasSuffix(tag, cosign.SignatureTagSuffix) {
return true
}
return false
}
func getCosignTagFromImageDigest(digest string) string {
if !isCosignTag(digest) {
return strings.Replace(digest, ":", "-", 1) + cosign.SignatureTagSuffix
}
return digest
}

View file

@ -3,6 +3,7 @@ package sync
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
@ -19,7 +20,7 @@ import (
"github.com/containers/image/v5/types"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/resty.v1"
"zotregistry.io/zot/errors"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/test"
@ -57,6 +58,7 @@ type RegistryConfig struct {
CertDir string
MaxRetries *int
RetryDelay *time.Duration
OnlySigned *bool
}
type Content struct {
@ -88,7 +90,7 @@ func getUpstreamCatalog(client *resty.Client, upstreamURL string, log log.Logger
log.Error().Msgf("couldn't query %s, status code: %d, body: %s", registryCatalogURL,
resp.StatusCode(), resp.Body())
return c, errors.ErrSyncMissingCatalog
return c, zerr.ErrSyncMissingCatalog
}
err = json.Unmarshal(resp.Body(), &c)
@ -283,7 +285,8 @@ func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types.
func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string,
storeController storage.StoreController, localCtx *types.SystemContext,
policyCtx *signature.PolicyContext, credentials Credentials, log log.Logger) error {
policyCtx *signature.PolicyContext, credentials Credentials,
retryOptions *retry.RetryOptions, log log.Logger) error {
log.Info().Msgf("syncing registry: %s", upstreamURL)
var err error
@ -293,22 +296,13 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
upstreamCtx := getUpstreamContext(&regCfg, credentials)
options := getCopyOptions(upstreamCtx, localCtx)
retryOptions := &retry.RetryOptions{}
if regCfg.MaxRetries != nil {
retryOptions.MaxRetry = *regCfg.MaxRetries
if regCfg.RetryDelay != nil {
retryOptions.Delay = *regCfg.RetryDelay
}
}
var catalog catalog
httpClient, err := getHTTPClient(&regCfg, upstreamURL, credentials, log)
httpClient, registryURL, err := getHTTPClient(&regCfg, upstreamURL, credentials, log)
if err != nil {
return err
}
var catalog catalog
if err = retry.RetryIfNecessary(ctx, func() error {
catalog, err = getUpstreamCatalog(httpClient, upstreamURL, log)
@ -356,28 +350,105 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
}
}
if len(images) == 0 {
log.Info().Msg("no images to copy, no need to sync")
for _, image := range images {
select {
case <-ctx.Done():
return ctx.Err()
default:
break
}
return nil
}
for _, ref := range images {
upstreamImageRef := ref.ref
upstreamImageRef := image.ref
remoteRepo := getRepoFromRef(upstreamImageRef, upstreamAddr)
localRepo := getRepoDestination(remoteRepo, ref.content)
localRepo := getRepoDestination(remoteRepo, image.content)
upstreamImageDigest, err := docker.GetDigest(ctx, upstreamCtx, upstreamImageRef)
if err != nil {
log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference())
return err
}
tag := getTagFromRef(upstreamImageRef, log).Tag()
imageStore := storeController.GetImageStore(localRepo)
canBeSkipped, err := canSkipImage(ctx, localRepo, tag, upstreamImageRef, imageStore, upstreamCtx, log)
// get upstream signatures
cosignManifest, err := getCosignManifest(httpClient, *registryURL, remoteRepo,
upstreamImageDigest.String(), log)
if err != nil && !errors.Is(err, zerr.ErrSyncSignatureNotFound) {
log.Error().Err(err).Msgf("couldn't get upstream image %s cosign manifest", upstreamImageRef.DockerReference())
return err
}
refs, err := getNotaryRefs(httpClient, *registryURL, remoteRepo, upstreamImageDigest.String(), log)
if err != nil && !errors.Is(err, zerr.ErrSyncSignatureNotFound) {
log.Error().Err(err).Msgf("couldn't get upstream image %s notary references", upstreamImageRef.DockerReference())
return err
}
// check if upstream image is signed
if cosignManifest == nil && len(refs.References) == 0 {
// upstream image not signed
if regCfg.OnlySigned != nil && *regCfg.OnlySigned {
// skip unsigned images
log.Info().Msgf("skipping image without signature %s", upstreamImageRef.DockerReference())
continue
}
}
skipImage, err := canSkipImage(localRepo, tag, upstreamImageDigest.String(), imageStore, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped",
upstreamImageRef.DockerReference())
return err
}
if canBeSkipped {
// sync only differences
if skipImage {
log.Info().Msgf("already synced image %s, checking its signatures", upstreamImageRef.DockerReference())
skipNotarySig, err := canSkipNotarySignature(localRepo, tag, upstreamImageDigest.String(),
refs, imageStore, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't check if the upstream image %s notary signature can be skipped",
upstreamImageRef.DockerReference())
}
if !skipNotarySig {
if err = retry.RetryIfNecessary(ctx, func() error {
err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo,
upstreamImageDigest.String(), refs, log)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("couldn't copy notary signature for %s", upstreamImageRef.DockerReference())
}
}
skipCosignSig, err := canSkipCosignSignature(localRepo, tag, upstreamImageDigest.String(),
cosignManifest, imageStore, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't check if the upstream image %s cosign signature can be skipped",
upstreamImageRef.DockerReference())
}
if !skipCosignSig {
if err = retry.RetryIfNecessary(ctx, func() error {
err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo,
upstreamImageDigest.String(), cosignManifest, log)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("couldn't copy cosign signature for %s", upstreamImageRef.DockerReference())
}
}
continue
}
@ -404,7 +475,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
return err
}
err = pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log)
err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log)
if err != nil {
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
@ -413,11 +484,21 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string
}
if err = retry.RetryIfNecessary(ctx, func() error {
err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log)
err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, upstreamImageDigest.String(),
refs, log)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("couldn't copy image signature %s", upstreamImageRef.DockerReference())
log.Error().Err(err).Msgf("couldn't copy notary signature for %s", upstreamImageRef.DockerReference())
}
if err = retry.RetryIfNecessary(ctx, func() error {
err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, upstreamImageDigest.String(),
cosignManifest, log)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("couldn't copy cosign signature for %s", upstreamImageRef.DockerReference())
}
}
@ -489,7 +570,16 @@ func Run(ctx context.Context, cfg Config, storeController storage.StoreControlle
ticker := time.NewTicker(regCfg.PollInterval)
// fork a new zerolog child to avoid data race
tlogger := log.Logger{Logger: logger.With().Caller().Timestamp().Logger()}
tlogger := log.Logger{Logger: logger.Logger}
retryOptions := &retry.RetryOptions{}
if regCfg.MaxRetries != nil {
retryOptions.MaxRetry = *regCfg.MaxRetries
if regCfg.RetryDelay != nil {
retryOptions.Delay = *regCfg.RetryDelay
}
}
// schedule each registry sync
go func(ctx context.Context, regCfg RegistryConfig, logger log.Logger) {
@ -501,7 +591,7 @@ func Run(ctx context.Context, cfg Config, storeController storage.StoreControlle
upstreamAddr := StripRegistryTransport(upstreamURL)
// first try syncing main registry
if err := syncRegistry(ctx, regCfg, upstreamURL, storeController, localCtx, policyCtx,
credentialsFile[upstreamAddr], logger); err != nil {
credentialsFile[upstreamAddr], retryOptions, logger); err != nil {
logger.Error().Err(err).Str("registry", upstreamURL).
Msg("sync exited with error, falling back to auxiliary registries if any")
} else {

View file

@ -17,6 +17,7 @@ import (
"github.com/containers/image/v5/types"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/rs/zerolog"
. "github.com/smartystreets/goconvey/convey"
"gopkg.in/resty.v1"
@ -190,9 +191,10 @@ func TestSyncInternal(t *testing.T) {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
httpClient, registryURL, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
So(httpClient, ShouldBeNil)
So(registryURL, ShouldBeNil)
// _, err = getUpstreamCatalog(httpClient, baseURL, log.NewLogger("debug", ""))
// So(err, ShouldNotBeNil)
})
@ -222,21 +224,24 @@ func TestSyncInternal(t *testing.T) {
CertDir: badCertsDir,
}
httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
httpClient, _, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
So(httpClient, ShouldBeNil)
syncRegistryConfig.CertDir = "/path/to/invalid/cert"
httpClient, err = getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
httpClient, _, err = getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
So(httpClient, ShouldBeNil)
syncRegistryConfig.CertDir = ""
syncRegistryConfig.URLs = []string{baseSecureURL}
httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", ""))
httpClient, registryURL, err := getHTTPClient(&syncRegistryConfig, baseSecureURL,
Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldBeNil)
So(httpClient, ShouldNotBeNil)
So(registryURL.String(), ShouldEqual, baseSecureURL)
_, err = getUpstreamCatalog(httpClient, baseURL, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
@ -245,7 +250,12 @@ func TestSyncInternal(t *testing.T) {
So(err, ShouldNotBeNil)
syncRegistryConfig.URLs = []string{test.BaseURL}
httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", ""))
httpClient, _, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
So(httpClient, ShouldBeNil)
syncRegistryConfig.URLs = []string{"%"}
httpClient, _, err = getHTTPClient(&syncRegistryConfig, "%", Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
So(httpClient, ShouldBeNil)
})
@ -263,23 +273,37 @@ func TestSyncInternal(t *testing.T) {
So(err, ShouldNotBeNil)
})
// Convey("Test OneImage() skips cosign signatures", t, func() {
// err := OneImage(Config{}, storage.StoreController{}, "repo", "sha256-.sig", log.NewLogger("", ""))
// So(err, ShouldBeNil)
// })
Convey("Test syncSignatures()", t, func() {
Convey("Test signatures", t, func() {
log := log.NewLogger("debug", "")
err := syncSignatures(resty.New(), storage.StoreController{}, "%", "repo", "repo", "tag", log)
client := resty.New()
regURL, err := url.Parse("http://zot")
So(err, ShouldBeNil)
So(regURL, ShouldNotBeNil)
ref := artifactspec.Descriptor{
Digest: "fakeDigest",
}
desc := ispec.Descriptor{
Digest: "fakeDigest",
}
manifest := ispec.Manifest{
Layers: []ispec.Descriptor{desc},
}
err = syncCosignSignature(client, &storage.ImageStoreFS{}, *regURL, testImage, testImage,
testImageTag, &ispec.Manifest{}, log)
So(err, ShouldNotBeNil)
err = syncSignatures(resty.New(), storage.StoreController{}, "http://zot", "repo", "repo", "tag", log)
err = syncCosignSignature(client, &storage.ImageStoreFS{}, *regURL, testImage, testImage,
testImageTag, &manifest, log)
So(err, ShouldNotBeNil)
err = syncSignatures(resty.New(), storage.StoreController{}, "https://google.com", "repo", "repo", "tag", log)
So(err, ShouldNotBeNil)
url, _ := url.Parse("invalid")
err = syncCosignSignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log)
So(err, ShouldNotBeNil)
err = syncNotarySignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log)
err = syncNotarySignature(client, &storage.ImageStoreFS{}, *regURL, testImage, testImage,
"invalidDigest", ReferenceList{[]artifactspec.Descriptor{ref}}, log)
So(err, ShouldNotBeNil)
})
@ -296,32 +320,49 @@ func TestSyncInternal(t *testing.T) {
imageStore := storage.NewImageStore(storageDir, false, storage.DefaultGCDelay, false, false, log, metrics)
repoRefStr := fmt.Sprintf("%s/%s", host, testImage)
repoRef, err := parseRepositoryReference(repoRefStr)
So(err, ShouldBeNil)
So(repoRef, ShouldNotBeNil)
refs := ReferenceList{[]artifactspec.Descriptor{
{
Digest: "fakeDigest",
},
}}
taggedRef, err := reference.WithTag(repoRef, testImageTag)
err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o000)
So(err, ShouldBeNil)
So(taggedRef, ShouldNotBeNil)
upstreamRef, err := docker.NewReference(taggedRef)
So(err, ShouldBeNil)
So(taggedRef, ShouldNotBeNil)
canBeSkipped, err := canSkipImage(context.Background(), testImage, testImageTag, upstreamRef,
imageStore, &types.SystemContext{}, log)
canBeSkipped, err := canSkipImage(testImage, testImageTag, "fakeDigest", imageStore, log)
So(err, ShouldNotBeNil)
So(canBeSkipped, ShouldBeFalse)
err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o755)
So(err, ShouldBeNil)
_, testImageManifestDigest, _, err := imageStore.GetImageManifest(testImage, testImageTag)
So(err, ShouldBeNil)
So(testImageManifestDigest, ShouldNotBeEmpty)
canBeSkipped, err = canSkipNotarySignature(testImage, testImageTag,
testImageManifestDigest, refs, imageStore, log)
So(err, ShouldBeNil)
So(canBeSkipped, ShouldBeFalse)
err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o000)
if err != nil {
panic(err)
So(err, ShouldBeNil)
canBeSkipped, err = canSkipNotarySignature(testImage, testImageTag,
testImageManifestDigest, refs, imageStore, log)
So(err, ShouldNotBeNil)
So(canBeSkipped, ShouldBeFalse)
cosignManifest := ispec.Manifest{
Layers: []ispec.Descriptor{{Digest: "fakeDigest"}},
}
canBeSkipped, err = canSkipImage(context.Background(), testImage, testImageTag, upstreamRef,
imageStore, &types.SystemContext{}, log)
So(err, ShouldNotBeNil)
err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o755)
So(err, ShouldBeNil)
canBeSkipped, err = canSkipCosignSignature(testImage, testImageTag,
testImageManifestDigest, &cosignManifest, imageStore, log)
So(err, ShouldBeNil)
So(canBeSkipped, ShouldBeFalse)
})
@ -367,7 +408,7 @@ func TestSyncInternal(t *testing.T) {
testRootDir := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir)
// testImagePath := path.Join(testRootDir, testImage)
err := pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log)
err := pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
So(err, ShouldNotBeNil)
err = os.MkdirAll(testRootDir, 0o755)
@ -396,7 +437,7 @@ func TestSyncInternal(t *testing.T) {
if os.Geteuid() != 0 {
So(func() {
_ = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log)
_ = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
}, ShouldPanic)
}
@ -409,7 +450,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
So(err, ShouldNotBeNil)
if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256",
@ -423,7 +464,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
So(err, ShouldNotBeNil)
if err := os.Chmod(cachedManifestConfigPath, 0o755); err != nil {
@ -435,7 +476,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
So(err, ShouldNotBeNil)
if err := os.Remove(manifestConfigPath); err != nil {
@ -449,7 +490,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
So(err, ShouldNotBeNil)
})
}

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,6 @@
package sync
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -19,9 +18,9 @@ import (
"github.com/containers/image/v5/oci/layout"
"github.com/containers/image/v5/types"
guuid "github.com/gofrs/uuid"
"github.com/notaryproject/notation-go-lib"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/sigstore/cosign/pkg/oci/static"
"gopkg.in/resty.v1"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/common"
@ -32,7 +31,7 @@ import (
)
type ReferenceList struct {
References []notation.Descriptor `json:"references"`
References []artifactspec.Descriptor `json:"references"`
}
// getTagFromRef returns a tagged reference from an image reference.
@ -207,18 +206,18 @@ func getFileCredentials(filepath string) (CredentialsFile, error) {
}
func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Credentials,
log log.Logger) (*resty.Client, error) {
log log.Logger) (*resty.Client, *url.URL, error) {
client := resty.New()
if !common.Contains(regCfg.URLs, upstreamURL) {
return nil, zerr.ErrSyncInvalidUpstreamURL
return nil, nil, zerr.ErrSyncInvalidUpstreamURL
}
registryURL, err := url.Parse(upstreamURL)
if err != nil {
log.Error().Err(err).Str("url", upstreamURL).Msg("couldn't parse url")
return nil, err
return nil, nil, err
}
if regCfg.CertDir != "" {
@ -231,7 +230,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede
if err != nil {
log.Error().Err(err).Msg("couldn't read CA certificate")
return nil, err
return nil, nil, err
}
caCertPool := x509.NewCertPool()
@ -243,7 +242,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede
if err != nil {
log.Error().Err(err).Msg("couldn't read certificates key pairs")
return nil, err
return nil, nil, err
}
client.SetCertificates(cert)
@ -259,275 +258,13 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede
client.SetBasicAuth(credentials.Username, credentials.Password)
}
return client, nil
}
func syncCosignSignature(client *resty.Client, storeController storage.StoreController,
regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error {
log.Info().Msg("syncing cosign signatures")
getCosignManifestURL := regURL
if !isCosignTag(digest) {
digest = strings.Replace(digest, ":", "-", 1) + ".sig"
}
getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", remoteRepo, "manifests", digest)
getCosignManifestURL.RawQuery = getCosignManifestURL.Query().Encode()
mResp, err := client.R().Get(getCosignManifestURL.String())
if err != nil {
log.Error().Err(err).Str("url", getCosignManifestURL.String()).
Msgf("couldn't get cosign manifest: %s", digest)
return err
}
if mResp.IsError() {
log.Info().Msgf("couldn't find any cosign signature from %s, status code: %d skipping",
getCosignManifestURL.String(), mResp.StatusCode())
return nil
}
var m ispec.Manifest
err = json.Unmarshal(mResp.Body(), &m)
if err != nil {
log.Error().Err(err).Str("url", getCosignManifestURL.String()).
Msgf("couldn't unmarshal cosign manifest %s", digest)
return err
}
imageStore := storeController.GetImageStore(localRepo)
for _, blob := range m.Layers {
// get blob
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get cosign blob: %s", blob.Digest.String())
return err
}
if resp.IsError() {
log.Info().Msgf("couldn't find cosign blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
return zerr.ErrBadBlobDigest
}
defer resp.RawBody().Close()
// push blob
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosign blob")
return err
}
}
// get config blob
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", m.Config.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get cosign config blob: %s", getBlobURL.String())
return err
}
if resp.IsError() {
log.Info().Msgf("couldn't find cosign config blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
return zerr.ErrBadBlobDigest
}
defer resp.RawBody().Close()
// push config blob
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), m.Config.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosign blob")
return err
}
// push manifest
_, err = imageStore.PutImageManifest(localRepo, digest, ispec.MediaTypeImageManifest, mResp.Body())
if err != nil {
log.Error().Err(err).Msg("couldn't upload cosing manifest")
return err
}
return nil
}
func syncNotarySignature(client *resty.Client, storeController storage.StoreController,
regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error {
log.Info().Msg("syncing notary signatures")
getReferrersURL := regURL
// based on manifest digest get referrers
getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/",
remoteRepo, "manifests", digest, "referrers")
getReferrersURL.RawQuery = getReferrersURL.Query().Encode()
resp, err := client.R().
SetHeader("Content-Type", "application/json").
SetQueryParam("artifactType", "application/vnd.cncf.notary.v2.signature").
Get(getReferrersURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get referrers from %s", getReferrersURL.String())
return err
}
if resp.IsError() {
log.Info().Msgf("couldn't find any notary signature from %s, status code: %d, skipping",
getReferrersURL.String(), resp.StatusCode())
return nil
}
var referrers ReferenceList
err = json.Unmarshal(resp.Body(), &referrers)
if err != nil {
log.Error().Err(err).Msgf("couldn't unmarshal notary signature from %s", getReferrersURL.String())
return err
}
imageStore := storeController.GetImageStore(localRepo)
for _, ref := range referrers.References {
// get referrer manifest
getRefManifestURL := regURL
getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", remoteRepo, "manifests", ref.Digest.String())
getRefManifestURL.RawQuery = getRefManifestURL.Query().Encode()
resp, err := client.R().
Get(getRefManifestURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get notary manifest: %s", getRefManifestURL.String())
return err
}
// read manifest
var m artifactspec.Manifest
err = json.Unmarshal(resp.Body(), &m)
if err != nil {
log.Error().Err(err).Msgf("couldn't unmarshal notary manifest: %s", getRefManifestURL.String())
return err
}
for _, blob := range m.Blobs {
getBlobURL := regURL
getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String())
getBlobURL.RawQuery = getBlobURL.Query().Encode()
resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String())
if err != nil {
log.Error().Err(err).Msgf("couldn't get notary blob: %s", getBlobURL.String())
return err
}
defer resp.RawBody().Close()
if resp.IsError() {
log.Info().Msgf("couldn't find notary blob from %s, status code: %d",
getBlobURL.String(), resp.StatusCode())
return zerr.ErrBadBlobDigest
}
_, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String())
if err != nil {
log.Error().Err(err).Msg("couldn't upload notary sig blob")
return err
}
}
_, err = imageStore.PutImageManifest(localRepo, ref.Digest.String(), artifactspec.MediaTypeArtifactManifest,
resp.Body())
if err != nil {
log.Error().Err(err).Msg("couldn't upload notary sig manifest")
return err
}
}
return nil
}
func syncSignatures(client *resty.Client, storeController storage.StoreController,
registryURL, remoteRepo, localRepo, tag string, log log.Logger) error {
log.Info().Msgf("syncing signatures from %s/%s:%s", registryURL, remoteRepo, tag)
// get manifest and find out its digest
regURL, err := url.Parse(registryURL)
if err != nil {
log.Error().Err(err).Msgf("couldn't parse registry URL: %s", registryURL)
return err
}
getManifestURL := *regURL
getManifestURL.Path = path.Join(getManifestURL.Path, "v2", remoteRepo, "manifests", tag)
resp, err := client.R().SetHeader("Content-Type", "application/json").Head(getManifestURL.String())
if err != nil {
log.Error().Err(err).Str("url", getManifestURL.String()).
Msgf("couldn't query %s", registryURL)
return err
}
digest := resp.Header().Get("Docker-Content-Digest")
if digest == "" {
log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()).
Msgf("couldn't get digest for manifest: %s:%s", remoteRepo, tag)
return zerr.ErrBadBlobDigest
}
err = syncNotarySignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log)
if err != nil {
return err
}
err = syncCosignSignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log)
if err != nil {
return err
}
log.Info().Msgf("successfully synced %s/%s:%s signatures", registryURL, remoteRepo, tag)
return nil
return client, registryURL, nil
}
func pushSyncedLocalImage(localRepo, tag, localCachePath string,
storeController storage.StoreController, log log.Logger) error {
imageStore storage.ImageStore, log log.Logger) error {
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, tag)
imageStore := storeController.GetImageStore(localRepo)
metrics := monitoring.NewMetricsServer(false, log)
cacheImageStore := storage.NewImageStore(localCachePath, false, storage.DefaultGCDelay, false, false, log, metrics)
@ -598,16 +335,6 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string,
return nil
}
// sync feature will try to pull cosign signature because for sync cosign signature is just an image
// this function will check if tag is a cosign tag.
func isCosignTag(tag string) bool {
if strings.HasPrefix(tag, "sha256-") && strings.HasSuffix(tag, ".sig") {
return true
}
return false
}
// sync needs transport to be stripped to not be wrongly interpreted as an image reference
// at a non-fully qualified registry (hostname as image and port as tag).
func StripRegistryTransport(url string) string {
@ -659,11 +386,10 @@ func getLocalImageRef(imageStore storage.ImageStore, repo, tag string) (types.Im
return localImageRef, localCachePath, nil
}
// canSkipImage returns whether or not the image can be skipped from syncing.
func canSkipImage(ctx context.Context, repo, tag string, upstreamRef types.ImageReference,
imageStore storage.ImageStore, upstreamCtx *types.SystemContext, log log.Logger) (bool, error) {
// filter already pulled images
_, localImageDigest, _, err := imageStore.GetImageManifest(repo, tag)
// canSkipImage returns whether or not we already synced this image.
func canSkipImage(repo, tag, digest string, imageStore storage.ImageStore, log log.Logger) (bool, error) {
// check image already synced
_, localImageManifestDigest, _, err := imageStore.GetImageManifest(repo, tag)
if err != nil {
if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrManifestNotFound) {
return false, nil
@ -674,18 +400,54 @@ func canSkipImage(ctx context.Context, repo, tag string, upstreamRef types.Image
return false, err
}
upstreamImageDigest, err := docker.GetDigest(ctx, upstreamCtx, upstreamRef)
if err != nil {
log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamRef.DockerReference())
if localImageManifestDigest != digest {
log.Info().Msgf("upstream image %s:%s digest changed, syncing again", repo, tag)
return false, err
return false, nil
}
if localImageDigest == string(upstreamImageDigest) {
log.Info().Msgf("skipping syncing %s:%s, image already synced", repo, tag)
return true, nil
}
return false, nil
return true, nil
}
func manifestsEqual(manifest1, manifest2 ispec.Manifest) bool {
if manifest1.Config.Digest == manifest2.Config.Digest &&
manifest1.Config.MediaType == manifest2.Config.MediaType &&
manifest1.Config.Size == manifest2.Config.Size &&
len(manifest1.Layers) == len(manifest2.Layers) {
if descriptorEqual(manifest1.Layers, manifest2.Layers) {
return true
}
}
return false
}
func artifactDescriptorsEqual(desc1, desc2 []artifactspec.Descriptor) bool {
if len(desc1) == len(desc2) {
for id, desc := range desc1 {
if desc.Digest == desc2[id].Digest &&
desc.Size == desc2[id].Size &&
desc.MediaType == desc2[id].MediaType &&
desc.ArtifactType == desc2[id].ArtifactType {
return true
}
}
}
return false
}
func descriptorEqual(desc1, desc2 []ispec.Descriptor) bool {
if len(desc1) == len(desc2) {
for id, desc := range desc1 {
if desc.Digest == desc2[id].Digest &&
desc.Size == desc2[id].Size &&
desc.MediaType == desc2[id].MediaType &&
desc.Annotations[static.SignatureAnnotationKey] == desc2[id].Annotations[static.SignatureAnnotationKey] {
return true
}
}
}
return false
}

View file

@ -495,6 +495,7 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy
}
// manifest contents have changed for the same tag,
// so update index.json descriptor
is.log.Info().
Int64("old size", desc.Size).
Int64("new size", int64(len(body))).