0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-06 22:40:28 -05:00
zot/pkg/scheduler/scheduler.go
peusebiu 59dc4c3229
feat(scheduler): pass the shutdown/reload ctx to running tasks (#1671)
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
2023-09-05 09:48:56 -07:00

384 lines
8.3 KiB
Go

package scheduler
import (
"container/heap"
"context"
"runtime"
"sync"
"time"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/log"
)
type Task interface {
DoWork(ctx context.Context) error
}
type generatorsPriorityQueue []*generator
func (pq generatorsPriorityQueue) Len() int {
return len(pq)
}
func (pq generatorsPriorityQueue) Less(i, j int) bool {
return pq[i].priority > pq[j].priority
}
func (pq generatorsPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *generatorsPriorityQueue) Push(x any) {
n := len(*pq)
item, ok := x.(*generator)
if !ok {
return
}
item.index = n
*pq = append(*pq, item)
}
func (pq *generatorsPriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil
item.index = -1
*pq = old[0 : n-1]
return item
}
const (
rateLimiterScheduler = 400
rateLimit = 5 * time.Second
numWorkersMultiplier = 4
)
type Scheduler struct {
tasksQLow chan Task
tasksQMedium chan Task
tasksQHigh chan Task
generators generatorsPriorityQueue
waitingGenerators []*generator
generatorsLock *sync.Mutex
log log.Logger
stopCh chan struct{}
RateLimit time.Duration
NumWorkers int
}
func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
chLow := make(chan Task, rateLimiterScheduler)
chMedium := make(chan Task, rateLimiterScheduler)
chHigh := make(chan Task, rateLimiterScheduler)
generatorPQ := make(generatorsPriorityQueue, 0)
numWorkers := getNumWorkers(cfg)
sublogger := logC.With().Str("component", "scheduler").Logger()
heap.Init(&generatorPQ)
return &Scheduler{
tasksQLow: chLow,
tasksQMedium: chMedium,
tasksQHigh: chHigh,
generators: generatorPQ,
generatorsLock: new(sync.Mutex),
log: log.Logger{Logger: sublogger},
stopCh: make(chan struct{}),
// default value
RateLimit: rateLimit,
NumWorkers: numWorkers,
}
}
func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, tasks chan Task) {
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
for task := range tasks {
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")
if err := task.DoWork(ctx); err != nil {
scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task")
}
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: finished task")
}
}(i + 1)
}
}
func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
throttle := time.NewTicker(rateLimit).C
numWorkers := scheduler.NumWorkers
tasksWorker := make(chan Task, numWorkers)
// start worker pool
go scheduler.poolWorker(ctx, numWorkers, tasksWorker)
go func() {
for {
select {
case <-ctx.Done():
close(tasksWorker)
close(scheduler.stopCh)
scheduler.log.Debug().Msg("scheduler: received stop signal, exiting...")
return
default:
i := 0
for i < numWorkers {
task := scheduler.getTask()
if task != nil {
// push tasks into worker pool
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
tasksWorker <- task
}
i++
}
}
<-throttle
}
}()
}
func (scheduler *Scheduler) pushReadyGenerators() {
// iterate through waiting generators list and resubmit those which become ready to run
for {
modified := false
for i, gen := range scheduler.waitingGenerators {
if gen.getState() == ready {
gen.done = false
heap.Push(&scheduler.generators, gen)
scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...)
modified = true
scheduler.log.Debug().Msg("scheduler: waiting generator is ready, pushing to ready generators")
break
}
}
if !modified {
break
}
}
}
func (scheduler *Scheduler) generateTasks() {
scheduler.generatorsLock.Lock()
defer scheduler.generatorsLock.Unlock()
// resubmit ready generators(which were in a waiting state) to generators priority queue
scheduler.pushReadyGenerators()
// get the highest priority generator from queue
if scheduler.generators.Len() == 0 {
return
}
var gen *generator
// check if the generator with highest prioriy is ready to run
if scheduler.generators[0].getState() == ready {
gen = scheduler.generators[0]
} else {
gen, _ = heap.Pop(&scheduler.generators).(*generator)
if gen.getState() == waiting {
scheduler.waitingGenerators = append(scheduler.waitingGenerators, gen)
}
return
}
// run generator to generate a new task which will be added to a channel by priority
gen.generate(scheduler)
}
func (scheduler *Scheduler) getTask() Task {
// first, generate a task with highest possible priority
scheduler.generateTasks()
// then, return a task with highest possible priority
select {
case t := <-scheduler.tasksQHigh:
return t
default:
}
select {
case t := <-scheduler.tasksQMedium:
return t
default:
}
select {
case t := <-scheduler.tasksQLow:
return t
default:
}
return nil
}
func (scheduler *Scheduler) getTasksChannelByPriority(priority Priority) chan Task {
switch priority {
case LowPriority:
return scheduler.tasksQLow
case MediumPriority:
return scheduler.tasksQMedium
case HighPriority:
return scheduler.tasksQHigh
}
return nil
}
func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) {
// get by priority the channel where the task should be added to
tasksQ := scheduler.getTasksChannelByPriority(priority)
if tasksQ == nil {
return
}
// check if the scheduler it's still running in order to add the task to the channel
select {
case <-scheduler.stopCh:
return
default:
}
select {
case <-scheduler.stopCh:
return
case tasksQ <- task:
scheduler.log.Info().Msg("scheduler: adding a new task")
}
}
type Priority int
const (
LowPriority Priority = iota
MediumPriority
HighPriority
)
type state int
const (
ready state = iota
waiting
done
)
type TaskGenerator interface {
Next() (Task, error)
IsDone() bool
IsReady() bool
Reset()
}
type generator struct {
interval time.Duration
lastRun time.Time
done bool
priority Priority
taskGenerator TaskGenerator
remainingTask Task
index int
}
func (gen *generator) generate(sch *Scheduler) {
// get by priority the channel where the new generated task should be added to
taskQ := sch.getTasksChannelByPriority(gen.priority)
task := gen.remainingTask
// in case there is no task already generated, generate a new task
if gen.remainingTask == nil {
nextTask, err := gen.taskGenerator.Next()
if err != nil {
sch.log.Error().Err(err).Msg("scheduler: error while executing generator")
return
}
task = nextTask
// check if the generator is done
if gen.taskGenerator.IsDone() {
gen.done = true
gen.lastRun = time.Now()
gen.taskGenerator.Reset()
return
}
}
// check if it's possible to add a new task to the channel
// if not, keep the generated task and retry to add it next time
select {
case taskQ <- task:
gen.remainingTask = nil
return
default:
gen.remainingTask = task
}
}
// getState() returns the state of a generator.
// if the generator is not periodic then it can be done or ready to generate a new task.
// if the generator is periodic then it can be waiting (finished its work and wait for its interval to pass)
// or ready to generate a new task.
func (gen *generator) getState() state {
if gen.interval == time.Duration(0) {
if gen.done && gen.remainingTask == nil {
return done
}
} else {
if gen.done && time.Since(gen.lastRun) < gen.interval && gen.remainingTask == nil {
return waiting
}
}
if !gen.taskGenerator.IsReady() {
return waiting
}
return ready
}
func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) {
newGenerator := &generator{
interval: interval,
done: false,
priority: priority,
taskGenerator: taskGenerator,
remainingTask: nil,
}
scheduler.generatorsLock.Lock()
defer scheduler.generatorsLock.Unlock()
// add generator to the generators priority queue
heap.Push(&scheduler.generators, newGenerator)
}
func getNumWorkers(cfg *config.Config) int {
if cfg.Scheduler != nil && cfg.Scheduler.NumWorkers != 0 {
return cfg.Scheduler.NumWorkers
}
return runtime.NumCPU() * numWorkersMultiplier
}