mirror of
https://github.com/project-zot/zot.git
synced 2025-03-18 02:22:53 -05:00
sync: for a prefix, allow multiple registries as a list instead of only one, closes #343
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
45fe129c63
commit
a0e65379c8
11 changed files with 384 additions and 216 deletions
|
@ -48,4 +48,5 @@ var (
|
|||
ErrMethodNotSupported = errors.New("storage: method not supported")
|
||||
ErrInvalidMetric = errors.New("metrics: invalid metric func")
|
||||
ErrInjected = errors.New("test: injected failure")
|
||||
ErrSyncInvalidUpstreamURL = errors.New("sync: upstream url not found in sync config")
|
||||
)
|
||||
|
|
|
@ -383,7 +383,7 @@ Configure each registry sync:
|
|||
|
||||
```
|
||||
"registries": [{
|
||||
"url": "https://registry1:5000",
|
||||
"urls": ["https://registry1:5000"],
|
||||
"onDemand": false, # pull any image which the local registry doesn't have
|
||||
"pollInterval": "6h", # polling interval, if not set then periodically polling will not run
|
||||
"tlsVerify": true, # whether or not to verify tls
|
||||
|
@ -405,7 +405,7 @@ Configure each registry sync:
|
|||
]
|
||||
},
|
||||
{
|
||||
"url": "https://registry2:5000",
|
||||
"urls": ["https://registry2:5000", "https://registry3:5000"], // specify multiple URLs in case first encounters an error
|
||||
"pollInterval": "12h",
|
||||
"tlsVerify": false,
|
||||
"onDemand": false,
|
||||
|
@ -419,7 +419,7 @@ Configure each registry sync:
|
|||
]
|
||||
},
|
||||
{
|
||||
"url": "https://docker.io/library",
|
||||
"urls": ["https://docker.io/library"],
|
||||
"onDemand": true, # doesn't have content, don't periodically pull, pull just on demand.
|
||||
"tlsVerify": true
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
"sync": {
|
||||
"credentialsFile": "./examples/sync-auth-filepath.json",
|
||||
"registries": [{
|
||||
"url": "https://registry1:5000",
|
||||
"urls": ["https://registry1:5000"],
|
||||
"onDemand": false,
|
||||
"pollInterval": "6h",
|
||||
"tlsVerify": true,
|
||||
|
@ -33,7 +33,7 @@
|
|||
]
|
||||
},
|
||||
{
|
||||
"url": "https://registry2:5000",
|
||||
"urls": ["https://registry2:5000", "https://registry3:5000"],
|
||||
"pollInterval": "12h",
|
||||
"tlsVerify": false,
|
||||
"onDemand": false,
|
||||
|
@ -47,7 +47,7 @@
|
|||
]
|
||||
},
|
||||
{
|
||||
"url": "https://docker.io/library",
|
||||
"urls": ["https://docker.io/library"],
|
||||
"onDemand": true,
|
||||
"tlsVerify": true
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
glob "github.com/bmatcuk/doublestar/v4"
|
||||
"github.com/gorilla/mux"
|
||||
"zotregistry.io/zot/pkg/api/config"
|
||||
"zotregistry.io/zot/pkg/common"
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
)
|
||||
|
||||
|
@ -52,12 +53,12 @@ func (ac *AccessController) getReadGlobPatterns(username string) map[string]bool
|
|||
|
||||
for pattern, policyGroup := range ac.Config.Repositories {
|
||||
// check default policy
|
||||
if contains(policyGroup.DefaultPolicy, READ) {
|
||||
if common.Contains(policyGroup.DefaultPolicy, READ) {
|
||||
globPatterns[pattern] = true
|
||||
}
|
||||
// check user based policy
|
||||
for _, p := range policyGroup.Policies {
|
||||
if contains(p.Users, username) && contains(p.Actions, READ) {
|
||||
if common.Contains(p.Users, username) && common.Contains(p.Actions, READ) {
|
||||
globPatterns[pattern] = true
|
||||
}
|
||||
}
|
||||
|
@ -94,7 +95,7 @@ func (ac *AccessController) can(username, action, repository string) bool {
|
|||
|
||||
// check admins based policy
|
||||
if !can {
|
||||
if ac.isAdmin(username) && contains(ac.Config.AdminPolicy.Actions, action) {
|
||||
if ac.isAdmin(username) && common.Contains(ac.Config.AdminPolicy.Actions, action) {
|
||||
can = true
|
||||
}
|
||||
}
|
||||
|
@ -104,7 +105,7 @@ func (ac *AccessController) can(username, action, repository string) bool {
|
|||
|
||||
// isAdmin .
|
||||
func (ac *AccessController) isAdmin(username string) bool {
|
||||
return contains(ac.Config.AdminPolicy.Users, username)
|
||||
return common.Contains(ac.Config.AdminPolicy.Users, username)
|
||||
}
|
||||
|
||||
// getContext builds ac context(allowed to read repos and if user is admin) and returns it.
|
||||
|
@ -128,7 +129,7 @@ func isPermitted(username, action string, policyGroup config.PolicyGroup) bool {
|
|||
var result bool
|
||||
// check repo/system based policies
|
||||
for _, p := range policyGroup.Policies {
|
||||
if contains(p.Users, username) && contains(p.Actions, action) {
|
||||
if common.Contains(p.Users, username) && common.Contains(p.Actions, action) {
|
||||
result = true
|
||||
|
||||
break
|
||||
|
@ -137,7 +138,7 @@ func isPermitted(username, action string, policyGroup config.PolicyGroup) bool {
|
|||
|
||||
// check defaultPolicy
|
||||
if !result {
|
||||
if contains(policyGroup.DefaultPolicy, action) {
|
||||
if common.Contains(policyGroup.DefaultPolicy, action) {
|
||||
result = true
|
||||
}
|
||||
}
|
||||
|
@ -145,16 +146,6 @@ func isPermitted(username, action string, policyGroup config.PolicyGroup) bool {
|
|||
return result
|
||||
}
|
||||
|
||||
func contains(slice []string, item string) bool {
|
||||
for _, v := range slice {
|
||||
if item == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// returns either a user has or not rights on 'repository'.
|
||||
func matchesRepo(globPatterns map[string]bool, repository string) bool {
|
||||
var longestMatchedPattern string
|
||||
|
@ -212,7 +203,7 @@ func AuthzHandler(ctlr *Controller) mux.MiddlewareFunc {
|
|||
is := ctlr.StoreController.GetImageStore(resource)
|
||||
tags, err := is.GetImageTags(resource)
|
||||
// if repo exists and request's tag doesn't exist yet then action is UPDATE
|
||||
if err == nil && contains(tags, reference) && reference != "latest" {
|
||||
if err == nil && common.Contains(tags, reference) && reference != "latest" {
|
||||
action = UPDATE
|
||||
}
|
||||
}
|
||||
|
|
|
@ -179,7 +179,7 @@ func TestVerify(t *testing.T) {
|
|||
content := []byte(`{"storage":{"rootDirectory":"/tmp/zot", "storageDriver": {"name": "s3"}},
|
||||
"http":{"address":"127.0.0.1","port":"8080","realm":"zot",
|
||||
"auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}},
|
||||
"extensions":{"sync": {"registries": [{"url":"localhost:9999"}]}}}`)
|
||||
"extensions":{"sync": {"registries": [{"urls":["localhost:9999"]}]}}}`)
|
||||
_, err = tmpfile.Write(content)
|
||||
So(err, ShouldBeNil)
|
||||
err = tmpfile.Close()
|
||||
|
@ -195,7 +195,7 @@ func TestVerify(t *testing.T) {
|
|||
content := []byte(`{"storage":{"rootDirectory":"/tmp/zot"},
|
||||
"http":{"address":"127.0.0.1","port":"8080","realm":"zot",
|
||||
"auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}},
|
||||
"extensions":{"sync": {"registries": [{"url":"localhost:9999"}]}}}`)
|
||||
"extensions":{"sync": {"registries": [{"urls":["localhost:9999"]}]}}}`)
|
||||
_, err = tmpfile.Write(content)
|
||||
So(err, ShouldBeNil)
|
||||
err = tmpfile.Close()
|
||||
|
@ -211,7 +211,7 @@ func TestVerify(t *testing.T) {
|
|||
content := []byte(`{"storage":{"rootDirectory":"/tmp/zot"},
|
||||
"http":{"address":"127.0.0.1","port":"8080","realm":"zot",
|
||||
"auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}},
|
||||
"extensions":{"sync": {"registries": [{"url":"localhost:9999",
|
||||
"extensions":{"sync": {"registries": [{"urls":["localhost:9999"],
|
||||
"content": [{"prefix":"[repo%^&"}]}]}}}`)
|
||||
_, err = tmpfile.Write(content)
|
||||
So(err, ShouldBeNil)
|
||||
|
@ -244,7 +244,7 @@ func TestVerify(t *testing.T) {
|
|||
content := []byte(`{"storage":{"rootDirectory":"/tmp/zot"},
|
||||
"http":{"address":"127.0.0.1","port":"8080","realm":"zot",
|
||||
"auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}},
|
||||
"extensions":{"sync": {"registries": [{"url":"localhost:9999",
|
||||
"extensions":{"sync": {"registries": [{"urls":["localhost:9999"],
|
||||
"content": [{"prefix":"repo**"}]}]}}}`)
|
||||
_, err = tmpfile.Write(content)
|
||||
So(err, ShouldBeNil)
|
||||
|
|
11
pkg/common/common.go
Normal file
11
pkg/common/common.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package common
|
||||
|
||||
func Contains(slice []string, item string) bool {
|
||||
for _, v := range slice {
|
||||
if item == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/containers/common/pkg/retry"
|
||||
"github.com/containers/image/v5/copy"
|
||||
|
@ -55,7 +54,7 @@ func OneImage(cfg Config, storeController storage.StoreController,
|
|||
for _, registryCfg := range cfg.Registries {
|
||||
regCfg := registryCfg
|
||||
if !regCfg.OnDemand {
|
||||
log.Info().Msgf("skipping syncing on demand from %s, onDemand flag is false", regCfg.URL)
|
||||
log.Info().Msgf("skipping syncing on demand from %v, onDemand flag is false", regCfg.URLs)
|
||||
|
||||
continue
|
||||
}
|
||||
|
@ -64,107 +63,106 @@ func OneImage(cfg Config, storeController storage.StoreController,
|
|||
if len(regCfg.Content) != 0 {
|
||||
repos := filterRepos([]string{repo}, regCfg.Content, log)
|
||||
if len(repos) == 0 {
|
||||
log.Info().Msgf("skipping syncing on demand %s from %s registry because it's filtered out by content config",
|
||||
repo, regCfg.URL)
|
||||
log.Info().Msgf("skipping syncing on demand %s from %v registry because it's filtered out by content config",
|
||||
repo, regCfg.URLs)
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
registryConfig := regCfg
|
||||
log.Info().Msgf("syncing on demand with %s", registryConfig.URL)
|
||||
log.Info().Msgf("syncing on demand with %v", registryConfig.URLs)
|
||||
|
||||
upstreamRegistryName := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
|
||||
for _, upstreamURL := range regCfg.URLs {
|
||||
regCfgURL := upstreamURL
|
||||
upstreamAddr := StripRegistryTransport(upstreamURL)
|
||||
upstreamCtx := getUpstreamContext(®istryConfig, credentialsFile[upstreamAddr])
|
||||
|
||||
upstreamCtx := getUpstreamContext(®istryConfig, credentialsFile[upstreamRegistryName])
|
||||
|
||||
upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamRegistryName, repo))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error parsing repository reference %s/%s", upstreamRegistryName, repo)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
upstreamTaggedRef, err := reference.WithTag(upstreamRepoRef, tag)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error creating a reference for repository %s and tag %q",
|
||||
upstreamRepoRef.Name(), tag)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
upstreamRef, err := docker.NewReference(upstreamTaggedRef)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error creating docker reference for repository %s and tag %q",
|
||||
upstreamRepoRef.Name(), tag)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
imageName := strings.Replace(upstreamTaggedRef.Name(), upstreamRegistryName, "", 1)
|
||||
|
||||
localRepo := path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid.String(), imageName)
|
||||
|
||||
if err = os.MkdirAll(localRepo, storage.DefaultDirPerms); err != nil {
|
||||
log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
defer os.RemoveAll(path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid.String()))
|
||||
|
||||
localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag)
|
||||
|
||||
localRef, err := layout.ParseReference(localTaggedRepo)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("cannot obtain a valid image reference for reference %q", localRepo)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msgf("copying image %s:%s to %s", upstreamTaggedRef.Name(),
|
||||
upstreamTaggedRef.Tag(), localRepo)
|
||||
|
||||
options := getCopyOptions(upstreamCtx, localCtx)
|
||||
|
||||
retryOptions := &retry.RetryOptions{
|
||||
MaxRetry: maxRetries,
|
||||
}
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
_, copyErr = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options)
|
||||
|
||||
return copyErr
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msgf("error while copying image %s to %s",
|
||||
upstreamRef.DockerReference().Name(), localTaggedRepo)
|
||||
} else {
|
||||
log.Info().Msgf("successfully synced %s", upstreamRef.DockerReference().Name())
|
||||
|
||||
err := pushSyncedLocalImage(repo, tag, uuid.String(), storeController, log)
|
||||
upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamAddr, repo))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
|
||||
localTaggedRepo)
|
||||
log.Error().Err(err).Msgf("error parsing repository reference %s/%s", upstreamAddr, repo)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
httpClient, err := getHTTPClient(®Cfg, credentialsFile[upstreamRegistryName], log)
|
||||
upstreamTaggedRef, err := reference.WithTag(upstreamRepoRef, tag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
err = syncSignatures(httpClient, storeController, regCfg.URL, imageName, upstreamTaggedRef.Tag(), log)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msgf("Couldn't copy image signature %s", upstreamRef.DockerReference().Name())
|
||||
log.Error().Err(err).Msgf("error creating a reference for repository %s and tag %q",
|
||||
upstreamRepoRef.Name(), tag)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
upstreamRef, err := docker.NewReference(upstreamTaggedRef)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error creating docker reference for repository %s and tag %q",
|
||||
upstreamRepoRef.Name(), tag)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
localRepo := path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid.String(), repo)
|
||||
|
||||
if err = os.MkdirAll(localRepo, storage.DefaultDirPerms); err != nil {
|
||||
log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
defer os.RemoveAll(path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid.String()))
|
||||
|
||||
localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag)
|
||||
|
||||
localRef, err := layout.ParseReference(localTaggedRepo)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("cannot obtain a valid image reference for reference %q", localRepo)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msgf("copying image %s:%s to %s", upstreamTaggedRef.Name(),
|
||||
upstreamTaggedRef.Tag(), localRepo)
|
||||
|
||||
options := getCopyOptions(upstreamCtx, localCtx)
|
||||
|
||||
retryOptions := &retry.RetryOptions{
|
||||
MaxRetry: maxRetries,
|
||||
}
|
||||
|
||||
copyErr = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
_, copyErr = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options)
|
||||
|
||||
return copyErr
|
||||
}, retryOptions)
|
||||
if copyErr != nil {
|
||||
log.Error().Err(copyErr).Msgf("error while copying image %s to %s",
|
||||
upstreamRef.DockerReference().Name(), localTaggedRepo)
|
||||
} else {
|
||||
err := pushSyncedLocalImage(repo, tag, uuid.String(), storeController, log)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
|
||||
localTaggedRepo)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msgf("successfully synced %s", upstreamRef.DockerReference().Name())
|
||||
|
||||
httpClient, err := getHTTPClient(®Cfg, upstreamURL, credentialsFile[upstreamAddr], log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if copyErr = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
copyErr = syncSignatures(httpClient, storeController, regCfgURL, repo, tag, log)
|
||||
|
||||
return copyErr
|
||||
}, retryOptions); copyErr != nil {
|
||||
log.Error().Err(err).Msgf("Couldn't copy image signature %s", upstreamRef.DockerReference().Name())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ type Config struct {
|
|||
}
|
||||
|
||||
type RegistryConfig struct {
|
||||
URL string
|
||||
URLs []string
|
||||
PollInterval time.Duration
|
||||
Content []Content
|
||||
TLSVerify *bool
|
||||
|
@ -72,10 +72,10 @@ type Tags struct {
|
|||
}
|
||||
|
||||
// getUpstreamCatalog gets all repos from a registry.
|
||||
func getUpstreamCatalog(client *resty.Client, regCfg *RegistryConfig, log log.Logger) (catalog, error) {
|
||||
func getUpstreamCatalog(client *resty.Client, upstreamURL string, log log.Logger) (catalog, error) {
|
||||
var c catalog
|
||||
|
||||
registryCatalogURL := fmt.Sprintf("%s%s", regCfg.URL, "/v2/_catalog")
|
||||
registryCatalogURL := fmt.Sprintf("%s%s", upstreamURL, "/v2/_catalog")
|
||||
|
||||
resp, err := client.R().SetHeader("Content-Type", "application/json").Get(registryCatalogURL)
|
||||
if err != nil {
|
||||
|
@ -279,10 +279,10 @@ func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types.
|
|||
return upstreamCtx
|
||||
}
|
||||
|
||||
func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController,
|
||||
log log.Logger, localCtx *types.SystemContext,
|
||||
policyCtx *signature.PolicyContext, credentials Credentials, uuid string) error {
|
||||
log.Info().Msgf("syncing registry: %s", regCfg.URL)
|
||||
func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController storage.StoreController,
|
||||
localCtx *types.SystemContext, policyCtx *signature.PolicyContext, credentials Credentials,
|
||||
uuid string, log log.Logger) error {
|
||||
log.Info().Msgf("syncing registry: %s", upstreamURL)
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -298,13 +298,13 @@ func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController
|
|||
|
||||
var catalog catalog
|
||||
|
||||
httpClient, err := getHTTPClient(®Cfg, credentials, log)
|
||||
httpClient, err := getHTTPClient(®Cfg, upstreamURL, credentials, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
catalog, err = getUpstreamCatalog(httpClient, ®Cfg, log)
|
||||
catalog, err = getUpstreamCatalog(httpClient, upstreamURL, log)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
|
@ -313,8 +313,6 @@ func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController
|
|||
return err
|
||||
}
|
||||
|
||||
upstreamRegistryName := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
|
||||
|
||||
log.Info().Msgf("filtering %d repos based on sync prefixes", len(catalog.Repositories))
|
||||
|
||||
repos := filterRepos(catalog.Repositories, regCfg.Content, log)
|
||||
|
@ -323,12 +321,14 @@ func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController
|
|||
|
||||
var images []types.ImageReference
|
||||
|
||||
upstreamAddr := StripRegistryTransport(upstreamURL)
|
||||
|
||||
for contentID, repos := range repos {
|
||||
r := repos
|
||||
id := contentID
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
refs, err := imagesToCopyFromUpstream(upstreamRegistryName, r, upstreamCtx, regCfg.Content[id], log)
|
||||
refs, err := imagesToCopyFromUpstream(upstreamAddr, r, upstreamCtx, regCfg.Content[id], log)
|
||||
images = append(images, refs...)
|
||||
|
||||
return err
|
||||
|
@ -348,7 +348,7 @@ func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController
|
|||
for _, ref := range images {
|
||||
upstreamRef := ref
|
||||
|
||||
imageName := strings.Replace(upstreamRef.DockerReference().Name(), upstreamRegistryName, "", 1)
|
||||
imageName := strings.Replace(upstreamRef.DockerReference().Name(), upstreamAddr, "", 1)
|
||||
imageName = strings.TrimPrefix(imageName, "/")
|
||||
|
||||
imageStore := storeController.GetImageStore(imageName)
|
||||
|
@ -399,7 +399,7 @@ func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController
|
|||
}
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
err = syncSignatures(httpClient, storeController, regCfg.URL, imageName, upstreamTaggedRef.Tag(), log)
|
||||
err = syncSignatures(httpClient, storeController, upstreamURL, imageName, upstreamTaggedRef.Tag(), log)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
|
@ -409,7 +409,7 @@ func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController
|
|||
}
|
||||
}
|
||||
|
||||
log.Info().Msgf("finished syncing %s", regCfg.URL)
|
||||
log.Info().Msgf("finished syncing %s", upstreamAddr)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -466,14 +466,14 @@ func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.Wait
|
|||
for _, regCfg := range cfg.Registries {
|
||||
// if content not provided, don't run periodically sync
|
||||
if len(regCfg.Content) == 0 {
|
||||
logger.Info().Msgf("sync config content not configured for %s, will not run periodically sync", regCfg.URL)
|
||||
logger.Info().Msgf("sync config content not configured for %v, will not run periodically sync", regCfg.URLs)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// if pollInterval is not provided, don't run periodically sync
|
||||
if regCfg.PollInterval == 0 {
|
||||
logger.Warn().Msgf("sync config PollInterval not configured for %s, will not run periodically sync", regCfg.URL)
|
||||
logger.Warn().Msgf("sync config PollInterval not configured for %v, will not run periodically sync", regCfg.URLs)
|
||||
|
||||
continue
|
||||
}
|
||||
|
@ -483,8 +483,6 @@ func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.Wait
|
|||
// fork a new zerolog child to avoid data race
|
||||
tlogger := log.Logger{Logger: logger.With().Caller().Timestamp().Logger()}
|
||||
|
||||
upstreamRegistry := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
|
||||
|
||||
// schedule each registry sync
|
||||
go func(regCfg RegistryConfig, logger log.Logger) {
|
||||
// run on intervals
|
||||
|
@ -492,10 +490,17 @@ func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.Wait
|
|||
// increment reference since will be busy, so shutdown has to wait
|
||||
wtgrp.Add(1)
|
||||
|
||||
if err := syncRegistry(regCfg, storeController, logger, localCtx, policyCtx,
|
||||
credentialsFile[upstreamRegistry], uuid.String()); err != nil {
|
||||
logger.Error().Err(err).Msg("sync exited with error, stopping it...")
|
||||
ticker.Stop()
|
||||
for _, upstreamURL := range regCfg.URLs {
|
||||
upstreamAddr := StripRegistryTransport(upstreamURL)
|
||||
// first try syncing main registry
|
||||
if err := syncRegistry(regCfg, upstreamURL, storeController, localCtx, policyCtx,
|
||||
credentialsFile[upstreamAddr], uuid.String(), logger); err != nil {
|
||||
logger.Error().Err(err).Str("registry", upstreamURL).
|
||||
Msg("sync exited with error, falling back to auxiliary registries")
|
||||
} else {
|
||||
// if success fall back to main registry
|
||||
break
|
||||
}
|
||||
}
|
||||
// mark as done after a single sync run
|
||||
wtgrp.Done()
|
||||
|
|
|
@ -96,7 +96,7 @@ func TestSyncInternal(t *testing.T) {
|
|||
Prefix: testImage,
|
||||
},
|
||||
},
|
||||
URL: baseURL,
|
||||
URLs: []string{baseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -119,14 +119,20 @@ func TestSyncInternal(t *testing.T) {
|
|||
Prefix: testImage,
|
||||
},
|
||||
},
|
||||
URL: BaseURL,
|
||||
URLs: []string{BaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "/tmp/missing_certs/a/b/c/d/z",
|
||||
}
|
||||
|
||||
_, err := getUpstreamCatalog(resty.New(), &syncRegistryConfig, log.NewLogger("debug", ""))
|
||||
port := GetFreePort()
|
||||
baseURL := GetBaseURL(port)
|
||||
|
||||
httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
So(httpClient, ShouldBeNil)
|
||||
// _, err = getUpstreamCatalog(httpClient, baseURL, log.NewLogger("debug", ""))
|
||||
// So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("Test getHttpClient() with bad certs", t, func() {
|
||||
|
@ -153,28 +159,42 @@ func TestSyncInternal(t *testing.T) {
|
|||
Prefix: testImage,
|
||||
},
|
||||
},
|
||||
URL: baseURL,
|
||||
URLs: []string{baseURL, "invalidUrl]"},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: badCertsDir,
|
||||
}
|
||||
|
||||
_, err = getHTTPClient(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", ""))
|
||||
httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
syncRegistryConfig.CertDir = "/path/to/invalid/cert"
|
||||
So(httpClient, ShouldBeNil)
|
||||
|
||||
_, err = getHTTPClient(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", ""))
|
||||
syncRegistryConfig.CertDir = "/path/to/invalid/cert"
|
||||
httpClient, err = getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
So(httpClient, ShouldBeNil)
|
||||
|
||||
syncRegistryConfig.CertDir = ""
|
||||
syncRegistryConfig.URL = baseSecureURL
|
||||
syncRegistryConfig.URLs = []string{baseSecureURL}
|
||||
|
||||
_, err = getHTTPClient(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", ""))
|
||||
httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", ""))
|
||||
So(err, ShouldBeNil)
|
||||
So(httpClient, ShouldNotBeNil)
|
||||
|
||||
syncRegistryConfig.URL = BaseURL
|
||||
_, err = getHTTPClient(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", ""))
|
||||
_, err = getUpstreamCatalog(httpClient, baseURL, log.NewLogger("debug", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, err = getUpstreamCatalog(httpClient, "http://invalid:5000", log.NewLogger("debug", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
syncRegistryConfig.URLs = []string{BaseURL}
|
||||
httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
So(httpClient, ShouldBeNil)
|
||||
|
||||
httpClient, err = getHTTPClient(&syncRegistryConfig, "invalidUrl]", Credentials{}, log.NewLogger("debug", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
So(httpClient, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Test imagesToCopyFromUpstream()", t, func() {
|
||||
|
|
|
@ -241,7 +241,7 @@ func startDownstreamServer(secure bool, syncConfig *sync.Config) (*api.Controlle
|
|||
return dctlr, destBaseURL, destDir, client
|
||||
}
|
||||
|
||||
func TestSyncOnDemand(t *testing.T) {
|
||||
func TestOnDemand(t *testing.T) {
|
||||
Convey("Verify sync on demand feature", t, func() {
|
||||
sctlr, srcBaseURL, srcDir, _, srcClient := startUpstreamServer(false, false)
|
||||
defer os.RemoveAll(srcDir)
|
||||
|
@ -265,7 +265,7 @@ func TestSyncOnDemand(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: true,
|
||||
|
@ -369,7 +369,7 @@ func TestSyncOnDemand(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSync(t *testing.T) {
|
||||
func TestPeriodically(t *testing.T) {
|
||||
Convey("Verify sync feature", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
|
@ -394,7 +394,7 @@ func TestSync(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -466,7 +466,7 @@ func TestSync(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -533,7 +533,7 @@ func TestSync(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncPermsDenied(t *testing.T) {
|
||||
func TestPermsDenied(t *testing.T) {
|
||||
Convey("Verify sync feature without perm on sync cache", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
|
@ -558,7 +558,7 @@ func TestSyncPermsDenied(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -590,7 +590,7 @@ func TestSyncPermsDenied(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncBadTLS(t *testing.T) {
|
||||
func TestBadTLS(t *testing.T) {
|
||||
Convey("Verify sync TLS feature", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
|
@ -615,7 +615,7 @@ func TestSyncBadTLS(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
OnDemand: true,
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
|
@ -647,7 +647,7 @@ func TestSyncBadTLS(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncTLS(t *testing.T) {
|
||||
func TestTLS(t *testing.T) {
|
||||
Convey("Verify sync TLS feature", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("1h")
|
||||
|
||||
|
@ -710,7 +710,7 @@ func TestSyncTLS(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: destClientCertDir,
|
||||
|
@ -748,7 +748,7 @@ func TestSyncTLS(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncBasicAuth(t *testing.T) {
|
||||
func TestBasicAuth(t *testing.T) {
|
||||
Convey("Verify sync basic auth", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("1h")
|
||||
|
||||
|
@ -761,7 +761,7 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
}()
|
||||
|
||||
Convey("Verify sync basic auth with file credentials", func() {
|
||||
registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1)
|
||||
registryName := sync.StripRegistryTransport(srcBaseURL)
|
||||
credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`, registryName))
|
||||
|
||||
var tlsVerify bool
|
||||
|
@ -772,7 +772,7 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
Prefix: testImage,
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -850,7 +850,7 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
regex := ".*"
|
||||
var semver bool
|
||||
|
||||
registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1)
|
||||
registryName := sync.StripRegistryTransport(srcBaseURL)
|
||||
|
||||
credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "invalid"}}`,
|
||||
registryName))
|
||||
|
@ -867,7 +867,7 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -911,7 +911,7 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Verify sync basic auth with bad file credentials", func() {
|
||||
registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1)
|
||||
registryName := sync.StripRegistryTransport(srcBaseURL)
|
||||
|
||||
credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`,
|
||||
registryName))
|
||||
|
@ -938,7 +938,7 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -964,21 +964,21 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
})
|
||||
|
||||
Convey("Verify on demand sync with basic auth", func() {
|
||||
registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1)
|
||||
registryName := sync.StripRegistryTransport(srcBaseURL)
|
||||
credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`, registryName))
|
||||
|
||||
syncRegistryConfig := sync.RegistryConfig{
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
OnDemand: true,
|
||||
}
|
||||
|
||||
unreacheableSyncRegistryConfig1 := sync.RegistryConfig{
|
||||
URL: "localhost:999999",
|
||||
URLs: []string{"localhost:9999"},
|
||||
OnDemand: true,
|
||||
}
|
||||
|
||||
unreacheableSyncRegistryConfig2 := sync.RegistryConfig{
|
||||
URL: "localhost:999999",
|
||||
URLs: []string{"localhost:9999"},
|
||||
OnDemand: false,
|
||||
}
|
||||
|
||||
|
@ -1034,6 +1034,10 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "inexistent")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
|
||||
resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/tags/list")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -1049,7 +1053,7 @@ func TestSyncBasicAuth(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncBadURL(t *testing.T) {
|
||||
func TestBadURL(t *testing.T) {
|
||||
Convey("Verify sync with bad url", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("1h")
|
||||
|
||||
|
@ -1067,7 +1071,7 @@ func TestSyncBadURL(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: "bad-registry-url",
|
||||
URLs: []string{"bad-registry-url"},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -1089,7 +1093,7 @@ func TestSyncBadURL(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncNoImagesByRegex(t *testing.T) {
|
||||
func TestNoImagesByRegex(t *testing.T) {
|
||||
Convey("Verify sync with no images on source based on regex", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("1h")
|
||||
|
||||
|
@ -1112,7 +1116,7 @@ func TestSyncNoImagesByRegex(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
PollInterval: updateDuration,
|
||||
CertDir: "",
|
||||
|
@ -1146,7 +1150,7 @@ func TestSyncNoImagesByRegex(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncInvalidRegex(t *testing.T) {
|
||||
func TestInvalidRegex(t *testing.T) {
|
||||
Convey("Verify sync with invalid regex", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("1h")
|
||||
|
||||
|
@ -1169,7 +1173,7 @@ func TestSyncInvalidRegex(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
PollInterval: updateDuration,
|
||||
CertDir: "",
|
||||
|
@ -1187,7 +1191,7 @@ func TestSyncInvalidRegex(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncNotSemver(t *testing.T) {
|
||||
func TestNotSemver(t *testing.T) {
|
||||
Convey("Verify sync feature semver compliant", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
|
@ -1225,7 +1229,7 @@ func TestSyncNotSemver(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -1263,7 +1267,7 @@ func TestSyncNotSemver(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncInvalidCerts(t *testing.T) {
|
||||
func TestInvalidCerts(t *testing.T) {
|
||||
Convey("Verify sync with bad certs", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("1h")
|
||||
|
||||
|
@ -1319,7 +1323,7 @@ func TestSyncInvalidCerts(t *testing.T) {
|
|||
Prefix: "",
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: clientCertDir,
|
||||
|
@ -1355,7 +1359,7 @@ func makeCredentialsFile(fileContent string) string {
|
|||
return tmpfile.Name()
|
||||
}
|
||||
|
||||
func TestSyncInvalidUrl(t *testing.T) {
|
||||
func TestInvalidUrl(t *testing.T) {
|
||||
Convey("Verify sync invalid url", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
regex := ".*"
|
||||
|
@ -1373,7 +1377,7 @@ func TestSyncInvalidUrl(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: "http://invalid.invalid/invalid/",
|
||||
URLs: []string{"http://invalid.invalid/invalid/"},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -1395,7 +1399,7 @@ func TestSyncInvalidUrl(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncInvalidTags(t *testing.T) {
|
||||
func TestInvalidTags(t *testing.T) {
|
||||
Convey("Verify sync invalid tags", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
|
@ -1421,7 +1425,7 @@ func TestSyncInvalidTags(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -1445,7 +1449,7 @@ func TestSyncInvalidTags(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncSubPaths(t *testing.T) {
|
||||
func TestSubPaths(t *testing.T) {
|
||||
Convey("Verify sync with storage subPaths", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
|
@ -1507,7 +1511,7 @@ func TestSyncSubPaths(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -1615,7 +1619,7 @@ func TestSyncOnDemandRepoErr(t *testing.T) {
|
|||
Prefix: testImage,
|
||||
},
|
||||
},
|
||||
URL: "docker://invalid",
|
||||
URLs: []string{"docker://invalid"},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: true,
|
||||
|
@ -1636,7 +1640,7 @@ func TestSyncOnDemandRepoErr(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncOnDemandContentFiltering(t *testing.T) {
|
||||
func TestOnDemandContentFiltering(t *testing.T) {
|
||||
Convey("Verify sync on demand feature", t, func() {
|
||||
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
||||
defer os.RemoveAll(srcDir)
|
||||
|
@ -1661,7 +1665,7 @@ func TestSyncOnDemandContentFiltering(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: true,
|
||||
|
@ -1697,7 +1701,7 @@ func TestSyncOnDemandContentFiltering(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: true,
|
||||
|
@ -1719,7 +1723,7 @@ func TestSyncOnDemandContentFiltering(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSyncConfigRules(t *testing.T) {
|
||||
func TestConfigRules(t *testing.T) {
|
||||
Convey("Verify sync config rules", t, func() {
|
||||
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false)
|
||||
defer os.RemoveAll(srcDir)
|
||||
|
@ -1743,7 +1747,7 @@ func TestSyncConfigRules(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: false,
|
||||
|
@ -1770,7 +1774,7 @@ func TestSyncConfigRules(t *testing.T) {
|
|||
|
||||
syncRegistryConfig := sync.RegistryConfig{
|
||||
PollInterval: updateDuration,
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: false,
|
||||
|
@ -1794,7 +1798,7 @@ func TestSyncConfigRules(t *testing.T) {
|
|||
var tlsVerify bool
|
||||
|
||||
syncRegistryConfig := sync.RegistryConfig{
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: false,
|
||||
|
@ -1816,6 +1820,80 @@ func TestSyncConfigRules(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestMultipleURLs(t *testing.T) {
|
||||
Convey("Verify sync feature", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
sctlr, srcBaseURL, srcDir, _, srcClient := startUpstreamServer(false, false)
|
||||
defer os.RemoveAll(srcDir)
|
||||
|
||||
defer func() {
|
||||
sctlr.Shutdown()
|
||||
}()
|
||||
|
||||
regex := ".*"
|
||||
semver := true
|
||||
var tlsVerify bool
|
||||
|
||||
syncRegistryConfig := sync.RegistryConfig{
|
||||
Content: []sync.Content{
|
||||
{
|
||||
Prefix: testImage,
|
||||
Tags: &sync.Tags{
|
||||
Regex: ®ex,
|
||||
Semver: &semver,
|
||||
},
|
||||
},
|
||||
},
|
||||
URLs: []string{"badURL", "http://invalid.invalid/invalid/", srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
}
|
||||
|
||||
syncConfig := &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}}
|
||||
|
||||
dc, destBaseURL, destDir, destClient := startDownstreamServer(false, syncConfig)
|
||||
defer os.RemoveAll(destDir)
|
||||
|
||||
defer func() {
|
||||
dc.Shutdown()
|
||||
}()
|
||||
|
||||
var srcTagsList TagsList
|
||||
var destTagsList TagsList
|
||||
|
||||
resp, _ := srcClient.R().Get(srcBaseURL + "/v2/" + testImage + "/tags/list")
|
||||
So(resp, ShouldNotBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
err := json.Unmarshal(resp.Body(), &srcTagsList)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for {
|
||||
resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/tags/list")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(resp.Body(), &destTagsList)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(destTagsList.Tags) > 0 {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
|
||||
So(destTagsList, ShouldResemble, srcTagsList)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncSignatures(t *testing.T) {
|
||||
Convey("Verify sync signatures", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
@ -1861,7 +1939,7 @@ func TestSyncSignatures(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -1909,11 +1987,14 @@ func TestSyncSignatures(t *testing.T) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// notation verify the image
|
||||
image := fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0")
|
||||
cmd := exec.Command("notation", "verify", "--cert", "good", "--plain-http", image)
|
||||
out, err := cmd.CombinedOutput()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
msg := string(out)
|
||||
So(msg, ShouldNotBeEmpty)
|
||||
So(strings.Contains(msg, "verification failure"), ShouldBeFalse)
|
||||
|
@ -1970,9 +2051,10 @@ func TestSyncSignatures(t *testing.T) {
|
|||
// panic(err)
|
||||
// }
|
||||
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
for _, blob := range nm.Blobs {
|
||||
srcBlobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex())
|
||||
|
@ -1986,9 +2068,14 @@ func TestSyncSignatures(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||
// // remove already synced image
|
||||
// err = os.RemoveAll(path.Join(destDir, repoName))
|
||||
// So(err, ShouldBeNil)
|
||||
|
||||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
// clean
|
||||
for _, blob := range nm.Blobs {
|
||||
|
@ -2018,9 +2105,14 @@ func TestSyncSignatures(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||
// remove already synced image
|
||||
err = os.RemoveAll(path.Join(destDir, repoName))
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
|
||||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
for _, blob := range cm.Layers {
|
||||
srcBlobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex())
|
||||
|
@ -2028,15 +2120,19 @@ func TestSyncSignatures(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
|
||||
destBlobPath := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex())
|
||||
err = os.Remove(destBlobPath)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = os.MkdirAll(destBlobPath, 0o000)
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||
// remove already synced image
|
||||
err = os.RemoveAll(path.Join(destDir, repoName))
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
|
||||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
for _, blob := range cm.Layers {
|
||||
destBlobPath := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex())
|
||||
|
@ -2052,23 +2148,33 @@ func TestSyncSignatures(t *testing.T) {
|
|||
err = os.Chmod(srcConfigBlobPath, 0o000)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||
// remove already synced image
|
||||
err = os.RemoveAll(path.Join(destDir, repoName))
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
|
||||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
err = os.Chmod(srcConfigBlobPath, 0o755)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
destConfigBlobPath := path.Join(destDir, repoName, "blobs", string(cm.Config.Digest.Algorithm()),
|
||||
cm.Config.Digest.Hex())
|
||||
err = os.Remove(destConfigBlobPath)
|
||||
So(err, ShouldBeNil)
|
||||
// err = os.Remove(destConfigBlobPath)
|
||||
// So(err, ShouldBeNil)
|
||||
err = os.MkdirAll(destConfigBlobPath, 0o000)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
|
||||
// remove already synced image
|
||||
err = os.RemoveAll(path.Join(destDir, repoName))
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
|
||||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2097,7 +2203,7 @@ func TestSyncError(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
|
@ -2154,7 +2260,7 @@ func TestSyncSignaturesOnDemand(t *testing.T) {
|
|||
var tlsVerify bool
|
||||
|
||||
syncRegistryConfig := sync.RegistryConfig{
|
||||
URL: srcBaseURL,
|
||||
URLs: []string{srcBaseURL},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: true,
|
||||
|
@ -2214,7 +2320,8 @@ func TestSyncSignaturesOnDemand(t *testing.T) {
|
|||
err = json.Unmarshal(mResp.Body(), &cm)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// trigger error on config blob
|
||||
// trigger errors on cosign blobs
|
||||
// trigger error on cosign config blob
|
||||
srcConfigBlobPath := path.Join(srcDir, repoName, "blobs", string(cm.Config.Digest.Algorithm()),
|
||||
cm.Config.Digest.Hex())
|
||||
err = os.Chmod(srcConfigBlobPath, 0o000)
|
||||
|
@ -2227,7 +2334,29 @@ func TestSyncSignaturesOnDemand(t *testing.T) {
|
|||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 404)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
// trigger error on cosign layer blob
|
||||
srcSignatureBlobPath := path.Join(srcDir, repoName, "blobs", string(cm.Layers[0].Digest.Algorithm()),
|
||||
cm.Layers[0].Digest.Hex())
|
||||
|
||||
err = os.Chmod(srcConfigBlobPath, 0o755)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = os.Chmod(srcSignatureBlobPath, 0o000)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// remove already synced image
|
||||
err = os.RemoveAll(path.Join(destDir, repoName))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// sync on demand
|
||||
resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, 200)
|
||||
|
||||
err = os.Chmod(srcSignatureBlobPath, 0o755)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
|
||||
"gopkg.in/resty.v1"
|
||||
"zotregistry.io/zot/errors"
|
||||
"zotregistry.io/zot/pkg/common"
|
||||
"zotregistry.io/zot/pkg/extensions/monitoring"
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
|
@ -101,12 +102,17 @@ func getFileCredentials(filepath string) (CredentialsFile, error) {
|
|||
return creds, nil
|
||||
}
|
||||
|
||||
func getHTTPClient(regCfg *RegistryConfig, credentials Credentials, log log.Logger) (*resty.Client, error) {
|
||||
func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Credentials,
|
||||
log log.Logger) (*resty.Client, error) {
|
||||
client := resty.New()
|
||||
|
||||
registryURL, err := url.Parse(regCfg.URL)
|
||||
if !common.Contains(regCfg.URLs, upstreamURL) {
|
||||
return nil, errors.ErrSyncInvalidUpstreamURL
|
||||
}
|
||||
|
||||
registryURL, err := url.Parse(upstreamURL)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("url", regCfg.URL).Msg("couldn't parse url")
|
||||
log.Error().Err(err).Str("url", upstreamURL).Msg("couldn't parse url")
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
@ -413,6 +419,7 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle
|
|||
return nil
|
||||
}
|
||||
|
||||
// copy from temporary oci repository to local registry.
|
||||
func pushSyncedLocalImage(repo, tag, uuid string,
|
||||
storeController storage.StoreController, log log.Logger) error {
|
||||
log.Info().Msgf("pushing synced local image %s:%s to local registry", repo, tag)
|
||||
|
@ -497,3 +504,9 @@ func isCosignTag(tag string) bool {
|
|||
|
||||
return false
|
||||
}
|
||||
|
||||
// sync needs transport to be stripped to not be wrongly interpreted as an image reference
|
||||
// at a non-fully qualified registry (hostname as image and port as tag).
|
||||
func StripRegistryTransport(url string) string {
|
||||
return strings.Replace(strings.Replace(url, "http://", "", 1), "https://", "", 1)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue