0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-30 22:34:13 -05:00

initial design for task scheduler (#700)

Signed-off-by: Andreea-Lupu <andreealupu1470@yahoo.com>
This commit is contained in:
Andreea Lupu 2022-09-23 08:27:56 +03:00 committed by GitHub
parent 7517f2a5bb
commit f686ab6bf6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 859 additions and 254 deletions

View file

@ -21,10 +21,10 @@ import (
"zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api/config"
ext "zotregistry.io/zot/pkg/extensions"
extconf "zotregistry.io/zot/pkg/extensions/config"
"zotregistry.io/zot/pkg/extensions/lint"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/s3"
)
@ -444,6 +444,14 @@ func (c *Controller) Shutdown() {
}
func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler := scheduler.NewScheduler(c.Log)
taskScheduler.RunScheduler(reloadCtx)
// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC && c.Config.Storage.GCInterval != 0 {
c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval, taskScheduler)
}
// Enable extensions if extension config is provided for DefaultStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
@ -451,7 +459,12 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}
if c.Config.Storage.SubPaths != nil {
for _, storageConfig := range c.Config.Storage.SubPaths {
for route, storageConfig := range c.Config.Storage.SubPaths {
// Enable running garbage-collect periodically for subImageStore
if storageConfig.GC && storageConfig.GCInterval != 0 {
c.StoreController.SubStore[route].RunGCPeriodically(storageConfig.GCInterval, taskScheduler)
}
// Enable extensions if extension config is provided for subImageStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, storageConfig.RootDirectory)
@ -468,131 +481,6 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}
if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, false, nil, "")
}
go StartPeriodicTasks(c.StoreController.DefaultStore, c.StoreController.SubStore, c.Config.Storage.SubPaths,
c.Config.Storage.GC, c.Config.Storage.GCInterval, c.Config.Extensions, c.Log)
}
func StartPeriodicTasks(defaultStore storage.ImageStore, subStore map[string]storage.ImageStore,
subPaths map[string]config.StorageConfig, gcEnabled bool, gcInterval time.Duration,
extensions *extconf.ExtensionConfig, log log.Logger,
) {
// start periodic gc and/or scrub for DefaultStore
StartPeriodicTasksForImageStore(defaultStore, gcEnabled, gcInterval, extensions, log)
for route, storageConfig := range subPaths {
// Enable running garbage-collect or/and scrub periodically for subImageStore
StartPeriodicTasksForImageStore(subStore[route], storageConfig.GC, storageConfig.GCInterval, extensions, log)
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
}
}
func StartPeriodicTasksForImageStore(imageStore storage.ImageStore, configGC bool, configGCInterval time.Duration,
extensions *extconf.ExtensionConfig, log log.Logger,
) {
scrubInterval := time.Duration(0)
gcInterval := time.Duration(0)
gc := false
scrub := false
if configGC && configGCInterval != 0 {
gcInterval = configGCInterval
gc = true
}
if extensions != nil && extensions.Scrub != nil && extensions.Scrub.Interval != 0 {
scrubInterval = extensions.Scrub.Interval
scrub = true
}
interval := minPeriodicInterval(scrub, gc, scrubInterval, gcInterval)
if interval == time.Duration(0) {
return
}
log.Info().Msg(fmt.Sprintf("Periodic interval for %s set to %s", imageStore.RootDir(), interval))
var lastGC, lastScrub time.Time
for {
log.Info().Msg(fmt.Sprintf("Starting periodic background tasks for %s", imageStore.RootDir()))
// Enable running garbage-collect or/and scrub periodically for imageStore
RunBackgroundTasks(imageStore, gc, scrub, log)
log.Info().Msg(fmt.Sprintf("Finishing periodic background tasks for %s", imageStore.RootDir()))
if gc {
lastGC = time.Now()
}
if scrub {
lastScrub = time.Now()
}
time.Sleep(interval)
if !lastGC.IsZero() && time.Since(lastGC) >= gcInterval {
gc = true
}
if !lastScrub.IsZero() && time.Since(lastScrub) >= scrubInterval {
scrub = true
}
}
}
func RunBackgroundTasks(imgStore storage.ImageStore, gc, scrub bool, log log.Logger) {
repos, err := imgStore.GetRepositories()
if err != nil {
log.Error().Err(err).Msg(fmt.Sprintf("error while running background task for %s", imgStore.RootDir()))
return
}
for _, repo := range repos {
if gc {
start := time.Now()
// run gc for this repo
imgStore.RunGCRepo(repo)
elapsed := time.Since(start)
log.Info().Msg(fmt.Sprintf("gc for %s executed in %s", repo, elapsed))
time.Sleep(1 * time.Minute)
}
if scrub {
start := time.Now()
// run scrub for this repo
ext.EnableScrubExtension(nil, log, true, imgStore, repo)
elapsed := time.Since(start)
log.Info().Msg(fmt.Sprintf("scrub for %s executed in %s", repo, elapsed))
time.Sleep(1 * time.Minute)
}
}
}
func minPeriodicInterval(scrub, gc bool, scrubInterval, gcInterval time.Duration) time.Duration {
if scrub && gc {
if scrubInterval <= gcInterval {
return scrubInterval
}
return gcInterval
}
if scrub {
return scrubInterval
}
if gc {
return gcInterval
}
return time.Duration(0)
}

View file

@ -6007,14 +6007,10 @@ func TestPeriodicGC(t *testing.T) {
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring,
"\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":3600000000000")
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
So(string(data), ShouldNotContainSubstring,
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
So(string(data), ShouldContainSubstring,
fmt.Sprintf("executing GC of orphaned blobs for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll
So(string(data), ShouldContainSubstring,
fmt.Sprintf("GC completed for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll
fmt.Sprintf("GC successfully completed for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll
})
Convey("Periodic GC enabled for substore", t, func() {
@ -6051,82 +6047,6 @@ func TestPeriodicGC(t *testing.T) {
// periodic GC is enabled for sub store
So(string(data), ShouldContainSubstring,
fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"GC\":true,\"Dedupe\":false,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) //nolint:lll // gofumpt conflicts with lll
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.SubStore["/a"].RootDir())) //nolint:lll
})
}
func TestPeriodicTasks(t *testing.T) {
Convey("Both periodic gc and periodic scrub enabled for default store with scrubInterval < gcInterval", t, func() {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
conf.Log.Output = logFile.Name()
defer os.Remove(logFile.Name()) // clean up
ctlr := api.NewController(conf)
dir := t.TempDir()
ctlr.Config.Storage.RootDirectory = dir
ctlr.Config.Storage.GC = true
ctlr.Config.Storage.GCInterval = 12 * time.Hour
ctlr.Config.Storage.GCDelay = 1 * time.Second
ctlr.Config.Extensions = &extconf.ExtensionConfig{Scrub: &extconf.ScrubConfig{Interval: 8 * time.Hour}}
go startServer(ctlr)
defer stopServer(ctlr)
test.WaitTillServerReady(baseURL)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
So(string(data), ShouldNotContainSubstring,
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Finishing periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Periodic interval for %s set to %s",
ctlr.StoreController.DefaultStore.RootDir(), ctlr.Config.Extensions.Scrub.Interval))
})
Convey("Both periodic gc and periodic scrub enabled for default store with gcInterval < scrubInterval", t, func() {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
conf.Log.Output = logFile.Name()
defer os.Remove(logFile.Name()) // clean up
ctlr := api.NewController(conf)
dir := t.TempDir()
ctlr.Config.Storage.RootDirectory = dir
ctlr.Config.Storage.GC = true
ctlr.Config.Storage.GCInterval = 8 * time.Hour
ctlr.Config.Storage.GCDelay = 1 * time.Second
ctlr.Config.Extensions = &extconf.ExtensionConfig{Scrub: &extconf.ScrubConfig{Interval: 12 * time.Hour}}
go startServer(ctlr)
defer stopServer(ctlr)
test.WaitTillServerReady(baseURL)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
So(string(data), ShouldNotContainSubstring,
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Finishing periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
So(string(data), ShouldContainSubstring,
fmt.Sprintf("Periodic interval for %s set to %s",
ctlr.StoreController.DefaultStore.RootDir(), ctlr.Config.Storage.GCInterval))
})
}

View file

@ -444,8 +444,6 @@ func TestServeScrubExtension(t *testing.T) {
"\"Extensions\":{\"Search\":null,\"Sync\":null,\"Metrics\":null,\"Scrub\":{\"Interval\":3600000000000},\"Lint\":null") //nolint:lll // gofumpt conflicts with lll
So(dataStr, ShouldContainSubstring,
"Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.")
So(dataStr, ShouldContainSubstring, "Starting periodic background tasks for")
So(dataStr, ShouldContainSubstring, "Finishing periodic background tasks for")
})
Convey("scrub not enabled - scrub interval param not set", t, func(c C) {

View file

@ -4,17 +4,21 @@
package extensions
import (
"errors"
"io"
"time"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/extensions/scrub"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/storage"
)
// EnableScrubExtension enables scrub extension.
func EnableScrubExtension(config *config.Config, log log.Logger, run bool, imgStore storage.ImageStore, repo string) {
if !run {
func EnableScrubExtension(config *config.Config, log log.Logger, storeController storage.StoreController,
sch *scheduler.Scheduler,
) {
if config.Extensions.Scrub != nil &&
config.Extensions.Scrub.Interval != 0 {
minScrubInterval, _ := time.ParseDuration("2h")
@ -24,10 +28,57 @@ func EnableScrubExtension(config *config.Config, log log.Logger, run bool, imgSt
log.Warn().Msg("Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.") //nolint:lll // gofumpt conflicts with lll
}
generator := &taskGenerator{
imgStore: storeController.DefaultStore,
log: log,
}
sch.SubmitGenerator(generator, config.Extensions.Scrub.Interval, scheduler.LowPriority)
if config.Storage.SubPaths != nil {
for route := range config.Storage.SubPaths {
generator := &taskGenerator{
imgStore: storeController.SubStore[route],
log: log,
}
sch.SubmitGenerator(generator, config.Extensions.Scrub.Interval, scheduler.LowPriority)
}
}
} else {
log.Info().Msg("Scrub config not provided, skipping scrub")
}
} else {
scrub.RunScrubRepo(imgStore, repo, log)
}
type taskGenerator struct {
imgStore storage.ImageStore
log log.Logger
lastRepo string
done bool
}
func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) {
repo, err := gen.imgStore.GetNextRepository(gen.lastRepo)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
if repo == "" {
gen.done = true
return nil, nil
}
gen.lastRepo = repo
return scrub.NewTask(gen.imgStore, repo, gen.log), nil
}
func (gen *taskGenerator) IsDone() bool {
return gen.done
}
func (gen *taskGenerator) Reset() {
gen.lastRepo = ""
gen.done = false
}

View file

@ -6,13 +6,13 @@ package extensions
import (
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/storage"
)
// EnableScrubExtension ...
func EnableScrubExtension(config *config.Config,
log log.Logger, run bool,
imgStore storage.ImageStore, repo string,
func EnableScrubExtension(config *config.Config, log log.Logger, storeController storage.StoreController,
sch *scheduler.Scheduler,
) {
log.Warn().Msg("skipping enabling scrub extension because given zot binary doesn't include this feature," +
"please build a binary that does so")

View file

@ -12,7 +12,7 @@ import (
)
// Scrub Extension for repo...
func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) {
func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) error {
execMsg := fmt.Sprintf("executing scrub to check manifest/blob integrity for %s", path.Join(imgStore.RootDir(), repo))
log.Info().Msg(execMsg)
@ -20,6 +20,9 @@ func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) {
if err != nil {
errMessage := fmt.Sprintf("error while running scrub for %s", path.Join(imgStore.RootDir(), repo))
log.Error().Err(err).Msg(errMessage)
log.Info().Msg(fmt.Sprintf("scrub unsuccessfully completed for %s", path.Join(imgStore.RootDir(), repo)))
return err
}
for _, result := range results {
@ -39,5 +42,21 @@ func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) {
}
}
log.Info().Msg(fmt.Sprintf("scrub completed for %s", path.Join(imgStore.RootDir(), repo)))
log.Info().Msg(fmt.Sprintf("scrub successfully completed for %s", path.Join(imgStore.RootDir(), repo)))
return nil
}
type Task struct {
imgStore storage.ImageStore
repo string
log log.Logger
}
func NewTask(imgStore storage.ImageStore, repo string, log log.Logger) *Task {
return &Task{imgStore, repo, log}
}
func (scrubT *Task) DoWork() error {
return RunScrubRepo(scrubT.imgStore, scrubT.repo, scrubT.log)
}

View file

@ -42,8 +42,11 @@ func TestScrubExtension(t *testing.T) {
conf.HTTP.Port = port
dir := t.TempDir()
subdir := t.TempDir()
conf.Storage.RootDirectory = dir
substore := config.StorageConfig{RootDirectory: subdir}
conf.Storage.SubPaths = map[string]config.StorageConfig{"/a": substore}
conf.Log.Output = logFile.Name()
scrubConfig := &extconf.ScrubConfig{
Interval: 2,
@ -75,7 +78,7 @@ func TestScrubExtension(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
time.Sleep(1 * time.Second)
time.Sleep(6 * time.Second)
defer func(controller *api.Controller) {
ctx := context.Background()
@ -140,7 +143,7 @@ func TestScrubExtension(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
time.Sleep(6 * time.Second)
defer func(controller *api.Controller) {
ctx := context.Background()
@ -152,7 +155,7 @@ func TestScrubExtension(t *testing.T) {
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest affected")
})
Convey("RunBackgroundTasks error - not enough permissions to access root directory", t, func(c C) {
Convey("Generator error - not enough permissions to access root directory", t, func(c C) {
port := test.GetFreePort()
url := test.GetBaseURL(port)
@ -200,7 +203,7 @@ func TestScrubExtension(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
time.Sleep(6 * time.Second)
defer func(controller *api.Controller) {
ctx := context.Background()
@ -209,8 +212,7 @@ func TestScrubExtension(t *testing.T) {
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring,
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
So(string(data), ShouldContainSubstring, "error while executing generator")
So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil)
})
@ -238,7 +240,8 @@ func TestRunScrubRepo(t *testing.T) {
panic(err)
}
scrub.RunScrubRepo(imgStore, repoName, log)
err = scrub.RunScrubRepo(imgStore, repoName, log)
So(err, ShouldBeNil)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
@ -274,7 +277,8 @@ func TestRunScrubRepo(t *testing.T) {
panic(err)
}
scrub.RunScrubRepo(imgStore, repoName, log)
err = scrub.RunScrubRepo(imgStore, repoName, log)
So(err, ShouldBeNil)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
@ -304,7 +308,8 @@ func TestRunScrubRepo(t *testing.T) {
So(os.Chmod(path.Join(dir, repoName), 0o000), ShouldBeNil)
scrub.RunScrubRepo(imgStore, repoName, log)
err = scrub.RunScrubRepo(imgStore, repoName, log)
So(err, ShouldNotBeNil)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)

40
pkg/scheduler/README.md Normal file
View file

@ -0,0 +1,40 @@
# How to submit a Generator to the scheduler
## What is a generator and how should it be implemented?
In order to create a new generator (which will generate new tasks one by one) and add it to the scheduler there are 3 methods which should be implemented:
1. ***GenerateTask() (Task, error)***
```
This method should implement the logic for generating a new task.
Basically, when this method is called by the scheduler it should return the next task until there are no more tasks to be generated.
Also, the Task returned by this method should implement DoWork() method which should contain the logic that should be executed when this task is run by the scheduler.
```
2. ***IsDone() bool***
```
This method should return true after the generator finished all the work and has no more tasks to generate.
```
3. ***Reset()***
```
When this method is called the generator should reset to its initial state.
After the generator is reset, it will generate new tasks as if it hadn't been used before.
This is useful for periodic generators, because the scheduler will call this method when the generator is done and has to wait a specific interval of time for this generator to become ready to run again.
```
## Submit a generator
The scheduler accepts both periodic and non-periodic generators.
To submit a generator to the scheduler, ***SubmitGenerator*** should be called with the implemented generator, interval of time (which should be time.Duration(0) in case of non-periodic generator, or the interval for the periodic generator) and the priority of the tasks which will be generated by this generator as parameters.
Notes:
- A generator should submit only tasks having the same priority
- The priority of a task can be: LowPriorirty, MediumPriority or HighPriority
# How to submit a Task to the scheduler
In order to create a new task and add it to the scheduler ***DoWork() error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler.
To submit a task to the scheduler ***SubmitTask*** should be called with the implemented task and the priority of the task as parameters.
Note:
- A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task.

318
pkg/scheduler/scheduler.go Normal file
View file

@ -0,0 +1,318 @@
package scheduler
import (
"container/heap"
"context"
"sync"
"time"
"zotregistry.io/zot/pkg/log"
)
type Task interface {
DoWork() error
}
type generatorsPriorityQueue []*generator
func (pq generatorsPriorityQueue) Len() int {
return len(pq)
}
func (pq generatorsPriorityQueue) Less(i, j int) bool {
return pq[i].priority > pq[j].priority
}
func (pq generatorsPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *generatorsPriorityQueue) Push(x any) {
n := len(*pq)
item, ok := x.(*generator)
if !ok {
return
}
item.index = n
*pq = append(*pq, item)
}
func (pq *generatorsPriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil
item.index = -1
*pq = old[0 : n-1]
return item
}
const rateLimiterScheduler = 400
type Scheduler struct {
tasksQLow chan Task
tasksQMedium chan Task
tasksQHigh chan Task
generators generatorsPriorityQueue
waitingGenerators []*generator
generatorsLock *sync.Mutex
log log.Logger
stopCh chan struct{}
}
func NewScheduler(logC log.Logger) *Scheduler {
chLow := make(chan Task, rateLimiterScheduler)
chMedium := make(chan Task, rateLimiterScheduler)
chHigh := make(chan Task, rateLimiterScheduler)
generatorPQ := make(generatorsPriorityQueue, 0)
sublogger := logC.With().Str("component", "scheduler").Logger()
heap.Init(&generatorPQ)
return &Scheduler{
tasksQLow: chLow,
tasksQMedium: chMedium,
tasksQHigh: chHigh,
generators: generatorPQ,
generatorsLock: new(sync.Mutex),
log: log.Logger{Logger: sublogger},
stopCh: make(chan struct{}),
}
}
const rateLimit = 5 * time.Second
func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
throttle := time.NewTicker(rateLimit).C
go func() {
for {
select {
case <-ctx.Done():
close(scheduler.stopCh)
return
default:
task := scheduler.getTask()
if task != nil {
if err := task.DoWork(); err != nil {
scheduler.log.Error().Err(err).Msg("error while executing task")
}
}
}
<-throttle
}
}()
}
func (scheduler *Scheduler) pushReadyGenerators() {
// iterate through waiting generators list and resubmit those which become ready to run
for i, gen := range scheduler.waitingGenerators {
if gen.getState() == ready {
gen.done = false
heap.Push(&scheduler.generators, gen)
scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...)
}
}
}
func (scheduler *Scheduler) generateTasks() {
scheduler.generatorsLock.Lock()
defer scheduler.generatorsLock.Unlock()
// resubmit ready generators(which were in a waiting state) to generators priority queue
scheduler.pushReadyGenerators()
// get the highest priority generator from queue
if scheduler.generators.Len() == 0 {
return
}
var gen *generator
// check if the generator with highest prioriy is ready to run
if scheduler.generators[0].getState() == ready {
gen = scheduler.generators[0]
} else {
gen, _ = heap.Pop(&scheduler.generators).(*generator)
if gen.getState() == waiting {
scheduler.waitingGenerators = append(scheduler.waitingGenerators, gen)
}
return
}
// run generator to generate a new task which will be added to a channel by priority
gen.generate(scheduler)
}
func (scheduler *Scheduler) getTask() Task {
// first, generate a task with highest possible priority
scheduler.generateTasks()
// then, return a task with highest possible priority
select {
case t := <-scheduler.tasksQHigh:
return t
default:
}
select {
case t := <-scheduler.tasksQMedium:
return t
default:
}
select {
case t := <-scheduler.tasksQLow:
return t
default:
}
return nil
}
func (scheduler *Scheduler) getTasksChannelByPriority(priority Priority) chan Task {
switch priority {
case LowPriority:
return scheduler.tasksQLow
case MediumPriority:
return scheduler.tasksQMedium
case HighPriority:
return scheduler.tasksQHigh
}
return nil
}
func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) {
// get by priority the channel where the task should be added to
tasksQ := scheduler.getTasksChannelByPriority(priority)
if tasksQ == nil {
return
}
// check if the scheduler it's still running in order to add the task to the channel
select {
case <-scheduler.stopCh:
return
default:
}
select {
case <-scheduler.stopCh:
return
case tasksQ <- task:
scheduler.log.Info().Msg("Adding a new task to the scheduler")
}
}
type Priority int
const (
LowPriority Priority = iota
MediumPriority
HighPriority
)
type state int
const (
ready state = iota
waiting
done
)
type Generator interface {
GenerateTask() (Task, error)
IsDone() bool
Reset()
}
type generator struct {
interval time.Duration
lastRun time.Time
done bool
priority Priority
taskGenerator Generator
remainingTask Task
index int
}
func (gen *generator) generate(sch *Scheduler) {
// get by priority the channel where the new generated task should be added to
taskQ := sch.getTasksChannelByPriority(gen.priority)
task := gen.remainingTask
// in case there is no task already generated, generate a new task
if gen.remainingTask == nil {
nextTask, err := gen.taskGenerator.GenerateTask()
if err != nil {
sch.log.Error().Err(err).Msg("error while executing generator")
return
}
task = nextTask
// check if the generator is done
if gen.taskGenerator.IsDone() {
gen.done = true
gen.lastRun = time.Now()
gen.taskGenerator.Reset()
return
}
}
// check if it's possible to add a new task to the channel
// if not, keep the generated task and retry to add it next time
select {
case taskQ <- task:
gen.remainingTask = nil
return
default:
gen.remainingTask = task
}
}
// getState() returns the state of a generator.
// if the generator is not periodic then it can be done or ready to generate a new task.
// if the generator is periodic then it can be waiting (finished its work and wait for its interval to pass)
// or ready to generate a new task.
func (gen *generator) getState() state {
if gen.interval == time.Duration(0) {
if gen.done && gen.remainingTask == nil {
return done
}
} else {
if gen.done && time.Since(gen.lastRun) < gen.interval && gen.remainingTask == nil {
return waiting
}
}
return ready
}
func (scheduler *Scheduler) SubmitGenerator(taskGenerator Generator, interval time.Duration, priority Priority) {
newGenerator := &generator{
interval: interval,
done: false,
priority: priority,
taskGenerator: taskGenerator,
remainingTask: nil,
}
scheduler.generatorsLock.Lock()
defer scheduler.generatorsLock.Unlock()
// add generator to the generators priority queue
heap.Push(&scheduler.generators, newGenerator)
}

View file

@ -0,0 +1,177 @@
package scheduler_test
import (
"context"
"errors"
"fmt"
"os"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
)
type task struct {
log log.Logger
msg string
err bool
}
var errInternal = errors.New("task: internal error")
func (t *task) DoWork() error {
if t.err {
return errInternal
}
t.log.Info().Msg(t.msg)
return nil
}
type generator struct {
log log.Logger
priority string
done bool
index int
step int
}
func (g *generator) GenerateTask() (scheduler.Task, error) {
if g.step > 1 {
g.done = true
}
g.step++
g.index++
return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil
}
func (g *generator) IsDone() bool {
return g.done
}
func (g *generator) Reset() {
g.done = false
g.step = 0
}
func TestScheduler(t *testing.T) {
Convey("Test order of generators in queue", t, func() {
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
genL := &generator{log: logger, priority: "low priority"}
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
genH := &generator{log: logger, priority: "high priority"}
sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
time.Sleep(500 * time.Millisecond)
cancel()
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "executing high priority task; index: 1")
So(string(data), ShouldNotContainSubstring, "executing low priority task; index: 1")
So(string(data), ShouldNotContainSubstring, "error while executing task")
})
Convey("Test task returning an error", t, func() {
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
t := &task{log: logger, msg: "", err: true}
sch.SubmitTask(t, scheduler.MediumPriority)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
time.Sleep(500 * time.Millisecond)
cancel()
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "Adding a new task to the scheduler")
So(string(data), ShouldContainSubstring, "error while executing task")
})
Convey("Test resubmit generator", t, func() {
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
genL := &generator{log: logger, priority: "low priority"}
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
time.Sleep(6 * time.Second)
cancel()
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "executing low priority task; index: 1")
So(string(data), ShouldContainSubstring, "executing low priority task; index: 2")
})
Convey("Try to add a task with wrong priority", t, func() {
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
t := &task{log: logger, msg: "", err: false}
sch.SubmitTask(t, -1)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler")
})
Convey("Test adding a new task when context is done", t, func() {
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(logger)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
cancel()
time.Sleep(500 * time.Millisecond)
t := &task{log: logger, msg: "", err: false}
sch.SubmitTask(t, scheduler.LowPriority)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldNotContainSubstring, "Adding a new task to the scheduler")
})
}

View file

@ -31,6 +31,7 @@ import (
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/extensions/monitoring"
zlog "zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/test"
)
@ -360,6 +361,65 @@ func (is *ImageStoreLocal) GetRepositories() ([]string, error) {
return stores, err
}
// GetNextRepository returns next repository under this store.
func (is *ImageStoreLocal) GetNextRepository(repo string) (string, error) {
var lockLatency time.Time
dir := is.rootDir
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
_, err := os.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Msg("failure walking storage root-dir")
return "", err
}
found := false
store := ""
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
return nil
}
rel, err := filepath.Rel(is.rootDir, path)
if err != nil {
return nil // nolint:nilerr // ignore paths not relative to root dir
}
ok, err := is.ValidateRepo(rel)
if !ok || err != nil {
return nil // nolint:nilerr // ignore invalid repos
}
if repo == "" && ok && err == nil {
store = rel
return io.EOF
}
if found {
store = rel
return io.EOF
}
if rel == repo {
found = true
}
return nil
})
return store, err
}
// GetImageTags returns a list of image tags available in the specified repository.
func (is *ImageStoreLocal) GetImageTags(repo string) ([]string, error) {
var lockLatency time.Time
@ -1973,13 +2033,71 @@ func (is *ImageStoreLocal) gcRepo(repo string) error {
return nil
}
func (is *ImageStoreLocal) RunGCRepo(repo string) {
func (is *ImageStoreLocal) RunGCRepo(repo string) error {
is.log.Info().Msg(fmt.Sprintf("executing GC of orphaned blobs for %s", path.Join(is.RootDir(), repo)))
if err := is.gcRepo(repo); err != nil {
errMessage := fmt.Sprintf("error while running GC for %s", path.Join(is.RootDir(), repo))
is.log.Error().Err(err).Msg(errMessage)
is.log.Info().Msg(fmt.Sprintf("GC unsuccessfully completed for %s", path.Join(is.RootDir(), repo)))
return err
}
is.log.Info().Msg(fmt.Sprintf("GC completed for %s", path.Join(is.RootDir(), repo)))
is.log.Info().Msg(fmt.Sprintf("GC successfully completed for %s", path.Join(is.RootDir(), repo)))
return nil
}
func (is *ImageStoreLocal) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) {
generator := &taskGenerator{
imgStore: is,
}
sch.SubmitGenerator(generator, interval, scheduler.MediumPriority)
}
type taskGenerator struct {
imgStore *ImageStoreLocal
lastRepo string
done bool
}
func (gen *taskGenerator) GenerateTask() (scheduler.Task, error) {
repo, err := gen.imgStore.GetNextRepository(gen.lastRepo)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
if repo == "" {
gen.done = true
return nil, nil
}
gen.lastRepo = repo
return newGCTask(gen.imgStore, repo), nil
}
func (gen *taskGenerator) IsDone() bool {
return gen.done
}
func (gen *taskGenerator) Reset() {
gen.lastRepo = ""
gen.done = false
}
type gcTask struct {
imgStore *ImageStoreLocal
repo string
}
func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask {
return &gcTask{imgStore, repo}
}
func (gcT *gcTask) DoWork() error {
return gcT.imgStore.RunGCRepo(gcT.repo)
}

View file

@ -918,7 +918,9 @@ func FuzzRunGCRepo(f *testing.F) {
imgStore := storage.NewImageStore(dir, true, storage.DefaultGCDelay, true, true, *log, metrics, nil)
imgStore.RunGCRepo(data)
if err := imgStore.RunGCRepo(data); err != nil {
t.Error(err)
}
})
}
@ -1943,7 +1945,8 @@ func TestGarbageCollectForImageStore(t *testing.T) {
panic(err)
}
imgStore.RunGCRepo(repoName)
err = imgStore.RunGCRepo(repoName)
So(err, ShouldNotBeNil)
time.Sleep(500 * time.Millisecond)
@ -1970,7 +1973,8 @@ func TestGarbageCollectForImageStore(t *testing.T) {
So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o000), ShouldBeNil)
imgStore.RunGCRepo(repoName)
err = imgStore.RunGCRepo(repoName)
So(err, ShouldNotBeNil)
time.Sleep(500 * time.Millisecond)
@ -2055,6 +2059,41 @@ func TestGetRepositoriesError(t *testing.T) {
})
}
func TestGetNextRepository(t *testing.T) {
dir := t.TempDir()
log := log.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)
imgStore := storage.NewImageStore(dir, true, storage.DefaultGCDelay,
true, true, log, metrics, nil,
)
firstRepoName := "repo1"
secondRepoName := "repo2"
err := test.CopyFiles("../../test/data/zot-test", path.Join(dir, firstRepoName))
if err != nil {
panic(err)
}
err = test.CopyFiles("../../test/data/zot-test", path.Join(dir, secondRepoName))
if err != nil {
panic(err)
}
Convey("Return first repository", t, func() {
firstRepo, err := imgStore.GetNextRepository("")
So(firstRepo, ShouldEqual, firstRepoName)
So(err, ShouldNotBeNil)
So(err, ShouldEqual, io.EOF)
})
Convey("Return second repository", t, func() {
secondRepo, err := imgStore.GetNextRepository(firstRepoName)
So(secondRepo, ShouldEqual, secondRepoName)
So(err, ShouldNotBeNil)
So(err, ShouldEqual, io.EOF)
})
}
func TestPutBlobChunkStreamed(t *testing.T) {
Convey("Get error on opening file", t, func() {
dir := t.TempDir()

View file

@ -28,6 +28,7 @@ import (
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/extensions/monitoring"
zlog "zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/test"
)
@ -292,6 +293,11 @@ func (is *ObjectStorage) GetRepositories() ([]string, error) {
return stores, err
}
// GetNextRepository returns next repository under this store.
func (is *ObjectStorage) GetNextRepository(repo string) (string, error) {
return "", nil
}
// GetImageTags returns a list of image tags available in the specified repository.
func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo)
@ -1303,7 +1309,11 @@ retry:
return nil
}
func (is *ObjectStorage) RunGCRepo(repo string) {
func (is *ObjectStorage) RunGCRepo(repo string) error {
return nil
}
func (is *ObjectStorage) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) {
}
// DeleteBlobUpload deletes an existing blob upload that is currently in progress.

View file

@ -6,6 +6,7 @@ import (
"github.com/opencontainers/go-digest"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"zotregistry.io/zot/pkg/scheduler"
)
const (
@ -23,6 +24,7 @@ type ImageStore interface {
InitRepo(name string) error
ValidateRepo(name string) (bool, error)
GetRepositories() ([]string, error)
GetNextRepository(repo string) (string, error)
GetImageTags(repo string) ([]string, error)
GetImageManifest(repo, reference string) ([]byte, string, string, error)
PutImageManifest(repo, reference, mediaType string, body []byte) (string, error)
@ -45,5 +47,6 @@ type ImageStore interface {
GetIndexContent(repo string) ([]byte, error)
GetBlobContent(repo, digest string) ([]byte, error)
GetReferrers(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error)
RunGCRepo(repo string)
RunGCRepo(repo string) error
RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler)
}

View file

@ -6,6 +6,7 @@ import (
"github.com/opencontainers/go-digest"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"zotregistry.io/zot/pkg/scheduler"
)
type MockedImageStore struct {
@ -14,6 +15,7 @@ type MockedImageStore struct {
InitRepoFn func(name string) error
ValidateRepoFn func(name string) (bool, error)
GetRepositoriesFn func() ([]string, error)
GetNextRepositoryFn func(repo string) (string, error)
GetImageTagsFn func(repo string) ([]string, error)
GetImageManifestFn func(repo string, reference string) ([]byte, string, string, error)
PutImageManifestFn func(repo string, reference string, mediaType string, body []byte) (string, error)
@ -38,7 +40,8 @@ type MockedImageStore struct {
GetBlobContentFn func(repo, digest string) ([]byte, error)
GetReferrersFn func(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error)
URLForPathFn func(path string) (string, error)
RunGCRepoFn func(repo string)
RunGCRepoFn func(repo string) error
RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler)
}
func (is MockedImageStore) Lock(t *time.Time) {
@ -93,6 +96,14 @@ func (is MockedImageStore) GetRepositories() ([]string, error) {
return []string{}, nil
}
func (is MockedImageStore) GetNextRepository(repo string) (string, error) {
if is.GetNextRepositoryFn != nil {
return is.GetNextRepositoryFn(repo)
}
return "", nil
}
func (is MockedImageStore) GetImageManifest(repo string, reference string) ([]byte, string, string, error) {
if is.GetImageManifestFn != nil {
return is.GetImageManifestFn(repo, reference)
@ -293,8 +304,16 @@ func (is MockedImageStore) URLForPath(path string) (string, error) {
return "", nil
}
func (is MockedImageStore) RunGCRepo(repo string) {
func (is MockedImageStore) RunGCRepo(repo string) error {
if is.RunGCRepoFn != nil {
is.RunGCRepoFn(repo)
return is.RunGCRepoFn(repo)
}
return nil
}
func (is MockedImageStore) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) {
if is.RunGCPeriodicallyFn != nil {
is.RunGCPeriodicallyFn(interval, sch)
}
}

View file

@ -59,7 +59,7 @@ function teardown() {
wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog"
# wait for scrub to be done and logs to get populated
run sleep 5s
run sleep 10s
run not_affected
[ "$status" -eq 0 ]
[ $(echo "${lines[0]}" ) = 'true' ]
@ -74,7 +74,7 @@ function teardown() {
wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog"
# wait for scrub to be done and logs to get populated
run sleep 5s
run sleep 10s
run affected
[ "$status" -eq 0 ]
[ $(echo "${lines[0]}" ) = 'true' ]