0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-16 21:56:37 -05:00

fix(sync): sync generator now backs off on errors (#2272)

handle unsupported features like oci artifacts.

closes: #2238

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu 2024-03-04 19:44:11 +02:00 committed by GitHub
parent 740eae8f26
commit 6f00e843a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 141 additions and 21 deletions

View file

@ -57,8 +57,10 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
interval := registryConfig.PollInterval
gen := sync.NewTaskGenerator(service, interval, log)
sch.SubmitGenerator(gen, interval, scheduler.MediumPriority)
}
if isOnDemand {

View file

@ -59,6 +59,7 @@ func (gen *DBUpdateTaskGenerator) Next() (scheduler.Task, error) {
newTask = newDBUpdadeTask(gen.interval, gen.scanner, gen, gen.log)
gen.status = running
}
gen.lock.Unlock()
return newTask, nil

View file

@ -0,0 +1,53 @@
package features
import (
"sync"
"time"
)
const defaultExpireMinutes = 10
type featureKey struct {
kind string
repo string
}
type featureVal struct {
enabled bool
expire time.Time
}
type Map struct {
store map[featureKey]*featureVal
expireAfter time.Duration
mu *sync.Mutex
}
func New() *Map {
return &Map{
store: make(map[featureKey]*featureVal),
expireAfter: defaultExpireMinutes * time.Minute,
mu: new(sync.Mutex),
}
}
// returns if registry supports this feature and if ok.
func (f *Map) Get(kind, repo string) (bool, bool) {
f.mu.Lock()
defer f.mu.Unlock()
if feature, ok := f.store[featureKey{kind, repo}]; ok {
if time.Now().Before(feature.expire) {
return feature.enabled, true
}
}
// feature expired or not found
return false, false
}
func (f *Map) Set(kind, repo string, enabled bool) {
f.mu.Lock()
f.store[featureKey{kind: kind, repo: repo}] = &featureVal{enabled: enabled, expire: time.Now().Add(f.expireAfter)}
f.mu.Unlock()
}

View file

@ -6,6 +6,7 @@ package references
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
@ -14,7 +15,10 @@ import (
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/sigstore/cosign/v2/pkg/oci/static"
zerr "zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/common"
"zotregistry.dev/zot/pkg/extensions/sync/constants"
"zotregistry.dev/zot/pkg/extensions/sync/features"
client "zotregistry.dev/zot/pkg/extensions/sync/httpclient"
"zotregistry.dev/zot/pkg/log"
mTypes "zotregistry.dev/zot/pkg/meta/types"
@ -33,13 +37,14 @@ type Reference interface {
type References struct {
referenceList []Reference
features *features.Map
log log.Logger
}
func NewReferences(httpClient *client.Client, storeController storage.StoreController,
metaDB mTypes.MetaDB, log log.Logger,
) References {
refs := References{log: log}
refs := References{features: features.New(), log: log}
refs.referenceList = append(refs.referenceList, NewCosignReference(httpClient, storeController, metaDB, log))
refs.referenceList = append(refs.referenceList, NewTagReferences(httpClient, storeController, metaDB, log))
@ -78,12 +83,30 @@ func (refs References) syncAll(ctx context.Context, localRepo, upstreamRepo,
// for each reference type(cosign/oci/oras reference)
for _, ref := range refs.referenceList {
supported, ok := refs.features.Get(ref.Name(), upstreamRepo)
if !supported && ok {
continue
}
syncedRefsDigests, err = ref.SyncReferences(ctx, localRepo, upstreamRepo, subjectDigestStr)
if err != nil {
// for all referrers we can stop querying same repo (for ten minutes) if the errors are different than 404
if !errors.Is(err, zerr.ErrSyncReferrerNotFound) {
refs.features.Set(ref.Name(), upstreamRepo, false)
}
// in the case of oci referrers, it will return 404 only if the repo is not found or refferers API is not supported
// no need to continue to make requests to the same repo
if ref.Name() == constants.OCI && errors.Is(err, zerr.ErrSyncReferrerNotFound) {
refs.features.Set(ref.Name(), upstreamRepo, false)
}
refs.log.Debug().Err(err).
Str("reference type", ref.Name()).
Str("image", fmt.Sprintf("%s:%s", upstreamRepo, subjectDigestStr)).
Msg("couldn't sync image referrer")
} else {
refs.features.Set(ref.Name(), upstreamRepo, true)
}
// for each synced references

View file

@ -364,8 +364,6 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
}, service.retryOptions); err != nil {
service.log.Error().Str("errorType", common.TypeOf(err)).Str("repository", repo).
Err(err).Msg("failed to sync tags for repository")
return err
}
}
}

View file

@ -6,6 +6,8 @@ package sync
import (
"context"
"fmt"
"sync"
"time"
"github.com/containers/common/pkg/retry"
"github.com/containers/image/v5/types"
@ -80,18 +82,26 @@ type Destination interface {
}
type TaskGenerator struct {
Service Service
lastRepo string
done bool
log log.Logger
Service Service
lastRepo string
done bool
waitTime time.Duration
lastTaskTime time.Time
maxWaitTime time.Duration
lock *sync.Mutex
log log.Logger
}
func NewTaskGenerator(service Service, log log.Logger) *TaskGenerator {
func NewTaskGenerator(service Service, maxWaitTime time.Duration, log log.Logger) *TaskGenerator {
return &TaskGenerator{
Service: service,
done: false,
lastRepo: "",
log: log,
Service: service,
done: false,
waitTime: 0,
lastTaskTime: time.Now(),
lock: &sync.Mutex{},
lastRepo: "",
maxWaitTime: maxWaitTime,
log: log,
}
}
@ -100,27 +110,35 @@ func (gen *TaskGenerator) Name() string {
}
func (gen *TaskGenerator) Next() (scheduler.Task, error) {
gen.lock.Lock()
defer gen.lock.Unlock()
if time.Since(gen.lastTaskTime) <= gen.waitTime {
return nil, nil
}
if err := gen.Service.SetNextAvailableURL(); err != nil {
gen.increaseWaitTime()
return nil, err
}
repo, err := gen.Service.GetNextRepo(gen.lastRepo)
if err != nil {
gen.increaseWaitTime()
return nil, err
}
gen.resetWaitTime()
if repo == "" {
gen.log.Info().Str("component", "sync").Msg("finished syncing all repos")
gen.log.Info().Str("component", "sync").Msg("finished syncing all repositories")
gen.done = true
return nil, nil
}
// a task with this repo is already running
if gen.lastRepo == repo {
return nil, nil
}
gen.lastRepo = repo
return newSyncRepoTask(gen.lastRepo, gen.Service), nil
@ -135,9 +153,34 @@ func (gen *TaskGenerator) IsReady() bool {
}
func (gen *TaskGenerator) Reset() {
gen.lock.Lock()
defer gen.lock.Unlock()
gen.lastRepo = ""
gen.Service.ResetCatalog()
gen.done = false
gen.waitTime = 0
}
func (gen *TaskGenerator) increaseWaitTime() {
if gen.waitTime == 0 {
gen.waitTime = time.Second
}
gen.waitTime *= 2
// max wait time should not exceed generator interval.
if gen.waitTime > gen.maxWaitTime {
gen.waitTime = gen.maxWaitTime
}
gen.lastTaskTime = time.Now()
}
// resets wait time.
func (gen *TaskGenerator) resetWaitTime() {
gen.lastTaskTime = time.Now()
gen.waitTime = 0
}
type syncRepoTask struct {
@ -154,7 +197,7 @@ func (srt *syncRepoTask) DoWork(ctx context.Context) error {
}
func (srt *syncRepoTask) String() string {
return fmt.Sprintf("{Name: \"%s\", repo: \"%s\"}",
return fmt.Sprintf("{Name: \"%s\", repository: \"%s\"}",
srt.Name(), srt.repo)
}