From f686ab6bf61493a289b5ed174921e69d537a6a61 Mon Sep 17 00:00:00 2001 From: Andreea Lupu <58118008+Andreea-Lupu@users.noreply.github.com> Date: Fri, 23 Sep 2022 08:27:56 +0300 Subject: [PATCH] initial design for task scheduler (#700) Signed-off-by: Andreea-Lupu --- pkg/api/controller.go | 144 ++-------- pkg/api/controller_test.go | 82 +----- pkg/cli/extensions_test.go | 2 - pkg/extensions/extension_scrub.go | 73 ++++- pkg/extensions/extension_scrub_disabled.go | 6 +- pkg/extensions/scrub/scrub.go | 23 +- pkg/extensions/scrub/scrub_test.go | 23 +- pkg/scheduler/README.md | 40 +++ pkg/scheduler/scheduler.go | 318 +++++++++++++++++++++ pkg/scheduler/scheduler_test.go | 177 ++++++++++++ pkg/storage/local.go | 122 +++++++- pkg/storage/local_test.go | 45 ++- pkg/storage/s3/s3.go | 12 +- pkg/storage/storage.go | 5 +- pkg/test/mocks/image_store_mock.go | 37 ++- test/blackbox/scrub.bats | 4 +- 16 files changed, 859 insertions(+), 254 deletions(-) create mode 100644 pkg/scheduler/README.md create mode 100644 pkg/scheduler/scheduler.go create mode 100644 pkg/scheduler/scheduler_test.go diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 92332ac7..51ac71a0 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -21,10 +21,10 @@ import ( "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/api/config" ext "zotregistry.io/zot/pkg/extensions" - extconf "zotregistry.io/zot/pkg/extensions/config" "zotregistry.io/zot/pkg/extensions/lint" "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" "zotregistry.io/zot/pkg/storage" "zotregistry.io/zot/pkg/storage/s3" ) @@ -444,6 +444,14 @@ func (c *Controller) Shutdown() { } func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { + taskScheduler := scheduler.NewScheduler(c.Log) + taskScheduler.RunScheduler(reloadCtx) + + // Enable running garbage-collect periodically for DefaultStore + if c.Config.Storage.GC && c.Config.Storage.GCInterval != 0 { + c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval, taskScheduler) + } + // Enable extensions if extension config is provided for DefaultStore if c.Config != nil && c.Config.Extensions != nil { ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory) @@ -451,7 +459,12 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { } if c.Config.Storage.SubPaths != nil { - for _, storageConfig := range c.Config.Storage.SubPaths { + for route, storageConfig := range c.Config.Storage.SubPaths { + // Enable running garbage-collect periodically for subImageStore + if storageConfig.GC && storageConfig.GCInterval != 0 { + c.StoreController.SubStore[route].RunGCPeriodically(storageConfig.GCInterval, taskScheduler) + } + // Enable extensions if extension config is provided for subImageStore if c.Config != nil && c.Config.Extensions != nil { ext.EnableMetricsExtension(c.Config, c.Log, storageConfig.RootDirectory) @@ -468,131 +481,6 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { } if c.Config.Extensions != nil { - ext.EnableScrubExtension(c.Config, c.Log, false, nil, "") - } - - go StartPeriodicTasks(c.StoreController.DefaultStore, c.StoreController.SubStore, c.Config.Storage.SubPaths, - c.Config.Storage.GC, c.Config.Storage.GCInterval, c.Config.Extensions, c.Log) -} - -func StartPeriodicTasks(defaultStore storage.ImageStore, subStore map[string]storage.ImageStore, - subPaths map[string]config.StorageConfig, gcEnabled bool, gcInterval time.Duration, - extensions *extconf.ExtensionConfig, log log.Logger, -) { - // start periodic gc and/or scrub for DefaultStore - StartPeriodicTasksForImageStore(defaultStore, gcEnabled, gcInterval, extensions, log) - - for route, storageConfig := range subPaths { - // Enable running garbage-collect or/and scrub periodically for subImageStore - StartPeriodicTasksForImageStore(subStore[route], storageConfig.GC, storageConfig.GCInterval, extensions, log) + ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler) } } - -func StartPeriodicTasksForImageStore(imageStore storage.ImageStore, configGC bool, configGCInterval time.Duration, - extensions *extconf.ExtensionConfig, log log.Logger, -) { - scrubInterval := time.Duration(0) - gcInterval := time.Duration(0) - - gc := false - scrub := false - - if configGC && configGCInterval != 0 { - gcInterval = configGCInterval - gc = true - } - - if extensions != nil && extensions.Scrub != nil && extensions.Scrub.Interval != 0 { - scrubInterval = extensions.Scrub.Interval - scrub = true - } - - interval := minPeriodicInterval(scrub, gc, scrubInterval, gcInterval) - if interval == time.Duration(0) { - return - } - - log.Info().Msg(fmt.Sprintf("Periodic interval for %s set to %s", imageStore.RootDir(), interval)) - - var lastGC, lastScrub time.Time - - for { - log.Info().Msg(fmt.Sprintf("Starting periodic background tasks for %s", imageStore.RootDir())) - - // Enable running garbage-collect or/and scrub periodically for imageStore - RunBackgroundTasks(imageStore, gc, scrub, log) - - log.Info().Msg(fmt.Sprintf("Finishing periodic background tasks for %s", imageStore.RootDir())) - - if gc { - lastGC = time.Now() - } - - if scrub { - lastScrub = time.Now() - } - - time.Sleep(interval) - - if !lastGC.IsZero() && time.Since(lastGC) >= gcInterval { - gc = true - } - - if !lastScrub.IsZero() && time.Since(lastScrub) >= scrubInterval { - scrub = true - } - } -} - -func RunBackgroundTasks(imgStore storage.ImageStore, gc, scrub bool, log log.Logger) { - repos, err := imgStore.GetRepositories() - if err != nil { - log.Error().Err(err).Msg(fmt.Sprintf("error while running background task for %s", imgStore.RootDir())) - - return - } - - for _, repo := range repos { - if gc { - start := time.Now() - - // run gc for this repo - imgStore.RunGCRepo(repo) - - elapsed := time.Since(start) - log.Info().Msg(fmt.Sprintf("gc for %s executed in %s", repo, elapsed)) - time.Sleep(1 * time.Minute) - } - - if scrub { - start := time.Now() - - // run scrub for this repo - ext.EnableScrubExtension(nil, log, true, imgStore, repo) - - elapsed := time.Since(start) - log.Info().Msg(fmt.Sprintf("scrub for %s executed in %s", repo, elapsed)) - time.Sleep(1 * time.Minute) - } - } -} - -func minPeriodicInterval(scrub, gc bool, scrubInterval, gcInterval time.Duration) time.Duration { - if scrub && gc { - if scrubInterval <= gcInterval { - return scrubInterval - } - - return gcInterval - } - - if scrub { - return scrubInterval - } - - if gc { - return gcInterval - } - - return time.Duration(0) -} diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 27b99ffc..1e147fe9 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -6007,14 +6007,10 @@ func TestPeriodicGC(t *testing.T) { So(err, ShouldBeNil) So(string(data), ShouldContainSubstring, "\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":3600000000000") - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll - So(string(data), ShouldNotContainSubstring, - fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir())) So(string(data), ShouldContainSubstring, fmt.Sprintf("executing GC of orphaned blobs for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll So(string(data), ShouldContainSubstring, - fmt.Sprintf("GC completed for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll + fmt.Sprintf("GC successfully completed for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll }) Convey("Periodic GC enabled for substore", t, func() { @@ -6051,82 +6047,6 @@ func TestPeriodicGC(t *testing.T) { // periodic GC is enabled for sub store So(string(data), ShouldContainSubstring, fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"GC\":true,\"Dedupe\":false,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) //nolint:lll // gofumpt conflicts with lll - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.SubStore["/a"].RootDir())) //nolint:lll - }) -} - -func TestPeriodicTasks(t *testing.T) { - Convey("Both periodic gc and periodic scrub enabled for default store with scrubInterval < gcInterval", t, func() { - port := test.GetFreePort() - baseURL := test.GetBaseURL(port) - conf := config.New() - conf.HTTP.Port = port - - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - conf.Log.Output = logFile.Name() - defer os.Remove(logFile.Name()) // clean up - - ctlr := api.NewController(conf) - dir := t.TempDir() - ctlr.Config.Storage.RootDirectory = dir - ctlr.Config.Storage.GC = true - ctlr.Config.Storage.GCInterval = 12 * time.Hour - ctlr.Config.Storage.GCDelay = 1 * time.Second - ctlr.Config.Extensions = &extconf.ExtensionConfig{Scrub: &extconf.ScrubConfig{Interval: 8 * time.Hour}} - - go startServer(ctlr) - defer stopServer(ctlr) - test.WaitTillServerReady(baseURL) - - data, err := os.ReadFile(logFile.Name()) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll - So(string(data), ShouldNotContainSubstring, - fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir())) - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Finishing periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Periodic interval for %s set to %s", - ctlr.StoreController.DefaultStore.RootDir(), ctlr.Config.Extensions.Scrub.Interval)) - }) - - Convey("Both periodic gc and periodic scrub enabled for default store with gcInterval < scrubInterval", t, func() { - port := test.GetFreePort() - baseURL := test.GetBaseURL(port) - conf := config.New() - conf.HTTP.Port = port - - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - conf.Log.Output = logFile.Name() - defer os.Remove(logFile.Name()) // clean up - - ctlr := api.NewController(conf) - dir := t.TempDir() - ctlr.Config.Storage.RootDirectory = dir - ctlr.Config.Storage.GC = true - ctlr.Config.Storage.GCInterval = 8 * time.Hour - ctlr.Config.Storage.GCDelay = 1 * time.Second - ctlr.Config.Extensions = &extconf.ExtensionConfig{Scrub: &extconf.ScrubConfig{Interval: 12 * time.Hour}} - - go startServer(ctlr) - defer stopServer(ctlr) - test.WaitTillServerReady(baseURL) - - data, err := os.ReadFile(logFile.Name()) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll - So(string(data), ShouldNotContainSubstring, - fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir())) - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Finishing periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll - So(string(data), ShouldContainSubstring, - fmt.Sprintf("Periodic interval for %s set to %s", - ctlr.StoreController.DefaultStore.RootDir(), ctlr.Config.Storage.GCInterval)) }) } diff --git a/pkg/cli/extensions_test.go b/pkg/cli/extensions_test.go index c398ae8b..75e51b9b 100644 --- a/pkg/cli/extensions_test.go +++ b/pkg/cli/extensions_test.go @@ -444,8 +444,6 @@ func TestServeScrubExtension(t *testing.T) { "\"Extensions\":{\"Search\":null,\"Sync\":null,\"Metrics\":null,\"Scrub\":{\"Interval\":3600000000000},\"Lint\":null") //nolint:lll // gofumpt conflicts with lll So(dataStr, ShouldContainSubstring, "Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.") - So(dataStr, ShouldContainSubstring, "Starting periodic background tasks for") - So(dataStr, ShouldContainSubstring, "Finishing periodic background tasks for") }) Convey("scrub not enabled - scrub interval param not set", t, func(c C) { diff --git a/pkg/extensions/extension_scrub.go b/pkg/extensions/extension_scrub.go index ef76e1ef..df9c7c5b 100644 --- a/pkg/extensions/extension_scrub.go +++ b/pkg/extensions/extension_scrub.go @@ -4,30 +4,81 @@ package extensions import ( + "errors" + "io" "time" "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/extensions/scrub" "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" "zotregistry.io/zot/pkg/storage" ) // EnableScrubExtension enables scrub extension. -func EnableScrubExtension(config *config.Config, log log.Logger, run bool, imgStore storage.ImageStore, repo string) { - if !run { - if config.Extensions.Scrub != nil && - config.Extensions.Scrub.Interval != 0 { - minScrubInterval, _ := time.ParseDuration("2h") +func EnableScrubExtension(config *config.Config, log log.Logger, storeController storage.StoreController, + sch *scheduler.Scheduler, +) { + if config.Extensions.Scrub != nil && + config.Extensions.Scrub.Interval != 0 { + minScrubInterval, _ := time.ParseDuration("2h") - if config.Extensions.Scrub.Interval < minScrubInterval { - config.Extensions.Scrub.Interval = minScrubInterval + if config.Extensions.Scrub.Interval < minScrubInterval { + config.Extensions.Scrub.Interval = minScrubInterval - log.Warn().Msg("Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.") //nolint:lll // gofumpt conflicts with lll + log.Warn().Msg("Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.") //nolint:lll // gofumpt conflicts with lll + } + + generator := &taskGenerator{ + imgStore: storeController.DefaultStore, + log: log, + } + sch.SubmitGenerator(generator, config.Extensions.Scrub.Interval, scheduler.LowPriority) + + if config.Storage.SubPaths != nil { + for route := range config.Storage.SubPaths { + generator := &taskGenerator{ + imgStore: storeController.SubStore[route], + log: log, + } + sch.SubmitGenerator(generator, config.Extensions.Scrub.Interval, scheduler.LowPriority) } - } else { - log.Info().Msg("Scrub config not provided, skipping scrub") } } else { - scrub.RunScrubRepo(imgStore, repo, log) + log.Info().Msg("Scrub config not provided, skipping scrub") } } + +type taskGenerator struct { + imgStore storage.ImageStore + log log.Logger + lastRepo string + done bool +} + +func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) { + repo, err := gen.imgStore.GetNextRepository(gen.lastRepo) + + if err != nil && !errors.Is(err, io.EOF) { + return nil, err + } + + if repo == "" { + gen.done = true + + return nil, nil + } + + gen.lastRepo = repo + + return scrub.NewTask(gen.imgStore, repo, gen.log), nil +} + +func (gen *taskGenerator) IsDone() bool { + return gen.done +} + +func (gen *taskGenerator) Reset() { + gen.lastRepo = "" + gen.done = false +} diff --git a/pkg/extensions/extension_scrub_disabled.go b/pkg/extensions/extension_scrub_disabled.go index 1874359f..c50dd990 100644 --- a/pkg/extensions/extension_scrub_disabled.go +++ b/pkg/extensions/extension_scrub_disabled.go @@ -6,13 +6,13 @@ package extensions import ( "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" "zotregistry.io/zot/pkg/storage" ) // EnableScrubExtension ... -func EnableScrubExtension(config *config.Config, - log log.Logger, run bool, - imgStore storage.ImageStore, repo string, +func EnableScrubExtension(config *config.Config, log log.Logger, storeController storage.StoreController, + sch *scheduler.Scheduler, ) { log.Warn().Msg("skipping enabling scrub extension because given zot binary doesn't include this feature," + "please build a binary that does so") diff --git a/pkg/extensions/scrub/scrub.go b/pkg/extensions/scrub/scrub.go index 582673c0..9d7aa4a5 100644 --- a/pkg/extensions/scrub/scrub.go +++ b/pkg/extensions/scrub/scrub.go @@ -12,7 +12,7 @@ import ( ) // Scrub Extension for repo... -func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) { +func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) error { execMsg := fmt.Sprintf("executing scrub to check manifest/blob integrity for %s", path.Join(imgStore.RootDir(), repo)) log.Info().Msg(execMsg) @@ -20,6 +20,9 @@ func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) { if err != nil { errMessage := fmt.Sprintf("error while running scrub for %s", path.Join(imgStore.RootDir(), repo)) log.Error().Err(err).Msg(errMessage) + log.Info().Msg(fmt.Sprintf("scrub unsuccessfully completed for %s", path.Join(imgStore.RootDir(), repo))) + + return err } for _, result := range results { @@ -39,5 +42,21 @@ func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) { } } - log.Info().Msg(fmt.Sprintf("scrub completed for %s", path.Join(imgStore.RootDir(), repo))) + log.Info().Msg(fmt.Sprintf("scrub successfully completed for %s", path.Join(imgStore.RootDir(), repo))) + + return nil +} + +type Task struct { + imgStore storage.ImageStore + repo string + log log.Logger +} + +func NewTask(imgStore storage.ImageStore, repo string, log log.Logger) *Task { + return &Task{imgStore, repo, log} +} + +func (scrubT *Task) DoWork() error { + return RunScrubRepo(scrubT.imgStore, scrubT.repo, scrubT.log) } diff --git a/pkg/extensions/scrub/scrub_test.go b/pkg/extensions/scrub/scrub_test.go index 17f47251..a695e6a7 100644 --- a/pkg/extensions/scrub/scrub_test.go +++ b/pkg/extensions/scrub/scrub_test.go @@ -42,8 +42,11 @@ func TestScrubExtension(t *testing.T) { conf.HTTP.Port = port dir := t.TempDir() + subdir := t.TempDir() conf.Storage.RootDirectory = dir + substore := config.StorageConfig{RootDirectory: subdir} + conf.Storage.SubPaths = map[string]config.StorageConfig{"/a": substore} conf.Log.Output = logFile.Name() scrubConfig := &extconf.ScrubConfig{ Interval: 2, @@ -75,7 +78,7 @@ func TestScrubExtension(t *testing.T) { time.Sleep(100 * time.Millisecond) } - time.Sleep(1 * time.Second) + time.Sleep(6 * time.Second) defer func(controller *api.Controller) { ctx := context.Background() @@ -140,7 +143,7 @@ func TestScrubExtension(t *testing.T) { time.Sleep(100 * time.Millisecond) } - time.Sleep(500 * time.Millisecond) + time.Sleep(6 * time.Second) defer func(controller *api.Controller) { ctx := context.Background() @@ -152,7 +155,7 @@ func TestScrubExtension(t *testing.T) { So(string(data), ShouldContainSubstring, "scrub: blobs/manifest affected") }) - Convey("RunBackgroundTasks error - not enough permissions to access root directory", t, func(c C) { + Convey("Generator error - not enough permissions to access root directory", t, func(c C) { port := test.GetFreePort() url := test.GetBaseURL(port) @@ -200,7 +203,7 @@ func TestScrubExtension(t *testing.T) { time.Sleep(100 * time.Millisecond) } - time.Sleep(500 * time.Millisecond) + time.Sleep(6 * time.Second) defer func(controller *api.Controller) { ctx := context.Background() @@ -209,8 +212,7 @@ func TestScrubExtension(t *testing.T) { data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, - fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir())) + So(string(data), ShouldContainSubstring, "error while executing generator") So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil) }) @@ -238,7 +240,8 @@ func TestRunScrubRepo(t *testing.T) { panic(err) } - scrub.RunScrubRepo(imgStore, repoName, log) + err = scrub.RunScrubRepo(imgStore, repoName, log) + So(err, ShouldBeNil) data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -274,7 +277,8 @@ func TestRunScrubRepo(t *testing.T) { panic(err) } - scrub.RunScrubRepo(imgStore, repoName, log) + err = scrub.RunScrubRepo(imgStore, repoName, log) + So(err, ShouldBeNil) data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -304,7 +308,8 @@ func TestRunScrubRepo(t *testing.T) { So(os.Chmod(path.Join(dir, repoName), 0o000), ShouldBeNil) - scrub.RunScrubRepo(imgStore, repoName, log) + err = scrub.RunScrubRepo(imgStore, repoName, log) + So(err, ShouldNotBeNil) data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) diff --git a/pkg/scheduler/README.md b/pkg/scheduler/README.md new file mode 100644 index 00000000..9779f148 --- /dev/null +++ b/pkg/scheduler/README.md @@ -0,0 +1,40 @@ +# How to submit a Generator to the scheduler + +## What is a generator and how should it be implemented? +In order to create a new generator (which will generate new tasks one by one) and add it to the scheduler there are 3 methods which should be implemented: +1. ***GenerateTask() (Task, error)*** + ``` + This method should implement the logic for generating a new task. + Basically, when this method is called by the scheduler it should return the next task until there are no more tasks to be generated. + Also, the Task returned by this method should implement DoWork() method which should contain the logic that should be executed when this task is run by the scheduler. + ``` +2. ***IsDone() bool*** + ``` + This method should return true after the generator finished all the work and has no more tasks to generate. + ``` +3. ***Reset()*** + ``` + When this method is called the generator should reset to its initial state. + After the generator is reset, it will generate new tasks as if it hadn't been used before. + This is useful for periodic generators, because the scheduler will call this method when the generator is done and has to wait a specific interval of time for this generator to become ready to run again. + ``` + +## Submit a generator +The scheduler accepts both periodic and non-periodic generators. + +To submit a generator to the scheduler, ***SubmitGenerator*** should be called with the implemented generator, interval of time (which should be time.Duration(0) in case of non-periodic generator, or the interval for the periodic generator) and the priority of the tasks which will be generated by this generator as parameters. + +Notes: + + - A generator should submit only tasks having the same priority + - The priority of a task can be: LowPriorirty, MediumPriority or HighPriority + +# How to submit a Task to the scheduler + +In order to create a new task and add it to the scheduler ***DoWork() error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler. + +To submit a task to the scheduler ***SubmitTask*** should be called with the implemented task and the priority of the task as parameters. + +Note: + + - A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task. \ No newline at end of file diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 00000000..ff55f42a --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,318 @@ +package scheduler + +import ( + "container/heap" + "context" + "sync" + "time" + + "zotregistry.io/zot/pkg/log" +) + +type Task interface { + DoWork() error +} + +type generatorsPriorityQueue []*generator + +func (pq generatorsPriorityQueue) Len() int { + return len(pq) +} + +func (pq generatorsPriorityQueue) Less(i, j int) bool { + return pq[i].priority > pq[j].priority +} + +func (pq generatorsPriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *generatorsPriorityQueue) Push(x any) { + n := len(*pq) + + item, ok := x.(*generator) + if !ok { + return + } + + item.index = n + *pq = append(*pq, item) +} + +func (pq *generatorsPriorityQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil + item.index = -1 + *pq = old[0 : n-1] + + return item +} + +const rateLimiterScheduler = 400 + +type Scheduler struct { + tasksQLow chan Task + tasksQMedium chan Task + tasksQHigh chan Task + generators generatorsPriorityQueue + waitingGenerators []*generator + generatorsLock *sync.Mutex + log log.Logger + stopCh chan struct{} +} + +func NewScheduler(logC log.Logger) *Scheduler { + chLow := make(chan Task, rateLimiterScheduler) + chMedium := make(chan Task, rateLimiterScheduler) + chHigh := make(chan Task, rateLimiterScheduler) + generatorPQ := make(generatorsPriorityQueue, 0) + sublogger := logC.With().Str("component", "scheduler").Logger() + + heap.Init(&generatorPQ) + + return &Scheduler{ + tasksQLow: chLow, + tasksQMedium: chMedium, + tasksQHigh: chHigh, + generators: generatorPQ, + generatorsLock: new(sync.Mutex), + log: log.Logger{Logger: sublogger}, + stopCh: make(chan struct{}), + } +} + +const rateLimit = 5 * time.Second + +func (scheduler *Scheduler) RunScheduler(ctx context.Context) { + throttle := time.NewTicker(rateLimit).C + + go func() { + for { + select { + case <-ctx.Done(): + close(scheduler.stopCh) + + return + default: + task := scheduler.getTask() + if task != nil { + if err := task.DoWork(); err != nil { + scheduler.log.Error().Err(err).Msg("error while executing task") + } + } + } + <-throttle + } + }() +} + +func (scheduler *Scheduler) pushReadyGenerators() { + // iterate through waiting generators list and resubmit those which become ready to run + for i, gen := range scheduler.waitingGenerators { + if gen.getState() == ready { + gen.done = false + heap.Push(&scheduler.generators, gen) + scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...) + } + } +} + +func (scheduler *Scheduler) generateTasks() { + scheduler.generatorsLock.Lock() + defer scheduler.generatorsLock.Unlock() + + // resubmit ready generators(which were in a waiting state) to generators priority queue + scheduler.pushReadyGenerators() + + // get the highest priority generator from queue + if scheduler.generators.Len() == 0 { + return + } + + var gen *generator + + // check if the generator with highest prioriy is ready to run + if scheduler.generators[0].getState() == ready { + gen = scheduler.generators[0] + } else { + gen, _ = heap.Pop(&scheduler.generators).(*generator) + if gen.getState() == waiting { + scheduler.waitingGenerators = append(scheduler.waitingGenerators, gen) + } + + return + } + + // run generator to generate a new task which will be added to a channel by priority + gen.generate(scheduler) +} + +func (scheduler *Scheduler) getTask() Task { + // first, generate a task with highest possible priority + scheduler.generateTasks() + + // then, return a task with highest possible priority + select { + case t := <-scheduler.tasksQHigh: + return t + default: + } + + select { + case t := <-scheduler.tasksQMedium: + return t + default: + } + + select { + case t := <-scheduler.tasksQLow: + return t + default: + } + + return nil +} + +func (scheduler *Scheduler) getTasksChannelByPriority(priority Priority) chan Task { + switch priority { + case LowPriority: + return scheduler.tasksQLow + case MediumPriority: + return scheduler.tasksQMedium + case HighPriority: + return scheduler.tasksQHigh + } + + return nil +} + +func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) { + // get by priority the channel where the task should be added to + tasksQ := scheduler.getTasksChannelByPriority(priority) + if tasksQ == nil { + return + } + + // check if the scheduler it's still running in order to add the task to the channel + select { + case <-scheduler.stopCh: + return + default: + } + + select { + case <-scheduler.stopCh: + return + case tasksQ <- task: + scheduler.log.Info().Msg("Adding a new task to the scheduler") + } +} + +type Priority int + +const ( + LowPriority Priority = iota + MediumPriority + HighPriority +) + +type state int + +const ( + ready state = iota + waiting + done +) + +type Generator interface { + GenerateTask() (Task, error) + IsDone() bool + Reset() +} + +type generator struct { + interval time.Duration + lastRun time.Time + done bool + priority Priority + taskGenerator Generator + remainingTask Task + index int +} + +func (gen *generator) generate(sch *Scheduler) { + // get by priority the channel where the new generated task should be added to + taskQ := sch.getTasksChannelByPriority(gen.priority) + + task := gen.remainingTask + + // in case there is no task already generated, generate a new task + if gen.remainingTask == nil { + nextTask, err := gen.taskGenerator.GenerateTask() + if err != nil { + sch.log.Error().Err(err).Msg("error while executing generator") + + return + } + + task = nextTask + + // check if the generator is done + if gen.taskGenerator.IsDone() { + gen.done = true + gen.lastRun = time.Now() + gen.taskGenerator.Reset() + + return + } + } + + // check if it's possible to add a new task to the channel + // if not, keep the generated task and retry to add it next time + select { + case taskQ <- task: + gen.remainingTask = nil + + return + default: + gen.remainingTask = task + } +} + +// getState() returns the state of a generator. +// if the generator is not periodic then it can be done or ready to generate a new task. +// if the generator is periodic then it can be waiting (finished its work and wait for its interval to pass) +// or ready to generate a new task. +func (gen *generator) getState() state { + if gen.interval == time.Duration(0) { + if gen.done && gen.remainingTask == nil { + return done + } + } else { + if gen.done && time.Since(gen.lastRun) < gen.interval && gen.remainingTask == nil { + return waiting + } + } + + return ready +} + +func (scheduler *Scheduler) SubmitGenerator(taskGenerator Generator, interval time.Duration, priority Priority) { + newGenerator := &generator{ + interval: interval, + done: false, + priority: priority, + taskGenerator: taskGenerator, + remainingTask: nil, + } + + scheduler.generatorsLock.Lock() + defer scheduler.generatorsLock.Unlock() + + // add generator to the generators priority queue + heap.Push(&scheduler.generators, newGenerator) +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go new file mode 100644 index 00000000..290d3934 --- /dev/null +++ b/pkg/scheduler/scheduler_test.go @@ -0,0 +1,177 @@ +package scheduler_test + +import ( + "context" + "errors" + "fmt" + "os" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" + "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" +) + +type task struct { + log log.Logger + msg string + err bool +} + +var errInternal = errors.New("task: internal error") + +func (t *task) DoWork() error { + if t.err { + return errInternal + } + + t.log.Info().Msg(t.msg) + + return nil +} + +type generator struct { + log log.Logger + priority string + done bool + index int + step int +} + +func (g *generator) GenerateTask() (scheduler.Task, error) { + if g.step > 1 { + g.done = true + } + g.step++ + g.index++ + + return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil +} + +func (g *generator) IsDone() bool { + return g.done +} + +func (g *generator) Reset() { + g.done = false + g.step = 0 +} + +func TestScheduler(t *testing.T) { + Convey("Test order of generators in queue", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + sch := scheduler.NewScheduler(logger) + + genL := &generator{log: logger, priority: "low priority"} + sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority) + + genH := &generator{log: logger, priority: "high priority"} + sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) + + ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) + + time.Sleep(500 * time.Millisecond) + cancel() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") + So(string(data), ShouldNotContainSubstring, "executing low priority task; index: 1") + So(string(data), ShouldNotContainSubstring, "error while executing task") + }) + + Convey("Test task returning an error", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + sch := scheduler.NewScheduler(logger) + + t := &task{log: logger, msg: "", err: true} + sch.SubmitTask(t, scheduler.MediumPriority) + + ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) + + time.Sleep(500 * time.Millisecond) + cancel() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "Adding a new task to the scheduler") + So(string(data), ShouldContainSubstring, "error while executing task") + }) + + Convey("Test resubmit generator", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + sch := scheduler.NewScheduler(logger) + + genL := &generator{log: logger, priority: "low priority"} + sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) + + ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) + + time.Sleep(6 * time.Second) + cancel() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing low priority task; index: 1") + So(string(data), ShouldContainSubstring, "executing low priority task; index: 2") + }) + + Convey("Try to add a task with wrong priority", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + sch := scheduler.NewScheduler(logger) + + t := &task{log: logger, msg: "", err: false} + sch.SubmitTask(t, -1) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler") + }) + + Convey("Test adding a new task when context is done", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + sch := scheduler.NewScheduler(logger) + + ctx, cancel := context.WithCancel(context.Background()) + + sch.RunScheduler(ctx) + cancel() + time.Sleep(500 * time.Millisecond) + + t := &task{log: logger, msg: "", err: false} + sch.SubmitTask(t, scheduler.LowPriority) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler") + }) +} diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 4ef21bac..77e713ad 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -31,6 +31,7 @@ import ( zerr "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/extensions/monitoring" zlog "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" "zotregistry.io/zot/pkg/test" ) @@ -360,6 +361,65 @@ func (is *ImageStoreLocal) GetRepositories() ([]string, error) { return stores, err } +// GetNextRepository returns next repository under this store. +func (is *ImageStoreLocal) GetNextRepository(repo string) (string, error) { + var lockLatency time.Time + + dir := is.rootDir + + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + + _, err := os.ReadDir(dir) + if err != nil { + is.log.Error().Err(err).Msg("failure walking storage root-dir") + + return "", err + } + + found := false + store := "" + err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + return nil + } + + rel, err := filepath.Rel(is.rootDir, path) + if err != nil { + return nil // nolint:nilerr // ignore paths not relative to root dir + } + + ok, err := is.ValidateRepo(rel) + if !ok || err != nil { + return nil // nolint:nilerr // ignore invalid repos + } + + if repo == "" && ok && err == nil { + store = rel + + return io.EOF + } + + if found { + store = rel + + return io.EOF + } + + if rel == repo { + found = true + } + + return nil + }) + + return store, err +} + // GetImageTags returns a list of image tags available in the specified repository. func (is *ImageStoreLocal) GetImageTags(repo string) ([]string, error) { var lockLatency time.Time @@ -1973,13 +2033,71 @@ func (is *ImageStoreLocal) gcRepo(repo string) error { return nil } -func (is *ImageStoreLocal) RunGCRepo(repo string) { +func (is *ImageStoreLocal) RunGCRepo(repo string) error { is.log.Info().Msg(fmt.Sprintf("executing GC of orphaned blobs for %s", path.Join(is.RootDir(), repo))) if err := is.gcRepo(repo); err != nil { errMessage := fmt.Sprintf("error while running GC for %s", path.Join(is.RootDir(), repo)) is.log.Error().Err(err).Msg(errMessage) + is.log.Info().Msg(fmt.Sprintf("GC unsuccessfully completed for %s", path.Join(is.RootDir(), repo))) + + return err } - is.log.Info().Msg(fmt.Sprintf("GC completed for %s", path.Join(is.RootDir(), repo))) + is.log.Info().Msg(fmt.Sprintf("GC successfully completed for %s", path.Join(is.RootDir(), repo))) + + return nil +} + +func (is *ImageStoreLocal) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) { + generator := &taskGenerator{ + imgStore: is, + } + sch.SubmitGenerator(generator, interval, scheduler.MediumPriority) +} + +type taskGenerator struct { + imgStore *ImageStoreLocal + lastRepo string + done bool +} + +func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) { + repo, err := gen.imgStore.GetNextRepository(gen.lastRepo) + + if err != nil && !errors.Is(err, io.EOF) { + return nil, err + } + + if repo == "" { + gen.done = true + + return nil, nil + } + + gen.lastRepo = repo + + return newGCTask(gen.imgStore, repo), nil +} + +func (gen *taskGenerator) IsDone() bool { + return gen.done +} + +func (gen *taskGenerator) Reset() { + gen.lastRepo = "" + gen.done = false +} + +type gcTask struct { + imgStore *ImageStoreLocal + repo string +} + +func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask { + return &gcTask{imgStore, repo} +} + +func (gcT *gcTask) DoWork() error { + return gcT.imgStore.RunGCRepo(gcT.repo) } diff --git a/pkg/storage/local_test.go b/pkg/storage/local_test.go index 006dcfdd..4d4ed1c6 100644 --- a/pkg/storage/local_test.go +++ b/pkg/storage/local_test.go @@ -918,7 +918,9 @@ func FuzzRunGCRepo(f *testing.F) { imgStore := storage.NewImageStore(dir, true, storage.DefaultGCDelay, true, true, *log, metrics, nil) - imgStore.RunGCRepo(data) + if err := imgStore.RunGCRepo(data); err != nil { + t.Error(err) + } }) } @@ -1943,7 +1945,8 @@ func TestGarbageCollectForImageStore(t *testing.T) { panic(err) } - imgStore.RunGCRepo(repoName) + err = imgStore.RunGCRepo(repoName) + So(err, ShouldNotBeNil) time.Sleep(500 * time.Millisecond) @@ -1970,7 +1973,8 @@ func TestGarbageCollectForImageStore(t *testing.T) { So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o000), ShouldBeNil) - imgStore.RunGCRepo(repoName) + err = imgStore.RunGCRepo(repoName) + So(err, ShouldNotBeNil) time.Sleep(500 * time.Millisecond) @@ -2055,6 +2059,41 @@ func TestGetRepositoriesError(t *testing.T) { }) } +func TestGetNextRepository(t *testing.T) { + dir := t.TempDir() + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + imgStore := storage.NewImageStore(dir, true, storage.DefaultGCDelay, + true, true, log, metrics, nil, + ) + firstRepoName := "repo1" + secondRepoName := "repo2" + + err := test.CopyFiles("../../test/data/zot-test", path.Join(dir, firstRepoName)) + if err != nil { + panic(err) + } + + err = test.CopyFiles("../../test/data/zot-test", path.Join(dir, secondRepoName)) + if err != nil { + panic(err) + } + + Convey("Return first repository", t, func() { + firstRepo, err := imgStore.GetNextRepository("") + So(firstRepo, ShouldEqual, firstRepoName) + So(err, ShouldNotBeNil) + So(err, ShouldEqual, io.EOF) + }) + + Convey("Return second repository", t, func() { + secondRepo, err := imgStore.GetNextRepository(firstRepoName) + So(secondRepo, ShouldEqual, secondRepoName) + So(err, ShouldNotBeNil) + So(err, ShouldEqual, io.EOF) + }) +} + func TestPutBlobChunkStreamed(t *testing.T) { Convey("Get error on opening file", t, func() { dir := t.TempDir() diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index 4241baba..1f2a68d1 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -28,6 +28,7 @@ import ( zerr "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/extensions/monitoring" zlog "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/scheduler" "zotregistry.io/zot/pkg/storage" "zotregistry.io/zot/pkg/test" ) @@ -292,6 +293,11 @@ func (is *ObjectStorage) GetRepositories() ([]string, error) { return stores, err } +// GetNextRepository returns next repository under this store. +func (is *ObjectStorage) GetNextRepository(repo string) (string, error) { + return "", nil +} + // GetImageTags returns a list of image tags available in the specified repository. func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) { dir := path.Join(is.rootDir, repo) @@ -1303,7 +1309,11 @@ retry: return nil } -func (is *ObjectStorage) RunGCRepo(repo string) { +func (is *ObjectStorage) RunGCRepo(repo string) error { + return nil +} + +func (is *ObjectStorage) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) { } // DeleteBlobUpload deletes an existing blob upload that is currently in progress. diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f0d4d3ed..51b01fbb 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -6,6 +6,7 @@ import ( "github.com/opencontainers/go-digest" artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" + "zotregistry.io/zot/pkg/scheduler" ) const ( @@ -23,6 +24,7 @@ type ImageStore interface { InitRepo(name string) error ValidateRepo(name string) (bool, error) GetRepositories() ([]string, error) + GetNextRepository(repo string) (string, error) GetImageTags(repo string) ([]string, error) GetImageManifest(repo, reference string) ([]byte, string, string, error) PutImageManifest(repo, reference, mediaType string, body []byte) (string, error) @@ -45,5 +47,6 @@ type ImageStore interface { GetIndexContent(repo string) ([]byte, error) GetBlobContent(repo, digest string) ([]byte, error) GetReferrers(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error) - RunGCRepo(repo string) + RunGCRepo(repo string) error + RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) } diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index e43cf5b0..3872e69c 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -6,6 +6,7 @@ import ( "github.com/opencontainers/go-digest" artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" + "zotregistry.io/zot/pkg/scheduler" ) type MockedImageStore struct { @@ -14,6 +15,7 @@ type MockedImageStore struct { InitRepoFn func(name string) error ValidateRepoFn func(name string) (bool, error) GetRepositoriesFn func() ([]string, error) + GetNextRepositoryFn func(repo string) (string, error) GetImageTagsFn func(repo string) ([]string, error) GetImageManifestFn func(repo string, reference string) ([]byte, string, string, error) PutImageManifestFn func(repo string, reference string, mediaType string, body []byte) (string, error) @@ -32,13 +34,14 @@ type MockedImageStore struct { CheckBlobFn func(repo string, digest string) (bool, int64, error) GetBlobPartialFn func(repo string, digest string, mediaType string, from, to int64, ) (io.ReadCloser, int64, int64, error) - GetBlobFn func(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) - DeleteBlobFn func(repo string, digest string) error - GetIndexContentFn func(repo string) ([]byte, error) - GetBlobContentFn func(repo, digest string) ([]byte, error) - GetReferrersFn func(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error) - URLForPathFn func(path string) (string, error) - RunGCRepoFn func(repo string) + GetBlobFn func(repo string, digest string, mediaType string) (io.ReadCloser, int64, error) + DeleteBlobFn func(repo string, digest string) error + GetIndexContentFn func(repo string) ([]byte, error) + GetBlobContentFn func(repo, digest string) ([]byte, error) + GetReferrersFn func(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error) + URLForPathFn func(path string) (string, error) + RunGCRepoFn func(repo string) error + RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler) } func (is MockedImageStore) Lock(t *time.Time) { @@ -93,6 +96,14 @@ func (is MockedImageStore) GetRepositories() ([]string, error) { return []string{}, nil } +func (is MockedImageStore) GetNextRepository(repo string) (string, error) { + if is.GetNextRepositoryFn != nil { + return is.GetNextRepositoryFn(repo) + } + + return "", nil +} + func (is MockedImageStore) GetImageManifest(repo string, reference string) ([]byte, string, string, error) { if is.GetImageManifestFn != nil { return is.GetImageManifestFn(repo, reference) @@ -293,8 +304,16 @@ func (is MockedImageStore) URLForPath(path string) (string, error) { return "", nil } -func (is MockedImageStore) RunGCRepo(repo string) { +func (is MockedImageStore) RunGCRepo(repo string) error { if is.RunGCRepoFn != nil { - is.RunGCRepoFn(repo) + return is.RunGCRepoFn(repo) + } + + return nil +} + +func (is MockedImageStore) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) { + if is.RunGCPeriodicallyFn != nil { + is.RunGCPeriodicallyFn(interval, sch) } } diff --git a/test/blackbox/scrub.bats b/test/blackbox/scrub.bats index bef9b38b..f82947d0 100644 --- a/test/blackbox/scrub.bats +++ b/test/blackbox/scrub.bats @@ -59,7 +59,7 @@ function teardown() { wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog" # wait for scrub to be done and logs to get populated - run sleep 5s + run sleep 10s run not_affected [ "$status" -eq 0 ] [ $(echo "${lines[0]}" ) = 'true' ] @@ -74,7 +74,7 @@ function teardown() { wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog" # wait for scrub to be done and logs to get populated - run sleep 5s + run sleep 10s run affected [ "$status" -eq 0 ] [ $(echo "${lines[0]}" ) = 'true' ]