mirror of
https://github.com/project-zot/zot.git
synced 2025-01-20 22:52:51 -05:00
6222dae1f0
wait for workers to finish before exiting should fix tests reporting they couldn't remove rootDir because it's being written by tasks Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
413 lines
8.9 KiB
Go
413 lines
8.9 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"zotregistry.io/zot/pkg/api/config"
|
|
"zotregistry.io/zot/pkg/log"
|
|
)
|
|
|
|
type Task interface {
|
|
DoWork(ctx context.Context) error
|
|
}
|
|
|
|
type generatorsPriorityQueue []*generator
|
|
|
|
func (pq generatorsPriorityQueue) Len() int {
|
|
return len(pq)
|
|
}
|
|
|
|
func (pq generatorsPriorityQueue) Less(i, j int) bool {
|
|
return pq[i].priority > pq[j].priority
|
|
}
|
|
|
|
func (pq generatorsPriorityQueue) Swap(i, j int) {
|
|
pq[i], pq[j] = pq[j], pq[i]
|
|
pq[i].index = i
|
|
pq[j].index = j
|
|
}
|
|
|
|
func (pq *generatorsPriorityQueue) Push(x any) {
|
|
n := len(*pq)
|
|
|
|
item, ok := x.(*generator)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
item.index = n
|
|
*pq = append(*pq, item)
|
|
}
|
|
|
|
func (pq *generatorsPriorityQueue) Pop() any {
|
|
old := *pq
|
|
n := len(old)
|
|
item := old[n-1]
|
|
old[n-1] = nil
|
|
item.index = -1
|
|
*pq = old[0 : n-1]
|
|
|
|
return item
|
|
}
|
|
|
|
const (
|
|
rateLimiterScheduler = 400
|
|
rateLimit = 5 * time.Second
|
|
numWorkersMultiplier = 4
|
|
)
|
|
|
|
type Scheduler struct {
|
|
tasksQLow chan Task
|
|
tasksQMedium chan Task
|
|
tasksQHigh chan Task
|
|
generators generatorsPriorityQueue
|
|
waitingGenerators []*generator
|
|
generatorsLock *sync.Mutex
|
|
log log.Logger
|
|
RateLimit time.Duration
|
|
NumWorkers int
|
|
workerChan chan Task
|
|
workerWg *sync.WaitGroup
|
|
isShuttingDown atomic.Bool
|
|
}
|
|
|
|
func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
|
|
chLow := make(chan Task, rateLimiterScheduler)
|
|
chMedium := make(chan Task, rateLimiterScheduler)
|
|
chHigh := make(chan Task, rateLimiterScheduler)
|
|
generatorPQ := make(generatorsPriorityQueue, 0)
|
|
numWorkers := getNumWorkers(cfg)
|
|
sublogger := logC.With().Str("component", "scheduler").Logger()
|
|
|
|
heap.Init(&generatorPQ)
|
|
|
|
return &Scheduler{
|
|
tasksQLow: chLow,
|
|
tasksQMedium: chMedium,
|
|
tasksQHigh: chHigh,
|
|
generators: generatorPQ,
|
|
generatorsLock: new(sync.Mutex),
|
|
log: log.Logger{Logger: sublogger},
|
|
// default value
|
|
RateLimit: rateLimit,
|
|
NumWorkers: numWorkers,
|
|
workerChan: make(chan Task, numWorkers),
|
|
workerWg: new(sync.WaitGroup),
|
|
}
|
|
}
|
|
|
|
func (scheduler *Scheduler) poolWorker(ctx context.Context) {
|
|
for i := 0; i < scheduler.NumWorkers; i++ {
|
|
go func(workerID int) {
|
|
defer scheduler.workerWg.Done()
|
|
|
|
for task := range scheduler.workerChan {
|
|
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")
|
|
|
|
if err := task.DoWork(ctx); err != nil {
|
|
scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task")
|
|
}
|
|
|
|
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: finished task")
|
|
}
|
|
}(i + 1)
|
|
}
|
|
}
|
|
|
|
func (scheduler *Scheduler) Shutdown() {
|
|
if !scheduler.inShutdown() {
|
|
scheduler.shutdown()
|
|
}
|
|
|
|
scheduler.workerWg.Wait()
|
|
}
|
|
|
|
func (scheduler *Scheduler) inShutdown() bool {
|
|
return scheduler.isShuttingDown.Load()
|
|
}
|
|
|
|
func (scheduler *Scheduler) shutdown() {
|
|
close(scheduler.workerChan)
|
|
scheduler.isShuttingDown.Store(true)
|
|
}
|
|
|
|
func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
|
|
throttle := time.NewTicker(rateLimit).C
|
|
|
|
numWorkers := scheduler.NumWorkers
|
|
|
|
// wait all workers to finish their work before exiting from Shutdown()
|
|
scheduler.workerWg.Add(numWorkers)
|
|
|
|
// start worker pool
|
|
go scheduler.poolWorker(ctx)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
if !scheduler.inShutdown() {
|
|
scheduler.shutdown()
|
|
}
|
|
|
|
scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...")
|
|
|
|
return
|
|
default:
|
|
i := 0
|
|
for i < numWorkers {
|
|
task := scheduler.getTask()
|
|
|
|
if task != nil {
|
|
// push tasks into worker pool
|
|
if !scheduler.inShutdown() {
|
|
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
|
|
scheduler.workerChan <- task
|
|
}
|
|
}
|
|
i++
|
|
}
|
|
}
|
|
|
|
<-throttle
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (scheduler *Scheduler) pushReadyGenerators() {
|
|
// iterate through waiting generators list and resubmit those which become ready to run
|
|
for {
|
|
modified := false
|
|
|
|
for i, gen := range scheduler.waitingGenerators {
|
|
if gen.getState() == ready {
|
|
gen.done = false
|
|
heap.Push(&scheduler.generators, gen)
|
|
scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...)
|
|
modified = true
|
|
|
|
scheduler.log.Debug().Msg("scheduler: waiting generator is ready, pushing to ready generators")
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if !modified {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (scheduler *Scheduler) generateTasks() {
|
|
scheduler.generatorsLock.Lock()
|
|
defer scheduler.generatorsLock.Unlock()
|
|
|
|
// resubmit ready generators(which were in a waiting state) to generators priority queue
|
|
scheduler.pushReadyGenerators()
|
|
|
|
// get the highest priority generator from queue
|
|
if scheduler.generators.Len() == 0 {
|
|
return
|
|
}
|
|
|
|
var gen *generator
|
|
|
|
// check if the generator with highest prioriy is ready to run
|
|
if scheduler.generators[0].getState() == ready {
|
|
gen = scheduler.generators[0]
|
|
} else {
|
|
gen, _ = heap.Pop(&scheduler.generators).(*generator)
|
|
if gen.getState() == waiting {
|
|
scheduler.waitingGenerators = append(scheduler.waitingGenerators, gen)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// run generator to generate a new task which will be added to a channel by priority
|
|
gen.generate(scheduler)
|
|
}
|
|
|
|
func (scheduler *Scheduler) getTask() Task {
|
|
// first, generate a task with highest possible priority
|
|
scheduler.generateTasks()
|
|
|
|
// then, return a task with highest possible priority
|
|
select {
|
|
case t := <-scheduler.tasksQHigh:
|
|
return t
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case t := <-scheduler.tasksQMedium:
|
|
return t
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case t := <-scheduler.tasksQLow:
|
|
return t
|
|
default:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (scheduler *Scheduler) getTasksChannelByPriority(priority Priority) chan Task {
|
|
switch priority {
|
|
case LowPriority:
|
|
return scheduler.tasksQLow
|
|
case MediumPriority:
|
|
return scheduler.tasksQMedium
|
|
case HighPriority:
|
|
return scheduler.tasksQHigh
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) {
|
|
// get by priority the channel where the task should be added to
|
|
tasksQ := scheduler.getTasksChannelByPriority(priority)
|
|
if tasksQ == nil {
|
|
return
|
|
}
|
|
|
|
// check if the scheduler it's still running in order to add the task to the channel
|
|
if scheduler.inShutdown() {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case tasksQ <- task:
|
|
scheduler.log.Info().Msg("scheduler: adding a new task")
|
|
default:
|
|
if scheduler.inShutdown() {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type Priority int
|
|
|
|
const (
|
|
LowPriority Priority = iota
|
|
MediumPriority
|
|
HighPriority
|
|
)
|
|
|
|
type state int
|
|
|
|
const (
|
|
ready state = iota
|
|
waiting
|
|
done
|
|
)
|
|
|
|
type TaskGenerator interface {
|
|
Next() (Task, error)
|
|
IsDone() bool
|
|
IsReady() bool
|
|
Reset()
|
|
}
|
|
|
|
type generator struct {
|
|
interval time.Duration
|
|
lastRun time.Time
|
|
done bool
|
|
priority Priority
|
|
taskGenerator TaskGenerator
|
|
remainingTask Task
|
|
index int
|
|
}
|
|
|
|
func (gen *generator) generate(sch *Scheduler) {
|
|
// get by priority the channel where the new generated task should be added to
|
|
taskQ := sch.getTasksChannelByPriority(gen.priority)
|
|
|
|
task := gen.remainingTask
|
|
|
|
// in case there is no task already generated, generate a new task
|
|
if gen.remainingTask == nil {
|
|
nextTask, err := gen.taskGenerator.Next()
|
|
if err != nil {
|
|
sch.log.Error().Err(err).Msg("scheduler: error while executing generator")
|
|
|
|
return
|
|
}
|
|
|
|
task = nextTask
|
|
|
|
// check if the generator is done
|
|
if gen.taskGenerator.IsDone() {
|
|
gen.done = true
|
|
gen.lastRun = time.Now()
|
|
gen.taskGenerator.Reset()
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
// check if it's possible to add a new task to the channel
|
|
// if not, keep the generated task and retry to add it next time
|
|
select {
|
|
case taskQ <- task:
|
|
gen.remainingTask = nil
|
|
|
|
return
|
|
default:
|
|
gen.remainingTask = task
|
|
}
|
|
}
|
|
|
|
// getState() returns the state of a generator.
|
|
// if the generator is not periodic then it can be done or ready to generate a new task.
|
|
// if the generator is periodic then it can be waiting (finished its work and wait for its interval to pass)
|
|
// or ready to generate a new task.
|
|
func (gen *generator) getState() state {
|
|
if gen.interval == time.Duration(0) {
|
|
if gen.done && gen.remainingTask == nil {
|
|
return done
|
|
}
|
|
} else {
|
|
if gen.done && time.Since(gen.lastRun) < gen.interval && gen.remainingTask == nil {
|
|
return waiting
|
|
}
|
|
}
|
|
|
|
if !gen.taskGenerator.IsReady() {
|
|
return waiting
|
|
}
|
|
|
|
return ready
|
|
}
|
|
|
|
func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) {
|
|
newGenerator := &generator{
|
|
interval: interval,
|
|
done: false,
|
|
priority: priority,
|
|
taskGenerator: taskGenerator,
|
|
remainingTask: nil,
|
|
}
|
|
|
|
scheduler.generatorsLock.Lock()
|
|
defer scheduler.generatorsLock.Unlock()
|
|
|
|
// add generator to the generators priority queue
|
|
heap.Push(&scheduler.generators, newGenerator)
|
|
}
|
|
|
|
func getNumWorkers(cfg *config.Config) int {
|
|
if cfg.Scheduler != nil && cfg.Scheduler.NumWorkers != 0 {
|
|
return cfg.Scheduler.NumWorkers
|
|
}
|
|
|
|
return runtime.NumCPU() * numWorkersMultiplier
|
|
}
|