0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-27 23:01:43 -05:00

fix: changing default numWorkers, making it customizable and refactoring scheduler (#1563)

Signed-off-by: Lisca Ana-Roberta <ana.kagome@yahoo.com>
This commit is contained in:
Lisca Ana-Roberta 2023-07-04 11:03:29 +03:00 committed by GitHub
parent 7881ce32b2
commit d4f200c2e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 101 additions and 29 deletions

View file

@ -252,7 +252,15 @@ Behaviour-based action list
}
```
#### Scheduler Workers
The number of workers for the task scheduler has the default value of runtime.NumCPU()*4, and it is configurable with:
```
"scheduler": {
"numWorkers": 3
```
## Logging

View file

@ -0,0 +1,16 @@
{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "/tmp/zot"
},
"http": {
"address": "127.0.0.1",
"port": "8080"
},
"scheduler": {
"numWorkers": 3
},
"log": {
"level": "debug"
}
}

View file

@ -74,6 +74,10 @@ type HTTPConfig struct {
Ratelimit *RatelimitConfig `mapstructure:",omitempty"`
}
type SchedulerConfig struct {
NumWorkers int
}
type LDAPConfig struct {
Port int
Insecure bool
@ -151,6 +155,7 @@ type Config struct {
HTTP HTTPConfig
Log *LogConfig
Extensions *extconf.ExtensionConfig
Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"`
}
func New() *Config {

View file

@ -312,7 +312,7 @@ func (c *Controller) Shutdown() {
}
func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Log)
taskScheduler := scheduler.NewScheduler(c.Config, c.Log)
taskScheduler.RunScheduler(reloadCtx)
// Enable running garbage-collect periodically for DefaultStore

View file

@ -57,7 +57,7 @@ type taskGenerator struct {
done bool
}
func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) {
func (gen *taskGenerator) Next() (scheduler.Task, error) {
repo, err := gen.imgStore.GetNextRepository(gen.lastRepo)
if err != nil && !errors.Is(err, io.EOF) {

View file

@ -86,7 +86,7 @@ type TrivyTaskGenerator struct {
lock *sync.Mutex
}
func (gen *TrivyTaskGenerator) GenerateTask() (scheduler.Task, error) {
func (gen *TrivyTaskGenerator) Next() (scheduler.Task, error) {
var newTask scheduler.Task
gen.lock.Lock()

View file

@ -5,6 +5,7 @@ package extensions_test
import (
"context"
"io"
"os"
"testing"
"time"
@ -12,6 +13,7 @@ import (
ispec "github.com/opencontainers/image-spec/specs-go/v1"
. "github.com/smartystreets/goconvey/convey"
"zotregistry.io/zot/pkg/api/config"
. "zotregistry.io/zot/pkg/extensions"
cveinfo "zotregistry.io/zot/pkg/extensions/search/cve"
"zotregistry.io/zot/pkg/log"
@ -30,8 +32,13 @@ func TestTrivyDBGenerator(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
logger := log.NewLogger("debug", logPath)
writers := io.MultiWriter(os.Stdout, logFile)
logger.Logger = logger.Output(writers)
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, logger)
repoDB := &mocks.RepoDBMock{
GetRepoMetaFn: func(repo string) (repodb.RepoMetadata, error) {
@ -56,13 +63,14 @@ func TestTrivyDBGenerator(t *testing.T) {
sch.SubmitGenerator(generator, 12000*time.Millisecond, scheduler.HighPriority)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
defer cancel()
// Wait for trivy db to download
found, err := ReadLogFileAndCountStringOccurence(logPath,
"DB update completed, next update scheduled", 90*time.Second, 2)
"DB update completed, next update scheduled", 120*time.Second, 2)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)
})

View file

@ -88,7 +88,7 @@ func NewTaskGenerator(service Service, log log.Logger) *TaskGenerator {
}
}
func (gen *TaskGenerator) GenerateTask() (scheduler.Task, error) {
func (gen *TaskGenerator) Next() (scheduler.Task, error) {
if err := gen.Service.SetNextAvailableURL(); err != nil {
return nil, err
}

View file

@ -3,9 +3,11 @@ package scheduler
import (
"container/heap"
"context"
"runtime"
"sync"
"time"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/log"
)
@ -55,7 +57,7 @@ func (pq *generatorsPriorityQueue) Pop() any {
const (
rateLimiterScheduler = 400
rateLimit = 5 * time.Second
numWorkers = 3
numWorkersMultiplier = 4
)
type Scheduler struct {
@ -68,13 +70,15 @@ type Scheduler struct {
log log.Logger
stopCh chan struct{}
RateLimit time.Duration
NumWorkers int
}
func NewScheduler(logC log.Logger) *Scheduler {
func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
chLow := make(chan Task, rateLimiterScheduler)
chMedium := make(chan Task, rateLimiterScheduler)
chHigh := make(chan Task, rateLimiterScheduler)
generatorPQ := make(generatorsPriorityQueue, 0)
numWorkers := getNumWorkers(cfg)
sublogger := logC.With().Str("component", "scheduler").Logger()
heap.Init(&generatorPQ)
@ -88,7 +92,8 @@ func NewScheduler(logC log.Logger) *Scheduler {
log: log.Logger{Logger: sublogger},
stopCh: make(chan struct{}),
// default value
RateLimit: rateLimit,
RateLimit: rateLimit,
NumWorkers: numWorkers,
}
}
@ -110,6 +115,8 @@ func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) {
func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
throttle := time.NewTicker(rateLimit).C
numWorkers := scheduler.NumWorkers
tasksWorker := make(chan Task, numWorkers)
// start worker pool
@ -274,8 +281,8 @@ const (
done
)
type Generator interface {
GenerateTask() (Task, error)
type TaskGenerator interface {
Next() (Task, error)
IsDone() bool
Reset()
}
@ -285,7 +292,7 @@ type generator struct {
lastRun time.Time
done bool
priority Priority
taskGenerator Generator
taskGenerator TaskGenerator
remainingTask Task
index int
}
@ -298,7 +305,7 @@ func (gen *generator) generate(sch *Scheduler) {
// in case there is no task already generated, generate a new task
if gen.remainingTask == nil {
nextTask, err := gen.taskGenerator.GenerateTask()
nextTask, err := gen.taskGenerator.Next()
if err != nil {
sch.log.Error().Err(err).Msg("scheduler: error while executing generator")
@ -347,7 +354,7 @@ func (gen *generator) getState() state {
return ready
}
func (scheduler *Scheduler) SubmitGenerator(taskGenerator Generator, interval time.Duration, priority Priority) {
func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) {
newGenerator := &generator{
interval: interval,
done: false,
@ -362,3 +369,11 @@ func (scheduler *Scheduler) SubmitGenerator(taskGenerator Generator, interval ti
// add generator to the generators priority queue
heap.Push(&scheduler.generators, newGenerator)
}
func getNumWorkers(cfg *config.Config) int {
if cfg.Scheduler != nil && cfg.Scheduler.NumWorkers != 0 {
return cfg.Scheduler.NumWorkers
}
return runtime.NumCPU() * numWorkersMultiplier
}

View file

@ -5,11 +5,13 @@ import (
"errors"
"fmt"
"os"
"runtime"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
)
@ -40,7 +42,7 @@ type generator struct {
step int
}
func (g *generator) GenerateTask() (scheduler.Task, error) {
func (g *generator) Next() (scheduler.Task, error) {
if g.step > 1 {
g.done = true
}
@ -67,7 +69,7 @@ type shortGenerator struct {
step int
}
func (g *shortGenerator) GenerateTask() (scheduler.Task, error) {
func (g *shortGenerator) Next() (scheduler.Task, error) {
g.done = true
return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil
@ -90,7 +92,7 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
sch := scheduler.NewScheduler(config.New(), logger)
genH := &shortGenerator{log: logger, priority: "high priority"}
// interval has to be higher than throttle value to simulate
@ -114,7 +116,9 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, logger)
genL := &generator{log: logger, priority: "low priority"}
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
@ -126,6 +130,7 @@ func TestScheduler(t *testing.T) {
sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
time.Sleep(4 * time.Second)
@ -147,7 +152,7 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
sch := scheduler.NewScheduler(config.New(), logger)
t := &task{log: logger, msg: "", err: true}
sch.SubmitTask(t, scheduler.MediumPriority)
@ -171,7 +176,7 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
sch := scheduler.NewScheduler(config.New(), logger)
genL := &generator{log: logger, priority: "low priority"}
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority)
@ -195,7 +200,7 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
sch := scheduler.NewScheduler(config.New(), logger)
t := &task{log: logger, msg: "", err: false}
sch.SubmitTask(t, -1)
@ -212,7 +217,7 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
sch := scheduler.NewScheduler(config.New(), logger)
ctx, cancel := context.WithCancel(context.Background())
@ -228,3 +233,17 @@ func TestScheduler(t *testing.T) {
So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task")
})
}
func TestGetNumWorkers(t *testing.T) {
Convey("Test setting the number of workers - default value", t, func() {
sch := scheduler.NewScheduler(config.New(), log.NewLogger("debug", "logFile"))
So(sch.NumWorkers, ShouldEqual, runtime.NumCPU()*4)
})
Convey("Test setting the number of workers - getting the value from config", t, func() {
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, log.NewLogger("debug", "logFile"))
So(sch.NumWorkers, ShouldEqual, 3)
})
}

View file

@ -694,7 +694,7 @@ type DedupeTaskGenerator struct {
Log zerolog.Logger
}
func (gen *DedupeTaskGenerator) GenerateTask() (scheduler.Task, error) {
func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
var err error
// get all blobs from storage.imageStore and group them by digest

View file

@ -1769,7 +1769,7 @@ type taskGenerator struct {
done bool
}
func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) {
func (gen *taskGenerator) Next() (scheduler.Task, error) {
repo, err := gen.imgStore.GetNextRepository(gen.lastRepo)
if err != nil && !errors.Is(err, io.EOF) {

View file

@ -26,6 +26,7 @@ import (
. "github.com/smartystreets/goconvey/convey"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/common"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/log"
@ -48,7 +49,7 @@ const (
var errCache = errors.New("new cache error")
func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) {
taskScheduler := scheduler.NewScheduler(log.Logger{})
taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{})
taskScheduler.RateLimit = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())

View file

@ -201,7 +201,7 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl
}
func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) {
taskScheduler := scheduler.NewScheduler(log.Logger{})
taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{})
taskScheduler.RateLimit = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
@ -2006,7 +2006,7 @@ func TestRebuildDedupeIndex(t *testing.T) {
Convey("Intrerrupt rebuilding and restart, checking idempotency", func() {
for i := 0; i < 10; i++ {
taskScheduler := scheduler.NewScheduler(log.Logger{})
taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{})
taskScheduler.RateLimit = 1 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -2045,7 +2045,7 @@ func TestRebuildDedupeIndex(t *testing.T) {
// now from dedupe false to true
for i := 0; i < 10; i++ {
taskScheduler := scheduler.NewScheduler(log.Logger{})
taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{})
taskScheduler.RateLimit = 1 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)