From 7642e5af98267dcbdb180f6184617850ce4741a5 Mon Sep 17 00:00:00 2001 From: peusebiu Date: Mon, 11 Dec 2023 20:00:34 +0200 Subject: [PATCH] fix(scheduler): fix data race (#2085) * fix(scheduler): data race when pushing new tasks the problem here is that scheduler can be closed in two ways: - canceling the context given as argument to scheduler.RunScheduler() - running scheduler.Shutdown() because of this shutdown can trigger a data race between calling scheduler.inShutdown() and actually pushing tasks into the pool workers solved that by keeping a quit channel and listening on both quit channel and ctx.Done() and closing the worker chan and scheduler afterwards. Signed-off-by: Petu Eusebiu * refactor(scheduler): refactor into a single shutdown before this we could stop scheduler either by closing the context provided to RunScheduler(ctx) or by running Shutdown(). simplify things by getting rid of the external context in RunScheduler(). keep an internal context in the scheduler itself and pass it down to all tasks. Signed-off-by: Petu Eusebiu --------- Signed-off-by: Petu Eusebiu --- Makefile | 2 +- pkg/api/authn_test.go | 88 +++++++++++ pkg/api/controller.go | 25 ++-- pkg/api/controller_test.go | 22 +-- pkg/api/cookiestore.go | 4 +- pkg/cli/client/cve_cmd_test.go | 18 +-- pkg/cli/client/image_cmd_test.go | 2 +- pkg/cli/server/config_reloader.go | 36 ++--- pkg/cli/server/root.go | 10 +- pkg/compliance/v1_0_0/check_test.go | 4 +- pkg/exporter/api/controller_test.go | 4 +- pkg/extensions/extension_image_trust_test.go | 10 +- pkg/extensions/monitoring/monitoring_test.go | 6 +- pkg/extensions/scrub/scrub_test.go | 18 +-- pkg/extensions/search/cve/scan_test.go | 12 +- pkg/extensions/search/cve/update_test.go | 6 +- pkg/extensions/search/search_test.go | 18 +-- pkg/extensions/sync/service.go | 2 +- pkg/extensions/sync/sync_internal_test.go | 23 +++ pkg/extensions/sync/sync_test.go | 145 +++++++++---------- pkg/meta/meta_test.go | 4 +- pkg/scheduler/scheduler.go | 51 ++++--- pkg/scheduler/scheduler_test.go | 79 ++++++---- pkg/storage/local/local_test.go | 38 +++-- pkg/storage/s3/s3_test.go | 84 ++++++----- pkg/test/common/utils.go | 23 +-- pkg/test/common/utils_test.go | 7 +- pkg/test/mocks/sync_remote_mock.go | 67 +++++++++ test/blackbox/sync.bats | 2 +- test/blackbox/sync_cloud.bats | 6 +- test/blackbox/sync_replica_cluster.bats | 4 +- 31 files changed, 494 insertions(+), 326 deletions(-) create mode 100644 pkg/test/mocks/sync_remote_mock.go diff --git a/Makefile b/Makefile index a77e3357..fccde17e 100644 --- a/Makefile +++ b/Makefile @@ -195,7 +195,7 @@ test-prereq: check-skopeo $(TESTDATA) $(ORAS) .PHONY: test-extended test-extended: $(if $(findstring ui,$(BUILD_LABELS)), ui) test-extended: test-prereq - go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./... + go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 20m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./... rm -rf /tmp/getter*; rm -rf /tmp/trivy* .PHONY: test-minimal diff --git a/pkg/api/authn_test.go b/pkg/api/authn_test.go index e7e89a87..ac8145bd 100644 --- a/pkg/api/authn_test.go +++ b/pkg/api/authn_test.go @@ -11,6 +11,7 @@ import ( "net/http" "net/http/httptest" "os" + "path" "testing" "time" @@ -23,9 +24,14 @@ import ( "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/api/constants" extconf "zotregistry.io/zot/pkg/extensions/config" + "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" mTypes "zotregistry.io/zot/pkg/meta/types" reqCtx "zotregistry.io/zot/pkg/requestcontext" + "zotregistry.io/zot/pkg/scheduler" + "zotregistry.io/zot/pkg/storage" + storageConstants "zotregistry.io/zot/pkg/storage/constants" + "zotregistry.io/zot/pkg/storage/local" authutils "zotregistry.io/zot/pkg/test/auth" test "zotregistry.io/zot/pkg/test/common" "zotregistry.io/zot/pkg/test/mocks" @@ -922,6 +928,88 @@ func TestAPIKeysGeneratorErrors(t *testing.T) { }) } +func TestCookiestoreCleanup(t *testing.T) { + log := log.Logger{} + metrics := monitoring.NewMetricsServer(true, log) + + Convey("Test cookiestore cleanup works", t, func() { + taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) + taskScheduler.RateLimit = 50 * time.Millisecond + taskScheduler.RunScheduler() + + rootDir := t.TempDir() + + err := os.MkdirAll(path.Join(rootDir, "_sessions"), storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + sessionPath := path.Join(rootDir, "_sessions", "session_1234") + + err = os.WriteFile(sessionPath, []byte("session"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + err = os.Chtimes(sessionPath, time.Time{}, time.Time{}) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(rootDir, false, false, log, metrics, nil, nil) + + storeController := storage.StoreController{ + DefaultStore: imgStore, + } + + cookieStore, err := api.NewCookieStore(storeController) + So(err, ShouldBeNil) + + cookieStore.RunSessionCleaner(taskScheduler) + + time.Sleep(2 * time.Second) + + taskScheduler.Shutdown() + + // make sure session is removed + _, err = os.Stat(sessionPath) + So(err, ShouldNotBeNil) + }) + + Convey("Test cookiestore cleanup without permissions on rootDir", t, func() { + taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) + taskScheduler.RateLimit = 50 * time.Millisecond + taskScheduler.RunScheduler() + + rootDir := t.TempDir() + + err := os.MkdirAll(path.Join(rootDir, "_sessions"), storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + sessionPath := path.Join(rootDir, "_sessions", "session_1234") + + err = os.WriteFile(sessionPath, []byte("session"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(rootDir, false, false, log, metrics, nil, nil) + + storeController := storage.StoreController{ + DefaultStore: imgStore, + } + + cookieStore, err := api.NewCookieStore(storeController) + So(err, ShouldBeNil) + + err = os.Chmod(rootDir, 0o000) + So(err, ShouldBeNil) + + defer func() { + err = os.Chmod(rootDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + }() + + cookieStore.RunSessionCleaner(taskScheduler) + + time.Sleep(1 * time.Second) + + taskScheduler.Shutdown() + }) +} + type mockUUIDGenerator struct { guuid.Generator succeedAttempts int diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 36cbd104..fda220f5 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -94,12 +94,12 @@ func (c *Controller) GetPort() int { return c.chosenPort } -func (c *Controller) Run(reloadCtx context.Context) error { +func (c *Controller) Run() error { if err := c.initCookieStore(); err != nil { return err } - c.StartBackgroundTasks(reloadCtx) + c.StartBackgroundTasks() // setup HTTP API router engine := mux.NewRouter() @@ -216,7 +216,7 @@ func (c *Controller) Run(reloadCtx context.Context) error { return server.Serve(listener) } -func (c *Controller) Init(reloadCtx context.Context) error { +func (c *Controller) Init() error { // print the current configuration, but strip secrets c.Log.Info().Interface("params", c.Config.Sanitize()).Msg("configuration settings") @@ -237,7 +237,7 @@ func (c *Controller) Init(reloadCtx context.Context) error { return err } - if err := c.InitMetaDB(reloadCtx); err != nil { + if err := c.InitMetaDB(); err != nil { return err } @@ -280,7 +280,7 @@ func (c *Controller) initCookieStore() error { return nil } -func (c *Controller) InitMetaDB(reloadCtx context.Context) error { +func (c *Controller) InitMetaDB() error { // init metaDB if search is enabled or we need to store user profiles, api keys or signatures if c.Config.IsSearchEnabled() || c.Config.IsBasicAuthnEnabled() || c.Config.IsImageTrustEnabled() || c.Config.IsRetentionEnabled() { @@ -310,7 +310,7 @@ func (c *Controller) InitMetaDB(reloadCtx context.Context) error { return nil } -func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.Config) { +func (c *Controller) LoadNewConfig(newConfig *config.Config) { // reload access control config c.Config.HTTP.AccessControl = newConfig.HTTP.AccessControl @@ -364,21 +364,24 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config. c.InitCVEInfo() - c.StartBackgroundTasks(reloadCtx) - c.Log.Info().Interface("reloaded params", c.Config.Sanitize()). Msg("loaded new configuration settings") } func (c *Controller) Shutdown() { - c.taskScheduler.Shutdown() + c.StopBackgroundTasks() ctx := context.Background() _ = c.Server.Shutdown(ctx) } -func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { +// Will stop scheduler and wait for all tasks to finish their work. +func (c *Controller) StopBackgroundTasks() { + c.taskScheduler.Shutdown() +} + +func (c *Controller) StartBackgroundTasks() { c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log) - c.taskScheduler.RunScheduler(reloadCtx) + c.taskScheduler.RunScheduler() // Enable running garbage-collect periodically for DefaultStore if c.Config.Storage.GC { diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 87c0c084..ca63430d 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -300,10 +300,10 @@ func TestRunAlreadyRunningServer(t *testing.T) { cm.StartAndWait(port) defer cm.StopServer() - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) - err = ctlr.Run(context.Background()) + err = ctlr.Run() So(err, ShouldNotBeNil) }) } @@ -377,7 +377,7 @@ func TestObjectStorageController(t *testing.T) { ctlr := makeController(conf, tmp) So(ctlr, ShouldNotBeNil) - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) }) @@ -1218,7 +1218,7 @@ func TestMultipleInstance(t *testing.T) { } ctlr := api.NewController(conf) ctlr.Log.Info().Int64("seedUser", seedUser).Int64("seedPass", seedPass).Msg("random seed for username & password") - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldEqual, errors.ErrImgStoreNotFound) globalDir := t.TempDir() @@ -1311,7 +1311,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) // subpath root directory does not exist. @@ -1320,7 +1320,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) subPathMap["/a"] = config.StorageConfig{RootDirectory: subDir, Dedupe: true, GC: true} @@ -1328,7 +1328,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) }) } @@ -1826,12 +1826,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() @@ -1866,12 +1866,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() diff --git a/pkg/api/cookiestore.go b/pkg/api/cookiestore.go index d66b971a..86c109e5 100644 --- a/pkg/api/cookiestore.go +++ b/pkg/api/cookiestore.go @@ -152,7 +152,9 @@ type CleanTask struct { func (cleanTask *CleanTask) DoWork(ctx context.Context) error { for _, session := range cleanTask.sessions { if err := os.Remove(session); err != nil { - return err + if !os.IsNotExist(err) { + return err + } } } diff --git a/pkg/cli/client/cve_cmd_test.go b/pkg/cli/client/cve_cmd_test.go index 91a922bd..d0876b28 100644 --- a/pkg/cli/client/cve_cmd_test.go +++ b/pkg/cli/client/cve_cmd_test.go @@ -163,16 +163,14 @@ func TestNegativeServerResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -239,16 +237,14 @@ func TestServerCVEResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -578,9 +574,7 @@ func TestCVESort(t *testing.T) { t.FailNow() } - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } @@ -617,7 +611,7 @@ func TestCVESort(t *testing.T) { } go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/cli/client/image_cmd_test.go b/pkg/cli/client/image_cmd_test.go index df56f1af..496ff026 100644 --- a/pkg/cli/client/image_cmd_test.go +++ b/pkg/cli/client/image_cmd_test.go @@ -866,7 +866,7 @@ func TestServerResponseGQLWithoutPermissions(t *testing.T) { } ctlr := api.NewController(conf) - if err := ctlr.Init(context.Background()); err != nil { + if err := ctlr.Init(); err != nil { So(err, ShouldNotBeNil) } }) diff --git a/pkg/cli/server/config_reloader.go b/pkg/cli/server/config_reloader.go index f81f7633..43967a27 100644 --- a/pkg/cli/server/config_reloader.go +++ b/pkg/cli/server/config_reloader.go @@ -1,7 +1,6 @@ package server import ( - "context" "os" "os/signal" "syscall" @@ -35,28 +34,22 @@ func NewHotReloader(ctlr *api.Controller, filePath string) (*HotReloader, error) return hotReloader, nil } -func signalHandler(ctlr *api.Controller, sigCh chan os.Signal, ctx context.Context, cancel context.CancelFunc) { - select { +func signalHandler(ctlr *api.Controller, sigCh chan os.Signal) { // if signal then shutdown - case sig := <-sigCh: - defer cancel() - + if sig, ok := <-sigCh; ok { ctlr.Log.Info().Interface("signal", sig).Msg("received signal") // gracefully shutdown http server ctlr.Shutdown() //nolint: contextcheck close(sigCh) - // if reload then return - case <-ctx.Done(): - return } } -func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel context.CancelFunc) { +func initShutDownRoutine(ctlr *api.Controller) { sigCh := make(chan os.Signal, 1) - go signalHandler(ctlr, sigCh, ctx, cancel) + go signalHandler(ctlr, sigCh) // block all async signals to this server signal.Ignore() @@ -65,12 +58,10 @@ func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel conte signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) } -func (hr *HotReloader) Start() context.Context { - reloadCtx, cancelFunc := context.WithCancel(context.Background()) - +func (hr *HotReloader) Start() { done := make(chan bool) - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + initShutDownRoutine(hr.ctlr) // run watcher go func() { @@ -92,16 +83,15 @@ func (hr *HotReloader) Start() context.Context { continue } - // if valid config then reload - cancelFunc() - // create new context - reloadCtx, cancelFunc = context.WithCancel(context.Background()) + // stop background tasks gracefully + hr.ctlr.StopBackgroundTasks() - // init shutdown routine - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + // load new config + hr.ctlr.LoadNewConfig(newConfig) - hr.ctlr.LoadNewConfig(reloadCtx, newConfig) + // start background tasks based on new loaded config + hr.ctlr.StartBackgroundTasks() } // watch for errors case err := <-hr.watcher.Errors: @@ -116,6 +106,4 @@ func (hr *HotReloader) Start() context.Context { <-done }() - - return reloadCtx } diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index d3e63845..afdc1d15 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -61,20 +61,16 @@ func newServeCmd(conf *config.Config) *cobra.Command { return err } - /* context used to cancel go routines so that - we can change their config on the fly (restart routines with different config) */ - reloaderCtx := hotReloader.Start() + hotReloader.Start() - if err := ctlr.Init(reloaderCtx); err != nil { + if err := ctlr.Init(); err != nil { ctlr.Log.Error().Err(err).Msg("failed to init controller") return err } - if err := ctlr.Run(reloaderCtx); err != nil { + if err := ctlr.Run(); err != nil { log.Error().Err(err).Msg("failed to start controller, exiting") - - return err } return nil diff --git a/pkg/compliance/v1_0_0/check_test.go b/pkg/compliance/v1_0_0/check_test.go index 3b104f3a..1ed06f74 100644 --- a/pkg/compliance/v1_0_0/check_test.go +++ b/pkg/compliance/v1_0_0/check_test.go @@ -81,12 +81,12 @@ func startServer(t *testing.T) (*api.Controller, string) { ctrl.Config.Storage.SubPaths = subPaths go func() { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { return } // this blocks - if err := ctrl.Run(context.Background()); err != nil { + if err := ctrl.Run(); err != nil { return } }() diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index 387470b8..f9bd0852 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -140,12 +140,12 @@ func TestNewExporter(t *testing.T) { dir := t.TempDir() serverController.Config.Storage.RootDirectory = dir go func(ctrl *zotapi.Controller) { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { panic(err) } // this blocks - if err := ctrl.Run(context.Background()); !errors.Is(err, http.ErrServerClosed) { + if err := ctrl.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }(serverController) diff --git a/pkg/extensions/extension_image_trust_test.go b/pkg/extensions/extension_image_trust_test.go index a0dc6d30..5db9a606 100644 --- a/pkg/extensions/extension_image_trust_test.go +++ b/pkg/extensions/extension_image_trust_test.go @@ -257,7 +257,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ []string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())}) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -369,7 +369,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ err = signature.SignWithNotation(certName, imageURL, rootDir, true) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -502,7 +502,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ err = signature.SignWithNotation(certName, imageURL, rootDir, false) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -672,7 +672,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ []string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())}) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -883,7 +883,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ So(err, ShouldBeNil) So(found, ShouldBeTrue) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) diff --git a/pkg/extensions/monitoring/monitoring_test.go b/pkg/extensions/monitoring/monitoring_test.go index 90f96681..e5ad1395 100644 --- a/pkg/extensions/monitoring/monitoring_test.go +++ b/pkg/extensions/monitoring/monitoring_test.go @@ -4,7 +4,6 @@ package monitoring_test import ( - "context" "fmt" "io" "math/rand" @@ -463,8 +462,7 @@ func TestPopulateStorageMetrics(t *testing.T) { metrics := monitoring.NewMetricsServer(true, ctlr.Log) sch := scheduler.NewScheduler(conf, metrics, ctlr.Log) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) + sch.RunScheduler() generator := &common.StorageMetricsInitGenerator{ ImgStore: ctlr.StoreController.DefaultStore, @@ -485,7 +483,7 @@ func TestPopulateStorageMetrics(t *testing.T) { So(err, ShouldBeNil) So(found, ShouldBeTrue) - cancel() + sch.Shutdown() alpineSize, err := monitoring.GetDirSize(path.Join(rootDir, "alpine")) So(err, ShouldBeNil) busyboxSize, err := monitoring.GetDirSize(path.Join(rootDir, "busybox")) diff --git a/pkg/extensions/scrub/scrub_test.go b/pkg/extensions/scrub/scrub_test.go index 69af3d52..2939722f 100644 --- a/pkg/extensions/scrub/scrub_test.go +++ b/pkg/extensions/scrub/scrub_test.go @@ -70,13 +70,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "blobs/manifest ok", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "blobs/manifest ok") }) Convey("Blobs integrity affected", t, func(c C) { @@ -122,13 +120,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "blobs/manifest affected", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "blobs/manifest affected") }) Convey("Generator error - not enough permissions to access root directory", t, func(c C) { @@ -170,13 +166,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "failed to execute generator", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "failed to execute generator") So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil) }) diff --git a/pkg/extensions/search/cve/scan_test.go b/pkg/extensions/search/cve/scan_test.go index ac05e725..13af44a8 100644 --- a/pkg/extensions/search/cve/scan_test.go +++ b/pkg/extensions/search/cve/scan_test.go @@ -432,11 +432,9 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo sch.SubmitGenerator(generator, 10*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed despite errors found, err := test.ReadLogFileAndSearchString(logPath, @@ -529,11 +527,9 @@ func TestScanGeneratorWithRealData(t *testing.T) { // Start the generator sch.SubmitGenerator(generator, 120*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed found, err := test.ReadLogFileAndSearchString(logPath, diff --git a/pkg/extensions/search/cve/update_test.go b/pkg/extensions/search/cve/update_test.go index 9efbdee7..7545ae6d 100644 --- a/pkg/extensions/search/cve/update_test.go +++ b/pkg/extensions/search/cve/update_test.go @@ -63,11 +63,9 @@ func TestCVEDBGenerator(t *testing.T) { sch.SubmitGenerator(generator, 12000*time.Millisecond, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Wait for trivy db to download found, err := test.ReadLogFileAndCountStringOccurence(logPath, diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index 4fac01d3..b13dd893 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -681,16 +681,14 @@ func TestRepoListWithNewestImage(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -3373,16 +3371,14 @@ func TestGlobalSearch(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -6205,16 +6201,14 @@ func TestImageSummary(t *testing.T) { configDigest := godigest.FromBytes(configBlob) So(errConfig, ShouldBeNil) // marshall success, config is valid JSON - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index afcd4890..2cecf3bb 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -44,7 +44,7 @@ func New( storeController storage.StoreController, metadb mTypes.MetaDB, log log.Logger, -) (Service, error) { +) (*BaseService, error) { service := &BaseService{} service.config = opts diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 50c572ca..9e985180 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -170,6 +170,29 @@ func TestService(t *testing.T) { }) } +func TestSyncRepo(t *testing.T) { + Convey("trigger context error", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + } + + service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) + So(err, ShouldBeNil) + + service.remote = mocks.SyncRemote{ + GetRepoTagsFn: func(repo string) ([]string, error) { + return []string{"repo1", "repo2"}, nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = service.SyncRepo(ctx, "repo") + So(err, ShouldEqual, ctx.Err()) + }) +} + func TestDestinationRegistry(t *testing.T) { Convey("make StoreController", t, func() { dir := t.TempDir() diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 6574dea7..344ad739 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -1901,15 +1901,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() @@ -2051,15 +2051,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() @@ -3938,6 +3938,12 @@ func TestPeriodicallySignaturesErr(t *testing.T) { } } + // start downstream server + updateDuration, err = time.ParseDuration("1s") + So(err, ShouldBeNil) + + syncConfig.Registries[0].PollInterval = updateDuration + // start downstream server dctlr, destBaseURL, _, _ := makeDownstreamServer(t, false, syncConfig) @@ -4030,6 +4036,61 @@ func TestPeriodicallySignaturesErr(t *testing.T) { So(err, ShouldBeNil) So(len(index.Manifests), ShouldEqual, 0) }) + + Convey("of type OCI image, error on downstream in canSkipReference()", func() { //nolint: dupl + // start downstream server + updateDuration, err = time.ParseDuration("1s") + So(err, ShouldBeNil) + + syncConfig.Registries[0].PollInterval = updateDuration + dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig) + + dcm := test.NewControllerManager(dctlr) + dcm.StartAndWait(dctlr.Config.HTTP.Port) + defer dcm.StopServer() + + found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, + "finished syncing all repos", 15*time.Second) + if err != nil { + panic(err) + } + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + } + + So(found, ShouldBeTrue) + + time.Sleep(time.Second) + + blob := referrers.Manifests[0] + blobsDir := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm())) + blobPath := path.Join(blobsDir, blob.Digest.Encoded()) + err = os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + err = os.WriteFile(blobPath, []byte("blob"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + err = os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + + found, err = test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, + "couldn't check if the upstream oci references for image can be skipped", 30*time.Second) + if err != nil { + panic(err) + } + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + } + + So(found, ShouldBeTrue) + }) }) }) } @@ -5200,78 +5261,6 @@ func TestOnDemandPullsOnce(t *testing.T) { }) } -func TestError(t *testing.T) { - Convey("Verify periodically sync pushSyncedLocalImage() error", t, func() { - updateDuration, _ := time.ParseDuration("30m") - - sctlr, srcBaseURL, _, _, _ := makeUpstreamServer(t, false, false) - - scm := test.NewControllerManager(sctlr) - scm.StartAndWait(sctlr.Config.HTTP.Port) - defer scm.StopServer() - - regex := ".*" - semver := true - var tlsVerify bool - - syncRegistryConfig := syncconf.RegistryConfig{ - Content: []syncconf.Content{ - { - Prefix: testImage, - Tags: &syncconf.Tags{ - Regex: ®ex, - Semver: &semver, - }, - }, - }, - URLs: []string{srcBaseURL}, - PollInterval: updateDuration, - TLSVerify: &tlsVerify, - CertDir: "", - } - - defaultVal := true - syncConfig := &syncconf.Config{ - Enable: &defaultVal, - Registries: []syncconf.RegistryConfig{syncRegistryConfig}, - } - - dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig) - - dcm := test.NewControllerManager(dctlr) - dcm.StartAndWait(dctlr.Config.HTTP.Port) - defer dcm.StopServer() - - // give permission denied on pushSyncedLocalImage() - localRepoPath := path.Join(destDir, testImage, "blobs") - err := os.MkdirAll(localRepoPath, 0o755) - So(err, ShouldBeNil) - - err = os.Chmod(localRepoPath, 0o000) - So(err, ShouldBeNil) - - defer func() { - err = os.Chmod(localRepoPath, 0o755) - So(err, ShouldBeNil) - }() - - found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, - "couldn't commit image to local image store", 30*time.Second) - if err != nil { - panic(err) - } - - if !found { - data, err := os.ReadFile(dctlr.Config.Log.Output) - So(err, ShouldBeNil) - - t.Logf("downstream log: %s", string(data)) - } - - So(found, ShouldBeTrue) - }) -} - func TestSignaturesOnDemand(t *testing.T) { Convey("Verify sync signatures on demand feature", t, func() { sctlr, srcBaseURL, srcDir, _, _ := makeUpstreamServer(t, false, false) diff --git a/pkg/meta/meta_test.go b/pkg/meta/meta_test.go index 59e2101e..990f1df4 100644 --- a/pkg/meta/meta_test.go +++ b/pkg/meta/meta_test.go @@ -413,7 +413,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func }) Convey("Test API keys with short expiration date", func() { - expirationDate := time.Now().Add(500 * time.Millisecond).Local().Round(time.Millisecond) + expirationDate := time.Now().Add(1 * time.Second) apiKeyDetails.ExpirationDate = expirationDate userAc := reqCtx.NewUserAccessControl() @@ -435,7 +435,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func So(isExpired, ShouldBeFalse) So(err, ShouldBeNil) - time.Sleep(600 * time.Millisecond) + time.Sleep(1 * time.Second) Convey("GetUserAPIKeys detects api key expired", func() { storedAPIKeys, err = metaDB.GetUserAPIKeys(ctx) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8ea89d54..174559d4 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -83,6 +83,7 @@ type Scheduler struct { workerWg *sync.WaitGroup isShuttingDown atomic.Bool metricServer monitoring.MetricServer + cancelFunc context.CancelFunc } func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen @@ -107,12 +108,12 @@ func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logge generatorsLock: new(sync.Mutex), log: log.Logger{Logger: sublogger}, // default value + metricServer: ms, RateLimit: rateLimit, NumWorkers: numWorkers, workerChan: make(chan Task, numWorkers), metricsChan: make(chan struct{}, 1), workerWg: new(sync.WaitGroup), - metricServer: ms, } } @@ -210,12 +211,16 @@ func (scheduler *Scheduler) metricsWorker() { } } +/* +Scheduler can be stopped by calling Shutdown(). +it will wait for all tasks being run to finish their work before exiting. +*/ func (scheduler *Scheduler) Shutdown() { + defer scheduler.workerWg.Wait() + if !scheduler.inShutdown() { scheduler.shutdown() } - - scheduler.workerWg.Wait() } func (scheduler *Scheduler) inShutdown() bool { @@ -223,12 +228,19 @@ func (scheduler *Scheduler) inShutdown() bool { } func (scheduler *Scheduler) shutdown() { - close(scheduler.workerChan) - close(scheduler.metricsChan) scheduler.isShuttingDown.Store(true) + + scheduler.cancelFunc() + close(scheduler.metricsChan) } -func (scheduler *Scheduler) RunScheduler(ctx context.Context) { +func (scheduler *Scheduler) RunScheduler() { + /*This context is passed to all task generators + calling scheduler.Shutdown() will cancel this context and will wait for all tasks + to finish their work gracefully.*/ + ctx, cancel := context.WithCancel(context.Background()) + scheduler.cancelFunc = cancel + throttle := time.NewTicker(rateLimit).C numWorkers := scheduler.NumWorkers @@ -243,6 +255,9 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { go scheduler.metricsWorker() go func() { + // will close workers chan when either ctx is canceled or scheduler.Shutdown() + defer close(scheduler.workerChan) + for { select { case <-ctx.Done(): @@ -254,22 +269,20 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { return default: - i := 0 - for i < numWorkers { - task := scheduler.getTask() + // we don't want to block on sending task in workerChan. + if len(scheduler.workerChan) == scheduler.NumWorkers { + <-throttle - if task != nil { - // push tasks into worker pool - if !scheduler.inShutdown() { - scheduler.log.Debug().Str("task", task.String()).Msg("pushing task into worker pool") - scheduler.workerChan <- task - } - } - i++ + continue + } + + task := scheduler.getTask() + + if task != nil { + // push tasks into worker pool until workerChan is full. + scheduler.workerChan <- task } } - - <-throttle } }() } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 89a39246..9980cd31 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -30,6 +30,14 @@ func (t *task) DoWork(ctx context.Context) error { return errInternal } + for idx := 0; idx < 5; idx++ { + if ctx.Err() != nil { + return ctx.Err() + } + + time.Sleep(100 * time.Millisecond) + } + t.log.Info().Msg(t.msg) return nil @@ -52,12 +60,20 @@ type generator struct { } func (g *generator) Next() (scheduler.Task, error) { - if g.step > 1 { + if g.step > 100 { g.done = true } g.step++ g.index++ + if g.step%11 == 0 { + return nil, nil + } + + if g.step%13 == 0 { + return nil, errInternal + } + return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil } @@ -114,13 +130,11 @@ func TestScheduler(t *testing.T) { genH := &shortGenerator{log: logger, priority: "high priority"} // interval has to be higher than throttle value to simulate - sch.SubmitGenerator(genH, 12000*time.Millisecond, scheduler.HighPriority) + sch.SubmitGenerator(genH, 6*time.Second, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(16 * time.Second) - cancel() + sch.RunScheduler() + time.Sleep(7 * time.Second) + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -148,12 +162,9 @@ func TestScheduler(t *testing.T) { genH := &generator{log: logger, priority: "high priority"} sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -177,11 +188,9 @@ func TestScheduler(t *testing.T) { t := &task{log: logger, msg: "", err: true} sch.SubmitTask(t, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(500 * time.Millisecond) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -202,11 +211,9 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(6 * time.Second) - cancel() + sch.RunScheduler() + time.Sleep(4 * time.Second) + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -242,10 +249,8 @@ func TestScheduler(t *testing.T) { metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(config.New(), metrics, logger) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - cancel() + sch.RunScheduler() + sch.Shutdown() time.Sleep(500 * time.Millisecond) t := &task{log: logger, msg: "", err: false} @@ -256,6 +261,30 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "adding a new task") }) + Convey("Test stopping scheduler by calling Shutdown()", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) + + genL := &generator{log: logger, priority: "medium priority"} + sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) + + sch.RunScheduler() + time.Sleep(4 * time.Second) + sch.Shutdown() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing medium priority task; index: 1") + So(string(data), ShouldContainSubstring, "executing medium priority task; index: 2") + So(string(data), ShouldContainSubstring, "received stop signal, gracefully shutting down...") + }) + Convey("Test scheduler Priority.String() method", t, func() { var p scheduler.Priority //nolint: varnamelen // test invalid priority diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 0409b68a..c825eb68 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -62,16 +62,15 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals var errCache = errors.New("new cache error") -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { log := zlog.Logger{} metrics := monitoring.NewMetricsServer(true, log) taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } func TestStorageFSAPIs(t *testing.T) { @@ -1195,14 +1194,15 @@ func TestDedupeLinks(t *testing.T) { // run on empty image store // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe true imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(1 * time.Second) - cancel() + taskScheduler.Shutdown() // manifest1 upload, err := imgStore.NewBlobUpload("dedupe1") @@ -1367,7 +1367,9 @@ func TestDedupeLinks(t *testing.T) { Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1375,10 +1377,11 @@ func TestDedupeLinks(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1387,7 +1390,7 @@ func TestDedupeLinks(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1398,7 +1401,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index error cache nil", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil) @@ -1408,7 +1412,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(3 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1420,7 +1424,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on original blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1436,7 +1441,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1448,7 +1453,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on duplicate blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1468,7 +1474,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(15 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index d9e3b907..67b8b686 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -186,16 +186,15 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl return store, il, err } -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { logger := log.Logger{} metrics := monitoring.NewMetricsServer(false, logger) taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } type FileInfoMock struct { @@ -1587,7 +1586,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1598,7 +1598,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1615,9 +1615,8 @@ func TestS3Dedupe(t *testing.T) { So(len(blobContent), ShouldEqual, fi1.Size()) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() - - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1628,6 +1627,8 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) @@ -1816,7 +1817,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1827,7 +1829,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1861,7 +1863,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1872,7 +1875,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2055,9 +2058,8 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2067,17 +2069,18 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2096,10 +2099,8 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2110,10 +2111,11 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel = runAndGetScheduler() + taskScheduler = runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2121,7 +2123,7 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2140,8 +2142,8 @@ func TestRebuildDedupeIndex(t *testing.T) { storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) defer cleanupStorage(storeDriver, testDir) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2150,8 +2152,8 @@ func TestRebuildDedupeIndex(t *testing.T) { }) Convey("Rebuild dedupe index already rebuilt", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2171,8 +2173,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2185,8 +2187,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), fi1.Path()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2202,8 +2204,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2224,8 +2226,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), imgStore.RootDir()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2235,8 +2237,8 @@ func TestRebuildDedupeIndex(t *testing.T) { }) Convey("Rebuild from true to false", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2247,6 +2249,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) diff --git a/pkg/test/common/utils.go b/pkg/test/common/utils.go index 51e17b72..d2f1d57c 100644 --- a/pkg/test/common/utils.go +++ b/pkg/test/common/utils.go @@ -1,7 +1,6 @@ package common import ( - "context" "errors" "fmt" "math/rand" @@ -62,44 +61,34 @@ func Location(baseURL string, resp *resty.Response) string { } type Controller interface { - Init(ctx context.Context) error - Run(ctx context.Context) error + Init() error + Run() error Shutdown() GetPort() int } type ControllerManager struct { controller Controller - // used to stop background tasks(goroutines) - cancelRoutinesFunc context.CancelFunc } -func (cm *ControllerManager) RunServer(ctx context.Context) { +func (cm *ControllerManager) RunServer() { // Useful to be able to call in the same goroutine for testing purposes - if err := cm.controller.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := cm.controller.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } } func (cm *ControllerManager) StartServer() { - ctx, cancel := context.WithCancel(context.Background()) - cm.cancelRoutinesFunc = cancel - - if err := cm.controller.Init(ctx); err != nil { + if err := cm.controller.Init(); err != nil { panic(err) } go func() { - cm.RunServer(ctx) + cm.RunServer() }() } func (cm *ControllerManager) StopServer() { - // stop background tasks - if cm.cancelRoutinesFunc != nil { - cm.cancelRoutinesFunc() - } - cm.controller.Shutdown() } diff --git a/pkg/test/common/utils_test.go b/pkg/test/common/utils_test.go index f9e485b8..9c547d10 100644 --- a/pkg/test/common/utils_test.go +++ b/pkg/test/common/utils_test.go @@ -1,7 +1,6 @@ package common_test import ( - "context" "os" "path" "testing" @@ -53,11 +52,9 @@ func TestControllerManager(t *testing.T) { ctlr := api.NewController(conf) ctlrManager := tcommon.NewControllerManager(ctlr) - ctx := context.Background() - - err := ctlr.Init(ctx) + err := ctlr.Init() So(err, ShouldBeNil) - So(func() { ctlrManager.RunServer(ctx) }, ShouldPanic) + So(func() { ctlrManager.RunServer() }, ShouldPanic) }) } diff --git a/pkg/test/mocks/sync_remote_mock.go b/pkg/test/mocks/sync_remote_mock.go new file mode 100644 index 00000000..d7500029 --- /dev/null +++ b/pkg/test/mocks/sync_remote_mock.go @@ -0,0 +1,67 @@ +package mocks + +import ( + "context" + + "github.com/containers/image/v5/types" + "github.com/opencontainers/go-digest" +) + +type SyncRemote struct { + // Get temporary ImageReference, is used by functions in containers/image package + GetImageReferenceFn func(repo string, tag string) (types.ImageReference, error) + + // Get local oci layout context, is used by functions in containers/image package + GetContextFn func() *types.SystemContext + + // Get a list of repos (catalog) + GetRepositoriesFn func(ctx context.Context) ([]string, error) + + // Get a list of tags given a repo + GetRepoTagsFn func(repo string) ([]string, error) + + // Get manifest content, mediaType, digest given an ImageReference + GetManifestContentFn func(imageReference types.ImageReference) ([]byte, string, digest.Digest, error) +} + +func (remote SyncRemote) GetImageReference(repo string, tag string) (types.ImageReference, error) { + if remote.GetImageReferenceFn != nil { + return remote.GetImageReferenceFn(repo, tag) + } + + return nil, nil +} + +func (remote SyncRemote) GetContext() *types.SystemContext { + if remote.GetContextFn != nil { + return remote.GetContextFn() + } + + return nil +} + +func (remote SyncRemote) GetRepositories(ctx context.Context) ([]string, error) { + if remote.GetRepositoriesFn != nil { + return remote.GetRepositoriesFn(ctx) + } + + return []string{}, nil +} + +func (remote SyncRemote) GetRepoTags(repo string) ([]string, error) { + if remote.GetRepoTagsFn != nil { + return remote.GetRepoTagsFn(repo) + } + + return []string{}, nil +} + +func (remote SyncRemote) GetManifestContent(imageReference types.ImageReference) ( + []byte, string, digest.Digest, error, +) { + if remote.GetManifestContentFn != nil { + return remote.GetManifestContentFn(imageReference) + } + + return nil, "", "", nil +} diff --git a/test/blackbox/sync.bats b/test/blackbox/sync.bats index 6ec24f70..218e21f1 100644 --- a/test/blackbox/sync.bats +++ b/test/blackbox/sync.bats @@ -76,7 +76,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" diff --git a/test/blackbox/sync_cloud.bats b/test/blackbox/sync_cloud.bats index a35e7670..0a78e490 100644 --- a/test/blackbox/sync_cloud.bats +++ b/test/blackbox/sync_cloud.bats @@ -30,7 +30,7 @@ function setup_file() { exit 1 fi - # Download test data to folder common for the entire suite, not just this file + # Download test data to folder common for the entire suite, not just this file skopeo --insecure-policy copy --format=oci docker://ghcr.io/project-zot/golang:1.20 oci:${TEST_DATA_DIR}/golang:1.20 # Setup zot server local zot_sync_per_root_dir=${BATS_FILE_TMPDIR}/zot-per @@ -88,7 +88,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" @@ -197,7 +197,7 @@ function teardown_file() { [ "$status" -eq 0 ] [ $(echo "${lines[-1]}" | jq '.tags[]') = '"1.20"' ] - run sleep 20s + run sleep 30s run curl http://127.0.0.1:${zot_port1}/v2/_catalog [ "$status" -eq 0 ] diff --git a/test/blackbox/sync_replica_cluster.bats b/test/blackbox/sync_replica_cluster.bats index 37291d04..1032f11a 100644 --- a/test/blackbox/sync_replica_cluster.bats +++ b/test/blackbox/sync_replica_cluster.bats @@ -69,7 +69,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" @@ -105,7 +105,7 @@ EOF ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**"