diff --git a/examples/README.md b/examples/README.md index 9b0daeaf..9df51e2c 100644 --- a/examples/README.md +++ b/examples/README.md @@ -252,7 +252,15 @@ Behaviour-based action list } ``` +#### Scheduler Workers +The number of workers for the task scheduler has the default value of runtime.NumCPU()*4, and it is configurable with: + +``` + "scheduler": { + "numWorkers": 3 + +``` ## Logging diff --git a/examples/config-scheduler.json b/examples/config-scheduler.json new file mode 100644 index 00000000..3d511a6b --- /dev/null +++ b/examples/config-scheduler.json @@ -0,0 +1,16 @@ +{ + "distSpecVersion": "1.1.0-dev", + "storage": { + "rootDirectory": "/tmp/zot" + }, + "http": { + "address": "127.0.0.1", + "port": "8080" + }, + "scheduler": { + "numWorkers": 3 + }, + "log": { + "level": "debug" + } +} diff --git a/pkg/api/config/config.go b/pkg/api/config/config.go index 38b0d679..d1a891cc 100644 --- a/pkg/api/config/config.go +++ b/pkg/api/config/config.go @@ -74,6 +74,10 @@ type HTTPConfig struct { Ratelimit *RatelimitConfig `mapstructure:",omitempty"` } +type SchedulerConfig struct { + NumWorkers int +} + type LDAPConfig struct { Port int Insecure bool @@ -151,6 +155,7 @@ type Config struct { HTTP HTTPConfig Log *LogConfig Extensions *extconf.ExtensionConfig + Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"` } func New() *Config { diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 4340871f..1ce0969b 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -312,7 +312,7 @@ func (c *Controller) Shutdown() { } func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { - taskScheduler := scheduler.NewScheduler(c.Log) + taskScheduler := scheduler.NewScheduler(c.Config, c.Log) taskScheduler.RunScheduler(reloadCtx) // Enable running garbage-collect periodically for DefaultStore diff --git a/pkg/extensions/extension_scrub.go b/pkg/extensions/extension_scrub.go index af49a096..94bb2435 100644 --- a/pkg/extensions/extension_scrub.go +++ b/pkg/extensions/extension_scrub.go @@ -57,7 +57,7 @@ type taskGenerator struct { done bool } -func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) { +func (gen *taskGenerator) Next() (scheduler.Task, error) { repo, err := gen.imgStore.GetNextRepository(gen.lastRepo) if err != nil && !errors.Is(err, io.EOF) { diff --git a/pkg/extensions/extension_search.go b/pkg/extensions/extension_search.go index aefa104b..37ac24e4 100644 --- a/pkg/extensions/extension_search.go +++ b/pkg/extensions/extension_search.go @@ -86,7 +86,7 @@ type TrivyTaskGenerator struct { lock *sync.Mutex } -func (gen *TrivyTaskGenerator) GenerateTask() (scheduler.Task, error) { +func (gen *TrivyTaskGenerator) Next() (scheduler.Task, error) { var newTask scheduler.Task gen.lock.Lock() diff --git a/pkg/extensions/extension_search_test.go b/pkg/extensions/extension_search_test.go index b0fa970e..668280e7 100644 --- a/pkg/extensions/extension_search_test.go +++ b/pkg/extensions/extension_search_test.go @@ -5,6 +5,7 @@ package extensions_test import ( "context" + "io" "os" "testing" "time" @@ -12,6 +13,7 @@ import ( ispec "github.com/opencontainers/image-spec/specs-go/v1" . "github.com/smartystreets/goconvey/convey" + "zotregistry.io/zot/pkg/api/config" . "zotregistry.io/zot/pkg/extensions" cveinfo "zotregistry.io/zot/pkg/extensions/search/cve" "zotregistry.io/zot/pkg/log" @@ -30,8 +32,13 @@ func TestTrivyDBGenerator(t *testing.T) { defer os.Remove(logFile.Name()) // clean up - logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(logger) + logger := log.NewLogger("debug", logPath) + writers := io.MultiWriter(os.Stdout, logFile) + logger.Logger = logger.Output(writers) + + cfg := config.New() + cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} + sch := scheduler.NewScheduler(cfg, logger) repoDB := &mocks.RepoDBMock{ GetRepoMetaFn: func(repo string) (repodb.RepoMetadata, error) { @@ -56,13 +63,14 @@ func TestTrivyDBGenerator(t *testing.T) { sch.SubmitGenerator(generator, 12000*time.Millisecond, scheduler.HighPriority) ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) defer cancel() // Wait for trivy db to download found, err := ReadLogFileAndCountStringOccurence(logPath, - "DB update completed, next update scheduled", 90*time.Second, 2) + "DB update completed, next update scheduled", 120*time.Second, 2) So(err, ShouldBeNil) So(found, ShouldBeTrue) }) diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index de039abd..5d7c7185 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -88,7 +88,7 @@ func NewTaskGenerator(service Service, log log.Logger) *TaskGenerator { } } -func (gen *TaskGenerator) GenerateTask() (scheduler.Task, error) { +func (gen *TaskGenerator) Next() (scheduler.Task, error) { if err := gen.Service.SetNextAvailableURL(); err != nil { return nil, err } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2a7580b8..6acfdfc1 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -3,9 +3,11 @@ package scheduler import ( "container/heap" "context" + "runtime" "sync" "time" + "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/log" ) @@ -55,7 +57,7 @@ func (pq *generatorsPriorityQueue) Pop() any { const ( rateLimiterScheduler = 400 rateLimit = 5 * time.Second - numWorkers = 3 + numWorkersMultiplier = 4 ) type Scheduler struct { @@ -68,13 +70,15 @@ type Scheduler struct { log log.Logger stopCh chan struct{} RateLimit time.Duration + NumWorkers int } -func NewScheduler(logC log.Logger) *Scheduler { +func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { chLow := make(chan Task, rateLimiterScheduler) chMedium := make(chan Task, rateLimiterScheduler) chHigh := make(chan Task, rateLimiterScheduler) generatorPQ := make(generatorsPriorityQueue, 0) + numWorkers := getNumWorkers(cfg) sublogger := logC.With().Str("component", "scheduler").Logger() heap.Init(&generatorPQ) @@ -88,7 +92,8 @@ func NewScheduler(logC log.Logger) *Scheduler { log: log.Logger{Logger: sublogger}, stopCh: make(chan struct{}), // default value - RateLimit: rateLimit, + RateLimit: rateLimit, + NumWorkers: numWorkers, } } @@ -110,6 +115,8 @@ func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) { func (scheduler *Scheduler) RunScheduler(ctx context.Context) { throttle := time.NewTicker(rateLimit).C + + numWorkers := scheduler.NumWorkers tasksWorker := make(chan Task, numWorkers) // start worker pool @@ -274,8 +281,8 @@ const ( done ) -type Generator interface { - GenerateTask() (Task, error) +type TaskGenerator interface { + Next() (Task, error) IsDone() bool Reset() } @@ -285,7 +292,7 @@ type generator struct { lastRun time.Time done bool priority Priority - taskGenerator Generator + taskGenerator TaskGenerator remainingTask Task index int } @@ -298,7 +305,7 @@ func (gen *generator) generate(sch *Scheduler) { // in case there is no task already generated, generate a new task if gen.remainingTask == nil { - nextTask, err := gen.taskGenerator.GenerateTask() + nextTask, err := gen.taskGenerator.Next() if err != nil { sch.log.Error().Err(err).Msg("scheduler: error while executing generator") @@ -347,7 +354,7 @@ func (gen *generator) getState() state { return ready } -func (scheduler *Scheduler) SubmitGenerator(taskGenerator Generator, interval time.Duration, priority Priority) { +func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) { newGenerator := &generator{ interval: interval, done: false, @@ -362,3 +369,11 @@ func (scheduler *Scheduler) SubmitGenerator(taskGenerator Generator, interval ti // add generator to the generators priority queue heap.Push(&scheduler.generators, newGenerator) } + +func getNumWorkers(cfg *config.Config) int { + if cfg.Scheduler != nil && cfg.Scheduler.NumWorkers != 0 { + return cfg.Scheduler.NumWorkers + } + + return runtime.NumCPU() * numWorkersMultiplier +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e36cc3b1..d2112fc4 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -5,11 +5,13 @@ import ( "errors" "fmt" "os" + "runtime" "testing" "time" . "github.com/smartystreets/goconvey/convey" + "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/scheduler" ) @@ -40,7 +42,7 @@ type generator struct { step int } -func (g *generator) GenerateTask() (scheduler.Task, error) { +func (g *generator) Next() (scheduler.Task, error) { if g.step > 1 { g.done = true } @@ -67,7 +69,7 @@ type shortGenerator struct { step int } -func (g *shortGenerator) GenerateTask() (scheduler.Task, error) { +func (g *shortGenerator) Next() (scheduler.Task, error) { g.done = true return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil @@ -90,7 +92,7 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(logger) + sch := scheduler.NewScheduler(config.New(), logger) genH := &shortGenerator{log: logger, priority: "high priority"} // interval has to be higher than throttle value to simulate @@ -114,7 +116,9 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(logger) + cfg := config.New() + cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} + sch := scheduler.NewScheduler(cfg, logger) genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority) @@ -126,6 +130,7 @@ func TestScheduler(t *testing.T) { sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) time.Sleep(4 * time.Second) @@ -147,7 +152,7 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(logger) + sch := scheduler.NewScheduler(config.New(), logger) t := &task{log: logger, msg: "", err: true} sch.SubmitTask(t, scheduler.MediumPriority) @@ -171,7 +176,7 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(logger) + sch := scheduler.NewScheduler(config.New(), logger) genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) @@ -195,7 +200,7 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(logger) + sch := scheduler.NewScheduler(config.New(), logger) t := &task{log: logger, msg: "", err: false} sch.SubmitTask(t, -1) @@ -212,7 +217,7 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(logger) + sch := scheduler.NewScheduler(config.New(), logger) ctx, cancel := context.WithCancel(context.Background()) @@ -228,3 +233,17 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task") }) } + +func TestGetNumWorkers(t *testing.T) { + Convey("Test setting the number of workers - default value", t, func() { + sch := scheduler.NewScheduler(config.New(), log.NewLogger("debug", "logFile")) + So(sch.NumWorkers, ShouldEqual, runtime.NumCPU()*4) + }) + + Convey("Test setting the number of workers - getting the value from config", t, func() { + cfg := config.New() + cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} + sch := scheduler.NewScheduler(cfg, log.NewLogger("debug", "logFile")) + So(sch.NumWorkers, ShouldEqual, 3) + }) +} diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index 9316ab1c..cd8a1cab 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -694,7 +694,7 @@ type DedupeTaskGenerator struct { Log zerolog.Logger } -func (gen *DedupeTaskGenerator) GenerateTask() (scheduler.Task, error) { +func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { var err error // get all blobs from storage.imageStore and group them by digest diff --git a/pkg/storage/local/local.go b/pkg/storage/local/local.go index b1e773c1..d4188ae2 100644 --- a/pkg/storage/local/local.go +++ b/pkg/storage/local/local.go @@ -1769,7 +1769,7 @@ type taskGenerator struct { done bool } -func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) { +func (gen *taskGenerator) Next() (scheduler.Task, error) { repo, err := gen.imgStore.GetNextRepository(gen.lastRepo) if err != nil && !errors.Is(err, io.EOF) { diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index b80ac558..76831a11 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -26,6 +26,7 @@ import ( . "github.com/smartystreets/goconvey/convey" zerr "zotregistry.io/zot/errors" + "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/common" "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" @@ -48,7 +49,7 @@ const ( var errCache = errors.New("new cache error") func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { - taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{}) taskScheduler.RateLimit = 50 * time.Millisecond ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 5229e01e..5cfa877f 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -201,7 +201,7 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl } func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { - taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{}) taskScheduler.RateLimit = 50 * time.Millisecond ctx, cancel := context.WithCancel(context.Background()) @@ -2006,7 +2006,7 @@ func TestRebuildDedupeIndex(t *testing.T) { Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { - taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{}) taskScheduler.RateLimit = 1 * time.Millisecond ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -2045,7 +2045,7 @@ func TestRebuildDedupeIndex(t *testing.T) { // now from dedupe false to true for i := 0; i < 10; i++ { - taskScheduler := scheduler.NewScheduler(log.Logger{}) + taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{}) taskScheduler.RateLimit = 1 * time.Millisecond ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)