diff --git a/examples/README.md b/examples/README.md index 738ca485..e2f6c4a3 100644 --- a/examples/README.md +++ b/examples/README.md @@ -24,6 +24,7 @@ Examples of working configurations for various use cases are available [here](.. * [Identity-based Authorization](#identity-based-authorization) * [Logging](#logging) * [Metrics](#metrics) +* [Sync](#sync) ## Network @@ -336,3 +337,62 @@ For more details see https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/c +## Sync + +Enable and configure sync with: + +``` + "sync": { +``` + +Configure credentials for upstream registries: + +``` + "credentialsFile": "./examples/sync-auth-filepath.json", +``` + +Configure each registry sync: + +``` + "registries": [{ + "url": "https://registry1:5000", + "onDemand": false, # pull any image which the local registry doesn't have + "pollInterval": "6h", # polling interval + "tlsVerify": true, # wheather or not to verify tls + "certDir": "/home/user/certs", # use certificates at certDir path, if not specified then use the default certs dir + "content":[ # which content to periodically pull + { + "prefix":"/repo1/repo", # pull all images under /repo1/repo + "tags":{ # filter by tags + "regex":"4.*", # filter tags by regex + "semver":true # filter tags by semver compliance + } + }, + { + "prefix":"/repo2/repo" # pull all images under /repo2/repo + } + ] + }, + { + "url": "https://registry2:5000", + "pollInterval": "12h", + "tlsVerify": false, + "onDemand": false, + "content":[ + { + "prefix":"/repo2", + "tags":{ + "semver":true + } + } + ] + }, + { + "url": "https://docker.io/library", + "onDemand": true, # doesn't have content, don't periodically pull, pull just on demand. + "tlsVerify": true + } + ] + } +``` + diff --git a/examples/config-sync.json b/examples/config-sync.json index 3a3a3a3c..ce60a1fb 100644 --- a/examples/config-sync.json +++ b/examples/config-sync.json @@ -5,7 +5,7 @@ }, "http":{ "address":"127.0.0.1", - "port":"5000" + "port":"8080" }, "log":{ "level":"debug" diff --git a/examples/sync-auth-filepath.json b/examples/sync-auth-filepath.json index e146d41a..c0b4790a 100644 --- a/examples/sync-auth-filepath.json +++ b/examples/sync-auth-filepath.json @@ -1,5 +1,5 @@ { - "localhost:8080": { + "127.0.0.1:8080": { "username": "user", "password": "pass" }, diff --git a/pkg/api/controller.go b/pkg/api/controller.go index d4576b0e..4716a29c 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -137,6 +137,10 @@ func (c *Controller) Run() error { // Enable extensions if extension config is provided if c.Config != nil && c.Config.Extensions != nil { ext.EnableExtensions(c.Config, c.Log, c.Config.Storage.RootDirectory) + + if c.Config.Extensions.Sync != nil { + ext.EnableSyncExtension(c.Config, c.Log, c.StoreController) + } } } else { // we can't proceed without global storage diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 5c50a7e2..34c45037 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1263,30 +1263,24 @@ func getImageManifest(rh *RouteHandler, is storage.ImageStore, name, case errors.ErrRepoNotFound: if rh.c.Config.Extensions != nil && rh.c.Config.Extensions.Sync != nil { rh.c.Log.Info().Msgf("image not found, trying to get image %s:%s by syncing on demand", name, reference) - ok, errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, name, reference) - switch ok { - case true: + errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, rh.c.StoreController, name, reference) + if errSync != nil { + rh.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", name, reference) + } else { content, digest, mediaType, err = is.GetImageManifest(name, reference) - case false && errSync == nil: - rh.c.Log.Info().Msgf("couldn't find image %s:%s in sync registries", name, reference) - case false && errSync != nil: - rh.c.Log.Err(err).Msgf("error encounter while syncing image %s:%s", name, reference) } } case errors.ErrManifestNotFound: if rh.c.Config.Extensions != nil && rh.c.Config.Extensions.Sync != nil { rh.c.Log.Info().Msgf("manifest not found, trying to get image %s:%s by syncing on demand", name, reference) - ok, errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, name, reference) - switch ok { - case true: + errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, rh.c.StoreController, name, reference) + if errSync != nil { + rh.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", name, reference) + } else { content, digest, mediaType, err = is.GetImageManifest(name, reference) - case false && errSync == nil: - rh.c.Log.Info().Msgf("couldn't find image %s:%s in sync registries", name, reference) - case false && errSync != nil: - rh.c.Log.Err(err).Msgf("error encounter while syncing image %s:%s", name, reference) } } default: diff --git a/pkg/cli/root.go b/pkg/cli/root.go index ae8c8e28..238fecf7 100644 --- a/pkg/cli/root.go +++ b/pkg/cli/root.go @@ -174,12 +174,18 @@ func LoadConfiguration(config *config.Config, configPath string) { } } - // enforce s3 driver in case of using storage driver if len(config.Storage.StorageDriver) != 0 { + // enforce s3 driver in case of using storage driver if config.Storage.StorageDriver["name"] != storage.S3StorageDriverName { log.Error().Err(errors.ErrBadConfig).Msgf("unsupported storage driver: %s", config.Storage.StorageDriver["name"]) panic(errors.ErrBadConfig) } + + // enforce filesystem storage in case sync feature is enabled + if config.Extensions != nil && config.Extensions.Sync != nil { + log.Error().Err(errors.ErrBadConfig).Msg("sync supports only filesystem storage") + panic(errors.ErrBadConfig) + } } // enforce s3 driver on subpaths in case of using storage driver diff --git a/pkg/cli/root_test.go b/pkg/cli/root_test.go index a7fac07f..9f995e48 100644 --- a/pkg/cli/root_test.go +++ b/pkg/cli/root_test.go @@ -118,6 +118,22 @@ func TestVerify(t *testing.T) { So(func() { _ = cli.NewRootCmd().Execute() }, ShouldNotPanic) }) + Convey("Test verify w/ sync and w/o filesystem storage", t, func(c C) { + tmpfile, err := ioutil.TempFile("", "zot-test*.json") + So(err, ShouldBeNil) + defer os.Remove(tmpfile.Name()) // clean up + content := []byte(`{"storage":{"rootDirectory":"/tmp/zot", "storageDriver": {"name": "s3"}}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [{"url":"localhost:9999"}]}}}`) + _, err = tmpfile.Write(content) + So(err, ShouldBeNil) + err = tmpfile.Close() + So(err, ShouldBeNil) + os.Args = []string{"cli_test", "verify", tmpfile.Name()} + So(func() { _ = cli.NewRootCmd().Execute() }, ShouldPanic) + }) + Convey("Test verify good config", t, func(c C) { tmpfile, err := ioutil.TempFile("", "zot-test*.json") So(err, ShouldBeNil) diff --git a/pkg/extensions/extensions.go b/pkg/extensions/extensions.go index ad484f6e..69c64842 100644 --- a/pkg/extensions/extensions.go +++ b/pkg/extensions/extensions.go @@ -54,36 +54,6 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) { log.Info().Msg("CVE config not provided, skipping CVE update") } - if config.Extensions.Sync != nil { - defaultPollInterval, _ := time.ParseDuration("1h") - for id, registryCfg := range config.Extensions.Sync.Registries { - if registryCfg.PollInterval < defaultPollInterval { - config.Extensions.Sync.Registries[id].PollInterval = defaultPollInterval - - log.Warn().Msg("Sync registries interval set to too-short interval <= 1h, changing update duration to 1 hour and continuing.") // nolint: lll - } - } - - var serverCert string - - var serverKey string - - var CACert string - - if config.HTTP.TLS != nil { - serverCert = config.HTTP.TLS.Cert - serverKey = config.HTTP.TLS.Key - CACert = config.HTTP.TLS.CACert - } - - if err := sync.Run(*config.Extensions.Sync, log, config.HTTP.Address, - config.HTTP.Port, serverCert, serverKey, CACert); err != nil { - log.Error().Err(err).Msg("Error encountered while setting up syncing") - } - } else { - log.Info().Msg("Sync registries config not provided, skipping sync") - } - if config.Extensions.Metrics != nil && config.Extensions.Metrics.Enable && config.Extensions.Metrics.Prometheus != nil { @@ -97,9 +67,31 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) { } } +// EnableSyncExtension enables sync extension. +func EnableSyncExtension(config *config.Config, log log.Logger, storeController storage.StoreController) { + if config.Extensions.Sync != nil { + defaultPollInterval, _ := time.ParseDuration("1h") + for id, registryCfg := range config.Extensions.Sync.Registries { + if registryCfg.PollInterval < defaultPollInterval { + config.Extensions.Sync.Registries[id].PollInterval = defaultPollInterval + + log.Warn().Msg("Sync registries interval set to too-short interval < 1h, changing update duration to 1 hour and continuing.") // nolint: lll + } + } + + if err := sync.Run(*config.Extensions.Sync, storeController, log); err != nil { + log.Error().Err(err).Msg("Error encountered while setting up syncing") + } + } else { + log.Info().Msg("Sync registries config not provided, skipping sync") + } +} + // SetupRoutes ... func SetupRoutes(config *config.Config, router *mux.Router, storeController storage.StoreController, - log log.Logger) { + l log.Logger) { + // fork a new zerolog child to avoid data race + log := log.Logger{Logger: l.With().Caller().Timestamp().Logger()} log.Info().Msg("setting up extensions routes") if config.Extensions.Search != nil && config.Extensions.Search.Enable { @@ -115,27 +107,11 @@ func SetupRoutes(config *config.Config, router *mux.Router, storeController stor Handler(gqlHandler.NewDefaultServer(search.NewExecutableSchema(resConfig))) } - var serverCert string - - var serverKey string - - var CACert string - - if config.HTTP.TLS != nil { - serverCert = config.HTTP.TLS.Cert - serverKey = config.HTTP.TLS.Key - CACert = config.HTTP.TLS.CACert - } - if config.Extensions.Sync != nil { postSyncer := sync.PostHandler{ - Address: config.HTTP.Address, - Port: config.HTTP.Port, - ServerCert: serverCert, - ServerKey: serverKey, - CACert: CACert, - Cfg: *config.Extensions.Sync, - Log: log, + Cfg: *config.Extensions.Sync, + Log: log, + StoreController: storeController, } router.HandleFunc("/sync", postSyncer.Handler).Methods("POST") @@ -148,23 +124,11 @@ func SetupRoutes(config *config.Config, router *mux.Router, storeController stor } // SyncOneImage syncs one image. -func SyncOneImage(config *config.Config, log log.Logger, repoName, reference string) (bool, error) { +func SyncOneImage(config *config.Config, log log.Logger, + storeController storage.StoreController, repoName, reference string) error { log.Info().Msgf("syncing image %s:%s", repoName, reference) - var serverCert string + err := sync.OneImage(*config.Extensions.Sync, log, storeController, repoName, reference) - var serverKey string - - var CACert string - - if config.HTTP.TLS != nil { - serverCert = config.HTTP.TLS.Cert - serverKey = config.HTTP.TLS.Key - CACert = config.HTTP.TLS.CACert - } - - ok, err := sync.OneImage(*config.Extensions.Sync, log, config.HTTP.Address, config.HTTP.Port, - serverCert, serverKey, CACert, repoName, reference) - - return ok, err + return err } diff --git a/pkg/extensions/minimal.go b/pkg/extensions/minimal.go index d212206a..89c778ae 100644 --- a/pkg/extensions/minimal.go +++ b/pkg/extensions/minimal.go @@ -22,15 +22,22 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) { "any extensions, please build zot full binary for this feature") } +// EnableSyncExtension ... +func EnableSyncExtension(config *config.Config, log log.Logger, storeController storage.StoreController) { + log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't support any extensions," + + "please build zot full binary for this feature") +} + // SetupRoutes ... func SetupRoutes(conf *config.Config, router *mux.Router, storeController storage.StoreController, log log.Logger) { log.Warn().Msg("skipping setting up extensions routes because given zot binary doesn't support " + "any extensions, please build zot full binary for this feature") } -// SyncOneImage... -func SyncOneImage(config *config.Config, log log.Logger, repoName, reference string) (bool, error) { - log.Warn().Msg("skipping syncing on demand because given zot binary doesn't support " + - "any extensions, please build zot full binary for this feature") - return false, nil +// SyncOneImage ... +func SyncOneImage(config *config.Config, log log.Logger, storeController storage.StoreController, + repoName, reference string) error { + log.Warn().Msg("skipping syncing on demand because given zot binary doesn't support any extensions," + + "please build zot full binary for this feature") + return nil } diff --git a/pkg/extensions/sync/http_handler.go b/pkg/extensions/sync/http_handler.go index 39b88215..787b209a 100644 --- a/pkg/extensions/sync/http_handler.go +++ b/pkg/extensions/sync/http_handler.go @@ -6,20 +6,32 @@ import ( "strings" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" + guuid "github.com/gofrs/uuid" ) type PostHandler struct { - Address string - Port string - ServerCert string - ServerKey string - CACert string - Cfg Config - Log log.Logger + StoreController storage.StoreController + Cfg Config + Log log.Logger } func (h *PostHandler) Handler(w http.ResponseWriter, r *http.Request) { - upstreamCtx, policyCtx, err := getLocalContexts(h.ServerCert, h.ServerKey, h.CACert, h.Log) + var credentialsFile CredentialsFile + + var err error + + if h.Cfg.CredentialsFile != "" { + credentialsFile, err = getFileCredentials(h.Cfg.CredentialsFile) + if err != nil { + h.Log.Error().Err(err).Msgf("sync http handler: couldn't get registry credentials from %s", h.Cfg.CredentialsFile) + WriteData(w, http.StatusInternalServerError, err.Error()) + + return + } + } + + localCtx, policyCtx, err := getLocalContexts(h.Log) if err != nil { WriteData(w, http.StatusInternalServerError, err.Error()) @@ -28,25 +40,22 @@ func (h *PostHandler) Handler(w http.ResponseWriter, r *http.Request) { defer policyCtx.Destroy() //nolint: errcheck - var credentialsFile CredentialsFile + uuid, err := guuid.NewV4() + if err != nil { + WriteData(w, http.StatusInternalServerError, err.Error()) - if h.Cfg.CredentialsFile != "" { - credentialsFile, err = getFileCredentials(h.Cfg.CredentialsFile) - if err != nil { - h.Log.Error().Err(err).Msgf("couldn't get registry credentials from %s", h.Cfg.CredentialsFile) - WriteData(w, http.StatusInternalServerError, err.Error()) - } + return } - localRegistryName := strings.Replace(fmt.Sprintf("%s:%s", h.Address, h.Port), "0.0.0.0", "127.0.0.1", 1) - for _, regCfg := range h.Cfg.Registries { upstreamRegistryName := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1) - if err := syncRegistry(regCfg, h.Log, localRegistryName, upstreamCtx, policyCtx, - credentialsFile[upstreamRegistryName]); err != nil { - h.Log.Err(err).Msg("error while syncing") + if err := syncRegistry(regCfg, h.StoreController, h.Log, localCtx, policyCtx, + credentialsFile[upstreamRegistryName], uuid.String()); err != nil { + h.Log.Err(err).Msg("sync http handler: error while syncing in") WriteData(w, http.StatusInternalServerError, err.Error()) + + return } } @@ -56,5 +65,5 @@ func (h *PostHandler) Handler(w http.ResponseWriter, r *http.Request) { func WriteData(w http.ResponseWriter, status int, msg string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) - _, _ = w.Write([]byte(msg)) + _, _ = w.Write([]byte(fmt.Sprintf("error: %s", msg))) } diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index b06595b2..edda06d5 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -3,35 +3,47 @@ package sync import ( "context" "fmt" + "os" + "path" "strings" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/copy" "github.com/containers/image/v5/docker" "github.com/containers/image/v5/docker/reference" + "github.com/containers/image/v5/oci/layout" + guuid "github.com/gofrs/uuid" ) func OneImage(cfg Config, log log.Logger, - address, port, serverCert, serverKey, caCert, repoName, tag string) (bool, error) { - localCtx, policyCtx, err := getLocalContexts(serverCert, serverKey, caCert, log) - if err != nil { - return false, err - } - - localRegistryName := strings.Replace(fmt.Sprintf("%s:%s", address, port), "0.0.0.0", "127.0.0.1", 1) - + storeController storage.StoreController, repo, tag string) error { var credentialsFile CredentialsFile if cfg.CredentialsFile != "" { + var err error + credentialsFile, err = getFileCredentials(cfg.CredentialsFile) if err != nil { log.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile) - return false, err + return err } } - var synced bool + localCtx, policyCtx, err := getLocalContexts(log) + if err != nil { + return err + } + + imageStore := storeController.GetImageStore(repo) + + var copyErr error + + uuid, err := guuid.NewV4() + if err != nil { + return err + } for _, regCfg := range cfg.Registries { if !regCfg.OnDemand { @@ -46,25 +58,45 @@ func OneImage(cfg Config, log log.Logger, upstreamCtx := getUpstreamContext(®istryConfig, credentialsFile[upstreamRegistryName]) - upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamRegistryName, repoName)) + upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamRegistryName, repo)) + if err != nil { + log.Error().Err(err).Msgf("error parsing repository reference %s/%s", upstreamRegistryName, repo) + return err + } upstreamTaggedRef, err := reference.WithTag(upstreamRepoRef, tag) if err != nil { - log.Err(err).Msgf("error creating a reference for repository %s and tag %q", upstreamRepoRef.Name(), tag) - return synced, err + log.Error().Err(err).Msgf("error creating a reference for repository %s and tag %q", + upstreamRepoRef.Name(), tag) + return err } upstreamRef, err := docker.NewReference(upstreamTaggedRef) - ref := strings.Replace(upstreamRef.DockerReference().String(), upstreamRegistryName, "", 1) - - localRef, err := docker.Transport.ParseReference( - fmt.Sprintf("//%s%s", localRegistryName, ref), - ) if err != nil { - return synced, err + log.Error().Err(err).Msgf("error creating docker reference for repository %s and tag %q", + upstreamRepoRef.Name(), tag) + return err } - log.Info().Msgf("copying image %s to %s", upstreamRef.DockerReference().Name(), localRef.DockerReference().Name()) + imageName := strings.Replace(upstreamTaggedRef.Name(), upstreamRegistryName, "", 1) + + localRepo := path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid.String(), imageName) + + if err = os.MkdirAll(localRepo, 0755); err != nil { + log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir") + return err + } + + localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag) + + localRef, err := layout.ParseReference(localTaggedRepo) + if err != nil { + log.Error().Err(err).Msgf("cannot obtain a valid image reference for reference %q", localRepo) + return err + } + + log.Info().Msgf("copying image %s:%s to %s", upstreamTaggedRef.Name(), + upstreamTaggedRef.Tag(), localRepo) options := getCopyOptions(upstreamCtx, localCtx) @@ -73,18 +105,24 @@ func OneImage(cfg Config, log log.Logger, } if err = retry.RetryIfNecessary(context.Background(), func() error { - _, err = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options) + _, copyErr = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options) return err - }, retryOptions); err != nil { - log.Error().Err(err).Msgf("error while copying image %s to %s", - upstreamRef.DockerReference().Name(), localRef.DockerReference().Name()) + }, retryOptions); copyErr != nil { + log.Error().Err(copyErr).Msgf("error while copying image %s to %s", + upstreamRef.DockerReference().Name(), localTaggedRepo) } else { log.Info().Msgf("successfully synced %s", upstreamRef.DockerReference().Name()) - synced = true - return synced, nil + err := pushSyncedLocalImage(repo, tag, uuid.String(), storeController, log) + if err != nil { + log.Error().Err(err).Msgf("error while pushing synced cached image %s", + localTaggedRepo) + return err + } + + return nil } } - return synced, nil + return copyErr } diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 1c68cdf4..178b46eb 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -6,8 +6,10 @@ import ( "crypto/x509" "encoding/json" "fmt" + "io" "io/ioutil" "os" + "path" "regexp" "strings" "time" @@ -15,19 +17,23 @@ import ( "github.com/Masterminds/semver" "github.com/anuvu/zot/errors" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/copy" "github.com/containers/image/v5/docker" "github.com/containers/image/v5/docker/reference" + "github.com/containers/image/v5/oci/layout" "github.com/containers/image/v5/signature" "github.com/containers/image/v5/types" + guuid "github.com/gofrs/uuid" ispec "github.com/opencontainers/image-spec/specs-go/v1" "gopkg.in/resty.v1" ) const ( - maxRetries = 3 - delay = 5 * time.Minute + maxRetries = 3 + delay = 5 * time.Minute + SyncBlobUploadDir = ".sync" ) // /v2/_catalog struct. @@ -76,12 +82,13 @@ func getUpstreamCatalog(regCfg *RegistryConfig, credentials Credentials, log log if regCfg.CertDir != "" { log.Debug().Msgf("sync: using certs directory: %s", regCfg.CertDir) - clientCert := fmt.Sprintf("%s/client.cert", regCfg.CertDir) - clientKey := fmt.Sprintf("%s/client.key", regCfg.CertDir) - caCertPath := fmt.Sprintf("%s/ca.crt", regCfg.CertDir) + clientCert := path.Join(regCfg.CertDir, "client.cert") + clientKey := path.Join(regCfg.CertDir, "client.key") + caCertPath := path.Join(regCfg.CertDir, "ca.crt") caCert, err := ioutil.ReadFile(caCertPath) if err != nil { + log.Error().Err(err).Msg("couldn't read CA certificate") return c, err } @@ -92,6 +99,7 @@ func getUpstreamCatalog(regCfg *RegistryConfig, credentials Credentials, log log cert, err := tls.LoadX509KeyPair(clientCert, clientKey) if err != nil { + log.Error().Err(err).Msg("couldn't read certificates key pairs") return c, err } @@ -140,7 +148,7 @@ func getImageTags(ctx context.Context, sysCtx *types.SystemContext, repoRef refe return tags, nil } -// filterImagesByTagRegex filters images by tag regex give in the config. +// filterImagesByTagRegex filters images by tag regex given in the config. func filterImagesByTagRegex(upstreamReferences *[]types.ImageReference, content Content, log log.Logger) error { refs := *upstreamReferences @@ -208,18 +216,20 @@ func filterImagesBySemver(upstreamReferences *[]types.ImageReference, content Co } // imagesToCopyFromRepos lists all images given a registry name and its repos. -func imagesToCopyFromUpstream(registryName string, repos []string, sourceCtx *types.SystemContext, +func imagesToCopyFromUpstream(registryName string, repos []string, upstreamCtx *types.SystemContext, content Content, log log.Logger) ([]types.ImageReference, error) { var upstreamReferences []types.ImageReference for _, repoName := range repos { repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryName, repoName)) if err != nil { + log.Error().Err(err).Msgf("couldn't parse repository reference: %s", repoRef) return nil, err } - tags, err := getImageTags(context.Background(), sourceCtx, repoRef) + tags, err := getImageTags(context.Background(), upstreamCtx, repoRef) if err != nil { + log.Error().Err(err).Msgf("couldn't fetch tags for %s", repoRef) return nil, err } @@ -260,6 +270,7 @@ func getCopyOptions(upstreamCtx, localCtx *types.SystemContext) copy.Options { options := copy.Options{ DestinationCtx: localCtx, SourceCtx: upstreamCtx, + ReportWriter: io.Discard, // force only oci manifest MIME type ForceManifestMIMEType: ispec.MediaTypeImageManifest, } @@ -291,8 +302,9 @@ func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types. return upstreamCtx } -func syncRegistry(regCfg RegistryConfig, log log.Logger, localRegistryName string, localCtx *types.SystemContext, - policyCtx *signature.PolicyContext, credentials Credentials) error { +func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController, + log log.Logger, localCtx *types.SystemContext, + policyCtx *signature.PolicyContext, credentials Credentials, uuid string) error { if len(regCfg.Content) == 0 { log.Info().Msgf("no content found for %s, will not run periodically sync", regCfg.URL) return nil @@ -354,23 +366,45 @@ func syncRegistry(regCfg RegistryConfig, log log.Logger, localRegistryName strin for _, ref := range images { upstreamRef := ref - suffix := strings.Replace(ref.DockerReference().String(), upstreamRegistryName, "", 1) + imageName := strings.Replace(upstreamRef.DockerReference().Name(), upstreamRegistryName, "", 1) - localRef, err := docker.Transport.ParseReference( - fmt.Sprintf("//%s%s", localRegistryName, suffix), - ) - if err != nil { + imageStore := storeController.GetImageStore(imageName) + + localRepo := path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid, imageName) + + if err = os.MkdirAll(localRepo, 0755); err != nil { + log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir") return err } - log.Info().Msgf("copying image %s to %s", upstreamRef.DockerReference().Name(), localRef.DockerReference().Name()) + upstreamTaggedRef := getTagFromRef(upstreamRef, log) + + localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, upstreamTaggedRef.Tag()) + + localRef, err := layout.ParseReference(localTaggedRepo) + if err != nil { + log.Error().Err(err).Msgf("Cannot obtain a valid image reference for reference %q", localTaggedRepo) + return err + } + + log.Info().Msgf("copying image %s:%s to %s", upstreamRef.DockerReference().Name(), + upstreamTaggedRef.Tag(), localTaggedRepo) if err = retry.RetryIfNecessary(context.Background(), func() error { _, err = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options) return err }, retryOptions); err != nil { - log.Error().Err(err).Msgf("error while copying image %s to %s", - upstreamRef.DockerReference().Name(), localRef.DockerReference().Name()) + log.Error().Err(err).Msgf("error while copying image %s:%s to %s", + upstreamRef.DockerReference().Name(), upstreamTaggedRef.Tag(), localTaggedRepo) + return err + } + + log.Info().Msgf("successfully synced %s:%s", upstreamRef.DockerReference().Name(), upstreamTaggedRef.Tag()) + + err = pushSyncedLocalImage(imageName, upstreamTaggedRef.Tag(), uuid, storeController, log) + if err != nil { + log.Error().Err(err).Msgf("error while pushing synced cached image %s", + localTaggedRepo) return err } } @@ -380,8 +414,7 @@ func syncRegistry(regCfg RegistryConfig, log log.Logger, localRegistryName strin return nil } -func getLocalContexts(serverCert, serverKey, - caCert string, log log.Logger) (*types.SystemContext, *signature.PolicyContext, error) { +func getLocalContexts(log log.Logger) (*types.SystemContext, *signature.PolicyContext, error) { log.Debug().Msg("getting local context") var policy *signature.Policy @@ -390,78 +423,64 @@ func getLocalContexts(serverCert, serverKey, localCtx := &types.SystemContext{} - if serverCert != "" && serverKey != "" { - certsDir, err := copyLocalCerts(serverCert, serverKey, caCert, log) - if err != nil { - return &types.SystemContext{}, &signature.PolicyContext{}, err - } - - localCtx.DockerDaemonCertPath = certsDir - localCtx.DockerCertPath = certsDir - - policy, err = signature.DefaultPolicy(localCtx) - if err != nil { - return &types.SystemContext{}, &signature.PolicyContext{}, err - } - } else { - localCtx.DockerDaemonInsecureSkipTLSVerify = true - localCtx.DockerInsecureSkipTLSVerify = types.NewOptionalBool(true) - policy = &signature.Policy{Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()}} - } + // accept any image with or without signature + policy = &signature.Policy{Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()}} policyContext, err := signature.NewPolicyContext(policy) if err != nil { + log.Error().Err(err).Msg("couldn't create policy context") return &types.SystemContext{}, &signature.PolicyContext{}, err } return localCtx, policyContext, nil } -func Run(cfg Config, log log.Logger, address, port, serverCert, serverKey, caCert string) error { - localCtx, policyCtx, err := getLocalContexts(serverCert, serverKey, caCert, log) - if err != nil { - return err - } - - localRegistry := strings.Replace(fmt.Sprintf("%s:%s", address, port), "0.0.0.0", "127.0.0.1", 1) - +func Run(cfg Config, storeController storage.StoreController, logger log.Logger) error { var credentialsFile CredentialsFile + var err error + if cfg.CredentialsFile != "" { credentialsFile, err = getFileCredentials(cfg.CredentialsFile) if err != nil { - log.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile) + logger.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile) return err } } + localCtx, policyCtx, err := getLocalContexts(logger) + if err != nil { + return err + } + + uuid, err := guuid.NewV4() + if err != nil { + return err + } + + // for each upstream registry, start a go routine. for _, regCfg := range cfg.Registries { // schedule each registry sync ticker := time.NewTicker(regCfg.PollInterval) + // fork a new zerolog child to avoid data race + l := log.Logger{Logger: logger.With().Caller().Timestamp().Logger()} + upstreamRegistry := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1) - go func(regCfg RegistryConfig) { - defer os.RemoveAll(certsDir) - // run sync first, then run on interval - if err := syncRegistry(regCfg, log, localRegistry, localCtx, policyCtx, - credentialsFile[upstreamRegistry]); err != nil { - log.Err(err).Msg("sync exited with error, stopping it...") - ticker.Stop() - } - + go func(regCfg RegistryConfig, l log.Logger) { // run on intervals - for range ticker.C { - if err := syncRegistry(regCfg, log, localRegistry, localCtx, policyCtx, - credentialsFile[upstreamRegistry]); err != nil { - log.Err(err).Msg("sync exited with error, stopping it...") + for ; true; <-ticker.C { + if err := syncRegistry(regCfg, storeController, l, localCtx, policyCtx, + credentialsFile[upstreamRegistry], uuid.String()); err != nil { + l.Error().Err(err).Msg("sync exited with error, stopping it...") ticker.Stop() } } - }(regCfg) + }(regCfg, l) } - log.Info().Msg("finished setting up sync") + logger.Info().Msg("finished setting up sync") return nil } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 0839f6a7..925fa38f 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -2,16 +2,25 @@ package sync import ( "context" + "encoding/json" "fmt" + "io" "io/ioutil" + "os" + "path" "testing" "time" "github.com/anuvu/zot/errors" + "github.com/anuvu/zot/pkg/extensions/monitoring" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" "github.com/containers/image/v5/docker" "github.com/containers/image/v5/docker/reference" "github.com/containers/image/v5/types" + godigest "github.com/opencontainers/go-digest" + ispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/rs/zerolog" . "github.com/smartystreets/goconvey/convey" ) @@ -27,8 +36,53 @@ const ( host = "127.0.0.1:45117" ) +func copyFiles(sourceDir string, destDir string) error { + sourceMeta, err := os.Stat(sourceDir) + if err != nil { + return err + } + + if err := os.MkdirAll(destDir, sourceMeta.Mode()); err != nil { + return err + } + + files, err := ioutil.ReadDir(sourceDir) + if err != nil { + return err + } + + for _, file := range files { + sourceFilePath := path.Join(sourceDir, file.Name()) + destFilePath := path.Join(destDir, file.Name()) + + if file.IsDir() { + if err = copyFiles(sourceFilePath, destFilePath); err != nil { + return err + } + } else { + sourceFile, err := os.Open(sourceFilePath) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.Create(destFilePath) + if err != nil { + return err + } + defer destFile.Close() + + if _, err = io.Copy(destFile, sourceFile); err != nil { + return err + } + } + } + + return nil +} + func TestSyncInternal(t *testing.T) { - Convey("test parseRepositoryReference func", t, func() { + Convey("Verify parseRepositoryReference func", t, func() { repositoryReference := fmt.Sprintf("%s/%s", host, testImage) ref, err := parseRepositoryReference(repositoryReference) So(err, ShouldBeNil) @@ -64,25 +118,20 @@ func TestSyncInternal(t *testing.T) { srcCtx := &types.SystemContext{} _, err = getImageTags(context.Background(), srcCtx, ref) - So(err, ShouldNotBeNil) - _, _, err = getLocalContexts("inexistent.cert", "inexistent.key", "inexistent.crt", log.NewLogger("", "")) - So(err, ShouldNotBeNil) - - _, _, err = getLocalContexts(ServerCert, "inexistent.key", "inexistent.crt", log.NewLogger("", "")) - So(err, ShouldNotBeNil) - - _, _, err = getLocalContexts(ServerCert, ServerKey, "inexistent.crt", log.NewLogger("", "")) - So(err, ShouldNotBeNil) - - taggedRef, err := reference.WithTag(ref, testImageTag) + taggedRef, err := reference.WithTag(ref, "tag") So(err, ShouldBeNil) + _, err = getImageTags(context.Background(), &types.SystemContext{}, taggedRef) + So(err, ShouldNotBeNil) + dockerRef, err := docker.NewReference(taggedRef) So(err, ShouldBeNil) - So(getTagFromRef(dockerRef, log.NewLogger("", "")), ShouldNotBeNil) + //tag := getTagFromRef(dockerRef, log.NewLogger("", "")) + + So(getTagFromRef(dockerRef, log.NewLogger("debug", "")), ShouldNotBeNil) var tlsVerify bool updateDuration := time.Microsecond @@ -100,10 +149,176 @@ func TestSyncInternal(t *testing.T) { cfg := Config{Registries: []RegistryConfig{syncRegistryConfig}, CredentialsFile: "/invalid/path/to/file"} - So(Run(cfg, log.NewLogger("", ""), - "127.0.0.1", "5000", ServerCert, ServerKey, CACert), ShouldNotBeNil) + So(Run(cfg, storage.StoreController{}, log.NewLogger("debug", "")), ShouldNotBeNil) _, err = getFileCredentials("/invalid/path/to/file") So(err, ShouldNotBeNil) }) + + Convey("Test getUpstreamCatalog() with missing certs", t, func() { + var tlsVerify bool + updateDuration := time.Microsecond + syncRegistryConfig := RegistryConfig{ + Content: []Content{ + { + Prefix: testImage, + }, + }, + URL: BaseURL, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "/tmp/missing_certs/a/b/c/d/z", + } + + _, err := getUpstreamCatalog(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", "")) + So(err, ShouldNotBeNil) + }) + + Convey("Test getUpstreamCatalog() with bad certs", t, func() { + badCertsDir, err := ioutil.TempDir("", "bad_certs*") + if err != nil { + panic(err) + } + + if err := os.WriteFile(path.Join(badCertsDir, "ca.crt"), []byte("certificate"), 0755); err != nil { + panic(err) + } + + defer os.RemoveAll(badCertsDir) + + var tlsVerify bool + updateDuration := time.Microsecond + syncRegistryConfig := RegistryConfig{ + Content: []Content{ + { + Prefix: testImage, + }, + }, + URL: BaseURL, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: badCertsDir, + } + + _, err = getUpstreamCatalog(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", "")) + So(err, ShouldNotBeNil) + }) + + Convey("Test imagesToCopyFromUpstream()", t, func() { + repos := []string{"repo1"} + upstreamCtx := &types.SystemContext{} + + _, err := imagesToCopyFromUpstream("localhost:4566", repos, upstreamCtx, Content{}, log.NewLogger("debug", "")) + So(err, ShouldNotBeNil) + + _, err = imagesToCopyFromUpstream("docker://localhost:4566", repos, upstreamCtx, + Content{}, log.NewLogger("debug", "")) + So(err, ShouldNotBeNil) + }) + + Convey("Verify pushSyncedLocalImage func", t, func() { + storageDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(storageDir) + + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + + imageStore := storage.NewImageStore(storageDir, false, false, log, metrics) + + storeController := storage.StoreController{} + storeController.DefaultStore = imageStore + + err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + So(err, ShouldNotBeNil) + + testRootDir := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir) + //testImagePath := path.Join(testRootDir, testImage) + + err = os.MkdirAll(testRootDir, 0755) + if err != nil { + panic(err) + } + + err = copyFiles("../../../test/data", testRootDir) + if err != nil { + panic(err) + } + + testImageStore := storage.NewImageStore(testRootDir, false, false, log, metrics) + manifestContent, _, _, err := testImageStore.GetImageManifest(testImage, testImageTag) + So(err, ShouldBeNil) + + var manifest ispec.Manifest + + if err := json.Unmarshal(manifestContent, &manifest); err != nil { + panic(err) + } + + if err := os.Chmod(storageDir, 0000); err != nil { + panic(err) + } + + if os.Geteuid() != 0 { + So(func() { + _ = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + }, + ShouldPanic) + } + + if err := os.Chmod(storageDir, 0755); err != nil { + panic(err) + } + + if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256", + manifest.Layers[0].Digest.Hex()), 0000); err != nil { + panic(err) + } + + err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + So(err, ShouldNotBeNil) + + if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256", + manifest.Layers[0].Digest.Hex()), 0755); err != nil { + panic(err) + } + + cachedManifestConfigPath := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir, + testImage, "blobs", "sha256", manifest.Config.Digest.Hex()) + if err := os.Chmod(cachedManifestConfigPath, 0000); err != nil { + panic(err) + } + + err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + So(err, ShouldNotBeNil) + + if err := os.Chmod(cachedManifestConfigPath, 0755); err != nil { + panic(err) + } + + manifestConfigPath := path.Join(imageStore.RootDir(), testImage, "blobs", "sha256", manifest.Config.Digest.Hex()) + if err := os.MkdirAll(manifestConfigPath, 0000); err != nil { + panic(err) + } + + err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + So(err, ShouldNotBeNil) + + if err := os.Remove(manifestConfigPath); err != nil { + panic(err) + } + + mDigest := godigest.FromBytes(manifestContent) + + manifestPath := path.Join(imageStore.RootDir(), testImage, "blobs", mDigest.Algorithm().String(), mDigest.Encoded()) + if err := os.MkdirAll(manifestPath, 0000); err != nil { + panic(err) + } + + err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log) + So(err, ShouldNotBeNil) + }) } diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 8abee1e0..983df1b1 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -186,6 +186,7 @@ func TestSyncOnDemand(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -236,7 +237,8 @@ func TestSyncOnDemand(t *testing.T) { destConfig.Extensions = &extconf.ExtensionConfig{} destConfig.Extensions.Search = nil - destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} + destConfig.Extensions.Sync = &sync.Config{ + Registries: []sync.RegistryConfig{syncRegistryConfig}} dc := api.NewController(destConfig) @@ -250,6 +252,7 @@ func TestSyncOnDemand(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -281,19 +284,64 @@ func TestSyncOnDemand(t *testing.T) { So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 404) + err = os.Chmod(path.Join(destDir, testImage), 0000) + if err != nil { + panic(err) + } + resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) So(err, ShouldBeNil) - So(resp.StatusCode(), ShouldEqual, 200) + So(resp.StatusCode(), ShouldEqual, 500) + + err = os.Chmod(path.Join(destDir, testImage), 0755) + if err != nil { + panic(err) + } resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "1.1.1") So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 404) - resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/tags/list") + err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0000) if err != nil { panic(err) } + resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "1.1.1") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + + resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + + err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0755) + if err != nil { + panic(err) + } + + err = os.MkdirAll(path.Join(destDir, testImage, "blobs"), 0000) + if err != nil { + panic(err) + } + + resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + + err = os.Chmod(path.Join(destDir, testImage, "blobs"), 0755) + if err != nil { + panic(err) + } + + resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/tags/list") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + err = json.Unmarshal(resp.Body(), &destTagsList) if err != nil { panic(err) @@ -341,6 +389,7 @@ func TestSync(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -403,6 +452,7 @@ func TestSync(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -516,6 +566,7 @@ func TestSync(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -570,6 +621,274 @@ func TestSync(t *testing.T) { }) } +func TestSyncPermsDenied(t *testing.T) { + Convey("Verify sync feature without perm on sync cache", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + srcPort := getFreePort() + srcBaseURL := getBaseURL(srcPort, false) + + srcConfig := config.New() + srcConfig.HTTP.Port = srcPort + + srcDir, err := ioutil.TempDir("", "oci-src-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(srcDir) + + err = copyFiles("../../../test/data", srcDir) + if err != nil { + panic(err) + } + + srcConfig.Storage.RootDirectory = srcDir + + sc := api.NewController(srcConfig) + + go func() { + // this blocks + if err := sc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) + }() + + // wait till ready + for { + _, err := resty.R().Get(srcBaseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + destPort := getFreePort() + destBaseURL := getBaseURL(destPort, false) + + destConfig := config.New() + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(destDir) + + destConfig.Storage.RootDirectory = destDir + + regex := ".*" + semver := true + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URL: srcBaseURL, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + } + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc := api.NewController(destConfig) + + go func() { + // this blocks + if err := dc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0000) + if err != nil { + panic(err) + } + + Convey("Test sync on POST request on /sync", func() { + resp, _ := resty.R().Post(destBaseURL + "/sync") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 500) + }) + }) +} + +func TestSyncBadTLS(t *testing.T) { + Convey("Verify sync TLS feature", t, func() { + caCert, err := ioutil.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + client := resty.New() + + client.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool}) + defer func() { client.SetTLSClientConfig(nil) }() + + updateDuration, _ := time.ParseDuration("1h") + + srcPort := getFreePort() + srcBaseURL := getBaseURL(srcPort, true) + + srcConfig := config.New() + srcConfig.HTTP.Port = srcPort + + srcConfig.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + CACert: CACert, + } + + srcDir, err := ioutil.TempDir("", "oci-src-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(srcDir) + + srcConfig.Storage.RootDirectory = srcDir + + sc := api.NewController(srcConfig) + + go func() { + // this blocks + if err := sc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = sc.Server.Shutdown(ctx) + }() + + cert, err := tls.LoadX509KeyPair("../../../test/data/client.cert", "../../../test/data/client.key") + if err != nil { + panic(err) + } + + client.SetCertificates(cert) + // wait till ready + for { + _, err := client.R().Get(srcBaseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + err = copyFiles("../../../test/data", destDir) + if err != nil { + panic(err) + } + + defer os.RemoveAll(destDir) + + destPort := getFreePort() + destBaseURL := getBaseURL(destPort, true) + destConfig := config.New() + destConfig.HTTP.Port = destPort + + destConfig.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + CACert: CACert, + } + + destConfig.Storage.RootDirectory = destDir + + regex := ".*" + var semver bool + tlsVerify := true + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URL: srcBaseURL, + OnDemand: true, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + } + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc := api.NewController(destConfig) + + go func() { + // this blocks + if err := dc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = dc.Server.Shutdown(ctx) + }() + + // give it time to set up sync + time.Sleep(2 * time.Second) + + resp, _ := client.R().Post(destBaseURL + "/sync") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 500) + + resp, _ = client.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "invalid") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + + resp, _ = client.R().Get(destBaseURL + "/v2/" + "invalid" + "/manifests/" + testImageTag) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) +} + func TestSyncTLS(t *testing.T) { Convey("Verify sync TLS feature", t, func() { caCert, err := ioutil.ReadFile(CACert) @@ -625,6 +944,7 @@ func TestSyncTLS(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() cert, err := tls.LoadX509KeyPair("../../../test/data/client.cert", "../../../test/data/client.key") @@ -652,6 +972,7 @@ func TestSyncTLS(t *testing.T) { } destPort := getFreePort() + destBaseURL := getBaseURL(destPort, true) destConfig := config.New() destConfig.HTTP.Port = destPort @@ -670,30 +991,32 @@ func TestSyncTLS(t *testing.T) { destConfig.Storage.RootDirectory = destDir - // copy client certs, use them in sync config - clientCertDir, err := ioutil.TempDir("", "certs") + // copy upstream client certs, use them in sync config + destClientCertDir, err := ioutil.TempDir("", "destCerts") if err != nil { panic(err) } - destFilePath := path.Join(clientCertDir, "ca.crt") + destFilePath := path.Join(destClientCertDir, "ca.crt") err = copyFile(CACert, destFilePath) if err != nil { panic(err) } - destFilePath = path.Join(clientCertDir, "client.cert") + destFilePath = path.Join(destClientCertDir, "client.cert") err = copyFile(ClientCert, destFilePath) if err != nil { panic(err) } - destFilePath = path.Join(clientCertDir, "client.key") + destFilePath = path.Join(destClientCertDir, "client.key") err = copyFile(ClientKey, destFilePath) if err != nil { panic(err) } + defer os.RemoveAll(destClientCertDir) + regex := ".*" var semver bool tlsVerify := true @@ -711,7 +1034,7 @@ func TestSyncTLS(t *testing.T) { URL: srcBaseURL, PollInterval: updateDuration, TLSVerify: &tlsVerify, - CertDir: clientCertDir, + CertDir: destClientCertDir, } destConfig.Extensions = &extconf.ExtensionConfig{} @@ -730,6 +1053,7 @@ func TestSyncTLS(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -752,9 +1076,16 @@ func TestSyncTLS(t *testing.T) { if !found { panic(errSync) } + + Convey("Test sync on POST request on /sync", func() { + resp, _ := client.R().SetBasicAuth("test", "test").Post(destBaseURL + "/sync") + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + }) }) } +// nolint: gocyclo func TestSyncBasicAuth(t *testing.T) { Convey("Verify sync basic auth", t, func() { updateDuration, _ := time.ParseDuration("1h") @@ -800,12 +1131,13 @@ func TestSyncBasicAuth(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready for { _, err := resty.R().Get(srcBaseURL) - t.Logf("err %v", err) + if err == nil { break } @@ -863,6 +1195,7 @@ func TestSyncBasicAuth(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -909,6 +1242,98 @@ func TestSyncBasicAuth(t *testing.T) { } }) + Convey("Verify sync basic auth with wrong file credentials", func() { + destPort := getFreePort() + destBaseURL := getBaseURL(destPort, false) + + destConfig := config.New() + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + destConfig.Storage.SubPaths = map[string]config.StorageConfig{ + "a": { + RootDirectory: destDir, + GC: true, + Dedupe: true, + }, + } + + defer os.RemoveAll(destDir) + + destConfig.Storage.RootDirectory = destDir + + regex := ".*" + var semver bool + + registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1) + + credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "invalid"}}`, + registryName)) + + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URL: srcBaseURL, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = &sync.Config{CredentialsFile: credentialsFile, + Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc := api.NewController(destConfig) + + go func() { + // this blocks + if err := dc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + + Convey("Test sync on POST request on /sync", func() { + resp, _ := resty.R().Post(destBaseURL + "/sync") + So(resp, ShouldNotBeNil) + So(string(resp.Body()), ShouldContainSubstring, "sync: couldn't fetch upstream registry's catalog") + So(resp.StatusCode(), ShouldEqual, 500) + }) + }) + Convey("Verify sync basic auth with bad file credentials", func() { destPort := getFreePort() destBaseURL := getBaseURL(destPort, false) @@ -930,9 +1355,17 @@ func TestSyncBasicAuth(t *testing.T) { registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1) - credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "invalid"}}`, + credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`, registryName)) + err = os.Chmod(credentialsFile, 0000) + So(err, ShouldBeNil) + + defer func() { + So(os.Chmod(credentialsFile, 0755), ShouldBeNil) + So(os.RemoveAll(credentialsFile), ShouldBeNil) + }() + var tlsVerify bool syncRegistryConfig := sync.RegistryConfig{ @@ -979,10 +1412,14 @@ func TestSyncBasicAuth(t *testing.T) { time.Sleep(100 * time.Millisecond) } + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + Convey("Test sync on POST request on /sync", func() { resp, _ := resty.R().Post(destBaseURL + "/sync") So(resp, ShouldNotBeNil) - So(string(resp.Body()), ShouldContainSubstring, "sync: couldn't fetch upstream registry's catalog") + So(string(resp.Body()), ShouldContainSubstring, "permission denied") So(resp.StatusCode(), ShouldEqual, 500) }) }) @@ -1041,6 +1478,7 @@ func TestSyncBasicAuth(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1164,6 +1602,7 @@ func TestSyncBadUrl(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1220,6 +1659,7 @@ func TestSyncNoImagesByRegex(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1280,6 +1720,7 @@ func TestSyncNoImagesByRegex(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1349,6 +1790,7 @@ func TestSyncInvalidRegex(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1409,6 +1851,7 @@ func TestSyncInvalidRegex(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1465,6 +1908,7 @@ func TestSyncNotSemver(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1540,6 +1984,7 @@ func TestSyncNotSemver(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1555,7 +2000,6 @@ func TestSyncNotSemver(t *testing.T) { resp, _ := resty.R().Post(destBaseURL + "/sync") So(resp, ShouldNotBeNil) So(resp.StatusCode(), ShouldEqual, 200) - So(err, ShouldBeNil) var destTagsList TagsList @@ -1626,6 +2070,7 @@ func TestSyncInvalidCerts(t *testing.T) { defer func() { ctx := context.Background() _ = sc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() cert, err := tls.LoadX509KeyPair("../../../test/data/client.cert", "../../../test/data/client.key") @@ -1648,8 +2093,6 @@ func TestSyncInvalidCerts(t *testing.T) { destConfig := config.New() destConfig.HTTP.Port = destPort - os.RemoveAll("/tmp/zot-certs-dir") - destDir, err := ioutil.TempDir("", "oci-dest-repo-test") if err != nil { panic(err) @@ -1694,6 +2137,8 @@ func TestSyncInvalidCerts(t *testing.T) { panic(err) } + defer os.RemoveAll(clientCertDir) + var tlsVerify bool syncRegistryConfig := sync.RegistryConfig{ @@ -1724,6 +2169,7 @@ func TestSyncInvalidCerts(t *testing.T) { defer func() { ctx := context.Background() _ = dc.Server.Shutdown(ctx) + time.Sleep(500 * time.Millisecond) }() // wait till ready @@ -1757,3 +2203,196 @@ func makeCredentialsFile(fileContent string) string { return f.Name() } + +func TestSyncInvalidUrl(t *testing.T) { + Convey("Verify sync invalid url", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + destPort := getFreePort() + destBaseURL := getBaseURL(destPort, false) + + destConfig := config.New() + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(destDir) + + destConfig.Storage.RootDirectory = destDir + + regex := ".*" + var semver bool + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + // won't match any image on source registry, we will sync on demand + Prefix: "dummy", + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URL: "http://invalid.invalid/invalid/", + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc := api.NewController(destConfig) + + go func() { + // this blocks + if err := dc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = dc.Server.Shutdown(ctx) + time.Sleep(1 * time.Second) + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) +} + +func TestSyncInvalidTags(t *testing.T) { + Convey("Verify sync invalid url", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + srcPort := getFreePort() + srcBaseURL := getBaseURL(srcPort, false) + + srcConfig := config.New() + srcConfig.HTTP.Port = srcPort + + srcDir, err := ioutil.TempDir("", "oci-src-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(srcDir) + + err = copyFiles("../../../test/data", srcDir) + if err != nil { + panic(err) + } + + srcConfig.Storage.RootDirectory = srcDir + + sc := api.NewController(srcConfig) + + go func() { + // this blocks + if err := sc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = sc.Server.Shutdown(ctx) + }() + + // wait till ready + for { + _, err := resty.R().Get(srcBaseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + destPort := getFreePort() + destBaseURL := getBaseURL(destPort, false) + + destConfig := config.New() + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(destDir) + + destConfig.Storage.RootDirectory = destDir + + regex := ".*" + var semver bool + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + // won't match any image on source registry, we will sync on demand + Prefix: "dummy", + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URL: srcBaseURL, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = &sync.Config{ + Registries: []sync.RegistryConfig{syncRegistryConfig}} + + dc := api.NewController(destConfig) + + go func() { + // this blocks + if err := dc.Run(); err != nil { + return + } + }() + + defer func() { + ctx := context.Background() + _ = dc.Server.Shutdown(ctx) + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "invalid:tag") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) +} diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index 64f22efa..51f769aa 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -2,78 +2,20 @@ package sync import ( "encoding/json" - "fmt" - "io" "io/ioutil" "os" "path" "strings" "github.com/anuvu/zot/errors" + "github.com/anuvu/zot/pkg/extensions/monitoring" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" "github.com/containers/image/v5/docker/reference" "github.com/containers/image/v5/types" + ispec "github.com/opencontainers/image-spec/specs-go/v1" ) -var certsDir = fmt.Sprintf("%s/zot-certs-dir/", os.TempDir()) //nolint: gochecknoglobals - -func copyFile(sourceFilePath, destFilePath string) error { - destFile, err := os.Create(destFilePath) - if err != nil { - return err - } - defer destFile.Close() - - // should never get error because server certs are already handled by zot, by the time - // it gets here - sourceFile, _ := os.Open(sourceFilePath) - defer sourceFile.Close() - - if _, err := io.Copy(destFile, sourceFile); err != nil { - return err - } - - return nil -} - -func copyLocalCerts(serverCert, serverKey, caCert string, log log.Logger) (string, error) { - log.Debug().Msgf("Creating certs directory: %s", certsDir) - - err := os.Mkdir(certsDir, 0755) - if err != nil && !os.IsExist(err) { - return "", err - } - - if serverCert != "" { - log.Debug().Msgf("Copying server cert: %s", serverCert) - - err := copyFile(serverCert, path.Join(certsDir, "server.cert")) - if err != nil { - return "", err - } - } - - if serverKey != "" { - log.Debug().Msgf("Copying server key: %s", serverKey) - - err := copyFile(serverKey, path.Join(certsDir, "server.key")) - if err != nil { - return "", err - } - } - - if caCert != "" { - log.Debug().Msgf("Copying CA cert: %s", caCert) - - err := copyFile(caCert, path.Join(certsDir, "ca.crt")) - if err != nil { - return "", err - } - } - - return certsDir, nil -} - // getTagFromRef returns a tagged reference from an image reference. func getTagFromRef(ref types.ImageReference, log log.Logger) reference.Tagged { tagged, isTagged := ref.DockerReference().(reference.Tagged) @@ -164,3 +106,70 @@ func getFileCredentials(filepath string) (CredentialsFile, error) { return creds, nil } + +func pushSyncedLocalImage(repo, tag, uuid string, + storeController storage.StoreController, log log.Logger) error { + log.Info().Msgf("pushing synced local image %s:%s to local registry", repo, tag) + + imageStore := storeController.GetImageStore(repo) + + metrics := monitoring.NewMetricsServer(false, log) + cacheImageStore := storage.NewImageStore(path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid), + false, false, log, metrics) + + manifestContent, _, _, err := cacheImageStore.GetImageManifest(repo, tag) + if err != nil { + log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("couldn't find index.json") + return err + } + + var manifest ispec.Manifest + + if err := json.Unmarshal(manifestContent, &manifest); err != nil { + log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("invalid JSON") + return err + } + + for _, blob := range manifest.Layers { + blobReader, _, err := cacheImageStore.GetBlob(repo, blob.Digest.String(), blob.MediaType) + if err != nil { + log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), + repo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob") + return err + } + + _, _, err = imageStore.FullBlobUpload(repo, blobReader, blob.Digest.String()) + if err != nil { + log.Error().Err(err).Str("blob digest", blob.Digest.String()).Msg("couldn't upload blob") + return err + } + } + + blobReader, _, err := cacheImageStore.GetBlob(repo, manifest.Config.Digest.String(), manifest.Config.MediaType) + if err != nil { + log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), + repo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob") + return err + } + + _, _, err = imageStore.FullBlobUpload(repo, blobReader, manifest.Config.Digest.String()) + if err != nil { + log.Error().Err(err).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't upload config blob") + return err + } + + _, err = imageStore.PutImageManifest(repo, tag, ispec.MediaTypeImageManifest, manifestContent) + if err != nil { + log.Error().Err(err).Msg("couldn't upload manifest") + return err + } + + log.Info().Msgf("removing temporary cached synced repo %s", path.Join(cacheImageStore.RootDir(), repo)) + + if err := os.RemoveAll(path.Join(cacheImageStore.RootDir(), repo)); err != nil { + log.Error().Err(err).Msg("couldn't remove locally cached sync repo") + return err + } + + return nil +}