mirror of
https://github.com/project-zot/zot.git
synced 2025-02-17 23:45:36 -05:00
sync: pull only missing images, not everything, closes #335
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
bb53552048
commit
35eeedb22a
4 changed files with 628 additions and 32 deletions
|
@ -354,6 +354,16 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
|
||||||
|
|
||||||
imageStore := storeController.GetImageStore(repo)
|
imageStore := storeController.GetImageStore(repo)
|
||||||
|
|
||||||
|
canBeSkipped, err := canSkipImage(repo, tag, upstreamImageRef, imageStore, upstreamCtx, log)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped",
|
||||||
|
upstreamImageRef.DockerReference())
|
||||||
|
}
|
||||||
|
|
||||||
|
if canBeSkipped {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
localCachePath, err := getLocalCachePath(imageStore, repo)
|
localCachePath, err := getLocalCachePath(imageStore, repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Str("dir", localCachePath).Msg("couldn't create temporary dir")
|
log.Error().Err(err).Str("dir", localCachePath).Msg("couldn't create temporary dir")
|
||||||
|
@ -371,15 +381,15 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info().Msgf("copying image %s:%s to %s", upstreamImageRef.DockerReference(), tag, localCachePath)
|
log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath)
|
||||||
|
|
||||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||||
_, err = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options)
|
_, err = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}, retryOptions); err != nil {
|
}, retryOptions); err != nil {
|
||||||
log.Error().Err(err).Msgf("error while copying image %s:%s to %s",
|
log.Error().Err(err).Msgf("error while copying image %s to %s",
|
||||||
upstreamImageRef.DockerReference(), tag, localCachePath)
|
upstreamImageRef.DockerReference(), localCachePath)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -397,7 +407,7 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}, retryOptions); err != nil {
|
}, retryOptions); err != nil {
|
||||||
log.Error().Err(err).Msgf("couldn't copy image signature %s:%s", upstreamImageRef.DockerReference(), tag)
|
log.Error().Err(err).Msgf("couldn't copy image signature %s", upstreamImageRef.DockerReference())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ import (
|
||||||
"zotregistry.io/zot/pkg/extensions/monitoring"
|
"zotregistry.io/zot/pkg/extensions/monitoring"
|
||||||
"zotregistry.io/zot/pkg/log"
|
"zotregistry.io/zot/pkg/log"
|
||||||
"zotregistry.io/zot/pkg/storage"
|
"zotregistry.io/zot/pkg/storage"
|
||||||
. "zotregistry.io/zot/pkg/test"
|
"zotregistry.io/zot/pkg/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -88,8 +88,8 @@ func TestSyncInternal(t *testing.T) {
|
||||||
|
|
||||||
var tlsVerify bool
|
var tlsVerify bool
|
||||||
updateDuration := time.Microsecond
|
updateDuration := time.Microsecond
|
||||||
port := GetFreePort()
|
port := test.GetFreePort()
|
||||||
baseURL := GetBaseURL(port)
|
baseURL := test.GetBaseURL(port)
|
||||||
syncRegistryConfig := RegistryConfig{
|
syncRegistryConfig := RegistryConfig{
|
||||||
Content: []Content{
|
Content: []Content{
|
||||||
{
|
{
|
||||||
|
@ -119,14 +119,14 @@ func TestSyncInternal(t *testing.T) {
|
||||||
Prefix: testImage,
|
Prefix: testImage,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
URLs: []string{BaseURL},
|
URLs: []string{test.BaseURL},
|
||||||
PollInterval: updateDuration,
|
PollInterval: updateDuration,
|
||||||
TLSVerify: &tlsVerify,
|
TLSVerify: &tlsVerify,
|
||||||
CertDir: "/tmp/missing_certs/a/b/c/d/z",
|
CertDir: "/tmp/missing_certs/a/b/c/d/z",
|
||||||
}
|
}
|
||||||
|
|
||||||
port := GetFreePort()
|
port := test.GetFreePort()
|
||||||
baseURL := GetBaseURL(port)
|
baseURL := test.GetBaseURL(port)
|
||||||
|
|
||||||
httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
|
httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
|
@ -149,9 +149,9 @@ func TestSyncInternal(t *testing.T) {
|
||||||
|
|
||||||
var tlsVerify bool
|
var tlsVerify bool
|
||||||
updateDuration := time.Microsecond
|
updateDuration := time.Microsecond
|
||||||
port := GetFreePort()
|
port := test.GetFreePort()
|
||||||
baseURL := GetBaseURL(port)
|
baseURL := test.GetBaseURL(port)
|
||||||
baseSecureURL := GetSecureBaseURL(port)
|
baseSecureURL := test.GetSecureBaseURL(port)
|
||||||
|
|
||||||
syncRegistryConfig := RegistryConfig{
|
syncRegistryConfig := RegistryConfig{
|
||||||
Content: []Content{
|
Content: []Content{
|
||||||
|
@ -187,7 +187,7 @@ func TestSyncInternal(t *testing.T) {
|
||||||
_, err = getUpstreamCatalog(httpClient, "http://invalid:5000", log.NewLogger("debug", ""))
|
_, err = getUpstreamCatalog(httpClient, "http://invalid:5000", log.NewLogger("debug", ""))
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
|
|
||||||
syncRegistryConfig.URLs = []string{BaseURL}
|
syncRegistryConfig.URLs = []string{test.BaseURL}
|
||||||
httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", ""))
|
httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", ""))
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
So(httpClient, ShouldBeNil)
|
So(httpClient, ShouldBeNil)
|
||||||
|
@ -229,6 +229,51 @@ func TestSyncInternal(t *testing.T) {
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Convey("Test canSkipImage()", t, func() {
|
||||||
|
storageDir, err := ioutil.TempDir("", "oci-dest-repo-test")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = test.CopyFiles("../../../test/data", storageDir)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer os.RemoveAll(storageDir)
|
||||||
|
|
||||||
|
log := log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||||
|
metrics := monitoring.NewMetricsServer(false, log)
|
||||||
|
|
||||||
|
imageStore := storage.NewImageStore(storageDir, false, false, false, log, metrics)
|
||||||
|
|
||||||
|
repoRefStr := fmt.Sprintf("%s/%s", host, testImage)
|
||||||
|
repoRef, err := parseRepositoryReference(repoRefStr)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(repoRef, ShouldNotBeNil)
|
||||||
|
|
||||||
|
taggedRef, err := reference.WithTag(repoRef, testImageTag)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(taggedRef, ShouldNotBeNil)
|
||||||
|
|
||||||
|
upstreamRef, err := docker.NewReference(taggedRef)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(taggedRef, ShouldNotBeNil)
|
||||||
|
|
||||||
|
canBeSkipped, err := canSkipImage(testImage, testImageTag, upstreamRef, imageStore, &types.SystemContext{}, log)
|
||||||
|
So(err, ShouldNotBeNil)
|
||||||
|
So(canBeSkipped, ShouldBeFalse)
|
||||||
|
|
||||||
|
err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o000)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
canBeSkipped, err = canSkipImage(testImage, testImageTag, upstreamRef, imageStore, &types.SystemContext{}, log)
|
||||||
|
So(err, ShouldNotBeNil)
|
||||||
|
So(canBeSkipped, ShouldBeFalse)
|
||||||
|
})
|
||||||
|
|
||||||
Convey("Test filterRepos()", t, func() {
|
Convey("Test filterRepos()", t, func() {
|
||||||
repos := []string{"repo", "repo1", "repo2", "repo/repo2", "repo/repo2/repo3/repo4"}
|
repos := []string{"repo", "repo1", "repo2", "repo/repo2", "repo/repo2/repo3/repo4"}
|
||||||
contents := []Content{
|
contents := []Content{
|
||||||
|
@ -284,7 +329,7 @@ func TestSyncInternal(t *testing.T) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = CopyFiles("../../../test/data", testRootDir)
|
err = test.CopyFiles("../../../test/data", testRootDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -555,8 +555,8 @@ func TestPeriodically(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPermsDenied(t *testing.T) {
|
func TestOnDemandPermsDenied(t *testing.T) {
|
||||||
Convey("Verify sync feature without perm on sync cache", t, func() {
|
Convey("Verify sync on demand feature without perm on sync cache", t, func() {
|
||||||
updateDuration, _ := time.ParseDuration("30m")
|
updateDuration, _ := time.ParseDuration("30m")
|
||||||
|
|
||||||
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
||||||
|
@ -593,19 +593,58 @@ func TestPermsDenied(t *testing.T) {
|
||||||
Registries: []sync.RegistryConfig{syncRegistryConfig},
|
Registries: []sync.RegistryConfig{syncRegistryConfig},
|
||||||
}
|
}
|
||||||
|
|
||||||
dctlr, destBaseURL, destDir, destClient := startDownstreamServer(false, syncConfig)
|
destPort := test.GetFreePort()
|
||||||
|
destConfig := config.New()
|
||||||
|
destBaseURL := test.GetBaseURL(destPort)
|
||||||
|
|
||||||
|
destConfig.HTTP.Port = destPort
|
||||||
|
|
||||||
|
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
defer os.RemoveAll(destDir)
|
defer os.RemoveAll(destDir)
|
||||||
|
|
||||||
|
destConfig.Storage.RootDirectory = destDir
|
||||||
|
|
||||||
|
destConfig.Extensions = &extconf.ExtensionConfig{}
|
||||||
|
destConfig.Extensions.Search = nil
|
||||||
|
destConfig.Extensions.Sync = syncConfig
|
||||||
|
|
||||||
|
dctlr := api.NewController(destConfig)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
dctlr.Shutdown()
|
dctlr.Shutdown()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err := os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000)
|
go func() {
|
||||||
if err != nil {
|
// this blocks
|
||||||
panic(err)
|
if err := dctlr.Run(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
// wait till ready
|
||||||
|
for {
|
||||||
|
_, err := resty.R().Get(destBaseURL)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(resp.StatusCode(), ShouldEqual, 404)
|
So(resp.StatusCode(), ShouldEqual, 404)
|
||||||
|
|
||||||
|
@ -616,6 +655,101 @@ func TestPermsDenied(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeriodicallyPermsDenied(t *testing.T) {
|
||||||
|
Convey("Verify periodically sync feature without perm on sync cache", t, func() {
|
||||||
|
updateDuration, _ := time.ParseDuration("30m")
|
||||||
|
|
||||||
|
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
||||||
|
defer os.RemoveAll(srcDir)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
sctlr.Shutdown()
|
||||||
|
}()
|
||||||
|
|
||||||
|
regex := ".*"
|
||||||
|
semver := true
|
||||||
|
var tlsVerify bool
|
||||||
|
|
||||||
|
syncRegistryConfig := sync.RegistryConfig{
|
||||||
|
Content: []sync.Content{
|
||||||
|
{
|
||||||
|
Prefix: testImage,
|
||||||
|
Tags: &sync.Tags{
|
||||||
|
Regex: ®ex,
|
||||||
|
Semver: &semver,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
URLs: []string{srcBaseURL},
|
||||||
|
PollInterval: updateDuration,
|
||||||
|
TLSVerify: &tlsVerify,
|
||||||
|
CertDir: "",
|
||||||
|
OnDemand: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultVal := true
|
||||||
|
syncConfig := &sync.Config{
|
||||||
|
Enable: &defaultVal,
|
||||||
|
Registries: []sync.RegistryConfig{syncRegistryConfig},
|
||||||
|
}
|
||||||
|
|
||||||
|
destPort := test.GetFreePort()
|
||||||
|
destConfig := config.New()
|
||||||
|
destBaseURL := test.GetBaseURL(destPort)
|
||||||
|
|
||||||
|
destConfig.HTTP.Port = destPort
|
||||||
|
|
||||||
|
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer os.RemoveAll(destDir)
|
||||||
|
|
||||||
|
destConfig.Storage.RootDirectory = destDir
|
||||||
|
|
||||||
|
destConfig.Extensions = &extconf.ExtensionConfig{}
|
||||||
|
destConfig.Extensions.Search = nil
|
||||||
|
destConfig.Extensions.Sync = syncConfig
|
||||||
|
|
||||||
|
dctlr := api.NewController(destConfig)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// this blocks
|
||||||
|
if err := dctlr.Run(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait till ready
|
||||||
|
for {
|
||||||
|
_, err := resty.R().Get(destBaseURL)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
dctlr.Shutdown()
|
||||||
|
err := os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o755)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestBadTLS(t *testing.T) {
|
func TestBadTLS(t *testing.T) {
|
||||||
Convey("Verify sync TLS feature", t, func() {
|
Convey("Verify sync TLS feature", t, func() {
|
||||||
updateDuration, _ := time.ParseDuration("30m")
|
updateDuration, _ := time.ParseDuration("30m")
|
||||||
|
@ -2295,6 +2429,109 @@ func TestPeriodicallySignatures(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeriodicallySignaturesErr(t *testing.T) {
|
||||||
|
Convey("Verify sync signatures gives error", t, func() {
|
||||||
|
updateDuration, _ := time.ParseDuration("30m")
|
||||||
|
|
||||||
|
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
||||||
|
defer os.RemoveAll(srcDir)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
sctlr.Shutdown()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// create repo, push and sign it
|
||||||
|
repoName := testSignedImage
|
||||||
|
var digest godigest.Digest
|
||||||
|
So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic)
|
||||||
|
|
||||||
|
splittedURL := strings.SplitAfter(srcBaseURL, ":")
|
||||||
|
srcPort := splittedURL[len(splittedURL)-1]
|
||||||
|
|
||||||
|
cwd, err := os.Getwd()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
defer func() { _ = os.Chdir(cwd) }()
|
||||||
|
tdir, err := ioutil.TempDir("", "sigs")
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
defer os.RemoveAll(tdir)
|
||||||
|
_ = os.Chdir(tdir)
|
||||||
|
generateKeyPairs(tdir)
|
||||||
|
|
||||||
|
So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic)
|
||||||
|
|
||||||
|
regex := ".*"
|
||||||
|
var semver bool
|
||||||
|
var tlsVerify bool
|
||||||
|
|
||||||
|
syncRegistryConfig := sync.RegistryConfig{
|
||||||
|
Content: []sync.Content{
|
||||||
|
{
|
||||||
|
Prefix: repoName,
|
||||||
|
Tags: &sync.Tags{
|
||||||
|
Regex: ®ex,
|
||||||
|
Semver: &semver,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
URLs: []string{srcBaseURL},
|
||||||
|
PollInterval: updateDuration,
|
||||||
|
TLSVerify: &tlsVerify,
|
||||||
|
CertDir: "",
|
||||||
|
OnDemand: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultVal := true
|
||||||
|
syncConfig := &sync.Config{
|
||||||
|
Enable: &defaultVal,
|
||||||
|
Registries: []sync.RegistryConfig{syncRegistryConfig},
|
||||||
|
}
|
||||||
|
|
||||||
|
// test negative cases (trigger errors)
|
||||||
|
// test notary signatures errors
|
||||||
|
|
||||||
|
// based on manifest digest get referrers
|
||||||
|
getReferrersURL := srcBaseURL + path.Join("/oras/artifacts/v1/", repoName, "manifests", digest.String(), "referrers")
|
||||||
|
|
||||||
|
resp, err := resty.R().
|
||||||
|
SetHeader("Content-Type", "application/json").
|
||||||
|
SetQueryParam("artifactType", "application/vnd.cncf.notary.v2.signature").
|
||||||
|
Get(getReferrersURL)
|
||||||
|
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp, ShouldNotBeEmpty)
|
||||||
|
|
||||||
|
var referrers ReferenceList
|
||||||
|
|
||||||
|
err = json.Unmarshal(resp.Body(), &referrers)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
// read manifest
|
||||||
|
var nm artifactspec.Manifest
|
||||||
|
for _, ref := range referrers.References {
|
||||||
|
refPath := path.Join(srcDir, repoName, "blobs", string(ref.Digest.Algorithm()), ref.Digest.Hex())
|
||||||
|
body, err := ioutil.ReadFile(refPath)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
err = json.Unmarshal(body, &nm)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
// triggers perm denied on sig blobs
|
||||||
|
for _, blob := range nm.Blobs {
|
||||||
|
blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex())
|
||||||
|
err := os.Chmod(blobPath, 0o000)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dctlr, _, destDir, _ := startDownstreamServer(false, syncConfig)
|
||||||
|
defer func() {
|
||||||
|
dctlr.Shutdown()
|
||||||
|
defer os.RemoveAll(destDir)
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestOnDemandRetryGoroutine(t *testing.T) {
|
func TestOnDemandRetryGoroutine(t *testing.T) {
|
||||||
Convey("Verify ondemand sync retries in background on error", t, func() {
|
Convey("Verify ondemand sync retries in background on error", t, func() {
|
||||||
srcPort := test.GetFreePort()
|
srcPort := test.GetFreePort()
|
||||||
|
@ -2917,6 +3154,277 @@ func TestOnlySignaturesOnDemand(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncOnlyDiff(t *testing.T) {
|
||||||
|
Convey("Verify sync only difference between local and upstream", t, func() {
|
||||||
|
updateDuration, _ := time.ParseDuration("30m")
|
||||||
|
|
||||||
|
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
||||||
|
defer os.RemoveAll(srcDir)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
sctlr.Shutdown()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var tlsVerify bool
|
||||||
|
|
||||||
|
syncRegistryConfig := sync.RegistryConfig{
|
||||||
|
Content: []sync.Content{
|
||||||
|
{
|
||||||
|
Prefix: "**",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
URLs: []string{srcBaseURL},
|
||||||
|
PollInterval: updateDuration,
|
||||||
|
TLSVerify: &tlsVerify,
|
||||||
|
CertDir: "",
|
||||||
|
OnDemand: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultVal := true
|
||||||
|
syncConfig := &sync.Config{
|
||||||
|
Enable: &defaultVal,
|
||||||
|
Registries: []sync.RegistryConfig{syncRegistryConfig},
|
||||||
|
}
|
||||||
|
|
||||||
|
destPort := test.GetFreePort()
|
||||||
|
destConfig := config.New()
|
||||||
|
destBaseURL := test.GetBaseURL(destPort)
|
||||||
|
destConfig.HTTP.Port = destPort
|
||||||
|
|
||||||
|
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy images so we have them before syncing, sync should not pull them again
|
||||||
|
err = test.CopyFiles("../../../test/data", destDir)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
destConfig.Storage.RootDirectory = destDir
|
||||||
|
destConfig.Storage.Dedupe = false
|
||||||
|
destConfig.Storage.GC = false
|
||||||
|
|
||||||
|
destConfig.Extensions = &extconf.ExtensionConfig{}
|
||||||
|
destConfig.Extensions.Search = nil
|
||||||
|
destConfig.Extensions.Sync = syncConfig
|
||||||
|
|
||||||
|
dctlr := api.NewController(destConfig)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// this blocks
|
||||||
|
if err := dctlr.Run(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait till ready
|
||||||
|
for {
|
||||||
|
_, err := resty.R().Get(destBaseURL)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
dctlr.Shutdown()
|
||||||
|
os.RemoveAll(destDir)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// watch .sync subdir, shouldn't be populated
|
||||||
|
done := make(chan bool)
|
||||||
|
var isPopulated bool
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
_, err := os.ReadDir(path.Join(destDir, testImage, ".sync"))
|
||||||
|
if err == nil {
|
||||||
|
isPopulated = true
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode(), ShouldEqual, 200)
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
done <- true
|
||||||
|
So(isPopulated, ShouldBeFalse)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncWithDiffDigest(t *testing.T) {
|
||||||
|
Convey("Verify sync correctly detects changes in upstream images", t, func() {
|
||||||
|
updateDuration, _ := time.ParseDuration("30m")
|
||||||
|
|
||||||
|
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
||||||
|
defer os.RemoveAll(srcDir)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
sctlr.Shutdown()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var tlsVerify bool
|
||||||
|
|
||||||
|
syncRegistryConfig := sync.RegistryConfig{
|
||||||
|
Content: []sync.Content{
|
||||||
|
{
|
||||||
|
Prefix: "**",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
URLs: []string{srcBaseURL},
|
||||||
|
PollInterval: updateDuration,
|
||||||
|
TLSVerify: &tlsVerify,
|
||||||
|
CertDir: "",
|
||||||
|
OnDemand: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultVal := true
|
||||||
|
syncConfig := &sync.Config{
|
||||||
|
Enable: &defaultVal,
|
||||||
|
Registries: []sync.RegistryConfig{syncRegistryConfig},
|
||||||
|
}
|
||||||
|
|
||||||
|
destPort := test.GetFreePort()
|
||||||
|
destConfig := config.New()
|
||||||
|
destBaseURL := test.GetBaseURL(destPort)
|
||||||
|
destConfig.HTTP.Port = destPort
|
||||||
|
|
||||||
|
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy images so we have them before syncing, sync should not pull them again
|
||||||
|
err = test.CopyFiles("../../../test/data", destDir)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
destConfig.Storage.RootDirectory = destDir
|
||||||
|
destConfig.Storage.Dedupe = false
|
||||||
|
destConfig.Storage.GC = false
|
||||||
|
|
||||||
|
destConfig.Extensions = &extconf.ExtensionConfig{}
|
||||||
|
destConfig.Extensions.Search = nil
|
||||||
|
destConfig.Extensions.Sync = syncConfig
|
||||||
|
|
||||||
|
dctlr := api.NewController(destConfig)
|
||||||
|
|
||||||
|
// before starting downstream server, let's modify an image manifest so that sync should pull it
|
||||||
|
// change digest of the manifest so that sync should happen
|
||||||
|
size := 5 * 1024 * 1024
|
||||||
|
blob := make([]byte, size)
|
||||||
|
digest := godigest.FromBytes(blob)
|
||||||
|
|
||||||
|
resp, err := resty.R().Get(srcBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode(), ShouldEqual, 200)
|
||||||
|
|
||||||
|
manifestBlob := resp.Body()
|
||||||
|
|
||||||
|
var manifest ispec.Manifest
|
||||||
|
|
||||||
|
err = json.Unmarshal(manifestBlob, &manifest)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
resp, err = resty.R().Post(srcBaseURL + "/v2/" + testImage + "/blobs/uploads/")
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp, ShouldNotBeNil)
|
||||||
|
So(resp.StatusCode(), ShouldEqual, http.StatusAccepted)
|
||||||
|
|
||||||
|
loc := resp.Header().Get("Location")
|
||||||
|
|
||||||
|
resp, err = resty.R().
|
||||||
|
SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))).
|
||||||
|
SetHeader("Content-Type", "application/octet-stream").
|
||||||
|
SetQueryParam("digest", digest.String()).
|
||||||
|
SetBody(blob).
|
||||||
|
Put(srcBaseURL + loc)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp, ShouldNotBeNil)
|
||||||
|
|
||||||
|
newLayer := ispec.Descriptor{
|
||||||
|
MediaType: ispec.MediaTypeImageLayer,
|
||||||
|
Digest: digest,
|
||||||
|
Size: int64(size),
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest.Layers = append(manifest.Layers, newLayer)
|
||||||
|
|
||||||
|
manifestBody, err := json.Marshal(manifest)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err = resty.R().SetHeader("Content-type", "application/vnd.oci.image.manifest.v1+json").
|
||||||
|
SetBody(manifestBody).
|
||||||
|
Put(srcBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp, ShouldNotBeNil)
|
||||||
|
So(resp.StatusCode(), ShouldEqual, 201)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// this blocks
|
||||||
|
if err := dctlr.Run(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// watch .sync subdir, shouldn't be populated
|
||||||
|
done := make(chan bool)
|
||||||
|
var isPopulated bool
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
_, err := os.ReadDir(path.Join(destDir, testImage, ".sync"))
|
||||||
|
if err == nil {
|
||||||
|
isPopulated = true
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
dctlr.Shutdown()
|
||||||
|
os.RemoveAll(destDir)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait till ready
|
||||||
|
for {
|
||||||
|
_, err := resty.R().Get(destBaseURL)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(resp.StatusCode(), ShouldEqual, 200)
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
done <- true
|
||||||
|
So(isPopulated, ShouldBeTrue)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func generateKeyPairs(tdir string) {
|
func generateKeyPairs(tdir string) {
|
||||||
// generate a keypair
|
// generate a keypair
|
||||||
os.Setenv("COSIGN_PASSWORD", "")
|
os.Setenv("COSIGN_PASSWORD", "")
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -21,7 +23,7 @@ import (
|
||||||
ispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
|
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
|
||||||
"gopkg.in/resty.v1"
|
"gopkg.in/resty.v1"
|
||||||
"zotregistry.io/zot/errors"
|
zerr "zotregistry.io/zot/errors"
|
||||||
"zotregistry.io/zot/pkg/common"
|
"zotregistry.io/zot/pkg/common"
|
||||||
"zotregistry.io/zot/pkg/extensions/monitoring"
|
"zotregistry.io/zot/pkg/extensions/monitoring"
|
||||||
"zotregistry.io/zot/pkg/log"
|
"zotregistry.io/zot/pkg/log"
|
||||||
|
@ -58,7 +60,7 @@ func parseRepositoryReference(input string) (reference.Named, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reference.IsNameOnly(ref) {
|
if !reference.IsNameOnly(ref) {
|
||||||
return nil, errors.ErrInvalidRepositoryName
|
return nil, zerr.ErrInvalidRepositoryName
|
||||||
}
|
}
|
||||||
|
|
||||||
return ref, nil
|
return ref, nil
|
||||||
|
@ -119,7 +121,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede
|
||||||
client := resty.New()
|
client := resty.New()
|
||||||
|
|
||||||
if !common.Contains(regCfg.URLs, upstreamURL) {
|
if !common.Contains(regCfg.URLs, upstreamURL) {
|
||||||
return nil, errors.ErrSyncInvalidUpstreamURL
|
return nil, zerr.ErrSyncInvalidUpstreamURL
|
||||||
}
|
}
|
||||||
|
|
||||||
registryURL, err := url.Parse(upstreamURL)
|
registryURL, err := url.Parse(upstreamURL)
|
||||||
|
@ -227,7 +229,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
|
||||||
if resp.IsError() {
|
if resp.IsError() {
|
||||||
log.Info().Msgf("couldn't find cosign blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
|
log.Info().Msgf("couldn't find cosign blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
|
||||||
|
|
||||||
return errors.ErrBadBlobDigest
|
return zerr.ErrBadBlobDigest
|
||||||
}
|
}
|
||||||
|
|
||||||
defer resp.RawBody().Close()
|
defer resp.RawBody().Close()
|
||||||
|
@ -256,7 +258,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont
|
||||||
if resp.IsError() {
|
if resp.IsError() {
|
||||||
log.Info().Msgf("couldn't find cosign config blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
|
log.Info().Msgf("couldn't find cosign config blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode())
|
||||||
|
|
||||||
return errors.ErrBadBlobDigest
|
return zerr.ErrBadBlobDigest
|
||||||
}
|
}
|
||||||
|
|
||||||
defer resp.RawBody().Close()
|
defer resp.RawBody().Close()
|
||||||
|
@ -360,7 +362,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont
|
||||||
log.Info().Msgf("couldn't find notary blob from %s, status code: %d",
|
log.Info().Msgf("couldn't find notary blob from %s, status code: %d",
|
||||||
getBlobURL.String(), resp.StatusCode())
|
getBlobURL.String(), resp.StatusCode())
|
||||||
|
|
||||||
return errors.ErrBadBlobDigest
|
return zerr.ErrBadBlobDigest
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), blob.Digest.String())
|
_, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), blob.Digest.String())
|
||||||
|
@ -407,17 +409,17 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle
|
||||||
|
|
||||||
digests, ok := resp.Header()["Docker-Content-Digest"]
|
digests, ok := resp.Header()["Docker-Content-Digest"]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Error().Err(errors.ErrBadBlobDigest).Str("url", getManifestURL.String()).
|
log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()).
|
||||||
Msgf("couldn't get digest for manifest: %s:%s", repo, tag)
|
Msgf("couldn't get digest for manifest: %s:%s", repo, tag)
|
||||||
|
|
||||||
return errors.ErrBadBlobDigest
|
return zerr.ErrBadBlobDigest
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(digests) != 1 {
|
if len(digests) != 1 {
|
||||||
log.Error().Err(errors.ErrBadBlobDigest).Str("url", getManifestURL.String()).
|
log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()).
|
||||||
Msgf("multiple digests found for: %s:%s", repo, tag)
|
Msgf("multiple digests found for: %s:%s", repo, tag)
|
||||||
|
|
||||||
return errors.ErrBadBlobDigest
|
return zerr.ErrBadBlobDigest
|
||||||
}
|
}
|
||||||
|
|
||||||
err = syncNotarySignature(client, storeController, *regURL, repo, digests[0], log)
|
err = syncNotarySignature(client, storeController, *regURL, repo, digests[0], log)
|
||||||
|
@ -573,3 +575,34 @@ func getLocalImageRef(localCachePath, repo, tag string) (types.ImageReference, e
|
||||||
|
|
||||||
return localImageRef, nil
|
return localImageRef, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// canSkipImage returns whether or not the image can be skipped from syncing.
|
||||||
|
func canSkipImage(repo, tag string, upstreamRef types.ImageReference,
|
||||||
|
imageStore storage.ImageStore, upstreamCtx *types.SystemContext, log log.Logger) (bool, error) {
|
||||||
|
// filter already pulled images
|
||||||
|
_, localImageDigest, _, err := imageStore.GetImageManifest(repo, tag)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrManifestNotFound) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error().Err(err).Msgf("couldn't get local image %s:%s manifest", repo, tag)
|
||||||
|
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
upstreamImageDigest, err := docker.GetDigest(context.Background(), upstreamCtx, upstreamRef)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamRef.DockerReference())
|
||||||
|
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if localImageDigest == string(upstreamImageDigest) {
|
||||||
|
log.Info().Msgf("skipping syncing %s:%s, image already synced", repo, tag)
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue