0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-16 21:56:37 -05:00

feat(scheduler): use an worker pool for scheduler (#1146)

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu 2023-02-15 21:36:50 +02:00 committed by GitHub
parent f00a9e6e48
commit 4aa0106b0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 14 deletions

View file

@ -384,6 +384,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
return err
}
// push from cache to repo
err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log)
if err != nil {

View file

@ -1047,7 +1047,7 @@ func TestMandatoryAnnotations(t *testing.T) {
defer dcm.StopServer()
// give it time to set up sync
time.Sleep(5 * time.Second)
time.Sleep(10 * time.Second)
resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/0.0.1")
So(err, ShouldBeNil)

View file

@ -85,26 +85,55 @@ func NewScheduler(logC log.Logger) *Scheduler {
}
}
const rateLimit = 5 * time.Second
const (
rateLimit = 5 * time.Second
numWorkers = 3
)
func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) {
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
for task := range tasks {
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")
if err := task.DoWork(); err != nil {
scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task")
}
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: finished task")
}
}(i + 1)
}
}
func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
throttle := time.NewTicker(rateLimit).C
tasksWorker := make(chan Task, numWorkers)
// start worker pool
go scheduler.poolWorker(numWorkers, tasksWorker)
go func() {
for {
select {
case <-ctx.Done():
close(tasksWorker)
close(scheduler.stopCh)
return
default:
task := scheduler.getTask()
if task != nil {
if err := task.DoWork(); err != nil {
scheduler.log.Error().Err(err).Msg("error while executing task")
i := 0
for i < numWorkers {
task := scheduler.getTask()
if task != nil {
// push tasks into worker pool
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
tasksWorker <- task
}
i++
}
}
<-throttle
}
}()
@ -122,7 +151,7 @@ func (scheduler *Scheduler) pushReadyGenerators() {
scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...)
modified = true
scheduler.log.Debug().Msg("waiting generator is ready, pushing to ready generators")
scheduler.log.Debug().Msg("scheduler: waiting generator is ready, pushing to ready generators")
break
}
@ -221,7 +250,7 @@ func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) {
case <-scheduler.stopCh:
return
case tasksQ <- task:
scheduler.log.Info().Msg("Adding a new task to the scheduler")
scheduler.log.Info().Msg("scheduler: adding a new task")
}
}
@ -267,7 +296,7 @@ func (gen *generator) generate(sch *Scheduler) {
if gen.remainingTask == nil {
nextTask, err := gen.taskGenerator.GenerateTask()
if err != nil {
sch.log.Error().Err(err).Msg("error while executing generator")
sch.log.Error().Err(err).Msg("scheduler: error while executing generator")
return
}

View file

@ -119,19 +119,24 @@ func TestScheduler(t *testing.T) {
genL := &generator{log: logger, priority: "low priority"}
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
genM := &generator{log: logger, priority: "medium priority"}
sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority)
genH := &generator{log: logger, priority: "high priority"}
sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
time.Sleep(500 * time.Millisecond)
time.Sleep(4 * time.Second)
cancel()
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "executing high priority task; index: 1")
So(string(data), ShouldNotContainSubstring, "executing low priority task; index: 1")
So(string(data), ShouldContainSubstring, "executing high priority task; index: 2")
So(string(data), ShouldNotContainSubstring, "executing medium priority task; index: 1")
So(string(data), ShouldNotContainSubstring, "error while executing task")
})
@ -155,7 +160,7 @@ func TestScheduler(t *testing.T) {
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "Adding a new task to the scheduler")
So(string(data), ShouldContainSubstring, "scheduler: adding a new task")
So(string(data), ShouldContainSubstring, "error while executing task")
})
@ -197,7 +202,7 @@ func TestScheduler(t *testing.T) {
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler")
So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task")
})
Convey("Test adding a new task when context is done", t, func() {
@ -220,6 +225,6 @@ func TestScheduler(t *testing.T) {
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler")
So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task")
})
}