mirror of
https://github.com/project-zot/zot.git
synced 2024-12-30 22:34:13 -05:00
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
c6ffbce6cf
commit
2d877aaea1
4 changed files with 130 additions and 46 deletions
|
@ -51,10 +51,10 @@ func (di *demandedImages) delete(key string) {
|
|||
}
|
||||
|
||||
func OneImage(cfg Config, storeController storage.StoreController,
|
||||
repo, tag string, isArtifact bool, log log.Logger,
|
||||
repo, reference string, isArtifact bool, log log.Logger,
|
||||
) error {
|
||||
// guard against multiple parallel requests
|
||||
demandedImage := fmt.Sprintf("%s:%s", repo, tag)
|
||||
demandedImage := fmt.Sprintf("%s:%s", repo, reference)
|
||||
// loadOrStore image-based channel
|
||||
imageChannel, found := demandedImgs.loadOrStoreChan(demandedImage, make(chan error))
|
||||
// if value found wait on channel receive or close
|
||||
|
@ -73,7 +73,7 @@ func OneImage(cfg Config, storeController storage.StoreController,
|
|||
defer demandedImgs.delete(demandedImage)
|
||||
defer close(imageChannel)
|
||||
|
||||
go syncOneImage(imageChannel, cfg, storeController, repo, tag, isArtifact, log)
|
||||
go syncOneImage(imageChannel, cfg, storeController, repo, reference, isArtifact, log)
|
||||
|
||||
err, ok := <-imageChannel
|
||||
if !ok {
|
||||
|
@ -84,7 +84,7 @@ func OneImage(cfg Config, storeController storage.StoreController,
|
|||
}
|
||||
|
||||
func syncOneImage(imageChannel chan error, cfg Config, storeController storage.StoreController,
|
||||
localRepo, tag string, isArtifact bool, log log.Logger,
|
||||
localRepo, reference string, isArtifact bool, log log.Logger,
|
||||
) {
|
||||
var credentialsFile CredentialsFile
|
||||
|
||||
|
@ -161,21 +161,21 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
|
|||
options := getCopyOptions(upstreamCtx, localCtx)
|
||||
|
||||
// demanded 'image' is a signature
|
||||
if isCosignTag(tag) {
|
||||
if isCosignTag(reference) {
|
||||
// at tis point we should already have images synced, but not their signatures.
|
||||
// is cosign signature
|
||||
cosignManifest, err := sig.getCosignManifest(upstreamRepo, tag)
|
||||
cosignManifest, err := sig.getCosignManifest(upstreamRepo, reference)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, upstreamRepo, tag)
|
||||
Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, upstreamRepo, reference)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
err = sig.syncCosignSignature(localRepo, upstreamRepo, tag, cosignManifest)
|
||||
err = sig.syncCosignSignature(localRepo, upstreamRepo, reference, cosignManifest)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, upstreamRepo, tag)
|
||||
Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, upstreamRepo, reference)
|
||||
|
||||
continue
|
||||
}
|
||||
|
@ -185,18 +185,18 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
|
|||
return
|
||||
} else if isArtifact {
|
||||
// is notary signature
|
||||
refs, err := sig.getNotaryRefs(upstreamRepo, tag)
|
||||
refs, err := sig.getNotaryRefs(upstreamRepo, reference)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, upstreamRepo, tag)
|
||||
Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, upstreamRepo, reference)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
err = sig.syncNotarySignature(localRepo, upstreamRepo, tag, refs)
|
||||
err = sig.syncNotarySignature(localRepo, upstreamRepo, reference, refs)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, upstreamRepo, tag)
|
||||
Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, upstreamRepo, reference)
|
||||
|
||||
continue
|
||||
}
|
||||
|
@ -214,13 +214,13 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
|
|||
copyOptions: options,
|
||||
}
|
||||
|
||||
skipped, copyErr := syncRun(regCfg, localRepo, upstreamRepo, tag, syncContextUtils, sig, log)
|
||||
skipped, copyErr := syncRun(regCfg, localRepo, upstreamRepo, reference, syncContextUtils, sig, log)
|
||||
if skipped {
|
||||
continue
|
||||
}
|
||||
|
||||
// key used to check if we already have a go routine syncing this image
|
||||
demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, upstreamRepo, tag)
|
||||
demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, upstreamRepo, reference)
|
||||
|
||||
if copyErr != nil {
|
||||
// don't retry in background if maxretry is 0
|
||||
|
@ -249,7 +249,7 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
|
|||
time.Sleep(retryOptions.Delay)
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
_, err := syncRun(regCfg, localRepo, upstreamRepo, tag, syncContextUtils, sig, log)
|
||||
_, err := syncRun(regCfg, localRepo, upstreamRepo, reference, syncContextUtils, sig, log)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
|
@ -257,6 +257,10 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
|
|||
Err(err).Msgf("sync routine: error while copying image %s", demandedImageRef)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
imageChannel <- nil
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -265,25 +269,29 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
|
|||
}
|
||||
|
||||
func syncRun(regCfg RegistryConfig,
|
||||
localRepo, upstreamRepo, tag string, utils syncContextUtils, sig *signaturesCopier,
|
||||
localRepo, upstreamRepo, reference string, utils syncContextUtils, sig *signaturesCopier,
|
||||
log log.Logger,
|
||||
) (bool, error) {
|
||||
upstreamImageRef, err := getImageRef(utils.upstreamAddr, upstreamRepo, tag)
|
||||
upstreamImageDigest, refIsDigest := parseDigest(reference)
|
||||
|
||||
upstreamImageRef, err := getImageRef(utils.upstreamAddr, upstreamRepo, reference)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("error creating docker reference for repository %s/%s:%s",
|
||||
utils.upstreamAddr, upstreamRepo, tag)
|
||||
utils.upstreamAddr, upstreamRepo, reference)
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
upstreamImageDigest, err := docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef)
|
||||
if !refIsDigest {
|
||||
upstreamImageDigest, err = docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference())
|
||||
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// get upstream signatures
|
||||
cosignManifest, err := sig.getCosignManifest(upstreamRepo, upstreamImageDigest.String())
|
||||
|
@ -316,11 +324,11 @@ func syncRun(regCfg RegistryConfig,
|
|||
log.Error().Err(err).Msgf("couldn't get localCachePath for %s", localRepo)
|
||||
}
|
||||
|
||||
localImageRef, err := getLocalImageRef(localCachePath, localRepo, tag)
|
||||
localImageRef, err := getLocalImageRef(localCachePath, localRepo, reference)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
|
||||
localCachePath, localRepo, tag)
|
||||
localCachePath, localRepo, reference)
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
@ -338,11 +346,11 @@ func syncRun(regCfg RegistryConfig,
|
|||
return false, err
|
||||
}
|
||||
|
||||
err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log)
|
||||
err = pushSyncedLocalImage(localRepo, reference, localCachePath, imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("error while pushing synced cached image %s",
|
||||
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
|
||||
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, reference))
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
@ -350,7 +358,7 @@ func syncRun(regCfg RegistryConfig,
|
|||
err = sig.syncCosignSignature(localRepo, upstreamRepo, upstreamImageDigest.String(), cosignManifest)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag)
|
||||
Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference)
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
@ -358,12 +366,12 @@ func syncRun(regCfg RegistryConfig,
|
|||
err = sig.syncNotarySignature(localRepo, upstreamRepo, upstreamImageDigest.String(), refs)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag)
|
||||
Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference)
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag)
|
||||
log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -182,7 +182,6 @@ func getCopyOptions(upstreamCtx, localCtx *types.SystemContext) copy.Options {
|
|||
SourceCtx: upstreamCtx,
|
||||
ReportWriter: io.Discard,
|
||||
ForceManifestMIMEType: ispec.MediaTypeImageManifest, // force only oci manifest MIME type
|
||||
PreserveDigests: true,
|
||||
ImageListSelection: copy.CopyAllImages,
|
||||
}
|
||||
|
||||
|
|
|
@ -2865,6 +2865,56 @@ func TestOnDemandRetryGoroutine(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestOnDemandWithDigest(t *testing.T) {
|
||||
Convey("Verify ondemand sync retries in background on error", t, func() {
|
||||
_, srcBaseURL, _, _, _ := startUpstreamServer(t, false, false)
|
||||
|
||||
regex := ".*"
|
||||
semver := true
|
||||
var tlsVerify bool
|
||||
|
||||
syncRegistryConfig := sync.RegistryConfig{
|
||||
Content: []sync.Content{
|
||||
{
|
||||
Prefix: testImage,
|
||||
Tags: &sync.Tags{
|
||||
Regex: ®ex,
|
||||
Semver: &semver,
|
||||
},
|
||||
},
|
||||
},
|
||||
URLs: []string{srcBaseURL},
|
||||
OnDemand: true,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
}
|
||||
|
||||
defaultVal := true
|
||||
syncConfig := &sync.Config{
|
||||
Enable: &defaultVal,
|
||||
Registries: []sync.RegistryConfig{syncRegistryConfig},
|
||||
}
|
||||
|
||||
dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig)
|
||||
defer os.RemoveAll(destDir)
|
||||
|
||||
defer func() {
|
||||
dctlr.Shutdown()
|
||||
}()
|
||||
|
||||
// get manifest digest from source
|
||||
resp, err := destClient.R().Get(srcBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
digest := godigest.FromBytes(resp.Body())
|
||||
|
||||
resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
})
|
||||
}
|
||||
|
||||
func TestOnDemandRetryGoroutineErr(t *testing.T) {
|
||||
Convey("Verify ondemand sync retries in background on error", t, func() {
|
||||
regex := ".*"
|
||||
|
|
|
@ -328,28 +328,28 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede
|
|||
return client, registryURL, nil
|
||||
}
|
||||
|
||||
func pushSyncedLocalImage(localRepo, tag, localCachePath string,
|
||||
func pushSyncedLocalImage(localRepo, reference, localCachePath string,
|
||||
imageStore storage.ImageStore, log log.Logger,
|
||||
) error {
|
||||
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, tag)
|
||||
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, reference)
|
||||
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
|
||||
cacheImageStore := local.NewImageStore(localCachePath, false,
|
||||
storage.DefaultGCDelay, false, false, log, metrics, nil)
|
||||
|
||||
manifestContent, _, mediaType, err := cacheImageStore.GetImageManifest(localRepo, tag)
|
||||
manifestContent, _, mediaType, err := cacheImageStore.GetImageManifest(localRepo, reference)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)).
|
||||
Msg("couldn't find index.json")
|
||||
Msgf("couldn't find %s manifest", reference)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// is image manifest
|
||||
if mediaType == ispec.MediaTypeImageManifest {
|
||||
if err := copyManifest(localRepo, manifestContent, tag, cacheImageStore, imageStore, log); err != nil {
|
||||
if err := copyManifest(localRepo, manifestContent, reference, cacheImageStore, imageStore, log); err != nil {
|
||||
if errors.Is(err, zerr.ErrImageLintAnnotations) {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msg("couldn't upload manifest because of missing annotations")
|
||||
|
@ -397,7 +397,7 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string,
|
|||
}
|
||||
}
|
||||
|
||||
_, err = imageStore.PutImageManifest(localRepo, tag, mediaType, manifestContent)
|
||||
_, err = imageStore.PutImageManifest(localRepo, reference, mediaType, manifestContent)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msg("couldn't upload manifest")
|
||||
|
@ -486,18 +486,28 @@ func StripRegistryTransport(url string) string {
|
|||
}
|
||||
|
||||
// get an ImageReference given the registry, repo and tag.
|
||||
func getImageRef(registryDomain, repo, tag string) (types.ImageReference, error) {
|
||||
func getImageRef(registryDomain, repo, ref string) (types.ImageReference, error) {
|
||||
repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryDomain, repo))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
taggedRepoRef, err := reference.WithTag(repoRef, tag)
|
||||
var namedRepoRef reference.Named
|
||||
|
||||
digest, ok := parseDigest(ref)
|
||||
if ok {
|
||||
namedRepoRef, err = reference.WithDigest(repoRef, digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
namedRepoRef, err = reference.WithTag(repoRef, ref)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
imageRef, err := docker.NewReference(taggedRepoRef)
|
||||
imageRef, err := docker.NewReference(namedRepoRef)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -506,15 +516,20 @@ func getImageRef(registryDomain, repo, tag string) (types.ImageReference, error)
|
|||
}
|
||||
|
||||
// get a local ImageReference used to temporary store one synced image.
|
||||
func getLocalImageRef(localCachePath, repo, tag string) (types.ImageReference, error) {
|
||||
func getLocalImageRef(localCachePath, repo, reference string) (types.ImageReference, error) {
|
||||
if _, err := os.ReadDir(localCachePath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
localRepo := path.Join(localCachePath, repo)
|
||||
localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag)
|
||||
|
||||
localImageRef, err := layout.ParseReference(localTaggedRepo)
|
||||
_, refIsDigest := parseDigest(reference)
|
||||
|
||||
if !refIsDigest {
|
||||
localRepo = fmt.Sprintf("%s:%s", localRepo, reference)
|
||||
}
|
||||
|
||||
localImageRef, err := layout.ParseReference(localRepo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -579,6 +594,18 @@ func canSkipImage(repo, tag string, digest godigest.Digest, imageStore storage.I
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// parse a reference, return its digest and if it's valid.
|
||||
func parseDigest(reference string) (godigest.Digest, bool) {
|
||||
var ok bool
|
||||
|
||||
d, err := godigest.Parse(reference)
|
||||
if err == nil {
|
||||
ok = true
|
||||
}
|
||||
|
||||
return d, ok
|
||||
}
|
||||
|
||||
func manifestsEqual(manifest1, manifest2 ispec.Manifest) bool {
|
||||
if manifest1.Config.Digest == manifest2.Config.Digest &&
|
||||
manifest1.Config.MediaType == manifest2.Config.MediaType &&
|
||||
|
|
Loading…
Reference in a new issue