mirror of
https://github.com/project-zot/zot.git
synced 2025-03-18 02:22:53 -05:00
fix periodic background tasks - gc and scrub
Signed-off-by: Andreea-Lupu <andreealupu1470@yahoo.com>
This commit is contained in:
parent
d0b52612a2
commit
081ba0b2f2
13 changed files with 384 additions and 118 deletions
|
@ -20,6 +20,7 @@ import (
|
|||
"zotregistry.io/zot/errors"
|
||||
"zotregistry.io/zot/pkg/api/config"
|
||||
ext "zotregistry.io/zot/pkg/extensions"
|
||||
extconf "zotregistry.io/zot/pkg/extensions/config"
|
||||
"zotregistry.io/zot/pkg/extensions/monitoring"
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
|
@ -350,23 +351,13 @@ func (c *Controller) Shutdown() {
|
|||
}
|
||||
|
||||
func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
|
||||
// Enable running garbage-collect periodically for DefaultStore
|
||||
if c.Config.Storage.GC && c.Config.Storage.GCInterval != 0 {
|
||||
c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval)
|
||||
}
|
||||
|
||||
// Enable extensions if extension config is provided for DefaultStore
|
||||
if c.Config != nil && c.Config.Extensions != nil {
|
||||
ext.EnableExtensions(c.Config, c.Log, c.Config.Storage.RootDirectory)
|
||||
}
|
||||
|
||||
if c.Config.Storage.SubPaths != nil {
|
||||
for route, storageConfig := range c.Config.Storage.SubPaths {
|
||||
// Enable running garbage-collect periodically for subImageStore
|
||||
if storageConfig.GC && storageConfig.GCInterval != 0 {
|
||||
c.StoreController.SubStore[route].RunGCPeriodically(storageConfig.GCInterval)
|
||||
}
|
||||
|
||||
for _, storageConfig := range c.Config.Storage.SubPaths {
|
||||
// Enable extensions if extension config is provided for subImageStore
|
||||
if c.Config != nil && c.Config.Extensions != nil {
|
||||
ext.EnableExtensions(c.Config, c.Log, storageConfig.RootDirectory)
|
||||
|
@ -382,6 +373,131 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
|
|||
}
|
||||
|
||||
if c.Config.Extensions != nil {
|
||||
ext.EnableScrubExtension(c.Config, c.StoreController, c.Log)
|
||||
ext.EnableScrubExtension(c.Config, c.Log, false, nil, "")
|
||||
}
|
||||
|
||||
go StartPeriodicTasks(c.StoreController.DefaultStore, c.StoreController.SubStore, c.Config.Storage.SubPaths,
|
||||
c.Config.Storage.GC, c.Config.Storage.GCInterval, c.Config.Extensions, c.Log)
|
||||
}
|
||||
|
||||
func StartPeriodicTasks(defaultStore storage.ImageStore, subStore map[string]storage.ImageStore,
|
||||
subPaths map[string]config.StorageConfig, gcEnabled bool, gcInterval time.Duration,
|
||||
extensions *extconf.ExtensionConfig, log log.Logger,
|
||||
) {
|
||||
// start periodic gc and/or scrub for DefaultStore
|
||||
StartPeriodicTasksForImageStore(defaultStore, gcEnabled, gcInterval, extensions, log)
|
||||
|
||||
for route, storageConfig := range subPaths {
|
||||
// Enable running garbage-collect or/and scrub periodically for subImageStore
|
||||
StartPeriodicTasksForImageStore(subStore[route], storageConfig.GC, storageConfig.GCInterval, extensions, log)
|
||||
}
|
||||
}
|
||||
|
||||
func StartPeriodicTasksForImageStore(imageStore storage.ImageStore, configGC bool, configGCInterval time.Duration,
|
||||
extensions *extconf.ExtensionConfig, log log.Logger,
|
||||
) {
|
||||
scrubInterval := time.Duration(0)
|
||||
gcInterval := time.Duration(0)
|
||||
|
||||
gc := false
|
||||
scrub := false
|
||||
|
||||
if configGC && configGCInterval != 0 {
|
||||
gcInterval = configGCInterval
|
||||
gc = true
|
||||
}
|
||||
|
||||
if extensions != nil && extensions.Scrub != nil && extensions.Scrub.Interval != 0 {
|
||||
scrubInterval = extensions.Scrub.Interval
|
||||
scrub = true
|
||||
}
|
||||
|
||||
interval := minPeriodicInterval(scrub, gc, scrubInterval, gcInterval)
|
||||
if interval == time.Duration(0) {
|
||||
return
|
||||
}
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Periodic interval for %s set to %s", imageStore.RootDir(), interval))
|
||||
|
||||
var lastGC, lastScrub time.Time
|
||||
|
||||
for {
|
||||
log.Info().Msg(fmt.Sprintf("Starting periodic background tasks for %s", imageStore.RootDir()))
|
||||
|
||||
// Enable running garbage-collect or/and scrub periodically for imageStore
|
||||
RunBackgroundTasks(imageStore, gc, scrub, log)
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Finishing periodic background tasks for %s", imageStore.RootDir()))
|
||||
|
||||
if gc {
|
||||
lastGC = time.Now()
|
||||
}
|
||||
|
||||
if scrub {
|
||||
lastScrub = time.Now()
|
||||
}
|
||||
|
||||
time.Sleep(interval)
|
||||
|
||||
if !lastGC.IsZero() && time.Since(lastGC) >= gcInterval {
|
||||
gc = true
|
||||
}
|
||||
|
||||
if !lastScrub.IsZero() && time.Since(lastScrub) >= scrubInterval {
|
||||
scrub = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func RunBackgroundTasks(imgStore storage.ImageStore, gc, scrub bool, log log.Logger) {
|
||||
repos, err := imgStore.GetRepositories()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg(fmt.Sprintf("error while running background task for %s", imgStore.RootDir()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for _, repo := range repos {
|
||||
if gc {
|
||||
start := time.Now()
|
||||
|
||||
// run gc for this repo
|
||||
imgStore.RunGCRepo(repo)
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.Info().Msg(fmt.Sprintf("gc for %s executed in %s", repo, elapsed))
|
||||
time.Sleep(1 * time.Minute)
|
||||
}
|
||||
|
||||
if scrub {
|
||||
start := time.Now()
|
||||
|
||||
// run scrub for this repo
|
||||
ext.EnableScrubExtension(nil, log, true, imgStore, repo)
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.Info().Msg(fmt.Sprintf("scrub for %s executed in %s", repo, elapsed))
|
||||
time.Sleep(1 * time.Minute)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func minPeriodicInterval(scrub, gc bool, scrubInterval, gcInterval time.Duration) time.Duration {
|
||||
if scrub && gc {
|
||||
if scrubInterval <= gcInterval {
|
||||
return scrubInterval
|
||||
}
|
||||
|
||||
return gcInterval
|
||||
}
|
||||
|
||||
if scrub {
|
||||
return scrubInterval
|
||||
}
|
||||
|
||||
if gc {
|
||||
return gcInterval
|
||||
}
|
||||
|
||||
return time.Duration(0)
|
||||
}
|
||||
|
|
|
@ -4741,6 +4741,8 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
|
|||
|
||||
func TestPeriodicGC(t *testing.T) {
|
||||
Convey("Periodic gc enabled for default store", t, func() {
|
||||
repoName := "test"
|
||||
|
||||
port := test.GetFreePort()
|
||||
baseURL := test.GetBaseURL(port)
|
||||
conf := config.New()
|
||||
|
@ -4748,7 +4750,6 @@ func TestPeriodicGC(t *testing.T) {
|
|||
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
conf.Log.Level = "debug"
|
||||
conf.Log.Output = logFile.Name()
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
|
@ -4759,20 +4760,29 @@ func TestPeriodicGC(t *testing.T) {
|
|||
ctlr.Config.Storage.GCInterval = 1 * time.Hour
|
||||
ctlr.Config.Storage.GCDelay = 1 * time.Second
|
||||
|
||||
err = test.CopyFiles("../../test/data/zot-test", path.Join(dir, repoName))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go startServer(ctlr)
|
||||
defer stopServer(ctlr)
|
||||
test.WaitTillServerReady(baseURL)
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring,
|
||||
"\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":3600000000000")
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("executing GC of orphaned blobs for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
|
||||
So(string(data), ShouldNotContainSubstring,
|
||||
fmt.Sprintf("error while running GC for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("GC completed for %s, next GC scheduled after", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
fmt.Sprintf("executing GC of orphaned blobs for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("GC completed for %s", path.Join(ctlr.StoreController.DefaultStore.RootDir(), repoName))) //nolint:lll
|
||||
})
|
||||
|
||||
Convey("Periodic GC enabled for substore", t, func() {
|
||||
|
@ -4783,7 +4793,6 @@ func TestPeriodicGC(t *testing.T) {
|
|||
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
conf.Log.Level = "debug"
|
||||
conf.Log.Output = logFile.Name()
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
|
@ -4811,7 +4820,81 @@ func TestPeriodicGC(t *testing.T) {
|
|||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"GC\":true,\"Dedupe\":false,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) //nolint:lll // gofumpt conflicts with lll
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("executing GC of orphaned blobs for %s", ctlr.StoreController.SubStore["/a"].RootDir()))
|
||||
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.SubStore["/a"].RootDir())) //nolint:lll
|
||||
})
|
||||
}
|
||||
|
||||
func TestPeriodicTasks(t *testing.T) {
|
||||
Convey("Both periodic gc and periodic scrub enabled for default store with scrubInterval < gcInterval", t, func() {
|
||||
port := test.GetFreePort()
|
||||
baseURL := test.GetBaseURL(port)
|
||||
conf := config.New()
|
||||
conf.HTTP.Port = port
|
||||
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
conf.Log.Output = logFile.Name()
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
ctlr := api.NewController(conf)
|
||||
dir := t.TempDir()
|
||||
ctlr.Config.Storage.RootDirectory = dir
|
||||
ctlr.Config.Storage.GC = true
|
||||
ctlr.Config.Storage.GCInterval = 12 * time.Hour
|
||||
ctlr.Config.Storage.GCDelay = 1 * time.Second
|
||||
ctlr.Config.Extensions = &extconf.ExtensionConfig{Scrub: &extconf.ScrubConfig{Interval: 8 * time.Hour}}
|
||||
|
||||
go startServer(ctlr)
|
||||
defer stopServer(ctlr)
|
||||
test.WaitTillServerReady(baseURL)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
|
||||
So(string(data), ShouldNotContainSubstring,
|
||||
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("Finishing periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("Periodic interval for %s set to %s",
|
||||
ctlr.StoreController.DefaultStore.RootDir(), ctlr.Config.Extensions.Scrub.Interval))
|
||||
})
|
||||
|
||||
Convey("Both periodic gc and periodic scrub enabled for default store with gcInterval < scrubInterval", t, func() {
|
||||
port := test.GetFreePort()
|
||||
baseURL := test.GetBaseURL(port)
|
||||
conf := config.New()
|
||||
conf.HTTP.Port = port
|
||||
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
conf.Log.Output = logFile.Name()
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
ctlr := api.NewController(conf)
|
||||
dir := t.TempDir()
|
||||
ctlr.Config.Storage.RootDirectory = dir
|
||||
ctlr.Config.Storage.GC = true
|
||||
ctlr.Config.Storage.GCInterval = 8 * time.Hour
|
||||
ctlr.Config.Storage.GCDelay = 1 * time.Second
|
||||
ctlr.Config.Extensions = &extconf.ExtensionConfig{Scrub: &extconf.ScrubConfig{Interval: 12 * time.Hour}}
|
||||
|
||||
go startServer(ctlr)
|
||||
defer stopServer(ctlr)
|
||||
test.WaitTillServerReady(baseURL)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("Starting periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
|
||||
So(string(data), ShouldNotContainSubstring,
|
||||
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("Finishing periodic background tasks for %s", ctlr.StoreController.DefaultStore.RootDir())) //nolint:lll
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("Periodic interval for %s set to %s",
|
||||
ctlr.StoreController.DefaultStore.RootDir(), ctlr.Config.Storage.GCInterval))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ type MockedImageStore struct {
|
|||
getBlobContentFn func(repo, digest string) ([]byte, error)
|
||||
getReferrersFn func(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error)
|
||||
urlForPathFn func(path string) (string, error)
|
||||
runGCPeriodicallyFn func(gcInterval time.Duration)
|
||||
runGCRepoFn func(repo string)
|
||||
}
|
||||
|
||||
func (is *MockedImageStore) Lock(t *time.Time) {
|
||||
|
@ -302,9 +302,9 @@ func (is *MockedImageStore) URLForPath(path string) (string, error) {
|
|||
return "", nil
|
||||
}
|
||||
|
||||
func (is *MockedImageStore) RunGCPeriodically(gcInterval time.Duration) {
|
||||
if is != nil && is.runGCPeriodicallyFn != nil {
|
||||
is.runGCPeriodicallyFn(gcInterval)
|
||||
func (is *MockedImageStore) RunGCRepo(repo string) {
|
||||
if is != nil && is.runGCRepoFn != nil {
|
||||
is.runGCRepoFn(repo)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -509,9 +509,10 @@ func TestServeScrubExtension(t *testing.T) {
|
|||
// Even if in config we specified scrub interval=1h, the minimum interval is 2h
|
||||
So(string(data), ShouldContainSubstring,
|
||||
"\"Extensions\":{\"Search\":null,\"Sync\":null,\"Metrics\":null,\"Scrub\":{\"Interval\":3600000000000}") //nolint:lll // gofumpt conflicts with lll
|
||||
So(string(data), ShouldContainSubstring, "executing scrub to check manifest/blob integrity")
|
||||
So(string(data), ShouldContainSubstring,
|
||||
"Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.")
|
||||
So(string(data), ShouldContainSubstring, "Starting periodic background tasks for")
|
||||
So(string(data), ShouldContainSubstring, "Finishing periodic background tasks for")
|
||||
})
|
||||
|
||||
Convey("scrub not enabled - scrub interval param not set", t, func(c C) {
|
||||
|
|
|
@ -88,27 +88,22 @@ func EnableSyncExtension(ctx context.Context, config *config.Config, wg *goSync.
|
|||
}
|
||||
|
||||
// EnableScrubExtension enables scrub extension.
|
||||
func EnableScrubExtension(config *config.Config, storeController storage.StoreController,
|
||||
log log.Logger,
|
||||
) {
|
||||
if config.Extensions.Scrub != nil &&
|
||||
config.Extensions.Scrub.Interval != 0 {
|
||||
minScrubInterval, _ := time.ParseDuration("2h")
|
||||
func EnableScrubExtension(config *config.Config, log log.Logger, run bool, imgStore storage.ImageStore, repo string) {
|
||||
if !run {
|
||||
if config.Extensions.Scrub != nil &&
|
||||
config.Extensions.Scrub.Interval != 0 {
|
||||
minScrubInterval, _ := time.ParseDuration("2h")
|
||||
|
||||
if config.Extensions.Scrub.Interval < minScrubInterval {
|
||||
config.Extensions.Scrub.Interval = minScrubInterval
|
||||
if config.Extensions.Scrub.Interval < minScrubInterval {
|
||||
config.Extensions.Scrub.Interval = minScrubInterval
|
||||
|
||||
log.Warn().Msg("Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.") //nolint:lll // gofumpt conflicts with lll
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := scrub.Run(log, config.Extensions.Scrub.Interval, storeController)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error while trying to scrub")
|
||||
log.Warn().Msg("Scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.") //nolint:lll // gofumpt conflicts with lll
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
log.Info().Msg("Scrub config not provided, skipping scrub")
|
||||
}
|
||||
} else {
|
||||
log.Info().Msg("Scrub config not provided, skipping scrub")
|
||||
scrub.RunScrubRepo(imgStore, repo, log)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,9 +40,7 @@ func EnableSyncExtension(ctx context.Context, config *config.Config, wg *goSync.
|
|||
}
|
||||
|
||||
// EnableScrubExtension ...
|
||||
func EnableScrubExtension(config *config.Config, storeController storage.StoreController,
|
||||
log log.Logger,
|
||||
) {
|
||||
func EnableScrubExtension(config *config.Config, log log.Logger, run bool, imgStore storage.ImageStore, repo string) {
|
||||
log.Warn().Msg("skipping enabling scrub extension because given zot binary doesn't support any extensions," +
|
||||
"please build zot full binary for this feature")
|
||||
}
|
||||
|
|
|
@ -4,41 +4,40 @@
|
|||
package scrub
|
||||
|
||||
import (
|
||||
"time"
|
||||
"fmt"
|
||||
"path"
|
||||
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
)
|
||||
|
||||
// Scrub Extension...
|
||||
func Run(log log.Logger, scrubInterval time.Duration, storeController storage.StoreController) error {
|
||||
for {
|
||||
log.Info().Msg("executing scrub to check manifest/blob integrity")
|
||||
// Scrub Extension for repo...
|
||||
func RunScrubRepo(imgStore storage.ImageStore, repo string, log log.Logger) {
|
||||
execMsg := fmt.Sprintf("executing scrub to check manifest/blob integrity for %s", path.Join(imgStore.RootDir(), repo))
|
||||
log.Info().Msg(execMsg)
|
||||
|
||||
results, err := storeController.CheckAllBlobsIntegrity()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, result := range results.ScrubResults {
|
||||
if result.Status == "ok" {
|
||||
log.Info().
|
||||
Str("image", result.ImageName).
|
||||
Str("tag", result.Tag).
|
||||
Str("status", result.Status).
|
||||
Msg("scrub: blobs/manifest ok")
|
||||
} else {
|
||||
log.Warn().
|
||||
Str("image", result.ImageName).
|
||||
Str("tag", result.Tag).
|
||||
Str("status", result.Status).
|
||||
Str("error", result.Error).
|
||||
Msg("scrub: blobs/manifest affected")
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Str("Scrub completed, next scrub scheduled after", scrubInterval.String()).Msg("")
|
||||
|
||||
time.Sleep(scrubInterval)
|
||||
results, err := storage.CheckRepo(repo, imgStore)
|
||||
if err != nil {
|
||||
errMessage := fmt.Sprintf("error while running scrub for %s", path.Join(imgStore.RootDir(), repo))
|
||||
log.Error().Err(err).Msg(errMessage)
|
||||
}
|
||||
|
||||
for _, result := range results {
|
||||
if result.Status == "ok" {
|
||||
log.Info().
|
||||
Str("image", result.ImageName).
|
||||
Str("tag", result.Tag).
|
||||
Str("status", result.Status).
|
||||
Msg("scrub: blobs/manifest ok")
|
||||
} else {
|
||||
log.Warn().
|
||||
Str("image", result.ImageName).
|
||||
Str("tag", result.Tag).
|
||||
Str("status", result.Status).
|
||||
Str("error", result.Error).
|
||||
Msg("scrub: blobs/manifest affected")
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("scrub completed for %s", path.Join(imgStore.RootDir(), repo)))
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ package scrub_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -17,6 +18,10 @@ import (
|
|||
"zotregistry.io/zot/pkg/api"
|
||||
"zotregistry.io/zot/pkg/api/config"
|
||||
extconf "zotregistry.io/zot/pkg/extensions/config"
|
||||
"zotregistry.io/zot/pkg/extensions/monitoring"
|
||||
"zotregistry.io/zot/pkg/extensions/scrub"
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
"zotregistry.io/zot/pkg/test"
|
||||
)
|
||||
|
||||
|
@ -148,7 +153,7 @@ func TestScrubExtension(t *testing.T) {
|
|||
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest affected")
|
||||
})
|
||||
|
||||
Convey("CheckAllBlobsIntegrity error - not enough permissions to access root directory", t, func(c C) {
|
||||
Convey("RunBackgroundTasks error - not enough permissions to access root directory", t, func(c C) {
|
||||
port := test.GetFreePort()
|
||||
url := test.GetBaseURL(port)
|
||||
|
||||
|
@ -205,8 +210,91 @@ func TestScrubExtension(t *testing.T) {
|
|||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring, "error while trying to scrub")
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("error while running background task for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
|
||||
So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRunScrubRepo(t *testing.T) {
|
||||
Convey("Blobs integrity not affected", t, func(c C) {
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
dir := t.TempDir()
|
||||
log := log.NewLogger("debug", logFile.Name())
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
imgStore := storage.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics)
|
||||
|
||||
err = test.CopyFiles("../../../test/data/zot-test", path.Join(dir, repoName))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
scrub.RunScrubRepo(imgStore, repoName, log)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest ok")
|
||||
})
|
||||
|
||||
Convey("Blobs integrity affected", t, func(c C) {
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
dir := t.TempDir()
|
||||
log := log.NewLogger("debug", logFile.Name())
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
imgStore := storage.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics)
|
||||
|
||||
err = test.CopyFiles("../../../test/data/zot-test", path.Join(dir, repoName))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var manifestDigest digest.Digest
|
||||
manifestDigest, _, _ = test.GetOciLayoutDigests("../../../test/data/zot-test")
|
||||
|
||||
err = os.Remove(path.Join(dir, repoName, "blobs/sha256", manifestDigest.Encoded()))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
scrub.RunScrubRepo(imgStore, repoName, log)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest affected")
|
||||
})
|
||||
|
||||
Convey("CheckRepo error - not enough permissions to access root directory", t, func(c C) {
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
dir := t.TempDir()
|
||||
log := log.NewLogger("debug", logFile.Name())
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
imgStore := storage.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics)
|
||||
|
||||
err = test.CopyFiles("../../../test/data/zot-test", path.Join(dir, repoName))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
So(os.Chmod(path.Join(dir, repoName), 0o000), ShouldBeNil)
|
||||
|
||||
scrub.RunScrubRepo(imgStore, repoName, log)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("error while running scrub for %s", imgStore.RootDir()))
|
||||
So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1117,7 +1117,7 @@ retry:
|
|||
return nil
|
||||
}
|
||||
|
||||
func (is *ObjectStorage) RunGCPeriodically(gcInterval time.Duration) {
|
||||
func (is *ObjectStorage) RunGCRepo(repo string) {
|
||||
}
|
||||
|
||||
// DeleteBlobUpload deletes an existing blob upload that is currently in progress.
|
||||
|
|
|
@ -73,7 +73,7 @@ func CheckImageStoreBlobsIntegrity(imgStore ImageStore) ([]ScrubImageResult, err
|
|||
}
|
||||
|
||||
for _, repo := range repos {
|
||||
imageResults, err := checkRepo(repo, imgStore)
|
||||
imageResults, err := CheckRepo(repo, imgStore)
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ func CheckImageStoreBlobsIntegrity(imgStore ImageStore) ([]ScrubImageResult, err
|
|||
return results, nil
|
||||
}
|
||||
|
||||
func checkRepo(imageName string, imgStore ImageStore) ([]ScrubImageResult, error) {
|
||||
func CheckRepo(imageName string, imgStore ImageStore) ([]ScrubImageResult, error) {
|
||||
results := []ScrubImageResult{}
|
||||
|
||||
dir := path.Join(imgStore.RootDir(), imageName)
|
||||
|
|
|
@ -44,5 +44,5 @@ type ImageStore interface {
|
|||
GetIndexContent(repo string) ([]byte, error)
|
||||
GetBlobContent(repo, digest string) ([]byte, error)
|
||||
GetReferrers(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error)
|
||||
RunGCPeriodically(gcInterval time.Duration)
|
||||
RunGCRepo(repo string)
|
||||
}
|
||||
|
|
|
@ -1647,47 +1647,31 @@ func DirExists(d string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func gcAllRepos(imgStore *ImageStoreFS) error {
|
||||
repos, err := imgStore.GetRepositories()
|
||||
func (is *ImageStoreFS) gcRepo(repo string) error {
|
||||
dir := path.Join(is.RootDir(), repo)
|
||||
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
|
||||
err := is.garbageCollect(dir, repo)
|
||||
|
||||
is.Unlock(&lockLatency)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, repo := range repos {
|
||||
dir := path.Join(imgStore.RootDir(), repo)
|
||||
|
||||
var lockLatency time.Time
|
||||
|
||||
imgStore.Lock(&lockLatency)
|
||||
|
||||
err := imgStore.garbageCollect(dir, repo)
|
||||
|
||||
imgStore.Unlock(&lockLatency)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (is *ImageStoreFS) RunGCPeriodically(gcInterval time.Duration) {
|
||||
go func() {
|
||||
for {
|
||||
execMessage := fmt.Sprintf("executing GC of orphaned blobs for %s", is.RootDir())
|
||||
is.log.Info().Msg(execMessage)
|
||||
func (is *ImageStoreFS) RunGCRepo(repo string) {
|
||||
is.log.Info().Msg(fmt.Sprintf("executing GC of orphaned blobs for %s", path.Join(is.RootDir(), repo)))
|
||||
|
||||
err := gcAllRepos(is)
|
||||
if err != nil {
|
||||
errMessage := fmt.Sprintf("error while running GC for %s", is.RootDir())
|
||||
is.log.Error().Err(err).Msg(errMessage)
|
||||
}
|
||||
if err := is.gcRepo(repo); err != nil {
|
||||
errMessage := fmt.Sprintf("error while running GC for %s", path.Join(is.RootDir(), repo))
|
||||
is.log.Error().Err(err).Msg(errMessage)
|
||||
}
|
||||
|
||||
completedMessage := fmt.Sprintf("GC completed for %s, next GC scheduled after", is.RootDir())
|
||||
is.log.Info().Str(completedMessage, gcInterval.String()).Msg("")
|
||||
|
||||
time.Sleep(gcInterval)
|
||||
}
|
||||
}()
|
||||
is.log.Info().Msg(fmt.Sprintf("GC completed for %s", path.Join(is.RootDir(), repo)))
|
||||
}
|
||||
|
|
|
@ -1090,7 +1090,7 @@ func TestGarbageCollect(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGarbageCollectForImageStore(t *testing.T) {
|
||||
Convey("Garbage collect for all repos from an ImageStore", t, func(c C) {
|
||||
Convey("Garbage collect for a specific repo from an ImageStore", t, func(c C) {
|
||||
dir := t.TempDir()
|
||||
|
||||
Convey("Garbage collect error for repo with config removed", func() {
|
||||
|
@ -1115,13 +1115,14 @@ func TestGarbageCollectForImageStore(t *testing.T) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
imgStore.RunGCPeriodically(24 * time.Hour)
|
||||
imgStore.RunGCRepo(repoName)
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring, fmt.Sprintf("error while running GC for %s", imgStore.RootDir()))
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("error while running GC for %s", path.Join(imgStore.RootDir(), repoName)))
|
||||
})
|
||||
|
||||
Convey("Garbage collect error - not enough permissions to access index.json", func() {
|
||||
|
@ -1141,13 +1142,14 @@ func TestGarbageCollectForImageStore(t *testing.T) {
|
|||
|
||||
So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o000), ShouldBeNil)
|
||||
|
||||
imgStore.RunGCPeriodically(24 * time.Hour)
|
||||
imgStore.RunGCRepo(repoName)
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring, fmt.Sprintf("error while running GC for %s", imgStore.RootDir()))
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("error while running GC for %s", path.Join(imgStore.RootDir(), repoName)))
|
||||
So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o755), ShouldBeNil)
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Add table
Reference in a new issue