0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-06 22:40:28 -05:00
zot/pkg/extensions/extension_sync.go
Vishwas Rajashekar 767f81d4f5
feat(sync): support for periodic repo sync in scale-out cluster (#2424)
This commit includes support for periodic repo sync in a scale-out
cluster.
Before this commit, all cluster members would sync all the repos as
the config is shared.

With this change, in periodic sync, the cluster member checks whether
it manages the repo. If it does not manage the repo, it will skip the
sync.

This commit also includes a unit test to test on-demand sync too, but
there are no logic changes for it as it is implicitly handled by the
proxying logic.

Signed-off-by: Vishwas Rajashekar <vrajashe@cisco.com>
2024-05-31 09:25:34 -07:00

182 lines
4.3 KiB
Go

//go:build sync
// +build sync
package extensions
import (
"net"
"net/url"
"strings"
zerr "zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/api/config"
syncconf "zotregistry.dev/zot/pkg/extensions/config/sync"
"zotregistry.dev/zot/pkg/extensions/sync"
"zotregistry.dev/zot/pkg/log"
mTypes "zotregistry.dev/zot/pkg/meta/types"
"zotregistry.dev/zot/pkg/scheduler"
"zotregistry.dev/zot/pkg/storage"
)
func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
storeController storage.StoreController, sch *scheduler.Scheduler, log log.Logger,
) (*sync.BaseOnDemand, error) {
if config.Extensions.Sync != nil && *config.Extensions.Sync.Enable {
onDemand := sync.NewOnDemand(log)
for _, registryConfig := range config.Extensions.Sync.Registries {
registryConfig := registryConfig
if len(registryConfig.URLs) > 1 {
if err := removeSelfURLs(config, &registryConfig, log); err != nil {
return nil, err
}
}
if len(registryConfig.URLs) == 0 {
log.Error().Err(zerr.ErrSyncNoURLsLeft).Msg("failed to start sync extension")
return nil, zerr.ErrSyncNoURLsLeft
}
isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
isOnDemand := registryConfig.OnDemand
if !(isPeriodical || isOnDemand) {
continue
}
tmpDir := config.Extensions.Sync.DownloadDir
credsPath := config.Extensions.Sync.CredentialsFile
clusterCfg := config.Cluster
service, err := sync.New(registryConfig, credsPath, clusterCfg, tmpDir, storeController, metaDB, log)
if err != nil {
log.Error().Err(err).Msg("failed to initialize sync extension")
return nil, err
}
if isPeriodical {
// add to task scheduler periodic sync
interval := registryConfig.PollInterval
gen := sync.NewTaskGenerator(service, interval, log)
sch.SubmitGenerator(gen, interval, scheduler.MediumPriority)
}
if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
}
return onDemand, nil
}
log.Info().Msg("sync config not provided or disabled, so not enabling sync")
return nil, nil //nolint: nilnil
}
func getLocalIPs() ([]string, error) {
var localIPs []string
ifaces, err := net.Interfaces()
if err != nil {
return []string{}, err
}
for _, i := range ifaces {
addrs, err := i.Addrs()
if err != nil {
return localIPs, err
}
for _, addr := range addrs {
if localIP, ok := addr.(*net.IPNet); ok {
localIPs = append(localIPs, localIP.IP.String())
}
}
}
return localIPs, nil
}
func getIPFromHostName(host string) ([]string, error) {
addrs, err := net.LookupIP(host)
if err != nil {
return []string{}, err
}
ips := make([]string, 0, len(addrs))
for _, ip := range addrs {
ips = append(ips, ip.String())
}
return ips, nil
}
func removeSelfURLs(config *config.Config, registryConfig *syncconf.RegistryConfig, log log.Logger) error {
// get IP from config
port := config.HTTP.Port
selfAddress := net.JoinHostPort(config.HTTP.Address, port)
// get all local IPs from interfaces
localIPs, err := getLocalIPs()
if err != nil {
return err
}
for idx := len(registryConfig.URLs) - 1; idx >= 0; idx-- {
registryURL := registryConfig.URLs[idx]
url, err := url.Parse(registryURL)
if err != nil {
log.Error().Str("url", registryURL).Msg("failed to parse sync registry url, removing it")
registryConfig.URLs = append(registryConfig.URLs[:idx], registryConfig.URLs[idx+1:]...)
continue
}
// check self address
if strings.Contains(registryURL, selfAddress) {
log.Info().Str("url", registryURL).Msg("removing local registry url")
registryConfig.URLs = append(registryConfig.URLs[:idx], registryConfig.URLs[idx+1:]...)
continue
}
// check dns
ips, err := getIPFromHostName(url.Hostname())
if err != nil {
// will not remove, maybe it will get resolved later after multiple retries
log.Warn().Str("url", registryURL).Msg("failed to lookup sync registry url's hostname")
continue
}
var removed bool
for _, localIP := range localIPs {
// if ip resolved from hostname/dns is equal with any local ip
for _, ip := range ips {
if net.JoinHostPort(ip, url.Port()) == net.JoinHostPort(localIP, port) {
registryConfig.URLs = append(registryConfig.URLs[:idx], registryConfig.URLs[idx+1:]...)
removed = true
break
}
}
if removed {
break
}
}
}
return nil
}