From 024b13efe689ab6d0cb04972efe193ecc86a6879 Mon Sep 17 00:00:00 2001 From: peusebiu Date: Fri, 9 Dec 2022 21:38:00 +0200 Subject: [PATCH] fix(sync): syncing OCI artifacts with distribution package fails (#1013) sync OCI artifacts using REST APIs Signed-off-by: Petu Eusebiu --- pkg/extensions/sync/on_demand.go | 35 ++- pkg/extensions/sync/signatures.go | 107 +++++++- pkg/extensions/sync/sync.go | 26 +- pkg/extensions/sync/sync_internal_test.go | 37 ++- pkg/extensions/sync/sync_test.go | 310 +++++++++++++++++++++- pkg/extensions/sync/utils.go | 60 ++++- test/blackbox/sync.bats | 37 ++- 7 files changed, 559 insertions(+), 53 deletions(-) diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 6ad1f57a..f1630ccf 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -9,9 +9,10 @@ import ( "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/copy" - "github.com/containers/image/v5/docker" "github.com/containers/image/v5/signature" "github.com/containers/image/v5/types" + "github.com/opencontainers/go-digest" + ispec "github.com/opencontainers/image-spec/specs-go/v1" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" @@ -244,7 +245,7 @@ func syncRun(regCfg RegistryConfig, localRepo, upstreamRepo, reference string, utils syncContextUtils, sig *signaturesCopier, log log.Logger, ) (bool, error) { - upstreamImageDigest, refIsDigest := parseDigest(reference) + upstreamImageDigest, refIsDigest := parseReference(reference) upstreamImageRef, err := getImageRef(utils.upstreamAddr, upstreamRepo, reference) if err != nil { @@ -255,14 +256,24 @@ func syncRun(regCfg RegistryConfig, return false, err } - if !refIsDigest { - upstreamImageDigest, err = docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef) - if err != nil { - log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference()) + manifestBuf, mediaType, err := getImageRefManifest(context.Background(), utils.upstreamCtx, upstreamImageRef, log) + if err != nil { + return false, err + } - return false, err + if !refIsDigest { + upstreamImageDigest = digest.FromBytes(manifestBuf) + } + + if !isSupportedMediaType(mediaType) { + if mediaType == ispec.MediaTypeArtifactManifest { + err = sig.syncOCIArtifact(localRepo, upstreamRepo, reference, manifestBuf) + if err != nil { + return false, err + } } + + return false, nil } // get upstream signatures @@ -272,7 +283,7 @@ func syncRun(regCfg RegistryConfig, Err(err).Msgf("couldn't get upstream image %s cosign manifest", upstreamImageRef.DockerReference()) } - refs, err := sig.getNotarySignatures(upstreamRepo, upstreamImageDigest.String()) + refs, err := sig.getNotaryRefs(upstreamRepo, upstreamImageDigest.String()) if err != nil { log.Error().Str("errorType", TypeOf(err)). Err(err).Msgf("couldn't get upstream image %s notary references", upstreamImageRef.DockerReference()) @@ -346,7 +357,7 @@ func syncRun(regCfg RegistryConfig, return false, err } - err = sig.syncNotarySignature(localRepo, upstreamRepo, upstreamImageDigest.String(), refs) + err = sig.syncNotaryRefs(localRepo, upstreamRepo, upstreamImageDigest.String(), refs) if err != nil { log.Error().Str("errorType", TypeOf(err)). Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference) @@ -382,7 +393,7 @@ func syncSignaturesArtifacts(sig *signaturesCopier, localRepo, upstreamRepo, ref } case artifactType == OrasArtifact: // is notary signature - refs, err := sig.getNotarySignatures(upstreamRepo, reference) + refs, err := sig.getNotaryRefs(upstreamRepo, reference) if err != nil { sig.log.Error().Str("errorType", TypeOf(err)). Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, upstreamRepo, reference) @@ -390,7 +401,7 @@ func syncSignaturesArtifacts(sig *signaturesCopier, localRepo, upstreamRepo, ref return err } - err = sig.syncNotarySignature(localRepo, upstreamRepo, reference, refs) + err = sig.syncNotaryRefs(localRepo, upstreamRepo, reference, refs) if err != nil { sig.log.Error().Str("errorType", TypeOf(err)). Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, upstreamRepo, reference) diff --git a/pkg/extensions/sync/signatures.go b/pkg/extensions/sync/signatures.go index 2f5a002a..474cbf93 100644 --- a/pkg/extensions/sync/signatures.go +++ b/pkg/extensions/sync/signatures.go @@ -86,7 +86,7 @@ func (sig *signaturesCopier) getCosignManifest(repo, digestStr string) (*ispec.M return &cosignManifest, nil } -func (sig *signaturesCopier) getNotarySignatures(repo, digestStr string) (ReferenceList, error) { +func (sig *signaturesCopier) getNotaryRefs(repo, digestStr string) (ReferenceList, error) { var referrers ReferenceList getReferrersURL := sig.upstreamURL @@ -99,7 +99,6 @@ func (sig *signaturesCopier) getNotarySignatures(repo, digestStr string) (Refere resp, err := sig.client.R(). SetHeader("Content-Type", "application/json"). - SetQueryParam("artifactType", notreg.ArtifactTypeNotation). Get(getReferrersURL.String()) if err != nil { sig.log.Error().Str("errorType", TypeOf(err)). @@ -216,6 +215,8 @@ func (sig *signaturesCopier) syncCosignSignature(localRepo, remoteRepo, digestSt if err != nil { sig.log.Error().Str("errorType", TypeOf(err)). Err(err).Msg("couldn't marshal cosign manifest") + + return err } // push manifest @@ -233,13 +234,68 @@ func (sig *signaturesCopier) syncCosignSignature(localRepo, remoteRepo, digestSt return nil } -func (sig *signaturesCopier) syncNotarySignature(localRepo, remoteRepo, digestStr string, referrers ReferenceList, +func (sig *signaturesCopier) syncOCIArtifact(localRepo, remoteRepo, reference string, + ociArtifactBuf []byte, +) error { + var ociArtifact ispec.Artifact + + err := json.Unmarshal(ociArtifactBuf, &ociArtifact) + if err != nil { + sig.log.Error().Err(err).Msgf("couldn't unmarshal OCI artifact from %s:%s", remoteRepo, reference) + + return err + } + + canSkipOCIArtifact, err := sig.canSkipOCIArtifact(localRepo, reference, ociArtifact) + if err != nil { + sig.log.Error().Err(err).Msgf("couldn't check if OCI artifact %s:%s can be skipped", + remoteRepo, reference) + } + + if canSkipOCIArtifact { + return nil + } + + imageStore := sig.storeController.GetImageStore(localRepo) + + sig.log.Info().Msg("syncing OCI artifacts") + + for _, blob := range ociArtifact.Blobs { + if err := syncBlob(sig, imageStore, localRepo, remoteRepo, blob.Digest); err != nil { + return err + } + } + + artifactManifestBuf, err := json.Marshal(ociArtifact) + if err != nil { + sig.log.Error().Str("errorType", TypeOf(err)). + Err(err).Msg("couldn't marshal OCI artifact") + + return err + } + + // push manifest + _, err = imageStore.PutImageManifest(localRepo, reference, + ispec.MediaTypeArtifactManifest, artifactManifestBuf) + if err != nil { + sig.log.Error().Str("errorType", TypeOf(err)). + Err(err).Msg("couldn't upload OCI artifact manifest") + + return err + } + + sig.log.Info().Msgf("successfully synced OCI artifact for repo %s tag %s", localRepo, reference) + + return nil +} + +func (sig *signaturesCopier) syncNotaryRefs(localRepo, remoteRepo, digestStr string, referrers ReferenceList, ) error { if len(referrers.References) == 0 { return nil } - skipNotarySig, err := sig.canSkipNotarySignature(localRepo, digestStr, referrers) + skipNotarySig, err := sig.canSkipNotaryRefs(localRepo, digestStr, referrers) if err != nil { sig.log.Error().Err(err).Msgf("couldn't check if the upstream image %s:%s notary signature can be skipped", remoteRepo, digestStr) @@ -319,7 +375,7 @@ func (sig *signaturesCopier) syncOCIRefs(localRepo, remoteRepo, digestStr string imageStore := sig.storeController.GetImageStore(localRepo) - sig.log.Info().Msg("syncing oci references") + sig.log.Info().Msg("syncing OCI references") for _, ref := range index.Manifests { getRefManifestURL := sig.upstreamURL @@ -392,7 +448,7 @@ func (sig *signaturesCopier) syncOCIRefs(localRepo, remoteRepo, digestStr string return nil } -func (sig *signaturesCopier) canSkipNotarySignature(localRepo, digestStr string, refs ReferenceList, +func (sig *signaturesCopier) canSkipNotaryRefs(localRepo, digestStr string, refs ReferenceList, ) (bool, error) { imageStore := sig.storeController.GetImageStore(localRepo) digest := godigest.Digest(digestStr) @@ -423,6 +479,43 @@ func (sig *signaturesCopier) canSkipNotarySignature(localRepo, digestStr string, return true, nil } +func (sig *signaturesCopier) canSkipOCIArtifact(localRepo, reference string, artifact ispec.Artifact, +) (bool, error) { + imageStore := sig.storeController.GetImageStore(localRepo) + + var localArtifactManifest ispec.Artifact + + localArtifactBuf, _, _, err := imageStore.GetImageManifest(localRepo, reference) + if err != nil { + if errors.Is(err, zerr.ErrManifestNotFound) { + return false, nil + } + + sig.log.Error().Str("errorType", TypeOf(err)). + Err(err).Msgf("couldn't get local OCI artifact %s:%s manifest", localRepo, reference) + + return false, err + } + + err = json.Unmarshal(localArtifactBuf, &localArtifactManifest) + if err != nil { + sig.log.Error().Str("errorType", TypeOf(err)). + Err(err).Msgf("couldn't unmarshal local OCI artifact %s:%s manifest", localRepo, reference) + + return false, err + } + + if !artifactsEqual(localArtifactManifest, artifact) { + sig.log.Info().Msgf("upstream OCI artifact %s:%s changed, syncing again", localRepo, reference) + + return false, nil + } + + sig.log.Info().Msgf("skipping OCI artifact %s:%s, already synced", localRepo, reference) + + return true, nil +} + func (sig *signaturesCopier) canSkipCosignSignature(localRepo, digestStr string, cosignManifest *ispec.Manifest, ) (bool, error) { imageStore := sig.storeController.GetImageStore(localRepo) @@ -480,7 +573,7 @@ func (sig *signaturesCopier) canSkipOCIRefs(localRepo, digestStr string, index i } sig.log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't get local ocireferences for %s:%s manifest", localRepo, digestStr) + Err(err).Msgf("couldn't get local oci references for %s:%s manifest", localRepo, digestStr) return false, err } diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 4aa866a5..2ab9f705 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -16,6 +16,7 @@ import ( "github.com/containers/image/v5/docker/reference" "github.com/containers/image/v5/signature" "github.com/containers/image/v5/types" + "github.com/opencontainers/go-digest" ispec "github.com/opencontainers/image-spec/specs-go/v1" "gopkg.in/resty.v1" @@ -300,13 +301,26 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, defer os.RemoveAll(localCachePath) for _, upstreamImageRef := range repoReference.imageReferences { - upstreamImageDigest, err := docker.GetDigest(ctx, upstreamCtx, upstreamImageRef) + manifestBuf, mediaType, err := getImageRefManifest(ctx, upstreamCtx, upstreamImageRef, log) if err != nil { - log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference()) - return err } + upstreamImageDigest := digest.FromBytes(manifestBuf) + + tag := getTagFromRef(upstreamImageRef, log).Tag() + + if !isSupportedMediaType(mediaType) { + if mediaType == ispec.MediaTypeArtifactManifest { + err = sig.syncOCIArtifact(localRepo, upstreamRepo, tag, manifestBuf) + if err != nil { + return err + } + } + + continue + } + // get upstream signatures cosignManifest, err := sig.getCosignManifest(upstreamRepo, upstreamImageDigest.String()) if err != nil && !errors.Is(err, zerr.ErrSyncReferrerNotFound) { @@ -315,7 +329,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, return err } - refs, err := sig.getNotarySignatures(upstreamRepo, upstreamImageDigest.String()) + refs, err := sig.getNotaryRefs(upstreamRepo, upstreamImageDigest.String()) if err != nil && !errors.Is(err, zerr.ErrSyncReferrerNotFound) { log.Error().Err(err).Msgf("couldn't get upstream image %s notary references", upstreamImageRef.DockerReference()) @@ -333,8 +347,6 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, } } - tag := getTagFromRef(upstreamImageRef, log).Tag() - skipImage, err := canSkipImage(localRepo, tag, upstreamImageDigest, imageStore, log) if err != nil { log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped", @@ -392,7 +404,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, return err } - err = sig.syncNotarySignature(localRepo, upstreamRepo, upstreamImageDigest.String(), refs) + err = sig.syncNotaryRefs(localRepo, upstreamRepo, upstreamImageDigest.String(), refs) if err != nil { return err } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index d44e7b05..5534ed3b 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -335,8 +335,39 @@ func TestSyncInternal(t *testing.T) { err = sig.syncCosignSignature(testImage, testImage, testImageTag, &manifest) So(err, ShouldNotBeNil) - err = sig.syncNotarySignature(testImage, testImage, "invalidDigest", ReferenceList{[]artifactspec.Descriptor{ref}}) + err = sig.syncOCIArtifact(testImage, testImage, testImageTag, nil) So(err, ShouldNotBeNil) + + ociArtifactBuf, err := json.Marshal(ispec.Artifact{}) + So(err, ShouldBeNil) + + err = sig.syncOCIArtifact(testImage, testImage, testImageTag, ociArtifactBuf) + So(err, ShouldBeNil) + + ociArtifactBuf, err = json.Marshal( + ispec.Artifact{Blobs: []ispec.Descriptor{{Digest: "fakeDigest"}}}) + So(err, ShouldBeNil) + + err = sig.syncOCIArtifact(testImage, testImage, testImageTag, ociArtifactBuf) + So(err, ShouldNotBeNil) + + err = sig.syncNotaryRefs(testImage, testImage, "invalidDigest", ReferenceList{[]artifactspec.Descriptor{ref}}) + So(err, ShouldNotBeNil) + + Convey("Trigger unmarshal error on canSkipOCIArtifact", func() { + sig := newSignaturesCopier(client, *regURL, storage.StoreController{DefaultStore: mocks.MockedImageStore{ + GetImageManifestFn: func(repo, reference string) ([]byte, godigest.Digest, string, error) { + result := []byte{} + digest := godigest.FromBytes(result) + + return result, digest, "", nil + }, + }}, log) + + skip, err := sig.canSkipOCIArtifact(testImage, testImageTag, ispec.Artifact{}) + So(skip, ShouldBeFalse) + So(err, ShouldNotBeNil) + }) }) Convey("Test canSkipImage()", t, func() { @@ -379,14 +410,14 @@ func TestSyncInternal(t *testing.T) { sig := newSignaturesCopier(resty.New(), *regURL, storage.StoreController{DefaultStore: imageStore}, log) - canBeSkipped, err = sig.canSkipNotarySignature(testImage, testImageManifestDigest.String(), refs) + canBeSkipped, err = sig.canSkipNotaryRefs(testImage, testImageManifestDigest.String(), refs) So(err, ShouldBeNil) So(canBeSkipped, ShouldBeFalse) err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o000) So(err, ShouldBeNil) - canBeSkipped, err = sig.canSkipNotarySignature(testImage, testImageManifestDigest.String(), refs) + canBeSkipped, err = sig.canSkipNotaryRefs(testImage, testImageManifestDigest.String(), refs) So(err, ShouldNotBeNil) So(canBeSkipped, ShouldBeFalse) diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index aa35fd2e..77b58c9b 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -4427,23 +4427,310 @@ func TestSyncImageIndex(t *testing.T) { So(resp.Body(), ShouldNotBeEmpty) So(resp.Header().Get("Content-Type"), ShouldNotBeEmpty) - // start downstream server - dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + Convey("sync periodically", func() { + // start downstream server + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + + defer func() { + dctlr.Shutdown() + }() + + // give it time to set up sync + t.Logf("waitsync(%s, %s)", dctlr.Config.Storage.RootDirectory, "index") + waitSync(dctlr.Config.Storage.RootDirectory, "index") + + resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex). + Get(destBaseURL + "/v2/index/manifests/latest") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + So(resp.Body(), ShouldNotBeEmpty) + So(resp.Header().Get("Content-Type"), ShouldNotBeEmpty) + + var syncedIndex ispec.Index + err := json.Unmarshal(resp.Body(), &syncedIndex) + So(err, ShouldBeNil) + + So(reflect.DeepEqual(syncedIndex, index), ShouldEqual, true) + }) + + Convey("sync on demand", func() { + // start downstream server + syncConfig.Registries[0].OnDemand = true + syncConfig.Registries[0].PollInterval = 0 + + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + + defer func() { + dctlr.Shutdown() + }() + + resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex). + Get(destBaseURL + "/v2/index/manifests/latest") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + So(resp.Body(), ShouldNotBeEmpty) + So(resp.Header().Get("Content-Type"), ShouldNotBeEmpty) + + var syncedIndex ispec.Index + err := json.Unmarshal(resp.Body(), &syncedIndex) + So(err, ShouldBeNil) + + So(reflect.DeepEqual(syncedIndex, index), ShouldEqual, true) + }) + }) +} + +func TestSyncOCIArtifactsWithTag(t *testing.T) { + Convey("Verify syncing tagged OCI artifacts", t, func() { + updateDuration, _ := time.ParseDuration("5s") + + sctlr, srcBaseURL, _, _, _ := startUpstreamServer(t, false, false) defer func() { - dctlr.Shutdown() + sctlr.Shutdown() }() - // give it time to set up sync - t.Logf("waitsync(%s, %s)", dctlr.Config.Storage.RootDirectory, "index") - waitSync(dctlr.Config.Storage.RootDirectory, "index") + regex := ".*" + var semver bool + tlsVerify := false - resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex). - Get(destBaseURL + "/v2/index/manifests/latest") + repoName := "artifact" + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: repoName, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + OnDemand: false, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + // create artifact blob + buf := []byte("this is an artifact") + digest := pushBlob(srcBaseURL, repoName, buf) + + // create artifact config blob + cbuf := []byte("{}") + cdigest := pushBlob(srcBaseURL, repoName, cbuf) + + // push a referrer artifact + manifest := ispec.Manifest{ + MediaType: ispec.MediaTypeImageManifest, + Config: ispec.Descriptor{ + MediaType: "application/vnd.cncf.icecream", + Digest: cdigest, + Size: int64(len(cbuf)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/octet-stream", + Digest: digest, + Size: int64(len(buf)), + }, + }, + } + + artifactManifest := ispec.Artifact{ + MediaType: ispec.MediaTypeArtifactManifest, + ArtifactType: "application/vnd.cncf.icecream", + Blobs: []ispec.Descriptor{ + { + MediaType: "application/octet-stream", + Digest: digest, + Size: int64(len(buf)), + }, + }, + } + + manifest.SchemaVersion = 2 + + content, err := json.Marshal(manifest) So(err, ShouldBeNil) - So(resp.StatusCode(), ShouldEqual, http.StatusOK) - So(resp.Body(), ShouldNotBeEmpty) - So(resp.Header().Get("Content-Type"), ShouldNotBeEmpty) + + // put OCI artifact mediatype oci image + _, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageManifest). + SetBody(content).Put(srcBaseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, "1.0")) + So(err, ShouldBeNil) + + content, err = json.Marshal(artifactManifest) + So(err, ShouldBeNil) + + artifactDigest := godigest.FromBytes(content) + + // put OCI artifact mediatype artifact + _, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeArtifactManifest). + SetBody(content).Put(srcBaseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, "2.0")) + So(err, ShouldBeNil) + + Convey("sync periodically", func() { + // start downstream server + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + + defer func() { + dctlr.Shutdown() + }() + + // give it time to set up sync + t.Logf("waitsync(%s, %s)", dctlr.Config.Storage.RootDirectory, repoName) + waitSync(dctlr.Config.Storage.RootDirectory, repoName) + + resp, err := resty.R().SetHeader("Content-Type", ispec.MediaTypeImageManifest). + Get(destBaseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, "1.0")) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + So(resp.Body(), ShouldNotBeEmpty) + So(resp.Header().Get("Content-Type"), ShouldNotBeEmpty) + + var syncedManifest ispec.Manifest + err = json.Unmarshal(resp.Body(), &syncedManifest) + So(err, ShouldBeNil) + + So(reflect.DeepEqual(syncedManifest, manifest), ShouldEqual, true) + + // sync again for coverage + time.Sleep(5 * time.Second) + }) + + Convey("sync on demand", func() { + // start downstream server + syncConfig.Registries[0].OnDemand = true + syncConfig.Registries[0].PollInterval = 0 + + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + + defer func() { + dctlr.Shutdown() + }() + + resp, err := resty.R().SetHeader("Content-Type", ispec.MediaTypeArtifactManifest). + Get(destBaseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, "2.0")) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + So(resp.Body(), ShouldNotBeEmpty) + So(resp.Header().Get("Content-Type"), ShouldNotBeEmpty) + + var syncedArtifact ispec.Artifact + err = json.Unmarshal(resp.Body(), &syncedArtifact) + So(err, ShouldBeNil) + + So(reflect.DeepEqual(syncedArtifact, artifactManifest), ShouldEqual, true) + }) + + Convey("sync periodically error on mediatype", func() { + manifestPath := path.Join(sctlr.Config.Storage.RootDirectory, repoName, "blobs", "sha256", artifactDigest.Encoded()) + So(os.Chmod(manifestPath, 0o000), ShouldBeNil) + + // start downstream server + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + + defer func() { + dctlr.Shutdown() + }() + + defer func() { + err := os.Chmod(manifestPath, 0o755) + So(err, ShouldBeNil) + }() + + // give it time to set up sync + time.Sleep(3 * time.Second) + + resp, err := resty.R().SetHeader("Content-Type", ispec.MediaTypeArtifactManifest). + Get(destBaseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, artifactDigest.String())) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + }) + + Convey("sync on demand error on mediatype", func() { + // start downstream server + syncConfig.Registries[0].OnDemand = true + syncConfig.Registries[0].PollInterval = 0 + + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + + defer func() { + dctlr.Shutdown() + }() + + manifestPath := path.Join(sctlr.Config.Storage.RootDirectory, repoName, "blobs", "sha256", artifactDigest.Encoded()) + So(os.Chmod(manifestPath, 0o000), ShouldBeNil) + defer func() { + err := os.Chmod(manifestPath, 0o755) + So(err, ShouldBeNil) + }() + + resp, err := resty.R().SetHeader("Content-Type", ispec.MediaTypeArtifactManifest). + Get(destBaseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, artifactDigest.String())) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + }) + + Convey("sync on demand and periodically error on PutImageManifest", func() { + // start downstream server + syncConfig.Registries[0].OnDemand = true + + destDir := t.TempDir() + destConfig := config.New() + + destConfig.HTTP.Port = test.GetFreePort() + destBaseURL := test.GetBaseURL(destConfig.HTTP.Port) + + destConfig.Storage.RootDirectory = destDir + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = syncConfig + + dctlr := api.NewController(destConfig) + + manifestPath := path.Join(destDir, repoName, "blobs", "sha256", artifactDigest.Encoded()) + So(os.MkdirAll(manifestPath, 0o755), ShouldBeNil) + So(os.Chmod(manifestPath, 0o000), ShouldBeNil) + + go func() { + // this blocks + if err := dctlr.Run(context.Background()); err != nil { + return + } + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + + time.Sleep(100 * time.Millisecond) + } + + defer func() { + dctlr.Shutdown() + }() + + defer func() { + err := os.Chmod(manifestPath, 0o755) + So(err, ShouldBeNil) + }() + + resp, err := resty.R().SetHeader("Content-Type", ispec.MediaTypeArtifactManifest). + Get(destBaseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, artifactDigest.String())) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + }) }) } @@ -4620,6 +4907,7 @@ func pushRepo(url, repoName string) godigest.Digest { // push a referrer artifact manifest = ispec.Manifest{ + MediaType: ispec.MediaTypeImageManifest, Config: ispec.Descriptor{ MediaType: "application/vnd.cncf.icecream", Digest: acdigest, diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index bf637795..c84692be 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -499,7 +499,7 @@ func getImageRef(registryDomain, repo, ref string) (types.ImageReference, error) var namedRepoRef reference.Named - digest, ok := parseDigest(ref) + digest, ok := parseReference(ref) if ok { namedRepoRef, err = reference.WithDigest(repoRef, digest) if err != nil { @@ -528,7 +528,7 @@ func getLocalImageRef(localCachePath, repo, reference string) (types.ImageRefere localRepo := path.Join(localCachePath, repo) - _, refIsDigest := parseDigest(reference) + _, refIsDigest := parseReference(reference) if !refIsDigest { localRepo = fmt.Sprintf("%s:%s", localRepo, reference) @@ -600,7 +600,7 @@ func canSkipImage(repo, tag string, digest godigest.Digest, imageStore storage.I } // parse a reference, return its digest and if it's valid. -func parseDigest(reference string) (godigest.Digest, bool) { +func parseReference(reference string) (godigest.Digest, bool) { var ok bool d, err := godigest.Parse(reference) @@ -623,6 +623,17 @@ func manifestsEqual(manifest1, manifest2 ispec.Manifest) bool { return false } +func artifactsEqual(manifest1, manifest2 ispec.Artifact) bool { + if manifest1.ArtifactType == manifest2.ArtifactType && + manifest1.MediaType == manifest2.MediaType { + if descriptorsEqual(manifest1.Blobs, manifest2.Blobs) { + return true + } + } + + return false +} + func artifactDescriptorsEqual(desc1, desc2 []artifactspec.Descriptor) bool { if len(desc1) != len(desc2) { return false @@ -646,13 +657,48 @@ func descriptorsEqual(desc1, desc2 []ispec.Descriptor) bool { } for id, desc := range desc1 { - if desc.Digest != desc2[id].Digest || - desc.Size != desc2[id].Size || - desc.MediaType != desc2[id].MediaType || - desc.Annotations[static.SignatureAnnotationKey] != desc2[id].Annotations[static.SignatureAnnotationKey] { + if !descriptorEqual(desc, desc2[id]) { return false } } return true } + +func descriptorEqual(desc1, desc2 ispec.Descriptor) bool { + if desc1.Size == desc2.Size && + desc1.Digest == desc2.Digest && + desc1.MediaType == desc2.MediaType && + desc1.Annotations[static.SignatureAnnotationKey] == desc2.Annotations[static.SignatureAnnotationKey] { + return true + } + + return false +} + +func isSupportedMediaType(mediaType string) bool { + return mediaType == ispec.MediaTypeImageIndex || + mediaType == ispec.MediaTypeImageManifest +} + +func getImageRefManifest(ctx context.Context, upstreamCtx *types.SystemContext, imageRef types.ImageReference, + log log.Logger, +) ([]byte, string, error) { + imageSource, err := imageRef.NewImageSource(ctx, upstreamCtx) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s manifest details", imageRef.DockerReference()) + + return []byte{}, "", err + } + + defer imageSource.Close() + + manifestBuf, mediaType, err := imageSource.GetManifest(ctx, nil) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s manifest mediaType", imageRef.DockerReference()) + + return []byte{}, "", err + } + + return manifestBuf, mediaType, nil +} diff --git a/test/blackbox/sync.bats b/test/blackbox/sync.bats index 857b6207..7860afd0 100644 --- a/test/blackbox/sync.bats +++ b/test/blackbox/sync.bats @@ -329,33 +329,58 @@ function teardown_file() { } # sync OCI artifacts -@test "push OCI artifact with regclient" { +@test "push OCI artifact (oci image mediatype) with regclient" { run regctl registry set localhost:9000 --tls disabled run regctl registry set localhost:8081 --tls disabled run regctl registry set localhost:8082 --tls disabled run regctl artifact put localhost:9000/artifact:demo <