mirror of
https://github.com/project-zot/zot.git
synced 2025-01-06 22:40:28 -05:00
test: stop task scheduler between test runs (#1311)
sync: remove sync WaitGroup, it's stopped with context sync: onDemand will always try to sync newest image when a tag is used if a digest is used then onDemand will serve local image test(sync): fix flaky coverage in sync package closes #1294 Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
0ae35e973a
commit
3dd3c46ee3
8 changed files with 499 additions and 489 deletions
|
@ -11,7 +11,6 @@ import (
|
|||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
goSync "sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -49,7 +48,6 @@ type Controller struct {
|
|||
Server *http.Server
|
||||
Metrics monitoring.MetricServer
|
||||
CveInfo ext.CveInfo
|
||||
wgShutDown *goSync.WaitGroup // use it to gracefully shutdown goroutines
|
||||
// runtime params
|
||||
chosenPort int // kernel-chosen port
|
||||
}
|
||||
|
@ -60,7 +58,6 @@ func NewController(config *config.Config) *Controller {
|
|||
logger := log.NewLogger(config.Log.Level, config.Log.Output)
|
||||
controller.Config = config
|
||||
controller.Log = logger
|
||||
controller.wgShutDown = new(goSync.WaitGroup)
|
||||
|
||||
if config.Log.Audit != "" {
|
||||
audit := log.NewAuditLogger(config.Log.Level, config.Log.Audit)
|
||||
|
@ -529,7 +526,7 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, config *config.Con
|
|||
if config.Extensions != nil && config.Extensions.Sync != nil {
|
||||
// reload sync config
|
||||
c.Config.Extensions.Sync = config.Extensions.Sync
|
||||
ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.RepoDB, c.StoreController, c.Log)
|
||||
ext.EnableSyncExtension(reloadCtx, c.Config, c.RepoDB, c.StoreController, c.Log)
|
||||
} else if c.Config.Extensions != nil {
|
||||
c.Config.Extensions.Sync = nil
|
||||
}
|
||||
|
@ -538,9 +535,6 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, config *config.Con
|
|||
}
|
||||
|
||||
func (c *Controller) Shutdown() {
|
||||
// wait gracefully
|
||||
c.wgShutDown.Wait()
|
||||
|
||||
ctx := context.Background()
|
||||
_ = c.Server.Shutdown(ctx)
|
||||
}
|
||||
|
@ -577,7 +571,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
|
|||
// Enable extensions if extension config is provided for storeController
|
||||
if c.Config.Extensions != nil {
|
||||
if c.Config.Extensions.Sync != nil {
|
||||
ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.RepoDB, c.StoreController, c.Log)
|
||||
ext.EnableSyncExtension(reloadCtx, c.Config, c.RepoDB, c.StoreController, c.Log)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1632,12 +1632,26 @@ func (rh *RouteHandler) getImageStore(name string) storage.ImageStore {
|
|||
}
|
||||
|
||||
// will sync on demand if an image is not found, in case sync extensions is enabled.
|
||||
func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storage.ImageStore, name,
|
||||
reference string,
|
||||
func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storage.ImageStore,
|
||||
name, reference string,
|
||||
) ([]byte, godigest.Digest, string, error) {
|
||||
syncEnabled := false
|
||||
if routeHandler.c.Config.Extensions != nil &&
|
||||
routeHandler.c.Config.Extensions.Sync != nil &&
|
||||
*routeHandler.c.Config.Extensions.Sync.Enable {
|
||||
syncEnabled = true
|
||||
}
|
||||
|
||||
_, digestErr := godigest.Parse(reference)
|
||||
if digestErr == nil {
|
||||
// if it's a digest then return local cached image, if not found and sync enabled, then try to sync
|
||||
content, digest, mediaType, err := imgStore.GetImageManifest(name, reference)
|
||||
if err == nil || !syncEnabled {
|
||||
return content, digest, mediaType, err
|
||||
}
|
||||
}
|
||||
|
||||
if syncEnabled {
|
||||
routeHandler.c.Log.Info().Msgf("trying to get updated image %s:%s by syncing on demand",
|
||||
name, reference)
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ package extensions
|
|||
|
||||
import (
|
||||
"context"
|
||||
goSync "sync"
|
||||
|
||||
"zotregistry.io/zot/pkg/api/config"
|
||||
"zotregistry.io/zot/pkg/extensions/sync"
|
||||
|
@ -14,11 +13,11 @@ import (
|
|||
"zotregistry.io/zot/pkg/storage"
|
||||
)
|
||||
|
||||
func EnableSyncExtension(ctx context.Context, config *config.Config, wg *goSync.WaitGroup,
|
||||
func EnableSyncExtension(ctx context.Context, config *config.Config,
|
||||
repoDB repodb.RepoDB, storeController storage.StoreController, log log.Logger,
|
||||
) {
|
||||
if config.Extensions.Sync != nil && *config.Extensions.Sync.Enable {
|
||||
if err := sync.Run(ctx, *config.Extensions.Sync, repoDB, storeController, wg, log); err != nil {
|
||||
if err := sync.Run(ctx, *config.Extensions.Sync, repoDB, storeController, log); err != nil {
|
||||
log.Error().Err(err).Msg("Error encountered while setting up syncing")
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -5,7 +5,6 @@ package extensions
|
|||
|
||||
import (
|
||||
"context"
|
||||
goSync "sync"
|
||||
|
||||
"zotregistry.io/zot/pkg/api/config"
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
|
@ -14,8 +13,7 @@ import (
|
|||
)
|
||||
|
||||
// EnableSyncExtension ...
|
||||
func EnableSyncExtension(ctx context.Context,
|
||||
config *config.Config, wg *goSync.WaitGroup, repoDB repodb.RepoDB,
|
||||
func EnableSyncExtension(ctx context.Context, config *config.Config, repoDB repodb.RepoDB,
|
||||
storeController storage.StoreController, log log.Logger,
|
||||
) {
|
||||
log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't include this feature," +
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
goSync "sync"
|
||||
"time"
|
||||
|
||||
"github.com/containers/common/pkg/retry"
|
||||
|
@ -325,7 +324,7 @@ func getLocalContexts(log log.Logger) (*types.SystemContext, *signature.PolicyCo
|
|||
}
|
||||
|
||||
func Run(ctx context.Context, cfg syncconf.Config, repoDB repodb.RepoDB,
|
||||
storeController storage.StoreController, wtgrp *goSync.WaitGroup, logger log.Logger,
|
||||
storeController storage.StoreController, logger log.Logger,
|
||||
) error {
|
||||
var credentialsFile syncconf.CredentialsFile
|
||||
|
||||
|
@ -376,9 +375,6 @@ func Run(ctx context.Context, cfg syncconf.Config, repoDB repodb.RepoDB,
|
|||
// schedule each registry sync
|
||||
go func(ctx context.Context, regCfg syncconf.RegistryConfig, logger log.Logger) {
|
||||
for {
|
||||
// increment reference since will be busy, so shutdown has to wait
|
||||
wtgrp.Add(1)
|
||||
|
||||
for _, upstreamURL := range regCfg.URLs {
|
||||
upstreamAddr := StripRegistryTransport(upstreamURL)
|
||||
// first try syncing main registry
|
||||
|
@ -392,8 +388,6 @@ func Run(ctx context.Context, cfg syncconf.Config, repoDB repodb.RepoDB,
|
|||
break
|
||||
}
|
||||
}
|
||||
// mark as done after a single sync run
|
||||
wtgrp.Done()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
goSync "sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -156,8 +155,7 @@ func TestSyncInternal(t *testing.T) {
|
|||
}
|
||||
ctx := context.Background()
|
||||
|
||||
So(Run(ctx, cfg, mocks.RepoDBMock{}, storage.StoreController{},
|
||||
new(goSync.WaitGroup), log.NewLogger("debug", "")), ShouldNotBeNil)
|
||||
So(Run(ctx, cfg, mocks.RepoDBMock{}, storage.StoreController{}, log.NewLogger("debug", "")), ShouldNotBeNil)
|
||||
|
||||
_, err = getFileCredentials("/invalid/path/to/file")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -283,6 +283,8 @@ type Controller interface {
|
|||
|
||||
type ControllerManager struct {
|
||||
controller Controller
|
||||
// used to stop background tasks(goroutines) - task scheduler
|
||||
cancelRoutinesFunc context.CancelFunc
|
||||
}
|
||||
|
||||
func (cm *ControllerManager) RunServer(ctx context.Context) {
|
||||
|
@ -293,7 +295,8 @@ func (cm *ControllerManager) RunServer(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (cm *ControllerManager) StartServer() {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cm.cancelRoutinesFunc = cancel
|
||||
|
||||
if err := cm.controller.Init(ctx); err != nil {
|
||||
panic(err)
|
||||
|
@ -305,6 +308,11 @@ func (cm *ControllerManager) StartServer() {
|
|||
}
|
||||
|
||||
func (cm *ControllerManager) StopServer() {
|
||||
// stop background tasks - task scheduler
|
||||
if cm.cancelRoutinesFunc != nil {
|
||||
cm.cancelRoutinesFunc()
|
||||
}
|
||||
|
||||
cm.controller.Shutdown()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue