mirror of
https://github.com/project-zot/zot.git
synced 2024-12-16 21:56:37 -05:00
fix: the scheduler is now fair (#2158)
Generators are now ordered by rank in the priority queue. The rank computation formula is: - 100/(1+generated_task_count) for high priority tasks - 10/(1+generated_task_count) for medium priority tasks - 1/(1+generated_task_count) for low priority tasks Note the ranks are used when comparing generators both with the same priority and with different priority. So now we are: - giving an opportunity to all generators with the same priority to take turns generating tasks - giving roughly 1 low priority and 10 medium priority tasks the opportunity to run for every 100 high priority tasks running. After a generator generates a task, the generators are reordered in the priority queue based on rank. Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
This commit is contained in:
parent
e9ab520905
commit
8215766720
2 changed files with 146 additions and 17 deletions
|
@ -3,6 +3,7 @@ package scheduler
|
|||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -26,7 +27,7 @@ func (pq generatorsPriorityQueue) Len() int {
|
|||
}
|
||||
|
||||
func (pq generatorsPriorityQueue) Less(i, j int) bool {
|
||||
return pq[i].priority > pq[j].priority
|
||||
return pq[i].getRanking() > pq[j].getRanking()
|
||||
}
|
||||
|
||||
func (pq generatorsPriorityQueue) Swap(i, j int) {
|
||||
|
@ -331,7 +332,13 @@ func (scheduler *Scheduler) generateTasks() {
|
|||
|
||||
// check if the generator with highest priority is ready to run
|
||||
if scheduler.generators[0].getState() == Ready {
|
||||
// we are not popping it as we will generate multiple tasks until it is done
|
||||
// we are going to pop after all tasks are generated
|
||||
gen = scheduler.generators[0]
|
||||
|
||||
// trigger a generator reorder, as generating a task may impact the order
|
||||
// equivalent of pop/remove followed by push, but more efficient
|
||||
heap.Fix(&scheduler.generators, 0)
|
||||
} else {
|
||||
gen, _ = heap.Pop(&scheduler.generators).(*generator)
|
||||
if gen.getState() == Waiting {
|
||||
|
@ -439,6 +446,7 @@ type generator struct {
|
|||
taskGenerator TaskGenerator
|
||||
remainingTask Task
|
||||
index int
|
||||
taskCount int64
|
||||
}
|
||||
|
||||
func (gen *generator) generate(sch *Scheduler) {
|
||||
|
@ -460,6 +468,7 @@ func (gen *generator) generate(sch *Scheduler) {
|
|||
if gen.taskGenerator.IsDone() {
|
||||
gen.done = true
|
||||
gen.lastRun = time.Now()
|
||||
gen.taskCount = 0
|
||||
gen.taskGenerator.Reset()
|
||||
|
||||
return
|
||||
|
@ -468,6 +477,9 @@ func (gen *generator) generate(sch *Scheduler) {
|
|||
task = nextTask
|
||||
}
|
||||
|
||||
// keep track of generated task count to use it for generator ordering
|
||||
gen.taskCount++
|
||||
|
||||
// check if it's possible to add a new task to the channel
|
||||
// if not, keep the generated task and retry to add it next time
|
||||
select {
|
||||
|
@ -502,12 +514,19 @@ func (gen *generator) getState() State {
|
|||
return Ready
|
||||
}
|
||||
|
||||
func (gen *generator) getRanking() float64 {
|
||||
// take into account the priority, but also how many tasks of
|
||||
// a specific generator were executed in the current generator run
|
||||
return math.Pow(10, float64(gen.priority)) / (1 + float64(gen.taskCount)) //nolint:gomnd
|
||||
}
|
||||
|
||||
func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) {
|
||||
newGenerator := &generator{
|
||||
interval: interval,
|
||||
done: false,
|
||||
priority: priority,
|
||||
taskGenerator: taskGenerator,
|
||||
taskCount: 0,
|
||||
remainingTask: nil,
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -18,9 +19,10 @@ import (
|
|||
)
|
||||
|
||||
type task struct {
|
||||
log log.Logger
|
||||
msg string
|
||||
err bool
|
||||
log log.Logger
|
||||
msg string
|
||||
err bool
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
var errInternal = errors.New("task: internal error")
|
||||
|
@ -35,7 +37,7 @@ func (t *task) DoWork(ctx context.Context) error {
|
|||
return ctx.Err()
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(t.delay)
|
||||
}
|
||||
|
||||
t.log.Info().Msg(t.msg)
|
||||
|
@ -52,15 +54,17 @@ func (t *task) Name() string {
|
|||
}
|
||||
|
||||
type generator struct {
|
||||
log log.Logger
|
||||
priority string
|
||||
done bool
|
||||
index int
|
||||
step int
|
||||
log log.Logger
|
||||
priority string
|
||||
done bool
|
||||
index int
|
||||
step int
|
||||
limit int
|
||||
taskDelay time.Duration
|
||||
}
|
||||
|
||||
func (g *generator) Next() (scheduler.Task, error) {
|
||||
if g.step > 100 {
|
||||
if g.step > g.limit {
|
||||
g.done = true
|
||||
}
|
||||
g.step++
|
||||
|
@ -74,7 +78,12 @@ func (g *generator) Next() (scheduler.Task, error) {
|
|||
return nil, errInternal
|
||||
}
|
||||
|
||||
return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil
|
||||
return &task{
|
||||
log: g.log,
|
||||
msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index),
|
||||
err: false,
|
||||
delay: g.taskDelay,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (g *generator) IsDone() bool {
|
||||
|
@ -154,13 +163,13 @@ func TestScheduler(t *testing.T) {
|
|||
sch := scheduler.NewScheduler(cfg, metrics, logger)
|
||||
sch.RateLimit = 5 * time.Second
|
||||
|
||||
genL := &generator{log: logger, priority: "low priority"}
|
||||
genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
||||
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
|
||||
|
||||
genM := &generator{log: logger, priority: "medium priority"}
|
||||
genM := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
||||
sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority)
|
||||
|
||||
genH := &generator{log: logger, priority: "high priority"}
|
||||
genH := &generator{log: logger, priority: "high priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
||||
sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority)
|
||||
|
||||
sch.RunScheduler()
|
||||
|
@ -176,6 +185,107 @@ func TestScheduler(t *testing.T) {
|
|||
So(string(data), ShouldNotContainSubstring, "failed to execute task")
|
||||
})
|
||||
|
||||
Convey("Test reordering of generators in queue", t, func() {
|
||||
logFile, err := os.CreateTemp("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
logger := log.NewLogger("debug", logFile.Name())
|
||||
cfg := config.New()
|
||||
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
|
||||
metrics := monitoring.NewMetricsServer(true, logger)
|
||||
sch := scheduler.NewScheduler(cfg, metrics, logger)
|
||||
sch.RateLimit = 1 * time.Nanosecond
|
||||
|
||||
// Testing repordering of generators using the same medium priority, as well as reordering with
|
||||
// a low priority generator
|
||||
|
||||
genL := &generator{log: logger, priority: "low priority", limit: 110, taskDelay: time.Nanosecond}
|
||||
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
|
||||
|
||||
genM := &generator{log: logger, priority: "medium 1 priority", limit: 110, taskDelay: time.Nanosecond}
|
||||
sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority)
|
||||
|
||||
genH := &generator{log: logger, priority: "medium 2 priority", limit: 110, taskDelay: time.Nanosecond}
|
||||
sch.SubmitGenerator(genH, time.Duration(0), scheduler.MediumPriority)
|
||||
|
||||
sch.RunScheduler()
|
||||
time.Sleep(1 * time.Second)
|
||||
sch.Shutdown()
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// Check all tasks show up in the logs
|
||||
for i := 1; i < 110; i++ {
|
||||
if i%11 == 0 || i%13 == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 1 priority task; index: %d", i))
|
||||
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 2 priority task; index: %d", i))
|
||||
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing low priority task; index: %d", i))
|
||||
}
|
||||
|
||||
taskCounter := 0
|
||||
priorityFlippedCounter := 0
|
||||
samePriorityFlippedCounter := 0
|
||||
lastPriority := "medium"
|
||||
lastMediumGenerator := "1"
|
||||
|
||||
for _, line := range strings.Split(strings.TrimSuffix(string(data), "\n"), "\n") {
|
||||
if !strings.Contains(line, "priority task; index: ") {
|
||||
continue
|
||||
}
|
||||
|
||||
taskCounter++
|
||||
|
||||
// low priority tasks start executing later
|
||||
// medium priority generators are prioritized until the rank 100/9 (8 generated tasks)
|
||||
// starting with 100/10, a low priority generator could potentially be prioritized instead
|
||||
// there will be at least 8 * 2 medium priority tasks executed before low priority tasks are pushed
|
||||
if taskCounter < 17 {
|
||||
So(line, ShouldContainSubstring, "executing medium")
|
||||
}
|
||||
|
||||
// medium priority 2*110 medium priority tasks should have been generated,
|
||||
// medium priority generators should be done
|
||||
// add around 10 low priority tasks to the counter
|
||||
// and an additional margin of 5 to make sure the test is stable
|
||||
if taskCounter > 225 {
|
||||
So(line, ShouldContainSubstring, "executing low priority")
|
||||
}
|
||||
|
||||
if strings.Contains(line, "executing medium") {
|
||||
if !strings.Contains(line, fmt.Sprintf("executing medium %s", lastMediumGenerator)) {
|
||||
samePriorityFlippedCounter++
|
||||
if lastMediumGenerator == "1" {
|
||||
lastMediumGenerator = "2"
|
||||
} else {
|
||||
lastMediumGenerator = "1"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !strings.Contains(line, fmt.Sprintf("executing %s", lastPriority)) {
|
||||
priorityFlippedCounter++
|
||||
if lastPriority == "low" {
|
||||
lastPriority = "medium"
|
||||
} else {
|
||||
lastPriority = "low"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fairness: make sure none of the medium priority generators is favored by the algorithm
|
||||
So(samePriorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 60)
|
||||
t.Logf("Switched between medium priority generators %d times", samePriorityFlippedCounter)
|
||||
// fairness: make sure the algorithm alternates between generator priorities
|
||||
So(priorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 10)
|
||||
t.Logf("Switched between generator priorities %d times", priorityFlippedCounter)
|
||||
})
|
||||
|
||||
Convey("Test task returning an error", t, func() {
|
||||
logFile, err := os.CreateTemp("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
|
@ -209,7 +319,7 @@ func TestScheduler(t *testing.T) {
|
|||
metrics := monitoring.NewMetricsServer(true, logger)
|
||||
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
||||
|
||||
genL := &generator{log: logger, priority: "low priority"}
|
||||
genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
||||
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority)
|
||||
|
||||
sch.RunScheduler()
|
||||
|
@ -272,7 +382,7 @@ func TestScheduler(t *testing.T) {
|
|||
metrics := monitoring.NewMetricsServer(true, logger)
|
||||
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
||||
|
||||
genL := &generator{log: logger, priority: "medium priority"}
|
||||
genL := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
||||
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority)
|
||||
|
||||
sch.RunScheduler()
|
||||
|
|
Loading…
Reference in a new issue