0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-16 21:56:37 -05:00

Changed sync behaviour, it used to copy images over http interface

now it copies to a local cache and then it copies over storage APIs

- accept all images with or without signatures
- disable sync writing to stdout
- added more logs
- fixed switch statement in routes
- fixed enabling sync multiple times for storage subpaths

closes #266

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
Petu Eusebiu 2021-10-28 12:10:01 +03:00 committed by Ramkumar Chinchani
parent 9c568c0ee2
commit 5c07e19c8d
15 changed files with 1269 additions and 289 deletions

View file

@ -24,6 +24,7 @@ Examples of working configurations for various use cases are available [here](..
* [Identity-based Authorization](#identity-based-authorization)
* [Logging](#logging)
* [Metrics](#metrics)
* [Sync](#sync)
## Network
@ -336,3 +337,62 @@ For more details see https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/c
## Sync
Enable and configure sync with:
```
"sync": {
```
Configure credentials for upstream registries:
```
"credentialsFile": "./examples/sync-auth-filepath.json",
```
Configure each registry sync:
```
"registries": [{
"url": "https://registry1:5000",
"onDemand": false, # pull any image which the local registry doesn't have
"pollInterval": "6h", # polling interval
"tlsVerify": true, # wheather or not to verify tls
"certDir": "/home/user/certs", # use certificates at certDir path, if not specified then use the default certs dir
"content":[ # which content to periodically pull
{
"prefix":"/repo1/repo", # pull all images under /repo1/repo
"tags":{ # filter by tags
"regex":"4.*", # filter tags by regex
"semver":true # filter tags by semver compliance
}
},
{
"prefix":"/repo2/repo" # pull all images under /repo2/repo
}
]
},
{
"url": "https://registry2:5000",
"pollInterval": "12h",
"tlsVerify": false,
"onDemand": false,
"content":[
{
"prefix":"/repo2",
"tags":{
"semver":true
}
}
]
},
{
"url": "https://docker.io/library",
"onDemand": true, # doesn't have content, don't periodically pull, pull just on demand.
"tlsVerify": true
}
]
}
```

View file

@ -5,7 +5,7 @@
},
"http":{
"address":"127.0.0.1",
"port":"5000"
"port":"8080"
},
"log":{
"level":"debug"

View file

@ -1,5 +1,5 @@
{
"localhost:8080": {
"127.0.0.1:8080": {
"username": "user",
"password": "pass"
},

View file

@ -137,6 +137,10 @@ func (c *Controller) Run() error {
// Enable extensions if extension config is provided
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableExtensions(c.Config, c.Log, c.Config.Storage.RootDirectory)
if c.Config.Extensions.Sync != nil {
ext.EnableSyncExtension(c.Config, c.Log, c.StoreController)
}
}
} else {
// we can't proceed without global storage

View file

@ -1263,30 +1263,24 @@ func getImageManifest(rh *RouteHandler, is storage.ImageStore, name,
case errors.ErrRepoNotFound:
if rh.c.Config.Extensions != nil && rh.c.Config.Extensions.Sync != nil {
rh.c.Log.Info().Msgf("image not found, trying to get image %s:%s by syncing on demand", name, reference)
ok, errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, name, reference)
switch ok {
case true:
errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, rh.c.StoreController, name, reference)
if errSync != nil {
rh.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", name, reference)
} else {
content, digest, mediaType, err = is.GetImageManifest(name, reference)
case false && errSync == nil:
rh.c.Log.Info().Msgf("couldn't find image %s:%s in sync registries", name, reference)
case false && errSync != nil:
rh.c.Log.Err(err).Msgf("error encounter while syncing image %s:%s", name, reference)
}
}
case errors.ErrManifestNotFound:
if rh.c.Config.Extensions != nil && rh.c.Config.Extensions.Sync != nil {
rh.c.Log.Info().Msgf("manifest not found, trying to get image %s:%s by syncing on demand", name, reference)
ok, errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, name, reference)
switch ok {
case true:
errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, rh.c.StoreController, name, reference)
if errSync != nil {
rh.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", name, reference)
} else {
content, digest, mediaType, err = is.GetImageManifest(name, reference)
case false && errSync == nil:
rh.c.Log.Info().Msgf("couldn't find image %s:%s in sync registries", name, reference)
case false && errSync != nil:
rh.c.Log.Err(err).Msgf("error encounter while syncing image %s:%s", name, reference)
}
}
default:

View file

@ -174,12 +174,18 @@ func LoadConfiguration(config *config.Config, configPath string) {
}
}
// enforce s3 driver in case of using storage driver
if len(config.Storage.StorageDriver) != 0 {
// enforce s3 driver in case of using storage driver
if config.Storage.StorageDriver["name"] != storage.S3StorageDriverName {
log.Error().Err(errors.ErrBadConfig).Msgf("unsupported storage driver: %s", config.Storage.StorageDriver["name"])
panic(errors.ErrBadConfig)
}
// enforce filesystem storage in case sync feature is enabled
if config.Extensions != nil && config.Extensions.Sync != nil {
log.Error().Err(errors.ErrBadConfig).Msg("sync supports only filesystem storage")
panic(errors.ErrBadConfig)
}
}
// enforce s3 driver on subpaths in case of using storage driver

View file

@ -118,6 +118,22 @@ func TestVerify(t *testing.T) {
So(func() { _ = cli.NewRootCmd().Execute() }, ShouldNotPanic)
})
Convey("Test verify w/ sync and w/o filesystem storage", t, func(c C) {
tmpfile, err := ioutil.TempFile("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up
content := []byte(`{"storage":{"rootDirectory":"/tmp/zot", "storageDriver": {"name": "s3"}},
"http":{"address":"127.0.0.1","port":"8080","realm":"zot",
"auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}},
"extensions":{"sync": {"registries": [{"url":"localhost:9999"}]}}}`)
_, err = tmpfile.Write(content)
So(err, ShouldBeNil)
err = tmpfile.Close()
So(err, ShouldBeNil)
os.Args = []string{"cli_test", "verify", tmpfile.Name()}
So(func() { _ = cli.NewRootCmd().Execute() }, ShouldPanic)
})
Convey("Test verify good config", t, func(c C) {
tmpfile, err := ioutil.TempFile("", "zot-test*.json")
So(err, ShouldBeNil)

View file

@ -54,36 +54,6 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) {
log.Info().Msg("CVE config not provided, skipping CVE update")
}
if config.Extensions.Sync != nil {
defaultPollInterval, _ := time.ParseDuration("1h")
for id, registryCfg := range config.Extensions.Sync.Registries {
if registryCfg.PollInterval < defaultPollInterval {
config.Extensions.Sync.Registries[id].PollInterval = defaultPollInterval
log.Warn().Msg("Sync registries interval set to too-short interval <= 1h, changing update duration to 1 hour and continuing.") // nolint: lll
}
}
var serverCert string
var serverKey string
var CACert string
if config.HTTP.TLS != nil {
serverCert = config.HTTP.TLS.Cert
serverKey = config.HTTP.TLS.Key
CACert = config.HTTP.TLS.CACert
}
if err := sync.Run(*config.Extensions.Sync, log, config.HTTP.Address,
config.HTTP.Port, serverCert, serverKey, CACert); err != nil {
log.Error().Err(err).Msg("Error encountered while setting up syncing")
}
} else {
log.Info().Msg("Sync registries config not provided, skipping sync")
}
if config.Extensions.Metrics != nil &&
config.Extensions.Metrics.Enable &&
config.Extensions.Metrics.Prometheus != nil {
@ -97,9 +67,31 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) {
}
}
// EnableSyncExtension enables sync extension.
func EnableSyncExtension(config *config.Config, log log.Logger, storeController storage.StoreController) {
if config.Extensions.Sync != nil {
defaultPollInterval, _ := time.ParseDuration("1h")
for id, registryCfg := range config.Extensions.Sync.Registries {
if registryCfg.PollInterval < defaultPollInterval {
config.Extensions.Sync.Registries[id].PollInterval = defaultPollInterval
log.Warn().Msg("Sync registries interval set to too-short interval < 1h, changing update duration to 1 hour and continuing.") // nolint: lll
}
}
if err := sync.Run(*config.Extensions.Sync, storeController, log); err != nil {
log.Error().Err(err).Msg("Error encountered while setting up syncing")
}
} else {
log.Info().Msg("Sync registries config not provided, skipping sync")
}
}
// SetupRoutes ...
func SetupRoutes(config *config.Config, router *mux.Router, storeController storage.StoreController,
log log.Logger) {
l log.Logger) {
// fork a new zerolog child to avoid data race
log := log.Logger{Logger: l.With().Caller().Timestamp().Logger()}
log.Info().Msg("setting up extensions routes")
if config.Extensions.Search != nil && config.Extensions.Search.Enable {
@ -115,27 +107,11 @@ func SetupRoutes(config *config.Config, router *mux.Router, storeController stor
Handler(gqlHandler.NewDefaultServer(search.NewExecutableSchema(resConfig)))
}
var serverCert string
var serverKey string
var CACert string
if config.HTTP.TLS != nil {
serverCert = config.HTTP.TLS.Cert
serverKey = config.HTTP.TLS.Key
CACert = config.HTTP.TLS.CACert
}
if config.Extensions.Sync != nil {
postSyncer := sync.PostHandler{
Address: config.HTTP.Address,
Port: config.HTTP.Port,
ServerCert: serverCert,
ServerKey: serverKey,
CACert: CACert,
Cfg: *config.Extensions.Sync,
Log: log,
Cfg: *config.Extensions.Sync,
Log: log,
StoreController: storeController,
}
router.HandleFunc("/sync", postSyncer.Handler).Methods("POST")
@ -148,23 +124,11 @@ func SetupRoutes(config *config.Config, router *mux.Router, storeController stor
}
// SyncOneImage syncs one image.
func SyncOneImage(config *config.Config, log log.Logger, repoName, reference string) (bool, error) {
func SyncOneImage(config *config.Config, log log.Logger,
storeController storage.StoreController, repoName, reference string) error {
log.Info().Msgf("syncing image %s:%s", repoName, reference)
var serverCert string
err := sync.OneImage(*config.Extensions.Sync, log, storeController, repoName, reference)
var serverKey string
var CACert string
if config.HTTP.TLS != nil {
serverCert = config.HTTP.TLS.Cert
serverKey = config.HTTP.TLS.Key
CACert = config.HTTP.TLS.CACert
}
ok, err := sync.OneImage(*config.Extensions.Sync, log, config.HTTP.Address, config.HTTP.Port,
serverCert, serverKey, CACert, repoName, reference)
return ok, err
return err
}

View file

@ -22,15 +22,22 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) {
"any extensions, please build zot full binary for this feature")
}
// EnableSyncExtension ...
func EnableSyncExtension(config *config.Config, log log.Logger, storeController storage.StoreController) {
log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't support any extensions," +
"please build zot full binary for this feature")
}
// SetupRoutes ...
func SetupRoutes(conf *config.Config, router *mux.Router, storeController storage.StoreController, log log.Logger) {
log.Warn().Msg("skipping setting up extensions routes because given zot binary doesn't support " +
"any extensions, please build zot full binary for this feature")
}
// SyncOneImage...
func SyncOneImage(config *config.Config, log log.Logger, repoName, reference string) (bool, error) {
log.Warn().Msg("skipping syncing on demand because given zot binary doesn't support " +
"any extensions, please build zot full binary for this feature")
return false, nil
// SyncOneImage ...
func SyncOneImage(config *config.Config, log log.Logger, storeController storage.StoreController,
repoName, reference string) error {
log.Warn().Msg("skipping syncing on demand because given zot binary doesn't support any extensions," +
"please build zot full binary for this feature")
return nil
}

View file

@ -6,20 +6,32 @@ import (
"strings"
"github.com/anuvu/zot/pkg/log"
"github.com/anuvu/zot/pkg/storage"
guuid "github.com/gofrs/uuid"
)
type PostHandler struct {
Address string
Port string
ServerCert string
ServerKey string
CACert string
Cfg Config
Log log.Logger
StoreController storage.StoreController
Cfg Config
Log log.Logger
}
func (h *PostHandler) Handler(w http.ResponseWriter, r *http.Request) {
upstreamCtx, policyCtx, err := getLocalContexts(h.ServerCert, h.ServerKey, h.CACert, h.Log)
var credentialsFile CredentialsFile
var err error
if h.Cfg.CredentialsFile != "" {
credentialsFile, err = getFileCredentials(h.Cfg.CredentialsFile)
if err != nil {
h.Log.Error().Err(err).Msgf("sync http handler: couldn't get registry credentials from %s", h.Cfg.CredentialsFile)
WriteData(w, http.StatusInternalServerError, err.Error())
return
}
}
localCtx, policyCtx, err := getLocalContexts(h.Log)
if err != nil {
WriteData(w, http.StatusInternalServerError, err.Error())
@ -28,25 +40,22 @@ func (h *PostHandler) Handler(w http.ResponseWriter, r *http.Request) {
defer policyCtx.Destroy() //nolint: errcheck
var credentialsFile CredentialsFile
uuid, err := guuid.NewV4()
if err != nil {
WriteData(w, http.StatusInternalServerError, err.Error())
if h.Cfg.CredentialsFile != "" {
credentialsFile, err = getFileCredentials(h.Cfg.CredentialsFile)
if err != nil {
h.Log.Error().Err(err).Msgf("couldn't get registry credentials from %s", h.Cfg.CredentialsFile)
WriteData(w, http.StatusInternalServerError, err.Error())
}
return
}
localRegistryName := strings.Replace(fmt.Sprintf("%s:%s", h.Address, h.Port), "0.0.0.0", "127.0.0.1", 1)
for _, regCfg := range h.Cfg.Registries {
upstreamRegistryName := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
if err := syncRegistry(regCfg, h.Log, localRegistryName, upstreamCtx, policyCtx,
credentialsFile[upstreamRegistryName]); err != nil {
h.Log.Err(err).Msg("error while syncing")
if err := syncRegistry(regCfg, h.StoreController, h.Log, localCtx, policyCtx,
credentialsFile[upstreamRegistryName], uuid.String()); err != nil {
h.Log.Err(err).Msg("sync http handler: error while syncing in")
WriteData(w, http.StatusInternalServerError, err.Error())
return
}
}
@ -56,5 +65,5 @@ func (h *PostHandler) Handler(w http.ResponseWriter, r *http.Request) {
func WriteData(w http.ResponseWriter, status int, msg string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_, _ = w.Write([]byte(msg))
_, _ = w.Write([]byte(fmt.Sprintf("error: %s", msg)))
}

View file

@ -3,35 +3,47 @@ package sync
import (
"context"
"fmt"
"os"
"path"
"strings"
"github.com/anuvu/zot/pkg/log"
"github.com/anuvu/zot/pkg/storage"
"github.com/containers/common/pkg/retry"
"github.com/containers/image/v5/copy"
"github.com/containers/image/v5/docker"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/oci/layout"
guuid "github.com/gofrs/uuid"
)
func OneImage(cfg Config, log log.Logger,
address, port, serverCert, serverKey, caCert, repoName, tag string) (bool, error) {
localCtx, policyCtx, err := getLocalContexts(serverCert, serverKey, caCert, log)
if err != nil {
return false, err
}
localRegistryName := strings.Replace(fmt.Sprintf("%s:%s", address, port), "0.0.0.0", "127.0.0.1", 1)
storeController storage.StoreController, repo, tag string) error {
var credentialsFile CredentialsFile
if cfg.CredentialsFile != "" {
var err error
credentialsFile, err = getFileCredentials(cfg.CredentialsFile)
if err != nil {
log.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile)
return false, err
return err
}
}
var synced bool
localCtx, policyCtx, err := getLocalContexts(log)
if err != nil {
return err
}
imageStore := storeController.GetImageStore(repo)
var copyErr error
uuid, err := guuid.NewV4()
if err != nil {
return err
}
for _, regCfg := range cfg.Registries {
if !regCfg.OnDemand {
@ -46,25 +58,45 @@ func OneImage(cfg Config, log log.Logger,
upstreamCtx := getUpstreamContext(&registryConfig, credentialsFile[upstreamRegistryName])
upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamRegistryName, repoName))
upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamRegistryName, repo))
if err != nil {
log.Error().Err(err).Msgf("error parsing repository reference %s/%s", upstreamRegistryName, repo)
return err
}
upstreamTaggedRef, err := reference.WithTag(upstreamRepoRef, tag)
if err != nil {
log.Err(err).Msgf("error creating a reference for repository %s and tag %q", upstreamRepoRef.Name(), tag)
return synced, err
log.Error().Err(err).Msgf("error creating a reference for repository %s and tag %q",
upstreamRepoRef.Name(), tag)
return err
}
upstreamRef, err := docker.NewReference(upstreamTaggedRef)
ref := strings.Replace(upstreamRef.DockerReference().String(), upstreamRegistryName, "", 1)
localRef, err := docker.Transport.ParseReference(
fmt.Sprintf("//%s%s", localRegistryName, ref),
)
if err != nil {
return synced, err
log.Error().Err(err).Msgf("error creating docker reference for repository %s and tag %q",
upstreamRepoRef.Name(), tag)
return err
}
log.Info().Msgf("copying image %s to %s", upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
imageName := strings.Replace(upstreamTaggedRef.Name(), upstreamRegistryName, "", 1)
localRepo := path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid.String(), imageName)
if err = os.MkdirAll(localRepo, 0755); err != nil {
log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir")
return err
}
localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag)
localRef, err := layout.ParseReference(localTaggedRepo)
if err != nil {
log.Error().Err(err).Msgf("cannot obtain a valid image reference for reference %q", localRepo)
return err
}
log.Info().Msgf("copying image %s:%s to %s", upstreamTaggedRef.Name(),
upstreamTaggedRef.Tag(), localRepo)
options := getCopyOptions(upstreamCtx, localCtx)
@ -73,18 +105,24 @@ func OneImage(cfg Config, log log.Logger,
}
if err = retry.RetryIfNecessary(context.Background(), func() error {
_, err = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options)
_, copyErr = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("error while copying image %s to %s",
upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
}, retryOptions); copyErr != nil {
log.Error().Err(copyErr).Msgf("error while copying image %s to %s",
upstreamRef.DockerReference().Name(), localTaggedRepo)
} else {
log.Info().Msgf("successfully synced %s", upstreamRef.DockerReference().Name())
synced = true
return synced, nil
err := pushSyncedLocalImage(repo, tag, uuid.String(), storeController, log)
if err != nil {
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
localTaggedRepo)
return err
}
return nil
}
}
return synced, nil
return copyErr
}

View file

@ -6,8 +6,10 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"regexp"
"strings"
"time"
@ -15,19 +17,23 @@ import (
"github.com/Masterminds/semver"
"github.com/anuvu/zot/errors"
"github.com/anuvu/zot/pkg/log"
"github.com/anuvu/zot/pkg/storage"
"github.com/containers/common/pkg/retry"
"github.com/containers/image/v5/copy"
"github.com/containers/image/v5/docker"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/oci/layout"
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/types"
guuid "github.com/gofrs/uuid"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/resty.v1"
)
const (
maxRetries = 3
delay = 5 * time.Minute
maxRetries = 3
delay = 5 * time.Minute
SyncBlobUploadDir = ".sync"
)
// /v2/_catalog struct.
@ -76,12 +82,13 @@ func getUpstreamCatalog(regCfg *RegistryConfig, credentials Credentials, log log
if regCfg.CertDir != "" {
log.Debug().Msgf("sync: using certs directory: %s", regCfg.CertDir)
clientCert := fmt.Sprintf("%s/client.cert", regCfg.CertDir)
clientKey := fmt.Sprintf("%s/client.key", regCfg.CertDir)
caCertPath := fmt.Sprintf("%s/ca.crt", regCfg.CertDir)
clientCert := path.Join(regCfg.CertDir, "client.cert")
clientKey := path.Join(regCfg.CertDir, "client.key")
caCertPath := path.Join(regCfg.CertDir, "ca.crt")
caCert, err := ioutil.ReadFile(caCertPath)
if err != nil {
log.Error().Err(err).Msg("couldn't read CA certificate")
return c, err
}
@ -92,6 +99,7 @@ func getUpstreamCatalog(regCfg *RegistryConfig, credentials Credentials, log log
cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
if err != nil {
log.Error().Err(err).Msg("couldn't read certificates key pairs")
return c, err
}
@ -140,7 +148,7 @@ func getImageTags(ctx context.Context, sysCtx *types.SystemContext, repoRef refe
return tags, nil
}
// filterImagesByTagRegex filters images by tag regex give in the config.
// filterImagesByTagRegex filters images by tag regex given in the config.
func filterImagesByTagRegex(upstreamReferences *[]types.ImageReference, content Content, log log.Logger) error {
refs := *upstreamReferences
@ -208,18 +216,20 @@ func filterImagesBySemver(upstreamReferences *[]types.ImageReference, content Co
}
// imagesToCopyFromRepos lists all images given a registry name and its repos.
func imagesToCopyFromUpstream(registryName string, repos []string, sourceCtx *types.SystemContext,
func imagesToCopyFromUpstream(registryName string, repos []string, upstreamCtx *types.SystemContext,
content Content, log log.Logger) ([]types.ImageReference, error) {
var upstreamReferences []types.ImageReference
for _, repoName := range repos {
repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryName, repoName))
if err != nil {
log.Error().Err(err).Msgf("couldn't parse repository reference: %s", repoRef)
return nil, err
}
tags, err := getImageTags(context.Background(), sourceCtx, repoRef)
tags, err := getImageTags(context.Background(), upstreamCtx, repoRef)
if err != nil {
log.Error().Err(err).Msgf("couldn't fetch tags for %s", repoRef)
return nil, err
}
@ -260,6 +270,7 @@ func getCopyOptions(upstreamCtx, localCtx *types.SystemContext) copy.Options {
options := copy.Options{
DestinationCtx: localCtx,
SourceCtx: upstreamCtx,
ReportWriter: io.Discard,
// force only oci manifest MIME type
ForceManifestMIMEType: ispec.MediaTypeImageManifest,
}
@ -291,8 +302,9 @@ func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types.
return upstreamCtx
}
func syncRegistry(regCfg RegistryConfig, log log.Logger, localRegistryName string, localCtx *types.SystemContext,
policyCtx *signature.PolicyContext, credentials Credentials) error {
func syncRegistry(regCfg RegistryConfig, storeController storage.StoreController,
log log.Logger, localCtx *types.SystemContext,
policyCtx *signature.PolicyContext, credentials Credentials, uuid string) error {
if len(regCfg.Content) == 0 {
log.Info().Msgf("no content found for %s, will not run periodically sync", regCfg.URL)
return nil
@ -354,23 +366,45 @@ func syncRegistry(regCfg RegistryConfig, log log.Logger, localRegistryName strin
for _, ref := range images {
upstreamRef := ref
suffix := strings.Replace(ref.DockerReference().String(), upstreamRegistryName, "", 1)
imageName := strings.Replace(upstreamRef.DockerReference().Name(), upstreamRegistryName, "", 1)
localRef, err := docker.Transport.ParseReference(
fmt.Sprintf("//%s%s", localRegistryName, suffix),
)
if err != nil {
imageStore := storeController.GetImageStore(imageName)
localRepo := path.Join(imageStore.RootDir(), imageName, SyncBlobUploadDir, uuid, imageName)
if err = os.MkdirAll(localRepo, 0755); err != nil {
log.Error().Err(err).Str("dir", localRepo).Msg("couldn't create temporary dir")
return err
}
log.Info().Msgf("copying image %s to %s", upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
upstreamTaggedRef := getTagFromRef(upstreamRef, log)
localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, upstreamTaggedRef.Tag())
localRef, err := layout.ParseReference(localTaggedRepo)
if err != nil {
log.Error().Err(err).Msgf("Cannot obtain a valid image reference for reference %q", localTaggedRepo)
return err
}
log.Info().Msgf("copying image %s:%s to %s", upstreamRef.DockerReference().Name(),
upstreamTaggedRef.Tag(), localTaggedRepo)
if err = retry.RetryIfNecessary(context.Background(), func() error {
_, err = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options)
return err
}, retryOptions); err != nil {
log.Error().Err(err).Msgf("error while copying image %s to %s",
upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
log.Error().Err(err).Msgf("error while copying image %s:%s to %s",
upstreamRef.DockerReference().Name(), upstreamTaggedRef.Tag(), localTaggedRepo)
return err
}
log.Info().Msgf("successfully synced %s:%s", upstreamRef.DockerReference().Name(), upstreamTaggedRef.Tag())
err = pushSyncedLocalImage(imageName, upstreamTaggedRef.Tag(), uuid, storeController, log)
if err != nil {
log.Error().Err(err).Msgf("error while pushing synced cached image %s",
localTaggedRepo)
return err
}
}
@ -380,8 +414,7 @@ func syncRegistry(regCfg RegistryConfig, log log.Logger, localRegistryName strin
return nil
}
func getLocalContexts(serverCert, serverKey,
caCert string, log log.Logger) (*types.SystemContext, *signature.PolicyContext, error) {
func getLocalContexts(log log.Logger) (*types.SystemContext, *signature.PolicyContext, error) {
log.Debug().Msg("getting local context")
var policy *signature.Policy
@ -390,78 +423,64 @@ func getLocalContexts(serverCert, serverKey,
localCtx := &types.SystemContext{}
if serverCert != "" && serverKey != "" {
certsDir, err := copyLocalCerts(serverCert, serverKey, caCert, log)
if err != nil {
return &types.SystemContext{}, &signature.PolicyContext{}, err
}
localCtx.DockerDaemonCertPath = certsDir
localCtx.DockerCertPath = certsDir
policy, err = signature.DefaultPolicy(localCtx)
if err != nil {
return &types.SystemContext{}, &signature.PolicyContext{}, err
}
} else {
localCtx.DockerDaemonInsecureSkipTLSVerify = true
localCtx.DockerInsecureSkipTLSVerify = types.NewOptionalBool(true)
policy = &signature.Policy{Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()}}
}
// accept any image with or without signature
policy = &signature.Policy{Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()}}
policyContext, err := signature.NewPolicyContext(policy)
if err != nil {
log.Error().Err(err).Msg("couldn't create policy context")
return &types.SystemContext{}, &signature.PolicyContext{}, err
}
return localCtx, policyContext, nil
}
func Run(cfg Config, log log.Logger, address, port, serverCert, serverKey, caCert string) error {
localCtx, policyCtx, err := getLocalContexts(serverCert, serverKey, caCert, log)
if err != nil {
return err
}
localRegistry := strings.Replace(fmt.Sprintf("%s:%s", address, port), "0.0.0.0", "127.0.0.1", 1)
func Run(cfg Config, storeController storage.StoreController, logger log.Logger) error {
var credentialsFile CredentialsFile
var err error
if cfg.CredentialsFile != "" {
credentialsFile, err = getFileCredentials(cfg.CredentialsFile)
if err != nil {
log.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile)
logger.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile)
return err
}
}
localCtx, policyCtx, err := getLocalContexts(logger)
if err != nil {
return err
}
uuid, err := guuid.NewV4()
if err != nil {
return err
}
// for each upstream registry, start a go routine.
for _, regCfg := range cfg.Registries {
// schedule each registry sync
ticker := time.NewTicker(regCfg.PollInterval)
// fork a new zerolog child to avoid data race
l := log.Logger{Logger: logger.With().Caller().Timestamp().Logger()}
upstreamRegistry := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
go func(regCfg RegistryConfig) {
defer os.RemoveAll(certsDir)
// run sync first, then run on interval
if err := syncRegistry(regCfg, log, localRegistry, localCtx, policyCtx,
credentialsFile[upstreamRegistry]); err != nil {
log.Err(err).Msg("sync exited with error, stopping it...")
ticker.Stop()
}
go func(regCfg RegistryConfig, l log.Logger) {
// run on intervals
for range ticker.C {
if err := syncRegistry(regCfg, log, localRegistry, localCtx, policyCtx,
credentialsFile[upstreamRegistry]); err != nil {
log.Err(err).Msg("sync exited with error, stopping it...")
for ; true; <-ticker.C {
if err := syncRegistry(regCfg, storeController, l, localCtx, policyCtx,
credentialsFile[upstreamRegistry], uuid.String()); err != nil {
l.Error().Err(err).Msg("sync exited with error, stopping it...")
ticker.Stop()
}
}
}(regCfg)
}(regCfg, l)
}
log.Info().Msg("finished setting up sync")
logger.Info().Msg("finished setting up sync")
return nil
}

View file

@ -2,16 +2,25 @@ package sync
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"testing"
"time"
"github.com/anuvu/zot/errors"
"github.com/anuvu/zot/pkg/extensions/monitoring"
"github.com/anuvu/zot/pkg/log"
"github.com/anuvu/zot/pkg/storage"
"github.com/containers/image/v5/docker"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/types"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/rs/zerolog"
. "github.com/smartystreets/goconvey/convey"
)
@ -27,8 +36,53 @@ const (
host = "127.0.0.1:45117"
)
func copyFiles(sourceDir string, destDir string) error {
sourceMeta, err := os.Stat(sourceDir)
if err != nil {
return err
}
if err := os.MkdirAll(destDir, sourceMeta.Mode()); err != nil {
return err
}
files, err := ioutil.ReadDir(sourceDir)
if err != nil {
return err
}
for _, file := range files {
sourceFilePath := path.Join(sourceDir, file.Name())
destFilePath := path.Join(destDir, file.Name())
if file.IsDir() {
if err = copyFiles(sourceFilePath, destFilePath); err != nil {
return err
}
} else {
sourceFile, err := os.Open(sourceFilePath)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(destFilePath)
if err != nil {
return err
}
defer destFile.Close()
if _, err = io.Copy(destFile, sourceFile); err != nil {
return err
}
}
}
return nil
}
func TestSyncInternal(t *testing.T) {
Convey("test parseRepositoryReference func", t, func() {
Convey("Verify parseRepositoryReference func", t, func() {
repositoryReference := fmt.Sprintf("%s/%s", host, testImage)
ref, err := parseRepositoryReference(repositoryReference)
So(err, ShouldBeNil)
@ -64,25 +118,20 @@ func TestSyncInternal(t *testing.T) {
srcCtx := &types.SystemContext{}
_, err = getImageTags(context.Background(), srcCtx, ref)
So(err, ShouldNotBeNil)
_, _, err = getLocalContexts("inexistent.cert", "inexistent.key", "inexistent.crt", log.NewLogger("", ""))
So(err, ShouldNotBeNil)
_, _, err = getLocalContexts(ServerCert, "inexistent.key", "inexistent.crt", log.NewLogger("", ""))
So(err, ShouldNotBeNil)
_, _, err = getLocalContexts(ServerCert, ServerKey, "inexistent.crt", log.NewLogger("", ""))
So(err, ShouldNotBeNil)
taggedRef, err := reference.WithTag(ref, testImageTag)
taggedRef, err := reference.WithTag(ref, "tag")
So(err, ShouldBeNil)
_, err = getImageTags(context.Background(), &types.SystemContext{}, taggedRef)
So(err, ShouldNotBeNil)
dockerRef, err := docker.NewReference(taggedRef)
So(err, ShouldBeNil)
So(getTagFromRef(dockerRef, log.NewLogger("", "")), ShouldNotBeNil)
//tag := getTagFromRef(dockerRef, log.NewLogger("", ""))
So(getTagFromRef(dockerRef, log.NewLogger("debug", "")), ShouldNotBeNil)
var tlsVerify bool
updateDuration := time.Microsecond
@ -100,10 +149,176 @@ func TestSyncInternal(t *testing.T) {
cfg := Config{Registries: []RegistryConfig{syncRegistryConfig}, CredentialsFile: "/invalid/path/to/file"}
So(Run(cfg, log.NewLogger("", ""),
"127.0.0.1", "5000", ServerCert, ServerKey, CACert), ShouldNotBeNil)
So(Run(cfg, storage.StoreController{}, log.NewLogger("debug", "")), ShouldNotBeNil)
_, err = getFileCredentials("/invalid/path/to/file")
So(err, ShouldNotBeNil)
})
Convey("Test getUpstreamCatalog() with missing certs", t, func() {
var tlsVerify bool
updateDuration := time.Microsecond
syncRegistryConfig := RegistryConfig{
Content: []Content{
{
Prefix: testImage,
},
},
URL: BaseURL,
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: "/tmp/missing_certs/a/b/c/d/z",
}
_, err := getUpstreamCatalog(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
})
Convey("Test getUpstreamCatalog() with bad certs", t, func() {
badCertsDir, err := ioutil.TempDir("", "bad_certs*")
if err != nil {
panic(err)
}
if err := os.WriteFile(path.Join(badCertsDir, "ca.crt"), []byte("certificate"), 0755); err != nil {
panic(err)
}
defer os.RemoveAll(badCertsDir)
var tlsVerify bool
updateDuration := time.Microsecond
syncRegistryConfig := RegistryConfig{
Content: []Content{
{
Prefix: testImage,
},
},
URL: BaseURL,
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: badCertsDir,
}
_, err = getUpstreamCatalog(&syncRegistryConfig, Credentials{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
})
Convey("Test imagesToCopyFromUpstream()", t, func() {
repos := []string{"repo1"}
upstreamCtx := &types.SystemContext{}
_, err := imagesToCopyFromUpstream("localhost:4566", repos, upstreamCtx, Content{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
_, err = imagesToCopyFromUpstream("docker://localhost:4566", repos, upstreamCtx,
Content{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
})
Convey("Verify pushSyncedLocalImage func", t, func() {
storageDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(storageDir)
log := log.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)
imageStore := storage.NewImageStore(storageDir, false, false, log, metrics)
storeController := storage.StoreController{}
storeController.DefaultStore = imageStore
err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log)
So(err, ShouldNotBeNil)
testRootDir := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir)
//testImagePath := path.Join(testRootDir, testImage)
err = os.MkdirAll(testRootDir, 0755)
if err != nil {
panic(err)
}
err = copyFiles("../../../test/data", testRootDir)
if err != nil {
panic(err)
}
testImageStore := storage.NewImageStore(testRootDir, false, false, log, metrics)
manifestContent, _, _, err := testImageStore.GetImageManifest(testImage, testImageTag)
So(err, ShouldBeNil)
var manifest ispec.Manifest
if err := json.Unmarshal(manifestContent, &manifest); err != nil {
panic(err)
}
if err := os.Chmod(storageDir, 0000); err != nil {
panic(err)
}
if os.Geteuid() != 0 {
So(func() {
_ = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log)
},
ShouldPanic)
}
if err := os.Chmod(storageDir, 0755); err != nil {
panic(err)
}
if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256",
manifest.Layers[0].Digest.Hex()), 0000); err != nil {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log)
So(err, ShouldNotBeNil)
if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256",
manifest.Layers[0].Digest.Hex()), 0755); err != nil {
panic(err)
}
cachedManifestConfigPath := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir,
testImage, "blobs", "sha256", manifest.Config.Digest.Hex())
if err := os.Chmod(cachedManifestConfigPath, 0000); err != nil {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log)
So(err, ShouldNotBeNil)
if err := os.Chmod(cachedManifestConfigPath, 0755); err != nil {
panic(err)
}
manifestConfigPath := path.Join(imageStore.RootDir(), testImage, "blobs", "sha256", manifest.Config.Digest.Hex())
if err := os.MkdirAll(manifestConfigPath, 0000); err != nil {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log)
So(err, ShouldNotBeNil)
if err := os.Remove(manifestConfigPath); err != nil {
panic(err)
}
mDigest := godigest.FromBytes(manifestContent)
manifestPath := path.Join(imageStore.RootDir(), testImage, "blobs", mDigest.Algorithm().String(), mDigest.Encoded())
if err := os.MkdirAll(manifestPath, 0000); err != nil {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, "", storeController, log)
So(err, ShouldNotBeNil)
})
}

View file

@ -186,6 +186,7 @@ func TestSyncOnDemand(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -236,7 +237,8 @@ func TestSyncOnDemand(t *testing.T) {
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}}
destConfig.Extensions.Sync = &sync.Config{
Registries: []sync.RegistryConfig{syncRegistryConfig}}
dc := api.NewController(destConfig)
@ -250,6 +252,7 @@ func TestSyncOnDemand(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -281,19 +284,64 @@ func TestSyncOnDemand(t *testing.T) {
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
err = os.Chmod(path.Join(destDir, testImage), 0000)
if err != nil {
panic(err)
}
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
So(resp.StatusCode(), ShouldEqual, 500)
err = os.Chmod(path.Join(destDir, testImage), 0755)
if err != nil {
panic(err)
}
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "1.1.1")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/tags/list")
err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0000)
if err != nil {
panic(err)
}
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "1.1.1")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0755)
if err != nil {
panic(err)
}
err = os.MkdirAll(path.Join(destDir, testImage, "blobs"), 0000)
if err != nil {
panic(err)
}
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
err = os.Chmod(path.Join(destDir, testImage, "blobs"), 0755)
if err != nil {
panic(err)
}
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/tags/list")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
err = json.Unmarshal(resp.Body(), &destTagsList)
if err != nil {
panic(err)
@ -341,6 +389,7 @@ func TestSync(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -403,6 +452,7 @@ func TestSync(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -516,6 +566,7 @@ func TestSync(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -570,6 +621,274 @@ func TestSync(t *testing.T) {
})
}
func TestSyncPermsDenied(t *testing.T) {
Convey("Verify sync feature without perm on sync cache", t, func() {
updateDuration, _ := time.ParseDuration("30m")
srcPort := getFreePort()
srcBaseURL := getBaseURL(srcPort, false)
srcConfig := config.New()
srcConfig.HTTP.Port = srcPort
srcDir, err := ioutil.TempDir("", "oci-src-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(srcDir)
err = copyFiles("../../../test/data", srcDir)
if err != nil {
panic(err)
}
srcConfig.Storage.RootDirectory = srcDir
sc := api.NewController(srcConfig)
go func() {
// this blocks
if err := sc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
for {
_, err := resty.R().Get(srcBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
destPort := getFreePort()
destBaseURL := getBaseURL(destPort, false)
destConfig := config.New()
destConfig.HTTP.Port = destPort
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(destDir)
destConfig.Storage.RootDirectory = destDir
regex := ".*"
semver := true
var tlsVerify bool
syncRegistryConfig := sync.RegistryConfig{
Content: []sync.Content{
{
Prefix: testImage,
Tags: &sync.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URL: srcBaseURL,
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: "",
}
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}}
dc := api.NewController(destConfig)
go func() {
// this blocks
if err := dc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
for {
_, err := resty.R().Get(destBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0000)
if err != nil {
panic(err)
}
Convey("Test sync on POST request on /sync", func() {
resp, _ := resty.R().Post(destBaseURL + "/sync")
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 500)
})
})
}
func TestSyncBadTLS(t *testing.T) {
Convey("Verify sync TLS feature", t, func() {
caCert, err := ioutil.ReadFile(CACert)
So(err, ShouldBeNil)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
client := resty.New()
client.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool})
defer func() { client.SetTLSClientConfig(nil) }()
updateDuration, _ := time.ParseDuration("1h")
srcPort := getFreePort()
srcBaseURL := getBaseURL(srcPort, true)
srcConfig := config.New()
srcConfig.HTTP.Port = srcPort
srcConfig.HTTP.TLS = &config.TLSConfig{
Cert: ServerCert,
Key: ServerKey,
CACert: CACert,
}
srcDir, err := ioutil.TempDir("", "oci-src-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(srcDir)
srcConfig.Storage.RootDirectory = srcDir
sc := api.NewController(srcConfig)
go func() {
// this blocks
if err := sc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
}()
cert, err := tls.LoadX509KeyPair("../../../test/data/client.cert", "../../../test/data/client.key")
if err != nil {
panic(err)
}
client.SetCertificates(cert)
// wait till ready
for {
_, err := client.R().Get(srcBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
}
err = copyFiles("../../../test/data", destDir)
if err != nil {
panic(err)
}
defer os.RemoveAll(destDir)
destPort := getFreePort()
destBaseURL := getBaseURL(destPort, true)
destConfig := config.New()
destConfig.HTTP.Port = destPort
destConfig.HTTP.TLS = &config.TLSConfig{
Cert: ServerCert,
Key: ServerKey,
CACert: CACert,
}
destConfig.Storage.RootDirectory = destDir
regex := ".*"
var semver bool
tlsVerify := true
syncRegistryConfig := sync.RegistryConfig{
Content: []sync.Content{
{
Prefix: testImage,
Tags: &sync.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URL: srcBaseURL,
OnDemand: true,
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
}
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}}
dc := api.NewController(destConfig)
go func() {
// this blocks
if err := dc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
}()
// give it time to set up sync
time.Sleep(2 * time.Second)
resp, _ := client.R().Post(destBaseURL + "/sync")
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 500)
resp, _ = client.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "invalid")
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
resp, _ = client.R().Get(destBaseURL + "/v2/" + "invalid" + "/manifests/" + testImageTag)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
})
}
func TestSyncTLS(t *testing.T) {
Convey("Verify sync TLS feature", t, func() {
caCert, err := ioutil.ReadFile(CACert)
@ -625,6 +944,7 @@ func TestSyncTLS(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
cert, err := tls.LoadX509KeyPair("../../../test/data/client.cert", "../../../test/data/client.key")
@ -652,6 +972,7 @@ func TestSyncTLS(t *testing.T) {
}
destPort := getFreePort()
destBaseURL := getBaseURL(destPort, true)
destConfig := config.New()
destConfig.HTTP.Port = destPort
@ -670,30 +991,32 @@ func TestSyncTLS(t *testing.T) {
destConfig.Storage.RootDirectory = destDir
// copy client certs, use them in sync config
clientCertDir, err := ioutil.TempDir("", "certs")
// copy upstream client certs, use them in sync config
destClientCertDir, err := ioutil.TempDir("", "destCerts")
if err != nil {
panic(err)
}
destFilePath := path.Join(clientCertDir, "ca.crt")
destFilePath := path.Join(destClientCertDir, "ca.crt")
err = copyFile(CACert, destFilePath)
if err != nil {
panic(err)
}
destFilePath = path.Join(clientCertDir, "client.cert")
destFilePath = path.Join(destClientCertDir, "client.cert")
err = copyFile(ClientCert, destFilePath)
if err != nil {
panic(err)
}
destFilePath = path.Join(clientCertDir, "client.key")
destFilePath = path.Join(destClientCertDir, "client.key")
err = copyFile(ClientKey, destFilePath)
if err != nil {
panic(err)
}
defer os.RemoveAll(destClientCertDir)
regex := ".*"
var semver bool
tlsVerify := true
@ -711,7 +1034,7 @@ func TestSyncTLS(t *testing.T) {
URL: srcBaseURL,
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: clientCertDir,
CertDir: destClientCertDir,
}
destConfig.Extensions = &extconf.ExtensionConfig{}
@ -730,6 +1053,7 @@ func TestSyncTLS(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -752,9 +1076,16 @@ func TestSyncTLS(t *testing.T) {
if !found {
panic(errSync)
}
Convey("Test sync on POST request on /sync", func() {
resp, _ := client.R().SetBasicAuth("test", "test").Post(destBaseURL + "/sync")
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
})
})
}
// nolint: gocyclo
func TestSyncBasicAuth(t *testing.T) {
Convey("Verify sync basic auth", t, func() {
updateDuration, _ := time.ParseDuration("1h")
@ -800,12 +1131,13 @@ func TestSyncBasicAuth(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
for {
_, err := resty.R().Get(srcBaseURL)
t.Logf("err %v", err)
if err == nil {
break
}
@ -863,6 +1195,7 @@ func TestSyncBasicAuth(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -909,6 +1242,98 @@ func TestSyncBasicAuth(t *testing.T) {
}
})
Convey("Verify sync basic auth with wrong file credentials", func() {
destPort := getFreePort()
destBaseURL := getBaseURL(destPort, false)
destConfig := config.New()
destConfig.HTTP.Port = destPort
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
}
destConfig.Storage.SubPaths = map[string]config.StorageConfig{
"a": {
RootDirectory: destDir,
GC: true,
Dedupe: true,
},
}
defer os.RemoveAll(destDir)
destConfig.Storage.RootDirectory = destDir
regex := ".*"
var semver bool
registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1)
credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "invalid"}}`,
registryName))
var tlsVerify bool
syncRegistryConfig := sync.RegistryConfig{
Content: []sync.Content{
{
Prefix: testImage,
Tags: &sync.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URL: srcBaseURL,
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: "",
OnDemand: true,
}
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
destConfig.Extensions.Sync = &sync.Config{CredentialsFile: credentialsFile,
Registries: []sync.RegistryConfig{syncRegistryConfig}}
dc := api.NewController(destConfig)
go func() {
// this blocks
if err := dc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
for {
_, err := resty.R().Get(destBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
Convey("Test sync on POST request on /sync", func() {
resp, _ := resty.R().Post(destBaseURL + "/sync")
So(resp, ShouldNotBeNil)
So(string(resp.Body()), ShouldContainSubstring, "sync: couldn't fetch upstream registry's catalog")
So(resp.StatusCode(), ShouldEqual, 500)
})
})
Convey("Verify sync basic auth with bad file credentials", func() {
destPort := getFreePort()
destBaseURL := getBaseURL(destPort, false)
@ -930,9 +1355,17 @@ func TestSyncBasicAuth(t *testing.T) {
registryName := strings.Replace(strings.Replace(srcBaseURL, "http://", "", 1), "https://", "", 1)
credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "invalid"}}`,
credentialsFile := makeCredentialsFile(fmt.Sprintf(`{"%s":{"username": "test", "password": "test"}}`,
registryName))
err = os.Chmod(credentialsFile, 0000)
So(err, ShouldBeNil)
defer func() {
So(os.Chmod(credentialsFile, 0755), ShouldBeNil)
So(os.RemoveAll(credentialsFile), ShouldBeNil)
}()
var tlsVerify bool
syncRegistryConfig := sync.RegistryConfig{
@ -979,10 +1412,14 @@ func TestSyncBasicAuth(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
Convey("Test sync on POST request on /sync", func() {
resp, _ := resty.R().Post(destBaseURL + "/sync")
So(resp, ShouldNotBeNil)
So(string(resp.Body()), ShouldContainSubstring, "sync: couldn't fetch upstream registry's catalog")
So(string(resp.Body()), ShouldContainSubstring, "permission denied")
So(resp.StatusCode(), ShouldEqual, 500)
})
})
@ -1041,6 +1478,7 @@ func TestSyncBasicAuth(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1164,6 +1602,7 @@ func TestSyncBadUrl(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1220,6 +1659,7 @@ func TestSyncNoImagesByRegex(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1280,6 +1720,7 @@ func TestSyncNoImagesByRegex(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1349,6 +1790,7 @@ func TestSyncInvalidRegex(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1409,6 +1851,7 @@ func TestSyncInvalidRegex(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1465,6 +1908,7 @@ func TestSyncNotSemver(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1540,6 +1984,7 @@ func TestSyncNotSemver(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1555,7 +2000,6 @@ func TestSyncNotSemver(t *testing.T) {
resp, _ := resty.R().Post(destBaseURL + "/sync")
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
So(err, ShouldBeNil)
var destTagsList TagsList
@ -1626,6 +2070,7 @@ func TestSyncInvalidCerts(t *testing.T) {
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
cert, err := tls.LoadX509KeyPair("../../../test/data/client.cert", "../../../test/data/client.key")
@ -1648,8 +2093,6 @@ func TestSyncInvalidCerts(t *testing.T) {
destConfig := config.New()
destConfig.HTTP.Port = destPort
os.RemoveAll("/tmp/zot-certs-dir")
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
@ -1694,6 +2137,8 @@ func TestSyncInvalidCerts(t *testing.T) {
panic(err)
}
defer os.RemoveAll(clientCertDir)
var tlsVerify bool
syncRegistryConfig := sync.RegistryConfig{
@ -1724,6 +2169,7 @@ func TestSyncInvalidCerts(t *testing.T) {
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
}()
// wait till ready
@ -1757,3 +2203,196 @@ func makeCredentialsFile(fileContent string) string {
return f.Name()
}
func TestSyncInvalidUrl(t *testing.T) {
Convey("Verify sync invalid url", t, func() {
updateDuration, _ := time.ParseDuration("30m")
destPort := getFreePort()
destBaseURL := getBaseURL(destPort, false)
destConfig := config.New()
destConfig.HTTP.Port = destPort
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(destDir)
destConfig.Storage.RootDirectory = destDir
regex := ".*"
var semver bool
var tlsVerify bool
syncRegistryConfig := sync.RegistryConfig{
Content: []sync.Content{
{
// won't match any image on source registry, we will sync on demand
Prefix: "dummy",
Tags: &sync.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URL: "http://invalid.invalid/invalid/",
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: "",
OnDemand: true,
}
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
destConfig.Extensions.Sync = &sync.Config{Registries: []sync.RegistryConfig{syncRegistryConfig}}
dc := api.NewController(destConfig)
go func() {
// this blocks
if err := dc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(1 * time.Second)
}()
// wait till ready
for {
_, err := resty.R().Get(destBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
})
}
func TestSyncInvalidTags(t *testing.T) {
Convey("Verify sync invalid url", t, func() {
updateDuration, _ := time.ParseDuration("30m")
srcPort := getFreePort()
srcBaseURL := getBaseURL(srcPort, false)
srcConfig := config.New()
srcConfig.HTTP.Port = srcPort
srcDir, err := ioutil.TempDir("", "oci-src-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(srcDir)
err = copyFiles("../../../test/data", srcDir)
if err != nil {
panic(err)
}
srcConfig.Storage.RootDirectory = srcDir
sc := api.NewController(srcConfig)
go func() {
// this blocks
if err := sc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
}()
// wait till ready
for {
_, err := resty.R().Get(srcBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
destPort := getFreePort()
destBaseURL := getBaseURL(destPort, false)
destConfig := config.New()
destConfig.HTTP.Port = destPort
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(destDir)
destConfig.Storage.RootDirectory = destDir
regex := ".*"
var semver bool
var tlsVerify bool
syncRegistryConfig := sync.RegistryConfig{
Content: []sync.Content{
{
// won't match any image on source registry, we will sync on demand
Prefix: "dummy",
Tags: &sync.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URL: srcBaseURL,
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: "",
OnDemand: true,
}
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
destConfig.Extensions.Sync = &sync.Config{
Registries: []sync.RegistryConfig{syncRegistryConfig}}
dc := api.NewController(destConfig)
go func() {
// this blocks
if err := dc.Run(); err != nil {
return
}
}()
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
}()
// wait till ready
for {
_, err := resty.R().Get(destBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "invalid:tag")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 404)
})
}

View file

@ -2,78 +2,20 @@ package sync
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"github.com/anuvu/zot/errors"
"github.com/anuvu/zot/pkg/extensions/monitoring"
"github.com/anuvu/zot/pkg/log"
"github.com/anuvu/zot/pkg/storage"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/types"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
)
var certsDir = fmt.Sprintf("%s/zot-certs-dir/", os.TempDir()) //nolint: gochecknoglobals
func copyFile(sourceFilePath, destFilePath string) error {
destFile, err := os.Create(destFilePath)
if err != nil {
return err
}
defer destFile.Close()
// should never get error because server certs are already handled by zot, by the time
// it gets here
sourceFile, _ := os.Open(sourceFilePath)
defer sourceFile.Close()
if _, err := io.Copy(destFile, sourceFile); err != nil {
return err
}
return nil
}
func copyLocalCerts(serverCert, serverKey, caCert string, log log.Logger) (string, error) {
log.Debug().Msgf("Creating certs directory: %s", certsDir)
err := os.Mkdir(certsDir, 0755)
if err != nil && !os.IsExist(err) {
return "", err
}
if serverCert != "" {
log.Debug().Msgf("Copying server cert: %s", serverCert)
err := copyFile(serverCert, path.Join(certsDir, "server.cert"))
if err != nil {
return "", err
}
}
if serverKey != "" {
log.Debug().Msgf("Copying server key: %s", serverKey)
err := copyFile(serverKey, path.Join(certsDir, "server.key"))
if err != nil {
return "", err
}
}
if caCert != "" {
log.Debug().Msgf("Copying CA cert: %s", caCert)
err := copyFile(caCert, path.Join(certsDir, "ca.crt"))
if err != nil {
return "", err
}
}
return certsDir, nil
}
// getTagFromRef returns a tagged reference from an image reference.
func getTagFromRef(ref types.ImageReference, log log.Logger) reference.Tagged {
tagged, isTagged := ref.DockerReference().(reference.Tagged)
@ -164,3 +106,70 @@ func getFileCredentials(filepath string) (CredentialsFile, error) {
return creds, nil
}
func pushSyncedLocalImage(repo, tag, uuid string,
storeController storage.StoreController, log log.Logger) error {
log.Info().Msgf("pushing synced local image %s:%s to local registry", repo, tag)
imageStore := storeController.GetImageStore(repo)
metrics := monitoring.NewMetricsServer(false, log)
cacheImageStore := storage.NewImageStore(path.Join(imageStore.RootDir(), repo, SyncBlobUploadDir, uuid),
false, false, log, metrics)
manifestContent, _, _, err := cacheImageStore.GetImageManifest(repo, tag)
if err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("couldn't find index.json")
return err
}
var manifest ispec.Manifest
if err := json.Unmarshal(manifestContent, &manifest); err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), repo)).Msg("invalid JSON")
return err
}
for _, blob := range manifest.Layers {
blobReader, _, err := cacheImageStore.GetBlob(repo, blob.Digest.String(), blob.MediaType)
if err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(),
repo)).Str("blob digest", blob.Digest.String()).Msg("couldn't read blob")
return err
}
_, _, err = imageStore.FullBlobUpload(repo, blobReader, blob.Digest.String())
if err != nil {
log.Error().Err(err).Str("blob digest", blob.Digest.String()).Msg("couldn't upload blob")
return err
}
}
blobReader, _, err := cacheImageStore.GetBlob(repo, manifest.Config.Digest.String(), manifest.Config.MediaType)
if err != nil {
log.Error().Err(err).Str("dir", path.Join(cacheImageStore.RootDir(),
repo)).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't read config blob")
return err
}
_, _, err = imageStore.FullBlobUpload(repo, blobReader, manifest.Config.Digest.String())
if err != nil {
log.Error().Err(err).Str("blob digest", manifest.Config.Digest.String()).Msg("couldn't upload config blob")
return err
}
_, err = imageStore.PutImageManifest(repo, tag, ispec.MediaTypeImageManifest, manifestContent)
if err != nil {
log.Error().Err(err).Msg("couldn't upload manifest")
return err
}
log.Info().Msgf("removing temporary cached synced repo %s", path.Join(cacheImageStore.RootDir(), repo))
if err := os.RemoveAll(path.Join(cacheImageStore.RootDir(), repo)); err != nil {
log.Error().Err(err).Msg("couldn't remove locally cached sync repo")
return err
}
return nil
}