diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index aa7fe943..9438b5df 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -6003,7 +6003,7 @@ func TestPeriodicGC(t *testing.T) { defer stopServer(ctlr) test.WaitTillServerReady(baseURL) - time.Sleep(500 * time.Millisecond) + time.Sleep(5000 * time.Millisecond) data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ff55f42a..dda3ffa6 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -112,11 +112,24 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { func (scheduler *Scheduler) pushReadyGenerators() { // iterate through waiting generators list and resubmit those which become ready to run - for i, gen := range scheduler.waitingGenerators { - if gen.getState() == ready { - gen.done = false - heap.Push(&scheduler.generators, gen) - scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...) + for { + modified := false + + for i, gen := range scheduler.waitingGenerators { + if gen.getState() == ready { + gen.done = false + heap.Push(&scheduler.generators, gen) + scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...) + modified = true + + scheduler.log.Debug().Msg("waiting generator is ready, pushing to ready generators") + + break + } + } + + if !modified { + break } } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index ae690512..0ad925f5 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -59,7 +59,54 @@ func (g *generator) Reset() { g.step = 0 } +type shortGenerator struct { + log log.Logger + priority string + done bool + index int + step int +} + +func (g *shortGenerator) GenerateTask() (scheduler.Task, error) { + g.done = true + + return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil +} + +func (g *shortGenerator) IsDone() bool { + return g.done +} + +func (g *shortGenerator) Reset() { + g.done = true + g.step = 0 +} + func TestScheduler(t *testing.T) { + Convey("Test active to waiting periodic generator", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + sch := scheduler.NewScheduler(logger) + + genH := &shortGenerator{log: logger, priority: "high priority"} + // interval has to be higher than throttle value to simulate + sch.SubmitGenerator(genH, 12000*time.Millisecond, scheduler.HighPriority) + + ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) + + time.Sleep(16 * time.Second) + cancel() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "waiting generator is ready, pushing to ready generators") + }) + Convey("Test order of generators in queue", t, func() { logFile, err := os.CreateTemp("", "zot-log*.txt") So(err, ShouldBeNil)