diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 76c9f472..358feaa4 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -377,7 +377,7 @@ func (c *Controller) Shutdown() { } func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { - c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log) + c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log) c.taskScheduler.RunScheduler(reloadCtx) // Enable running garbage-collect periodically for DefaultStore diff --git a/pkg/api/cookiestore.go b/pkg/api/cookiestore.go index d065399b..d66b971a 100644 --- a/pkg/api/cookiestore.go +++ b/pkg/api/cookiestore.go @@ -3,6 +3,7 @@ package api import ( "context" "encoding/gob" + "fmt" "io/fs" "os" "path" @@ -157,3 +158,12 @@ func (cleanTask *CleanTask) DoWork(ctx context.Context) error { return nil } + +func (cleanTask *CleanTask) String() string { + return fmt.Sprintf("{Name: %s, sessions: %s}", + cleanTask.Name(), cleanTask.sessions) +} + +func (cleanTask *CleanTask) Name() string { + return "SessionCleanupTask" +} diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index ff624a9e..387470b8 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -10,6 +10,7 @@ import ( "fmt" "math/big" "net/http" + "runtime" "strings" "sync" "testing" @@ -25,6 +26,7 @@ import ( zotcfg "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/exporter/api" "zotregistry.io/zot/pkg/extensions/monitoring" + "zotregistry.io/zot/pkg/scheduler" . "zotregistry.io/zot/pkg/test/common" ) @@ -69,12 +71,22 @@ func readDefaultMetrics(collector *api.Collector, chMetric chan prometheus.Metri So(err, ShouldBeNil) So(*metric.Gauge.Value, ShouldEqual, 1) + pmMetric = <-chMetric + So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_scheduler_workers_total"].String()) + + err = pmMetric.Write(&metric) + So(err, ShouldBeNil) + So(*metric.Gauge.Value, ShouldEqual, runtime.NumCPU()*scheduler.NumWorkersMultiplier) + pmMetric = <-chMetric So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_info"].String()) err = pmMetric.Write(&metric) So(err, ShouldBeNil) So(*metric.Gauge.Value, ShouldEqual, 0) + + pmMetric = <-chMetric + So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_scheduler_generators_total"].String()) } func TestNewExporter(t *testing.T) { diff --git a/pkg/extensions/extension_userprefs_disable.go b/pkg/extensions/extension_userprefs_disable.go index 054bb998..6975e248 100644 --- a/pkg/extensions/extension_userprefs_disable.go +++ b/pkg/extensions/extension_userprefs_disable.go @@ -18,6 +18,6 @@ func IsBuiltWithUserPrefsExtension() bool { func SetupUserPreferencesRoutes(config *config.Config, router *mux.Router, metaDB mTypes.MetaDB, log log.Logger, ) { - log.Warn().Msg("userprefs extension is disabled because given zot binary doesn't" + + log.Warn().Msg("userprefs extension is disabled because given zot binary doesn't " + "include this feature please build a binary that does so") } diff --git a/pkg/extensions/imagetrust/image_trust.go b/pkg/extensions/imagetrust/image_trust.go index b56bba3d..3a8f87c6 100644 --- a/pkg/extensions/imagetrust/image_trust.go +++ b/pkg/extensions/imagetrust/image_trust.go @@ -5,6 +5,7 @@ package imagetrust import ( "context" + "fmt" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -274,3 +275,13 @@ func (validityT *validityTask) DoWork(ctx context.Context) error { return nil } + +func (validityT *validityTask) String() string { + return fmt.Sprintf("{sigValidityTaskGenerator: %s, repo: %s}", + "signatures validity task", // description of generator's task purpose + validityT.repo.Name) +} + +func (validityT *validityTask) Name() string { + return "SignatureValidityTask" +} diff --git a/pkg/extensions/monitoring/extension.go b/pkg/extensions/monitoring/extension.go index 82f78b93..7bf7a98c 100644 --- a/pkg/extensions/monitoring/extension.go +++ b/pkg/extensions/monitoring/extension.go @@ -83,6 +83,53 @@ var ( }, []string{"storageName", "lockType"}, ) + schedulerGenerators = promauto.NewCounter( //nolint: gochecknoglobals + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "scheduler_generators_total", + Help: "Total number of generators registered in scheduler", + }, + ) + schedulerGeneratorsStatus = promauto.NewGaugeVec( //nolint: gochecknoglobals + prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "scheduler_generators_status", + Help: "Scheduler generators by priority & state", + }, + []string{"priority", "state"}, + ) + schedulerNumWorkers = promauto.NewGauge( //nolint: gochecknoglobals + prometheus.GaugeOpts{ //nolint: promlinter + Namespace: metricsNamespace, + Name: "scheduler_workers_total", + Help: "Total number of available workers to perform scheduler tasks", + }, + ) + schedulerWorkers = promauto.NewGaugeVec( //nolint: gochecknoglobals + prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "scheduler_workers", + Help: "Scheduler workers state", + }, + []string{"state"}, + ) + schedulerTasksQueue = promauto.NewGaugeVec( //nolint: gochecknoglobals + prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "scheduler_tasksqueue_length", + Help: "Number of tasks waiting in the queue to pe processed by scheduler workers", + }, + []string{"priority"}, + ) + workersTasksDuration = promauto.NewHistogramVec( //nolint: gochecknoglobals + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Name: "scheduler_workers_tasks_duration_seconds", + Help: "How long it takes for a worker to execute a task", + Buckets: GetDefaultBuckets(), + }, + []string{"name"}, + ) ) type metricServer struct { @@ -169,7 +216,7 @@ func IncDownloadCounter(ms MetricServer, repo string) { } func SetStorageUsage(ms MetricServer, rootDir, repo string) { - ms.SendMetric(func() { + ms.ForceSendMetric(func() { dir := path.Join(rootDir, repo) repoSize, err := GetDirSize(dir) @@ -196,3 +243,47 @@ func ObserveStorageLockLatency(ms MetricServer, latency time.Duration, storageNa storageLockLatency.WithLabelValues(storageName, lockType).Observe(latency.Seconds()) }) } + +func IncSchedulerGenerators(ms MetricServer) { + ms.ForceSendMetric(func() { + schedulerGenerators.Inc() + }) +} + +func SetSchedulerGenerators(ms MetricServer, gen map[string]map[string]uint64) { + ms.SendMetric(func() { + for priority, states := range gen { + for state, value := range states { + schedulerGeneratorsStatus.WithLabelValues(priority, state).Set(float64(value)) + } + } + }) +} + +func SetSchedulerNumWorkers(ms MetricServer, total int) { + ms.SendMetric(func() { + schedulerNumWorkers.Set(float64(total)) + }) +} + +func SetSchedulerWorkers(ms MetricServer, w map[string]int) { + ms.SendMetric(func() { + for state, value := range w { + schedulerWorkers.WithLabelValues(state).Set(float64(value)) + } + }) +} + +func SetSchedulerTasksQueue(ms MetricServer, tq map[string]int) { + ms.SendMetric(func() { + for priority, value := range tq { + schedulerTasksQueue.WithLabelValues(priority).Set(float64(value)) + } + }) +} + +func ObserveWorkersTasksDuration(ms MetricServer, taskName string, duration time.Duration) { + ms.SendMetric(func() { + workersTasksDuration.WithLabelValues(taskName).Observe(duration.Seconds()) + }) +} diff --git a/pkg/extensions/monitoring/minimal.go b/pkg/extensions/monitoring/minimal.go index bd38b8c1..1ca2ac21 100644 --- a/pkg/extensions/monitoring/minimal.go +++ b/pkg/extensions/monitoring/minimal.go @@ -18,17 +18,23 @@ import ( const ( metricsNamespace = "zot" // Counters. - httpConnRequests = metricsNamespace + ".http.requests" - repoDownloads = metricsNamespace + ".repo.downloads" - repoUploads = metricsNamespace + ".repo.uploads" + httpConnRequests = metricsNamespace + ".http.requests" + repoDownloads = metricsNamespace + ".repo.downloads" + repoUploads = metricsNamespace + ".repo.uploads" + schedulerGenerators = metricsNamespace + ".scheduler.generators" // Gauge. - repoStorageBytes = metricsNamespace + ".repo.storage.bytes" - serverInfo = metricsNamespace + ".info" + repoStorageBytes = metricsNamespace + ".repo.storage.bytes" + serverInfo = metricsNamespace + ".info" + schedulerNumWorkers = metricsNamespace + ".scheduler.workers.total" + schedulerWorkers = metricsNamespace + ".scheduler.workers" + schedulerGeneratorsStatus = metricsNamespace + ".scheduler.generators.status" + schedulerTasksQueue = metricsNamespace + ".scheduler.tasksqueue.length" // Summary. httpRepoLatencySeconds = metricsNamespace + ".http.repo.latency.seconds" // Histogram. httpMethodLatencySeconds = metricsNamespace + ".http.method.latency.seconds" storageLockLatencySeconds = metricsNamespace + ".storage.lock.latency.seconds" + workersTasksDuration = metricsNamespace + ".scheduler.workers.tasks.duration.seconds" metricsScrapeTimeout = 2 * time.Minute metricsScrapeCheckInterval = 30 * time.Second @@ -39,7 +45,7 @@ type metricServer struct { lastCheck time.Time reqChan chan interface{} cache *MetricsInfo - cacheChan chan *MetricsInfo + cacheChan chan MetricsCopy bucketsF2S map[float64]string // float64 to string conversion of buckets label log log.Logger lock *sync.RWMutex @@ -51,6 +57,12 @@ type MetricsInfo struct { Summaries []*SummaryValue Histograms []*HistogramValue } +type MetricsCopy struct { + Counters []CounterValue + Gauges []GaugeValue + Summaries []SummaryValue + Histograms []HistogramValue +} // CounterValue stores info about a metric that is incremented over time, // such as the number of requests to an HTTP endpoint. @@ -118,7 +130,7 @@ func (ms *metricServer) ReceiveMetrics() interface{} { ms.enabled = true } ms.lock.Unlock() - ms.cacheChan <- &MetricsInfo{} + ms.cacheChan <- MetricsCopy{} return <-ms.cacheChan } @@ -145,7 +157,29 @@ func (ms *metricServer) Run() { select { case <-ms.cacheChan: ms.lastCheck = time.Now() - ms.cacheChan <- ms.cache + // make a copy of cache values to prevent data race + metrics := MetricsCopy{ + Counters: make([]CounterValue, len(ms.cache.Counters)), + Gauges: make([]GaugeValue, len(ms.cache.Gauges)), + Summaries: make([]SummaryValue, len(ms.cache.Summaries)), + Histograms: make([]HistogramValue, len(ms.cache.Histograms)), + } + for i, cv := range ms.cache.Counters { + metrics.Counters[i] = *cv + } + + for i, gv := range ms.cache.Gauges { + metrics.Gauges[i] = *gv + } + + for i, sv := range ms.cache.Summaries { + metrics.Summaries[i] = *sv + } + + for i, hv := range ms.cache.Histograms { + metrics.Histograms[i] = *hv + } + ms.cacheChan <- metrics case m := <-ms.reqChan: switch v := m.(type) { case CounterValue: @@ -200,7 +234,7 @@ func NewMetricsServer(enabled bool, log log.Logger) MetricServer { ms := &metricServer{ enabled: enabled, reqChan: make(chan interface{}), - cacheChan: make(chan *MetricsInfo), + cacheChan: make(chan MetricsCopy), cache: mi, bucketsF2S: bucketsFloat2String, log: log, @@ -215,16 +249,21 @@ func NewMetricsServer(enabled bool, log log.Logger) MetricServer { // contains a map with key=CounterName and value=CounterLabels. func GetCounters() map[string][]string { return map[string][]string{ - httpConnRequests: {"method", "code"}, - repoDownloads: {"repo"}, - repoUploads: {"repo"}, + httpConnRequests: {"method", "code"}, + repoDownloads: {"repo"}, + repoUploads: {"repo"}, + schedulerGenerators: {}, } } func GetGauges() map[string][]string { return map[string][]string{ - repoStorageBytes: {"repo"}, - serverInfo: {"commit", "binaryType", "goVersion", "version"}, + repoStorageBytes: {"repo"}, + serverInfo: {"commit", "binaryType", "goVersion", "version"}, + schedulerNumWorkers: {}, + schedulerGeneratorsStatus: {"priority", "state"}, + schedulerTasksQueue: {"priority"}, + schedulerWorkers: {"state"}, } } @@ -238,6 +277,7 @@ func GetHistograms() map[string][]string { return map[string][]string{ httpMethodLatencySeconds: {"method"}, storageLockLatencySeconds: {"storageName", "lockType"}, + workersTasksDuration: {"name"}, } } @@ -533,3 +573,66 @@ func GetBuckets(metricName string) []float64 { return GetDefaultBuckets() } } + +func SetSchedulerNumWorkers(ms MetricServer, workers int) { + numWorkers := GaugeValue{ + Name: schedulerNumWorkers, + Value: float64(workers), + } + ms.ForceSendMetric(numWorkers) +} + +func IncSchedulerGenerators(ms MetricServer) { + genCounter := CounterValue{ + Name: schedulerGenerators, + } + ms.ForceSendMetric(genCounter) +} + +func ObserveWorkersTasksDuration(ms MetricServer, taskName string, duration time.Duration) { + h := HistogramValue{ + Name: workersTasksDuration, + Sum: duration.Seconds(), // convenient temporary store for Histogram latency value + LabelNames: []string{"name"}, + LabelValues: []string{taskName}, + } + ms.SendMetric(h) +} + +func SetSchedulerGenerators(ms MetricServer, gen map[string]map[string]uint64) { + for priority, states := range gen { + for state, value := range states { + generator := GaugeValue{ + Name: schedulerGeneratorsStatus, + Value: float64(value), + LabelNames: []string{"priority", "state"}, + LabelValues: []string{priority, state}, + } + ms.SendMetric(generator) + } + } +} + +func SetSchedulerTasksQueue(ms MetricServer, tq map[string]int) { + for priority, value := range tq { + tasks := GaugeValue{ + Name: schedulerTasksQueue, + Value: float64(value), + LabelNames: []string{"priority"}, + LabelValues: []string{priority}, + } + ms.SendMetric(tasks) + } +} + +func SetSchedulerWorkers(ms MetricServer, w map[string]int) { + for state, value := range w { + workers := GaugeValue{ + Name: schedulerWorkers, + Value: float64(value), + LabelNames: []string{"state"}, + LabelValues: []string{state}, + } + ms.SendMetric(workers) + } +} diff --git a/pkg/extensions/monitoring/monitoring_test.go b/pkg/extensions/monitoring/monitoring_test.go index 948fe553..1a9a4ced 100644 --- a/pkg/extensions/monitoring/monitoring_test.go +++ b/pkg/extensions/monitoring/monitoring_test.go @@ -461,7 +461,8 @@ func TestPopulateStorageMetrics(t *testing.T) { err = WriteImageToFileSystem(CreateDefaultImage(), "busybox", "0.0.1", srcStorageCtlr) So(err, ShouldBeNil) - sch := scheduler.NewScheduler(conf, ctlr.Log) + metrics := monitoring.NewMetricsServer(true, ctlr.Log) + sch := scheduler.NewScheduler(conf, metrics, ctlr.Log) ctx, cancel := context.WithCancel(context.Background()) sch.RunScheduler(ctx) diff --git a/pkg/extensions/scrub/scrub.go b/pkg/extensions/scrub/scrub.go index a683fb62..6c317bfc 100644 --- a/pkg/extensions/scrub/scrub.go +++ b/pkg/extensions/scrub/scrub.go @@ -63,3 +63,13 @@ func NewTask(imgStore storageTypes.ImageStore, repo string, log log.Logger) *Tas func (scrubT *Task) DoWork(ctx context.Context) error { return RunScrubRepo(ctx, scrubT.imgStore, scrubT.repo, scrubT.log) //nolint: contextcheck } + +func (scrubT *Task) String() string { + return fmt.Sprintf("{taskGenerator: \"%s\", repo: \"%s\"}", + "image scrub", // description of generator's task purpose + scrubT.repo) +} + +func (scrubT *Task) Name() string { + return "ScrubTask" +} diff --git a/pkg/extensions/search/cve/scan.go b/pkg/extensions/search/cve/scan.go index 422807e9..02ba39dd 100644 --- a/pkg/extensions/search/cve/scan.go +++ b/pkg/extensions/search/cve/scan.go @@ -2,6 +2,7 @@ package cveinfo import ( "context" + "fmt" "sync" "zotregistry.io/zot/pkg/log" @@ -194,3 +195,12 @@ func (st *scanTask) DoWork(ctx context.Context) error { return nil } + +func (st *scanTask) String() string { + return fmt.Sprintf("{Name: \"%s\", repo: \"%s\", digest: \"%s\"}", + st.Name(), st.repo, st.digest) +} + +func (st *scanTask) Name() string { + return "ScanTask" +} diff --git a/pkg/extensions/search/cve/scan_test.go b/pkg/extensions/search/cve/scan_test.go index 41873a7a..f9aa8ed7 100644 --- a/pkg/extensions/search/cve/scan_test.go +++ b/pkg/extensions/search/cve/scan_test.go @@ -57,7 +57,8 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo cfg := config.New() cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} - sch := scheduler.NewScheduler(cfg, logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(cfg, metrics, logger) params := boltdb.DBParameters{ RootDir: t.TempDir(), @@ -502,8 +503,9 @@ func TestScanGeneratorWithRealData(t *testing.T) { metaDB, err := boltdb.New(boltDriver, logger) So(err, ShouldBeNil) + metrics := monitoring.NewMetricsServer(true, logger) imageStore := local.NewImageStore(rootDir, false, false, - logger, monitoring.NewMetricsServer(false, logger), nil, nil) + logger, metrics, nil, nil) storeController := storage.StoreController{DefaultStore: imageStore} image := CreateRandomVulnerableImage() @@ -520,7 +522,7 @@ func TestScanGeneratorWithRealData(t *testing.T) { So(scanner.IsResultCached(image.DigestStr()), ShouldBeFalse) - sch := scheduler.NewScheduler(cfg, logger) + sch := scheduler.NewScheduler(cfg, metrics, logger) generator := cveinfo.NewScanTaskGenerator(metaDB, scanner, logger) diff --git a/pkg/extensions/search/cve/update.go b/pkg/extensions/search/cve/update.go index 4befebf6..2c31f324 100644 --- a/pkg/extensions/search/cve/update.go +++ b/pkg/extensions/search/cve/update.go @@ -2,6 +2,7 @@ package cveinfo import ( "context" + "fmt" "sync" "time" @@ -118,3 +119,11 @@ func (dbt *dbUpdateTask) DoWork(ctx context.Context) error { return nil } + +func (dbt *dbUpdateTask) String() string { + return fmt.Sprintf("{Name: %s}", dbt.Name()) +} + +func (dbt *dbUpdateTask) Name() string { + return "DBUpdateTask" +} diff --git a/pkg/extensions/search/cve/update_test.go b/pkg/extensions/search/cve/update_test.go index 8cdf9554..47f9d1f3 100644 --- a/pkg/extensions/search/cve/update_test.go +++ b/pkg/extensions/search/cve/update_test.go @@ -14,6 +14,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "zotregistry.io/zot/pkg/api/config" + "zotregistry.io/zot/pkg/extensions/monitoring" cveinfo "zotregistry.io/zot/pkg/extensions/search/cve" "zotregistry.io/zot/pkg/log" mTypes "zotregistry.io/zot/pkg/meta/types" @@ -37,7 +38,8 @@ func TestCVEDBGenerator(t *testing.T) { cfg := config.New() cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} - sch := scheduler.NewScheduler(cfg, logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(cfg, metrics, logger) metaDB := &mocks.MetaDBMock{ GetRepoMetaFn: func(ctx context.Context, repo string) (mTypes.RepoMeta, error) { diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index c285c572..eba91fc7 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -5,6 +5,7 @@ package sync import ( "context" + "fmt" "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/types" @@ -137,3 +138,12 @@ func newSyncRepoTask(repo string, service Service) *syncRepoTask { func (srt *syncRepoTask) DoWork(ctx context.Context) error { return srt.service.SyncRepo(ctx, srt.repo) } + +func (srt *syncRepoTask) String() string { + return fmt.Sprintf("{Name: \"%s\", repo: \"%s\"}", + srt.Name(), srt.repo) +} + +func (srt *syncRepoTask) Name() string { + return "SyncTask" +} diff --git a/pkg/scheduler/README.md b/pkg/scheduler/README.md index c5826f58..1a83b5b3 100644 --- a/pkg/scheduler/README.md +++ b/pkg/scheduler/README.md @@ -2,9 +2,9 @@ ## What is a generator and how should it be implemented? In order to create a new generator (which will generate new tasks one by one) and add it to the scheduler there are 4 methods which should be implemented: -1. ***GenerateTask() (Task, error)*** +1. ***Next() (Task, error)*** ``` - This method should implement the logic for generating a new task. + This method should implement the logic for generating a new task. Basically, when this method is called by the scheduler it should return the next task until there are no more tasks to be generated. Also, the Task returned by this method should implement DoWork(ctx context.Context) method which should contain the logic that should be executed when this task is run by the scheduler. ``` @@ -27,15 +27,28 @@ In order to create a new generator (which will generate new tasks one by one) an The scheduler accepts both periodic and non-periodic generators. To submit a generator to the scheduler, ***SubmitGenerator*** should be called with the implemented generator, interval of time (which should be time.Duration(0) in case of non-periodic generator, or the interval for the periodic generator) and the priority of the tasks which will be generated by this generator as parameters. - + Notes: - A generator should submit only tasks having the same priority - - The priority of a task can be: LowPriorirty, MediumPriority or HighPriority + - The priority of a task can be: LowPriority, MediumPriority or HighPriority # How to submit a Task to the scheduler -In order to create a new task and add it to the scheduler ***DoWork(ctx context.Context) error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler. +In order to create a new task and add it to the scheduler we need to implement below methods: + +1. ***DoWork(ctx context.Context) error*** +``` +This should contain the logic that should be executed when this task is run by the scheduler. +``` +2. ***Name() string*** +``` +Name of the task. +``` +3. ***String() string*** +``` +Description of the task. Used in debugging to identify executed task. +``` To submit a task to the scheduler ***SubmitTask*** should be called with the implemented task and the priority of the task as parameters. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 53d50084..a97bbae0 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -9,11 +9,14 @@ import ( "time" "zotregistry.io/zot/pkg/api/config" + "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" ) type Task interface { DoWork(ctx context.Context) error + Name() string + String() string } type generatorsPriorityQueue []*generator @@ -58,25 +61,31 @@ func (pq *generatorsPriorityQueue) Pop() any { const ( rateLimiterScheduler = 400 rateLimit = 5 * time.Second - numWorkersMultiplier = 4 + NumWorkersMultiplier = 4 + sendMetricsInterval = 5 * time.Second ) type Scheduler struct { tasksQLow chan Task tasksQMedium chan Task tasksQHigh chan Task + tasksDoWork int + tasksLock *sync.Mutex generators generatorsPriorityQueue waitingGenerators []*generator + doneGenerators []*generator generatorsLock *sync.Mutex log log.Logger RateLimit time.Duration NumWorkers int workerChan chan Task + metricsChan chan struct{} workerWg *sync.WaitGroup isShuttingDown atomic.Bool + metricServer monitoring.MetricServer } -func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { +func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen chLow := make(chan Task, rateLimiterScheduler) chMedium := make(chan Task, rateLimiterScheduler) chHigh := make(chan Task, rateLimiterScheduler) @@ -85,19 +94,25 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { sublogger := logC.With().Str("component", "scheduler").Logger() heap.Init(&generatorPQ) + // force pushing this metric (for zot minimal metrics are enabled on first scraping) + monitoring.SetSchedulerNumWorkers(ms, numWorkers) return &Scheduler{ tasksQLow: chLow, tasksQMedium: chMedium, tasksQHigh: chHigh, + tasksDoWork: 0, // number of tasks that are in working state + tasksLock: new(sync.Mutex), generators: generatorPQ, generatorsLock: new(sync.Mutex), log: log.Logger{Logger: sublogger}, // default value - RateLimit: rateLimit, - NumWorkers: numWorkers, - workerChan: make(chan Task, numWorkers), - workerWg: new(sync.WaitGroup), + RateLimit: rateLimit, + NumWorkers: numWorkers, + workerChan: make(chan Task, numWorkers), + metricsChan: make(chan struct{}, 1), + workerWg: new(sync.WaitGroup), + metricServer: ms, } } @@ -106,19 +121,95 @@ func (scheduler *Scheduler) poolWorker(ctx context.Context) { go func(workerID int) { defer scheduler.workerWg.Done() - for task := range scheduler.workerChan { - scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task") + var workStart time.Time - if err := task.DoWork(ctx); err != nil { - scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task") + var workDuration time.Duration + + for task := range scheduler.workerChan { + // leave below line here (for zot minimal metrics can be enabled on first scraping) + metricsEnabled := scheduler.metricServer.IsEnabled() + scheduler.log.Debug().Int("worker", workerID).Str("task", task.String()).Msg("scheduler: starting task") + + if metricsEnabled { + scheduler.tasksLock.Lock() + scheduler.tasksDoWork++ + scheduler.tasksLock.Unlock() + workStart = time.Now() } - scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: finished task") + if err := task.DoWork(ctx); err != nil { + scheduler.log.Error().Int("worker", workerID).Str("task", task.String()).Err(err). + Msg("scheduler: error while executing task") + } + + if metricsEnabled { + scheduler.tasksLock.Lock() + scheduler.tasksDoWork-- + scheduler.tasksLock.Unlock() + workDuration = time.Since(workStart) + monitoring.ObserveWorkersTasksDuration(scheduler.metricServer, task.Name(), workDuration) + } + + scheduler.log.Debug().Int("worker", workerID).Str("task", task.String()).Msg("scheduler: finished task") } }(i + 1) } } +func (scheduler *Scheduler) metricsWorker() { + ticker := time.NewTicker(sendMetricsInterval) + + for { + if scheduler.inShutdown() { + return + } + select { + case <-scheduler.metricsChan: + ticker.Stop() + + return + case <-ticker.C: + genMap := make(map[string]map[string]uint64) + tasksMap := make(map[string]int) + // initialize map + for _, p := range []Priority{LowPriority, MediumPriority, HighPriority} { + priority := p.String() + genMap[priority] = make(map[string]uint64) + + for _, s := range []State{Ready, Waiting, Done} { + genMap[priority][s.String()] = 0 + } + } + + scheduler.generatorsLock.Lock() + generators := append(append(scheduler.generators, scheduler.waitingGenerators...), + scheduler.doneGenerators...) + + for _, gen := range generators { + p := gen.priority.String() + s := gen.getState().String() + genMap[p][s]++ + } + + // tasks queue size by priority + tasksMap[LowPriority.String()] = len(scheduler.tasksQLow) + tasksMap[MediumPriority.String()] = len(scheduler.tasksQMedium) + tasksMap[HighPriority.String()] = len(scheduler.tasksQHigh) + scheduler.generatorsLock.Unlock() + + monitoring.SetSchedulerGenerators(scheduler.metricServer, genMap) + monitoring.SetSchedulerTasksQueue(scheduler.metricServer, tasksMap) + workersMap := make(map[string]int) + + scheduler.tasksLock.Lock() + workersMap["idle"] = scheduler.NumWorkers - scheduler.tasksDoWork + workersMap["working"] = scheduler.tasksDoWork + scheduler.tasksLock.Unlock() + monitoring.SetSchedulerWorkers(scheduler.metricServer, workersMap) + } + } +} + func (scheduler *Scheduler) Shutdown() { if !scheduler.inShutdown() { scheduler.shutdown() @@ -133,6 +224,7 @@ func (scheduler *Scheduler) inShutdown() bool { func (scheduler *Scheduler) shutdown() { close(scheduler.workerChan) + close(scheduler.metricsChan) scheduler.isShuttingDown.Store(true) } @@ -147,6 +239,9 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { // start worker pool go scheduler.poolWorker(ctx) + // periodically send metrics + go scheduler.metricsWorker() + go func() { for { select { @@ -166,7 +261,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { if task != nil { // push tasks into worker pool if !scheduler.inShutdown() { - scheduler.log.Debug().Msg("scheduler: pushing task into worker pool") + scheduler.log.Debug().Str("task", task.String()).Msg("scheduler: pushing task into worker pool") scheduler.workerChan <- task } } @@ -185,7 +280,7 @@ func (scheduler *Scheduler) pushReadyGenerators() { modified := false for i, gen := range scheduler.waitingGenerators { - if gen.getState() == ready { + if gen.getState() == Ready { gen.done = false heap.Push(&scheduler.generators, gen) scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...) @@ -217,13 +312,15 @@ func (scheduler *Scheduler) generateTasks() { var gen *generator - // check if the generator with highest prioriy is ready to run - if scheduler.generators[0].getState() == ready { + // check if the generator with highest priority is ready to run + if scheduler.generators[0].getState() == Ready { gen = scheduler.generators[0] } else { gen, _ = heap.Pop(&scheduler.generators).(*generator) - if gen.getState() == waiting { + if gen.getState() == Waiting { scheduler.waitingGenerators = append(scheduler.waitingGenerators, gen) + } else if gen.getState() == Done { + scheduler.doneGenerators = append(scheduler.doneGenerators, gen) } return @@ -279,7 +376,7 @@ func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) { return } - // check if the scheduler it's still running in order to add the task to the channel + // check if the scheduler is still running in order to add the task to the channel if scheduler.inShutdown() { return } @@ -302,12 +399,12 @@ const ( HighPriority ) -type state int +type State int const ( - ready state = iota - waiting - done + Ready State = iota + Waiting + Done ) type TaskGenerator interface { @@ -342,8 +439,6 @@ func (gen *generator) generate(sch *Scheduler) { return } - task = nextTask - // check if the generator is done if gen.taskGenerator.IsDone() { gen.done = true @@ -352,6 +447,8 @@ func (gen *generator) generate(sch *Scheduler) { return } + + task = nextTask } // check if it's possible to add a new task to the channel @@ -370,22 +467,22 @@ func (gen *generator) generate(sch *Scheduler) { // if the generator is not periodic then it can be done or ready to generate a new task. // if the generator is periodic then it can be waiting (finished its work and wait for its interval to pass) // or ready to generate a new task. -func (gen *generator) getState() state { +func (gen *generator) getState() State { if gen.interval == time.Duration(0) { if gen.done && gen.remainingTask == nil { - return done + return Done } } else { if gen.done && time.Since(gen.lastRun) < gen.interval && gen.remainingTask == nil { - return waiting + return Waiting } } if !gen.taskGenerator.IsReady() { - return waiting + return Waiting } - return ready + return Ready } func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) { @@ -402,6 +499,8 @@ func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interva // add generator to the generators priority queue heap.Push(&scheduler.generators, newGenerator) + // force pushing this metric (for zot minimal metrics are enabled on first scraping) + monitoring.IncSchedulerGenerators(scheduler.metricServer) } func getNumWorkers(cfg *config.Config) int { @@ -409,5 +508,39 @@ func getNumWorkers(cfg *config.Config) int { return cfg.Scheduler.NumWorkers } - return runtime.NumCPU() * numWorkersMultiplier + return runtime.NumCPU() * NumWorkersMultiplier +} + +func (p Priority) String() string { + var priority string + + switch p { + case LowPriority: + priority = "low" + case MediumPriority: + priority = "medium" + case HighPriority: + priority = "high" + default: + priority = "invalid" + } + + return priority +} + +func (s State) String() string { + var status string + + switch s { + case Ready: + status = "ready" + case Waiting: + status = "waiting" + case Done: + status = "done" + default: + status = "invalid" + } + + return status } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 8cb59b74..001973cb 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -12,6 +12,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "zotregistry.io/zot/pkg/api/config" + "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/scheduler" ) @@ -34,6 +35,14 @@ func (t *task) DoWork(ctx context.Context) error { return nil } +func (t *task) String() string { + return t.Name() +} + +func (t *task) Name() string { + return "TestTask" +} + type generator struct { log log.Logger priority string @@ -100,7 +109,8 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(config.New(), logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) genH := &shortGenerator{log: logger, priority: "high priority"} // interval has to be higher than throttle value to simulate @@ -126,7 +136,8 @@ func TestScheduler(t *testing.T) { logger := log.NewLogger("debug", logFile.Name()) cfg := config.New() cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} - sch := scheduler.NewScheduler(cfg, logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(cfg, metrics, logger) genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority) @@ -160,7 +171,8 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(config.New(), logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) t := &task{log: logger, msg: "", err: true} sch.SubmitTask(t, scheduler.MediumPriority) @@ -184,7 +196,8 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(config.New(), logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) @@ -208,7 +221,8 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(config.New(), logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) t := &task{log: logger, msg: "", err: false} sch.SubmitTask(t, -1) @@ -225,7 +239,8 @@ func TestScheduler(t *testing.T) { defer os.Remove(logFile.Name()) // clean up logger := log.NewLogger("debug", logFile.Name()) - sch := scheduler.NewScheduler(config.New(), logger) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) ctx, cancel := context.WithCancel(context.Background()) @@ -240,11 +255,39 @@ func TestScheduler(t *testing.T) { So(err, ShouldBeNil) So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task") }) + + Convey("Test scheduler Priority.String() method", t, func() { + var p scheduler.Priority //nolint: varnamelen + // test invalid priority + p = 6238734 + So(p.String(), ShouldEqual, "invalid") + p = scheduler.LowPriority + So(p.String(), ShouldEqual, "low") + p = scheduler.MediumPriority + So(p.String(), ShouldEqual, "medium") + p = scheduler.HighPriority + So(p.String(), ShouldEqual, "high") + }) + + Convey("Test scheduler State.String() method", t, func() { + var s scheduler.State //nolint: varnamelen + // test invalid state + s = -67 + So(s.String(), ShouldEqual, "invalid") + s = scheduler.Ready + So(s.String(), ShouldEqual, "ready") + s = scheduler.Waiting + So(s.String(), ShouldEqual, "waiting") + s = scheduler.Done + So(s.String(), ShouldEqual, "done") + }) } func TestGetNumWorkers(t *testing.T) { Convey("Test setting the number of workers - default value", t, func() { - sch := scheduler.NewScheduler(config.New(), log.NewLogger("debug", "logFile")) + logger := log.NewLogger("debug", "logFile") + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) defer os.Remove("logFile") So(sch.NumWorkers, ShouldEqual, runtime.NumCPU()*4) }) @@ -252,7 +295,9 @@ func TestGetNumWorkers(t *testing.T) { Convey("Test setting the number of workers - getting the value from config", t, func() { cfg := config.New() cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} - sch := scheduler.NewScheduler(cfg, log.NewLogger("debug", "logFile")) + logger := log.NewLogger("debug", "logFile") + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(cfg, metrics, logger) defer os.Remove("logFile") So(sch.NumWorkers, ShouldEqual, 3) }) diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index 0795b649..9addd893 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -1061,6 +1061,15 @@ func (dt *dedupeTask) DoWork(ctx context.Context) error { return err } +func (dt *dedupeTask) String() string { + return fmt.Sprintf("{Name: %s, digest: %s, dedupe: %t}", + dt.Name(), dt.digest, dt.dedupe) +} + +func (dt *dedupeTask) Name() string { + return "DedupeTask" +} + type StorageMetricsInitGenerator struct { ImgStore storageTypes.ImageStore done bool @@ -1132,3 +1141,12 @@ func (smt *smTask) DoWork(ctx context.Context) error { return nil } + +func (smt *smTask) String() string { + return fmt.Sprintf("{Name: \"%s\", repo: \"%s\"}", + smt.Name(), smt.repo) +} + +func (smt *smTask) Name() string { + return "StorageMetricsTask" +} diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index c51b9b26..bc048eec 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -851,3 +851,12 @@ func (gct *gcTask) DoWork(ctx context.Context) error { // run task return gct.gc.CleanRepo(ctx, gct.repo) //nolint: contextcheck } + +func (gct *gcTask) String() string { + return fmt.Sprintf("{Name: %s, repo: %s}", + gct.Name(), gct.repo) +} + +func (gct *gcTask) Name() string { + return "GCTask" +} diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 6f5ec1cc..2b81ce97 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -63,7 +63,9 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals var errCache = errors.New("new cache error") func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { - taskScheduler := scheduler.NewScheduler(config.New(), zlog.Logger{}) + log := zlog.Logger{} + metrics := monitoring.NewMetricsServer(true, log) + taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 924f5aa8..d9e3b907 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -62,7 +62,7 @@ func cleanupStorage(store driver.StorageDriver, name string) { func createMockStorage(rootDir string, cacheDir string, dedupe bool, store driver.StorageDriver, ) storageTypes.ImageStore { log := log.Logger{Logger: zerolog.New(os.Stdout)} - metrics := monitoring.NewMetricsServer(false, log) + metrics := monitoring.NewMetricsServer(true, log) var cacheDriver cache.Cache @@ -187,7 +187,9 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl } func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { - taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{}) + logger := log.Logger{} + metrics := monitoring.NewMetricsServer(false, logger) + taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 50 * time.Millisecond ctx, cancel := context.WithCancel(context.Background()) @@ -2048,7 +2050,9 @@ func TestRebuildDedupeIndex(t *testing.T) { Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { - taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{}) + logger := log.Logger{} + metrics := monitoring.NewMetricsServer(false, logger) + taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -2087,7 +2091,9 @@ func TestRebuildDedupeIndex(t *testing.T) { // now from dedupe false to true for i := 0; i < 10; i++ { - taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{}) + logger := log.Logger{} + metrics := monitoring.NewMetricsServer(false, logger) + taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)