diff --git a/Makefile b/Makefile index a77e3357..fccde17e 100644 --- a/Makefile +++ b/Makefile @@ -195,7 +195,7 @@ test-prereq: check-skopeo $(TESTDATA) $(ORAS) .PHONY: test-extended test-extended: $(if $(findstring ui,$(BUILD_LABELS)), ui) test-extended: test-prereq - go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./... + go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 20m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./... rm -rf /tmp/getter*; rm -rf /tmp/trivy* .PHONY: test-minimal diff --git a/pkg/api/authn_test.go b/pkg/api/authn_test.go index e7e89a87..ac8145bd 100644 --- a/pkg/api/authn_test.go +++ b/pkg/api/authn_test.go @@ -11,6 +11,7 @@ import ( "net/http" "net/http/httptest" "os" + "path" "testing" "time" @@ -23,9 +24,14 @@ import ( "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/api/constants" extconf "zotregistry.io/zot/pkg/extensions/config" + "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" mTypes "zotregistry.io/zot/pkg/meta/types" reqCtx "zotregistry.io/zot/pkg/requestcontext" + "zotregistry.io/zot/pkg/scheduler" + "zotregistry.io/zot/pkg/storage" + storageConstants "zotregistry.io/zot/pkg/storage/constants" + "zotregistry.io/zot/pkg/storage/local" authutils "zotregistry.io/zot/pkg/test/auth" test "zotregistry.io/zot/pkg/test/common" "zotregistry.io/zot/pkg/test/mocks" @@ -922,6 +928,88 @@ func TestAPIKeysGeneratorErrors(t *testing.T) { }) } +func TestCookiestoreCleanup(t *testing.T) { + log := log.Logger{} + metrics := monitoring.NewMetricsServer(true, log) + + Convey("Test cookiestore cleanup works", t, func() { + taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) + taskScheduler.RateLimit = 50 * time.Millisecond + taskScheduler.RunScheduler() + + rootDir := t.TempDir() + + err := os.MkdirAll(path.Join(rootDir, "_sessions"), storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + sessionPath := path.Join(rootDir, "_sessions", "session_1234") + + err = os.WriteFile(sessionPath, []byte("session"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + err = os.Chtimes(sessionPath, time.Time{}, time.Time{}) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(rootDir, false, false, log, metrics, nil, nil) + + storeController := storage.StoreController{ + DefaultStore: imgStore, + } + + cookieStore, err := api.NewCookieStore(storeController) + So(err, ShouldBeNil) + + cookieStore.RunSessionCleaner(taskScheduler) + + time.Sleep(2 * time.Second) + + taskScheduler.Shutdown() + + // make sure session is removed + _, err = os.Stat(sessionPath) + So(err, ShouldNotBeNil) + }) + + Convey("Test cookiestore cleanup without permissions on rootDir", t, func() { + taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) + taskScheduler.RateLimit = 50 * time.Millisecond + taskScheduler.RunScheduler() + + rootDir := t.TempDir() + + err := os.MkdirAll(path.Join(rootDir, "_sessions"), storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + sessionPath := path.Join(rootDir, "_sessions", "session_1234") + + err = os.WriteFile(sessionPath, []byte("session"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(rootDir, false, false, log, metrics, nil, nil) + + storeController := storage.StoreController{ + DefaultStore: imgStore, + } + + cookieStore, err := api.NewCookieStore(storeController) + So(err, ShouldBeNil) + + err = os.Chmod(rootDir, 0o000) + So(err, ShouldBeNil) + + defer func() { + err = os.Chmod(rootDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + }() + + cookieStore.RunSessionCleaner(taskScheduler) + + time.Sleep(1 * time.Second) + + taskScheduler.Shutdown() + }) +} + type mockUUIDGenerator struct { guuid.Generator succeedAttempts int diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 36cbd104..fda220f5 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -94,12 +94,12 @@ func (c *Controller) GetPort() int { return c.chosenPort } -func (c *Controller) Run(reloadCtx context.Context) error { +func (c *Controller) Run() error { if err := c.initCookieStore(); err != nil { return err } - c.StartBackgroundTasks(reloadCtx) + c.StartBackgroundTasks() // setup HTTP API router engine := mux.NewRouter() @@ -216,7 +216,7 @@ func (c *Controller) Run(reloadCtx context.Context) error { return server.Serve(listener) } -func (c *Controller) Init(reloadCtx context.Context) error { +func (c *Controller) Init() error { // print the current configuration, but strip secrets c.Log.Info().Interface("params", c.Config.Sanitize()).Msg("configuration settings") @@ -237,7 +237,7 @@ func (c *Controller) Init(reloadCtx context.Context) error { return err } - if err := c.InitMetaDB(reloadCtx); err != nil { + if err := c.InitMetaDB(); err != nil { return err } @@ -280,7 +280,7 @@ func (c *Controller) initCookieStore() error { return nil } -func (c *Controller) InitMetaDB(reloadCtx context.Context) error { +func (c *Controller) InitMetaDB() error { // init metaDB if search is enabled or we need to store user profiles, api keys or signatures if c.Config.IsSearchEnabled() || c.Config.IsBasicAuthnEnabled() || c.Config.IsImageTrustEnabled() || c.Config.IsRetentionEnabled() { @@ -310,7 +310,7 @@ func (c *Controller) InitMetaDB(reloadCtx context.Context) error { return nil } -func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.Config) { +func (c *Controller) LoadNewConfig(newConfig *config.Config) { // reload access control config c.Config.HTTP.AccessControl = newConfig.HTTP.AccessControl @@ -364,21 +364,24 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config. c.InitCVEInfo() - c.StartBackgroundTasks(reloadCtx) - c.Log.Info().Interface("reloaded params", c.Config.Sanitize()). Msg("loaded new configuration settings") } func (c *Controller) Shutdown() { - c.taskScheduler.Shutdown() + c.StopBackgroundTasks() ctx := context.Background() _ = c.Server.Shutdown(ctx) } -func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { +// Will stop scheduler and wait for all tasks to finish their work. +func (c *Controller) StopBackgroundTasks() { + c.taskScheduler.Shutdown() +} + +func (c *Controller) StartBackgroundTasks() { c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log) - c.taskScheduler.RunScheduler(reloadCtx) + c.taskScheduler.RunScheduler() // Enable running garbage-collect periodically for DefaultStore if c.Config.Storage.GC { diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 87c0c084..ca63430d 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -300,10 +300,10 @@ func TestRunAlreadyRunningServer(t *testing.T) { cm.StartAndWait(port) defer cm.StopServer() - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) - err = ctlr.Run(context.Background()) + err = ctlr.Run() So(err, ShouldNotBeNil) }) } @@ -377,7 +377,7 @@ func TestObjectStorageController(t *testing.T) { ctlr := makeController(conf, tmp) So(ctlr, ShouldNotBeNil) - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) }) @@ -1218,7 +1218,7 @@ func TestMultipleInstance(t *testing.T) { } ctlr := api.NewController(conf) ctlr.Log.Info().Int64("seedUser", seedUser).Int64("seedPass", seedPass).Msg("random seed for username & password") - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldEqual, errors.ErrImgStoreNotFound) globalDir := t.TempDir() @@ -1311,7 +1311,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) // subpath root directory does not exist. @@ -1320,7 +1320,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) subPathMap["/a"] = config.StorageConfig{RootDirectory: subDir, Dedupe: true, GC: true} @@ -1328,7 +1328,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) }) } @@ -1826,12 +1826,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() @@ -1866,12 +1866,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() diff --git a/pkg/api/cookiestore.go b/pkg/api/cookiestore.go index d66b971a..86c109e5 100644 --- a/pkg/api/cookiestore.go +++ b/pkg/api/cookiestore.go @@ -152,7 +152,9 @@ type CleanTask struct { func (cleanTask *CleanTask) DoWork(ctx context.Context) error { for _, session := range cleanTask.sessions { if err := os.Remove(session); err != nil { - return err + if !os.IsNotExist(err) { + return err + } } } diff --git a/pkg/cli/client/cve_cmd_test.go b/pkg/cli/client/cve_cmd_test.go index 91a922bd..d0876b28 100644 --- a/pkg/cli/client/cve_cmd_test.go +++ b/pkg/cli/client/cve_cmd_test.go @@ -163,16 +163,14 @@ func TestNegativeServerResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -239,16 +237,14 @@ func TestServerCVEResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -578,9 +574,7 @@ func TestCVESort(t *testing.T) { t.FailNow() } - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } @@ -617,7 +611,7 @@ func TestCVESort(t *testing.T) { } go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/cli/client/image_cmd_test.go b/pkg/cli/client/image_cmd_test.go index df56f1af..496ff026 100644 --- a/pkg/cli/client/image_cmd_test.go +++ b/pkg/cli/client/image_cmd_test.go @@ -866,7 +866,7 @@ func TestServerResponseGQLWithoutPermissions(t *testing.T) { } ctlr := api.NewController(conf) - if err := ctlr.Init(context.Background()); err != nil { + if err := ctlr.Init(); err != nil { So(err, ShouldNotBeNil) } }) diff --git a/pkg/cli/server/config_reloader.go b/pkg/cli/server/config_reloader.go index f81f7633..43967a27 100644 --- a/pkg/cli/server/config_reloader.go +++ b/pkg/cli/server/config_reloader.go @@ -1,7 +1,6 @@ package server import ( - "context" "os" "os/signal" "syscall" @@ -35,28 +34,22 @@ func NewHotReloader(ctlr *api.Controller, filePath string) (*HotReloader, error) return hotReloader, nil } -func signalHandler(ctlr *api.Controller, sigCh chan os.Signal, ctx context.Context, cancel context.CancelFunc) { - select { +func signalHandler(ctlr *api.Controller, sigCh chan os.Signal) { // if signal then shutdown - case sig := <-sigCh: - defer cancel() - + if sig, ok := <-sigCh; ok { ctlr.Log.Info().Interface("signal", sig).Msg("received signal") // gracefully shutdown http server ctlr.Shutdown() //nolint: contextcheck close(sigCh) - // if reload then return - case <-ctx.Done(): - return } } -func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel context.CancelFunc) { +func initShutDownRoutine(ctlr *api.Controller) { sigCh := make(chan os.Signal, 1) - go signalHandler(ctlr, sigCh, ctx, cancel) + go signalHandler(ctlr, sigCh) // block all async signals to this server signal.Ignore() @@ -65,12 +58,10 @@ func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel conte signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) } -func (hr *HotReloader) Start() context.Context { - reloadCtx, cancelFunc := context.WithCancel(context.Background()) - +func (hr *HotReloader) Start() { done := make(chan bool) - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + initShutDownRoutine(hr.ctlr) // run watcher go func() { @@ -92,16 +83,15 @@ func (hr *HotReloader) Start() context.Context { continue } - // if valid config then reload - cancelFunc() - // create new context - reloadCtx, cancelFunc = context.WithCancel(context.Background()) + // stop background tasks gracefully + hr.ctlr.StopBackgroundTasks() - // init shutdown routine - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + // load new config + hr.ctlr.LoadNewConfig(newConfig) - hr.ctlr.LoadNewConfig(reloadCtx, newConfig) + // start background tasks based on new loaded config + hr.ctlr.StartBackgroundTasks() } // watch for errors case err := <-hr.watcher.Errors: @@ -116,6 +106,4 @@ func (hr *HotReloader) Start() context.Context { <-done }() - - return reloadCtx } diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index d3e63845..afdc1d15 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -61,20 +61,16 @@ func newServeCmd(conf *config.Config) *cobra.Command { return err } - /* context used to cancel go routines so that - we can change their config on the fly (restart routines with different config) */ - reloaderCtx := hotReloader.Start() + hotReloader.Start() - if err := ctlr.Init(reloaderCtx); err != nil { + if err := ctlr.Init(); err != nil { ctlr.Log.Error().Err(err).Msg("failed to init controller") return err } - if err := ctlr.Run(reloaderCtx); err != nil { + if err := ctlr.Run(); err != nil { log.Error().Err(err).Msg("failed to start controller, exiting") - - return err } return nil diff --git a/pkg/compliance/v1_0_0/check_test.go b/pkg/compliance/v1_0_0/check_test.go index 3b104f3a..1ed06f74 100644 --- a/pkg/compliance/v1_0_0/check_test.go +++ b/pkg/compliance/v1_0_0/check_test.go @@ -81,12 +81,12 @@ func startServer(t *testing.T) (*api.Controller, string) { ctrl.Config.Storage.SubPaths = subPaths go func() { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { return } // this blocks - if err := ctrl.Run(context.Background()); err != nil { + if err := ctrl.Run(); err != nil { return } }() diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index 387470b8..f9bd0852 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -140,12 +140,12 @@ func TestNewExporter(t *testing.T) { dir := t.TempDir() serverController.Config.Storage.RootDirectory = dir go func(ctrl *zotapi.Controller) { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { panic(err) } // this blocks - if err := ctrl.Run(context.Background()); !errors.Is(err, http.ErrServerClosed) { + if err := ctrl.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }(serverController) diff --git a/pkg/extensions/extension_image_trust_test.go b/pkg/extensions/extension_image_trust_test.go index a0dc6d30..5db9a606 100644 --- a/pkg/extensions/extension_image_trust_test.go +++ b/pkg/extensions/extension_image_trust_test.go @@ -257,7 +257,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ []string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())}) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -369,7 +369,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ err = signature.SignWithNotation(certName, imageURL, rootDir, true) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -502,7 +502,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ err = signature.SignWithNotation(certName, imageURL, rootDir, false) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -672,7 +672,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ []string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())}) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -883,7 +883,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ So(err, ShouldBeNil) So(found, ShouldBeTrue) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) diff --git a/pkg/extensions/monitoring/monitoring_test.go b/pkg/extensions/monitoring/monitoring_test.go index 90f96681..e5ad1395 100644 --- a/pkg/extensions/monitoring/monitoring_test.go +++ b/pkg/extensions/monitoring/monitoring_test.go @@ -4,7 +4,6 @@ package monitoring_test import ( - "context" "fmt" "io" "math/rand" @@ -463,8 +462,7 @@ func TestPopulateStorageMetrics(t *testing.T) { metrics := monitoring.NewMetricsServer(true, ctlr.Log) sch := scheduler.NewScheduler(conf, metrics, ctlr.Log) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) + sch.RunScheduler() generator := &common.StorageMetricsInitGenerator{ ImgStore: ctlr.StoreController.DefaultStore, @@ -485,7 +483,7 @@ func TestPopulateStorageMetrics(t *testing.T) { So(err, ShouldBeNil) So(found, ShouldBeTrue) - cancel() + sch.Shutdown() alpineSize, err := monitoring.GetDirSize(path.Join(rootDir, "alpine")) So(err, ShouldBeNil) busyboxSize, err := monitoring.GetDirSize(path.Join(rootDir, "busybox")) diff --git a/pkg/extensions/scrub/scrub_test.go b/pkg/extensions/scrub/scrub_test.go index 69af3d52..2939722f 100644 --- a/pkg/extensions/scrub/scrub_test.go +++ b/pkg/extensions/scrub/scrub_test.go @@ -70,13 +70,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "blobs/manifest ok", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "blobs/manifest ok") }) Convey("Blobs integrity affected", t, func(c C) { @@ -122,13 +120,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "blobs/manifest affected", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "blobs/manifest affected") }) Convey("Generator error - not enough permissions to access root directory", t, func(c C) { @@ -170,13 +166,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "failed to execute generator", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "failed to execute generator") So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil) }) diff --git a/pkg/extensions/search/cve/scan_test.go b/pkg/extensions/search/cve/scan_test.go index ac05e725..13af44a8 100644 --- a/pkg/extensions/search/cve/scan_test.go +++ b/pkg/extensions/search/cve/scan_test.go @@ -432,11 +432,9 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo sch.SubmitGenerator(generator, 10*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed despite errors found, err := test.ReadLogFileAndSearchString(logPath, @@ -529,11 +527,9 @@ func TestScanGeneratorWithRealData(t *testing.T) { // Start the generator sch.SubmitGenerator(generator, 120*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed found, err := test.ReadLogFileAndSearchString(logPath, diff --git a/pkg/extensions/search/cve/update_test.go b/pkg/extensions/search/cve/update_test.go index 9efbdee7..7545ae6d 100644 --- a/pkg/extensions/search/cve/update_test.go +++ b/pkg/extensions/search/cve/update_test.go @@ -63,11 +63,9 @@ func TestCVEDBGenerator(t *testing.T) { sch.SubmitGenerator(generator, 12000*time.Millisecond, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Wait for trivy db to download found, err := test.ReadLogFileAndCountStringOccurence(logPath, diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index 4fac01d3..b13dd893 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -681,16 +681,14 @@ func TestRepoListWithNewestImage(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -3373,16 +3371,14 @@ func TestGlobalSearch(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -6205,16 +6201,14 @@ func TestImageSummary(t *testing.T) { configDigest := godigest.FromBytes(configBlob) So(errConfig, ShouldBeNil) // marshall success, config is valid JSON - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index afcd4890..2cecf3bb 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -44,7 +44,7 @@ func New( storeController storage.StoreController, metadb mTypes.MetaDB, log log.Logger, -) (Service, error) { +) (*BaseService, error) { service := &BaseService{} service.config = opts diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 50c572ca..9e985180 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -170,6 +170,29 @@ func TestService(t *testing.T) { }) } +func TestSyncRepo(t *testing.T) { + Convey("trigger context error", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + } + + service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) + So(err, ShouldBeNil) + + service.remote = mocks.SyncRemote{ + GetRepoTagsFn: func(repo string) ([]string, error) { + return []string{"repo1", "repo2"}, nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = service.SyncRepo(ctx, "repo") + So(err, ShouldEqual, ctx.Err()) + }) +} + func TestDestinationRegistry(t *testing.T) { Convey("make StoreController", t, func() { dir := t.TempDir() diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 6574dea7..344ad739 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -1901,15 +1901,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() @@ -2051,15 +2051,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() @@ -3938,6 +3938,12 @@ func TestPeriodicallySignaturesErr(t *testing.T) { } } + // start downstream server + updateDuration, err = time.ParseDuration("1s") + So(err, ShouldBeNil) + + syncConfig.Registries[0].PollInterval = updateDuration + // start downstream server dctlr, destBaseURL, _, _ := makeDownstreamServer(t, false, syncConfig) @@ -4030,6 +4036,61 @@ func TestPeriodicallySignaturesErr(t *testing.T) { So(err, ShouldBeNil) So(len(index.Manifests), ShouldEqual, 0) }) + + Convey("of type OCI image, error on downstream in canSkipReference()", func() { //nolint: dupl + // start downstream server + updateDuration, err = time.ParseDuration("1s") + So(err, ShouldBeNil) + + syncConfig.Registries[0].PollInterval = updateDuration + dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig) + + dcm := test.NewControllerManager(dctlr) + dcm.StartAndWait(dctlr.Config.HTTP.Port) + defer dcm.StopServer() + + found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, + "finished syncing all repos", 15*time.Second) + if err != nil { + panic(err) + } + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + } + + So(found, ShouldBeTrue) + + time.Sleep(time.Second) + + blob := referrers.Manifests[0] + blobsDir := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm())) + blobPath := path.Join(blobsDir, blob.Digest.Encoded()) + err = os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + err = os.WriteFile(blobPath, []byte("blob"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + err = os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + + found, err = test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, + "couldn't check if the upstream oci references for image can be skipped", 30*time.Second) + if err != nil { + panic(err) + } + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + } + + So(found, ShouldBeTrue) + }) }) }) } @@ -5200,78 +5261,6 @@ func TestOnDemandPullsOnce(t *testing.T) { }) } -func TestError(t *testing.T) { - Convey("Verify periodically sync pushSyncedLocalImage() error", t, func() { - updateDuration, _ := time.ParseDuration("30m") - - sctlr, srcBaseURL, _, _, _ := makeUpstreamServer(t, false, false) - - scm := test.NewControllerManager(sctlr) - scm.StartAndWait(sctlr.Config.HTTP.Port) - defer scm.StopServer() - - regex := ".*" - semver := true - var tlsVerify bool - - syncRegistryConfig := syncconf.RegistryConfig{ - Content: []syncconf.Content{ - { - Prefix: testImage, - Tags: &syncconf.Tags{ - Regex: ®ex, - Semver: &semver, - }, - }, - }, - URLs: []string{srcBaseURL}, - PollInterval: updateDuration, - TLSVerify: &tlsVerify, - CertDir: "", - } - - defaultVal := true - syncConfig := &syncconf.Config{ - Enable: &defaultVal, - Registries: []syncconf.RegistryConfig{syncRegistryConfig}, - } - - dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig) - - dcm := test.NewControllerManager(dctlr) - dcm.StartAndWait(dctlr.Config.HTTP.Port) - defer dcm.StopServer() - - // give permission denied on pushSyncedLocalImage() - localRepoPath := path.Join(destDir, testImage, "blobs") - err := os.MkdirAll(localRepoPath, 0o755) - So(err, ShouldBeNil) - - err = os.Chmod(localRepoPath, 0o000) - So(err, ShouldBeNil) - - defer func() { - err = os.Chmod(localRepoPath, 0o755) - So(err, ShouldBeNil) - }() - - found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, - "couldn't commit image to local image store", 30*time.Second) - if err != nil { - panic(err) - } - - if !found { - data, err := os.ReadFile(dctlr.Config.Log.Output) - So(err, ShouldBeNil) - - t.Logf("downstream log: %s", string(data)) - } - - So(found, ShouldBeTrue) - }) -} - func TestSignaturesOnDemand(t *testing.T) { Convey("Verify sync signatures on demand feature", t, func() { sctlr, srcBaseURL, srcDir, _, _ := makeUpstreamServer(t, false, false) diff --git a/pkg/meta/meta_test.go b/pkg/meta/meta_test.go index 59e2101e..990f1df4 100644 --- a/pkg/meta/meta_test.go +++ b/pkg/meta/meta_test.go @@ -413,7 +413,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func }) Convey("Test API keys with short expiration date", func() { - expirationDate := time.Now().Add(500 * time.Millisecond).Local().Round(time.Millisecond) + expirationDate := time.Now().Add(1 * time.Second) apiKeyDetails.ExpirationDate = expirationDate userAc := reqCtx.NewUserAccessControl() @@ -435,7 +435,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func So(isExpired, ShouldBeFalse) So(err, ShouldBeNil) - time.Sleep(600 * time.Millisecond) + time.Sleep(1 * time.Second) Convey("GetUserAPIKeys detects api key expired", func() { storedAPIKeys, err = metaDB.GetUserAPIKeys(ctx) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8ea89d54..174559d4 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -83,6 +83,7 @@ type Scheduler struct { workerWg *sync.WaitGroup isShuttingDown atomic.Bool metricServer monitoring.MetricServer + cancelFunc context.CancelFunc } func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen @@ -107,12 +108,12 @@ func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logge generatorsLock: new(sync.Mutex), log: log.Logger{Logger: sublogger}, // default value + metricServer: ms, RateLimit: rateLimit, NumWorkers: numWorkers, workerChan: make(chan Task, numWorkers), metricsChan: make(chan struct{}, 1), workerWg: new(sync.WaitGroup), - metricServer: ms, } } @@ -210,12 +211,16 @@ func (scheduler *Scheduler) metricsWorker() { } } +/* +Scheduler can be stopped by calling Shutdown(). +it will wait for all tasks being run to finish their work before exiting. +*/ func (scheduler *Scheduler) Shutdown() { + defer scheduler.workerWg.Wait() + if !scheduler.inShutdown() { scheduler.shutdown() } - - scheduler.workerWg.Wait() } func (scheduler *Scheduler) inShutdown() bool { @@ -223,12 +228,19 @@ func (scheduler *Scheduler) inShutdown() bool { } func (scheduler *Scheduler) shutdown() { - close(scheduler.workerChan) - close(scheduler.metricsChan) scheduler.isShuttingDown.Store(true) + + scheduler.cancelFunc() + close(scheduler.metricsChan) } -func (scheduler *Scheduler) RunScheduler(ctx context.Context) { +func (scheduler *Scheduler) RunScheduler() { + /*This context is passed to all task generators + calling scheduler.Shutdown() will cancel this context and will wait for all tasks + to finish their work gracefully.*/ + ctx, cancel := context.WithCancel(context.Background()) + scheduler.cancelFunc = cancel + throttle := time.NewTicker(rateLimit).C numWorkers := scheduler.NumWorkers @@ -243,6 +255,9 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { go scheduler.metricsWorker() go func() { + // will close workers chan when either ctx is canceled or scheduler.Shutdown() + defer close(scheduler.workerChan) + for { select { case <-ctx.Done(): @@ -254,22 +269,20 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { return default: - i := 0 - for i < numWorkers { - task := scheduler.getTask() + // we don't want to block on sending task in workerChan. + if len(scheduler.workerChan) == scheduler.NumWorkers { + <-throttle - if task != nil { - // push tasks into worker pool - if !scheduler.inShutdown() { - scheduler.log.Debug().Str("task", task.String()).Msg("pushing task into worker pool") - scheduler.workerChan <- task - } - } - i++ + continue + } + + task := scheduler.getTask() + + if task != nil { + // push tasks into worker pool until workerChan is full. + scheduler.workerChan <- task } } - - <-throttle } }() } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 89a39246..9980cd31 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -30,6 +30,14 @@ func (t *task) DoWork(ctx context.Context) error { return errInternal } + for idx := 0; idx < 5; idx++ { + if ctx.Err() != nil { + return ctx.Err() + } + + time.Sleep(100 * time.Millisecond) + } + t.log.Info().Msg(t.msg) return nil @@ -52,12 +60,20 @@ type generator struct { } func (g *generator) Next() (scheduler.Task, error) { - if g.step > 1 { + if g.step > 100 { g.done = true } g.step++ g.index++ + if g.step%11 == 0 { + return nil, nil + } + + if g.step%13 == 0 { + return nil, errInternal + } + return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil } @@ -114,13 +130,11 @@ func TestScheduler(t *testing.T) { genH := &shortGenerator{log: logger, priority: "high priority"} // interval has to be higher than throttle value to simulate - sch.SubmitGenerator(genH, 12000*time.Millisecond, scheduler.HighPriority) + sch.SubmitGenerator(genH, 6*time.Second, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(16 * time.Second) - cancel() + sch.RunScheduler() + time.Sleep(7 * time.Second) + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -148,12 +162,9 @@ func TestScheduler(t *testing.T) { genH := &generator{log: logger, priority: "high priority"} sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -177,11 +188,9 @@ func TestScheduler(t *testing.T) { t := &task{log: logger, msg: "", err: true} sch.SubmitTask(t, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(500 * time.Millisecond) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -202,11 +211,9 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(6 * time.Second) - cancel() + sch.RunScheduler() + time.Sleep(4 * time.Second) + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -242,10 +249,8 @@ func TestScheduler(t *testing.T) { metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(config.New(), metrics, logger) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - cancel() + sch.RunScheduler() + sch.Shutdown() time.Sleep(500 * time.Millisecond) t := &task{log: logger, msg: "", err: false} @@ -256,6 +261,30 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "adding a new task") }) + Convey("Test stopping scheduler by calling Shutdown()", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) + + genL := &generator{log: logger, priority: "medium priority"} + sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) + + sch.RunScheduler() + time.Sleep(4 * time.Second) + sch.Shutdown() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing medium priority task; index: 1") + So(string(data), ShouldContainSubstring, "executing medium priority task; index: 2") + So(string(data), ShouldContainSubstring, "received stop signal, gracefully shutting down...") + }) + Convey("Test scheduler Priority.String() method", t, func() { var p scheduler.Priority //nolint: varnamelen // test invalid priority diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 0409b68a..c825eb68 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -62,16 +62,15 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals var errCache = errors.New("new cache error") -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { log := zlog.Logger{} metrics := monitoring.NewMetricsServer(true, log) taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } func TestStorageFSAPIs(t *testing.T) { @@ -1195,14 +1194,15 @@ func TestDedupeLinks(t *testing.T) { // run on empty image store // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe true imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(1 * time.Second) - cancel() + taskScheduler.Shutdown() // manifest1 upload, err := imgStore.NewBlobUpload("dedupe1") @@ -1367,7 +1367,9 @@ func TestDedupeLinks(t *testing.T) { Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1375,10 +1377,11 @@ func TestDedupeLinks(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1387,7 +1390,7 @@ func TestDedupeLinks(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1398,7 +1401,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index error cache nil", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil) @@ -1408,7 +1412,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(3 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1420,7 +1424,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on original blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1436,7 +1441,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1448,7 +1453,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on duplicate blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1468,7 +1474,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(15 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index d9e3b907..67b8b686 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -186,16 +186,15 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl return store, il, err } -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { logger := log.Logger{} metrics := monitoring.NewMetricsServer(false, logger) taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } type FileInfoMock struct { @@ -1587,7 +1586,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1598,7 +1598,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1615,9 +1615,8 @@ func TestS3Dedupe(t *testing.T) { So(len(blobContent), ShouldEqual, fi1.Size()) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() - - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1628,6 +1627,8 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) @@ -1816,7 +1817,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1827,7 +1829,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1861,7 +1863,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1872,7 +1875,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2055,9 +2058,8 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2067,17 +2069,18 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2096,10 +2099,8 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2110,10 +2111,11 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel = runAndGetScheduler() + taskScheduler = runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2121,7 +2123,7 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2140,8 +2142,8 @@ func TestRebuildDedupeIndex(t *testing.T) { storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) defer cleanupStorage(storeDriver, testDir) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2150,8 +2152,8 @@ func TestRebuildDedupeIndex(t *testing.T) { }) Convey("Rebuild dedupe index already rebuilt", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2171,8 +2173,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2185,8 +2187,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), fi1.Path()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2202,8 +2204,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2224,8 +2226,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), imgStore.RootDir()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2235,8 +2237,8 @@ func TestRebuildDedupeIndex(t *testing.T) { }) Convey("Rebuild from true to false", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2247,6 +2249,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) diff --git a/pkg/test/common/utils.go b/pkg/test/common/utils.go index 51e17b72..d2f1d57c 100644 --- a/pkg/test/common/utils.go +++ b/pkg/test/common/utils.go @@ -1,7 +1,6 @@ package common import ( - "context" "errors" "fmt" "math/rand" @@ -62,44 +61,34 @@ func Location(baseURL string, resp *resty.Response) string { } type Controller interface { - Init(ctx context.Context) error - Run(ctx context.Context) error + Init() error + Run() error Shutdown() GetPort() int } type ControllerManager struct { controller Controller - // used to stop background tasks(goroutines) - cancelRoutinesFunc context.CancelFunc } -func (cm *ControllerManager) RunServer(ctx context.Context) { +func (cm *ControllerManager) RunServer() { // Useful to be able to call in the same goroutine for testing purposes - if err := cm.controller.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := cm.controller.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } } func (cm *ControllerManager) StartServer() { - ctx, cancel := context.WithCancel(context.Background()) - cm.cancelRoutinesFunc = cancel - - if err := cm.controller.Init(ctx); err != nil { + if err := cm.controller.Init(); err != nil { panic(err) } go func() { - cm.RunServer(ctx) + cm.RunServer() }() } func (cm *ControllerManager) StopServer() { - // stop background tasks - if cm.cancelRoutinesFunc != nil { - cm.cancelRoutinesFunc() - } - cm.controller.Shutdown() } diff --git a/pkg/test/common/utils_test.go b/pkg/test/common/utils_test.go index f9e485b8..9c547d10 100644 --- a/pkg/test/common/utils_test.go +++ b/pkg/test/common/utils_test.go @@ -1,7 +1,6 @@ package common_test import ( - "context" "os" "path" "testing" @@ -53,11 +52,9 @@ func TestControllerManager(t *testing.T) { ctlr := api.NewController(conf) ctlrManager := tcommon.NewControllerManager(ctlr) - ctx := context.Background() - - err := ctlr.Init(ctx) + err := ctlr.Init() So(err, ShouldBeNil) - So(func() { ctlrManager.RunServer(ctx) }, ShouldPanic) + So(func() { ctlrManager.RunServer() }, ShouldPanic) }) } diff --git a/pkg/test/mocks/sync_remote_mock.go b/pkg/test/mocks/sync_remote_mock.go new file mode 100644 index 00000000..d7500029 --- /dev/null +++ b/pkg/test/mocks/sync_remote_mock.go @@ -0,0 +1,67 @@ +package mocks + +import ( + "context" + + "github.com/containers/image/v5/types" + "github.com/opencontainers/go-digest" +) + +type SyncRemote struct { + // Get temporary ImageReference, is used by functions in containers/image package + GetImageReferenceFn func(repo string, tag string) (types.ImageReference, error) + + // Get local oci layout context, is used by functions in containers/image package + GetContextFn func() *types.SystemContext + + // Get a list of repos (catalog) + GetRepositoriesFn func(ctx context.Context) ([]string, error) + + // Get a list of tags given a repo + GetRepoTagsFn func(repo string) ([]string, error) + + // Get manifest content, mediaType, digest given an ImageReference + GetManifestContentFn func(imageReference types.ImageReference) ([]byte, string, digest.Digest, error) +} + +func (remote SyncRemote) GetImageReference(repo string, tag string) (types.ImageReference, error) { + if remote.GetImageReferenceFn != nil { + return remote.GetImageReferenceFn(repo, tag) + } + + return nil, nil +} + +func (remote SyncRemote) GetContext() *types.SystemContext { + if remote.GetContextFn != nil { + return remote.GetContextFn() + } + + return nil +} + +func (remote SyncRemote) GetRepositories(ctx context.Context) ([]string, error) { + if remote.GetRepositoriesFn != nil { + return remote.GetRepositoriesFn(ctx) + } + + return []string{}, nil +} + +func (remote SyncRemote) GetRepoTags(repo string) ([]string, error) { + if remote.GetRepoTagsFn != nil { + return remote.GetRepoTagsFn(repo) + } + + return []string{}, nil +} + +func (remote SyncRemote) GetManifestContent(imageReference types.ImageReference) ( + []byte, string, digest.Digest, error, +) { + if remote.GetManifestContentFn != nil { + return remote.GetManifestContentFn(imageReference) + } + + return nil, "", "", nil +} diff --git a/test/blackbox/sync.bats b/test/blackbox/sync.bats index 6ec24f70..218e21f1 100644 --- a/test/blackbox/sync.bats +++ b/test/blackbox/sync.bats @@ -76,7 +76,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" diff --git a/test/blackbox/sync_cloud.bats b/test/blackbox/sync_cloud.bats index a35e7670..0a78e490 100644 --- a/test/blackbox/sync_cloud.bats +++ b/test/blackbox/sync_cloud.bats @@ -30,7 +30,7 @@ function setup_file() { exit 1 fi - # Download test data to folder common for the entire suite, not just this file + # Download test data to folder common for the entire suite, not just this file skopeo --insecure-policy copy --format=oci docker://ghcr.io/project-zot/golang:1.20 oci:${TEST_DATA_DIR}/golang:1.20 # Setup zot server local zot_sync_per_root_dir=${BATS_FILE_TMPDIR}/zot-per @@ -88,7 +88,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" @@ -197,7 +197,7 @@ function teardown_file() { [ "$status" -eq 0 ] [ $(echo "${lines[-1]}" | jq '.tags[]') = '"1.20"' ] - run sleep 20s + run sleep 30s run curl http://127.0.0.1:${zot_port1}/v2/_catalog [ "$status" -eq 0 ] diff --git a/test/blackbox/sync_replica_cluster.bats b/test/blackbox/sync_replica_cluster.bats index 37291d04..1032f11a 100644 --- a/test/blackbox/sync_replica_cluster.bats +++ b/test/blackbox/sync_replica_cluster.bats @@ -69,7 +69,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" @@ -105,7 +105,7 @@ EOF ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**"