From 4aa0106b0a0684ffd7f71d759fc23b0c8550fe33 Mon Sep 17 00:00:00 2001 From: peusebiu Date: Wed, 15 Feb 2023 21:36:50 +0200 Subject: [PATCH] feat(scheduler): use an worker pool for scheduler (#1146) Signed-off-by: Petu Eusebiu --- pkg/extensions/sync/sync.go | 1 + pkg/extensions/sync/sync_test.go | 2 +- pkg/scheduler/scheduler.go | 45 ++++++++++++++++++++++++++------ pkg/scheduler/scheduler_test.go | 15 +++++++---- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 4af4bbb9..f04cd890 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -384,6 +384,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, return err } + // push from cache to repo err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log) if err != nil { diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 2151cde0..3e1a5412 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -1047,7 +1047,7 @@ func TestMandatoryAnnotations(t *testing.T) { defer dcm.StopServer() // give it time to set up sync - time.Sleep(5 * time.Second) + time.Sleep(10 * time.Second) resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/0.0.1") So(err, ShouldBeNil) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index dda3ffa6..20c37d41 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -85,26 +85,55 @@ func NewScheduler(logC log.Logger) *Scheduler { } } -const rateLimit = 5 * time.Second +const ( + rateLimit = 5 * time.Second + numWorkers = 3 +) + +func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) { + for i := 0; i < numWorkers; i++ { + go func(workerID int) { + for task := range tasks { + scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task") + + if err := task.DoWork(); err != nil { + scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task") + } + + scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: finished task") + } + }(i + 1) + } +} func (scheduler *Scheduler) RunScheduler(ctx context.Context) { throttle := time.NewTicker(rateLimit).C + tasksWorker := make(chan Task, numWorkers) + + // start worker pool + go scheduler.poolWorker(numWorkers, tasksWorker) go func() { for { select { case <-ctx.Done(): + close(tasksWorker) close(scheduler.stopCh) return default: - task := scheduler.getTask() - if task != nil { - if err := task.DoWork(); err != nil { - scheduler.log.Error().Err(err).Msg("error while executing task") + i := 0 + for i < numWorkers { + task := scheduler.getTask() + if task != nil { + // push tasks into worker pool + scheduler.log.Debug().Msg("scheduler: pushing task into worker pool") + tasksWorker <- task } + i++ } } + <-throttle } }() @@ -122,7 +151,7 @@ func (scheduler *Scheduler) pushReadyGenerators() { scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...) modified = true - scheduler.log.Debug().Msg("waiting generator is ready, pushing to ready generators") + scheduler.log.Debug().Msg("scheduler: waiting generator is ready, pushing to ready generators") break } @@ -221,7 +250,7 @@ func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) { case <-scheduler.stopCh: return case tasksQ <- task: - scheduler.log.Info().Msg("Adding a new task to the scheduler") + scheduler.log.Info().Msg("scheduler: adding a new task") } } @@ -267,7 +296,7 @@ func (gen *generator) generate(sch *Scheduler) { if gen.remainingTask == nil { nextTask, err := gen.taskGenerator.GenerateTask() if err != nil { - sch.log.Error().Err(err).Msg("error while executing generator") + sch.log.Error().Err(err).Msg("scheduler: error while executing generator") return } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 0ad925f5..e36cc3b1 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -119,19 +119,24 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority) + genM := &generator{log: logger, priority: "medium priority"} + sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority) + genH := &generator{log: logger, priority: "high priority"} sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) ctx, cancel := context.WithCancel(context.Background()) sch.RunScheduler(ctx) - time.Sleep(500 * time.Millisecond) + time.Sleep(4 * time.Second) cancel() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") - So(string(data), ShouldNotContainSubstring, "executing low priority task; index: 1") + So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") + So(string(data), ShouldNotContainSubstring, "executing medium priority task; index: 1") So(string(data), ShouldNotContainSubstring, "error while executing task") }) @@ -155,7 +160,7 @@ func TestScheduler(t *testing.T) { data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "Adding a new task to the scheduler") + So(string(data), ShouldContainSubstring, "scheduler: adding a new task") So(string(data), ShouldContainSubstring, "error while executing task") }) @@ -197,7 +202,7 @@ func TestScheduler(t *testing.T) { data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) - So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler") + So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task") }) Convey("Test adding a new task when context is done", t, func() { @@ -220,6 +225,6 @@ func TestScheduler(t *testing.T) { data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) - So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler") + So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task") }) }