mirror of
https://github.com/project-zot/zot.git
synced 2024-12-30 22:34:13 -05:00
make gc periodic
Signed-off-by: Andreea-Lupu <andreealupu1470@yahoo.com>
This commit is contained in:
parent
89c5f4f604
commit
5e35dfa28f
10 changed files with 327 additions and 36 deletions
24
examples/config-gc-periodic.json
Normal file
24
examples/config-gc-periodic.json
Normal file
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"distSpecVersion":"1.0.1",
|
||||
"storage": {
|
||||
"rootDirectory": "/tmp/zot",
|
||||
"gc": true,
|
||||
"gcDelay": "1h",
|
||||
"gcInterval": "24h",
|
||||
"subPaths": {
|
||||
"/a": {
|
||||
"rootDirectory": "/tmp/zot1",
|
||||
"gc": true,
|
||||
"gcDelay": "1h",
|
||||
"gcInterval": "24h"
|
||||
}
|
||||
}
|
||||
},
|
||||
"http": {
|
||||
"address": "127.0.0.1",
|
||||
"port": "8080"
|
||||
},
|
||||
"log": {
|
||||
"level": "debug"
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ type StorageConfig struct {
|
|||
Dedupe bool
|
||||
Commit bool
|
||||
GCDelay time.Duration
|
||||
GCInterval time.Duration
|
||||
StorageDriver map[string]interface{} `mapstructure:",omitempty"`
|
||||
}
|
||||
|
||||
|
@ -99,6 +100,7 @@ type GlobalStorageConfig struct {
|
|||
GC bool
|
||||
Commit bool
|
||||
GCDelay time.Duration
|
||||
GCInterval time.Duration
|
||||
RootDirectory string
|
||||
StorageDriver map[string]interface{} `mapstructure:",omitempty"`
|
||||
SubPaths map[string]StorageConfig
|
||||
|
|
|
@ -259,11 +259,6 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
}
|
||||
|
||||
c.StoreController.DefaultStore = defaultStore
|
||||
|
||||
// Enable extensions if extension config is provided
|
||||
if c.Config != nil && c.Config.Extensions != nil {
|
||||
ext.EnableExtensions(c.Config, c.Log, c.Config.Storage.RootDirectory)
|
||||
}
|
||||
} else {
|
||||
// we can't proceed without global storage
|
||||
c.Log.Error().Err(errors.ErrImgStoreNotFound).Msg("controller: no storage config provided")
|
||||
|
@ -309,25 +304,13 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error {
|
|||
subImageStore[route] = s3.NewImageStore(storageConfig.RootDirectory,
|
||||
storageConfig.GC, storageConfig.GCDelay, storageConfig.Dedupe, storageConfig.Commit, c.Log, c.Metrics, store)
|
||||
}
|
||||
|
||||
// Enable extensions if extension config is provided
|
||||
if c.Config != nil && c.Config.Extensions != nil {
|
||||
ext.EnableExtensions(c.Config, c.Log, storageConfig.RootDirectory)
|
||||
}
|
||||
}
|
||||
|
||||
c.StoreController.SubStore = subImageStore
|
||||
}
|
||||
}
|
||||
|
||||
// Enable extensions if extension config is provided
|
||||
if c.Config.Extensions != nil && c.Config.Extensions.Sync != nil && *c.Config.Extensions.Sync.Enable {
|
||||
ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.StoreController, c.Log)
|
||||
}
|
||||
|
||||
if c.Config.Extensions != nil {
|
||||
ext.EnableScrubExtension(c.Config, c.StoreController, c.Log)
|
||||
}
|
||||
c.StartBackgroundTasks(reloadCtx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -356,3 +339,40 @@ func (c *Controller) Shutdown() {
|
|||
ctx := context.Background()
|
||||
_ = c.Server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
|
||||
// Enable running garbage-collect periodically for DefaultStore
|
||||
if c.Config.Storage.GC && c.Config.Storage.GCInterval != 0 {
|
||||
c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval)
|
||||
}
|
||||
|
||||
// Enable extensions if extension config is provided for DefaultStore
|
||||
if c.Config != nil && c.Config.Extensions != nil {
|
||||
ext.EnableExtensions(c.Config, c.Log, c.Config.Storage.RootDirectory)
|
||||
}
|
||||
|
||||
if c.Config.Storage.SubPaths != nil {
|
||||
for route, storageConfig := range c.Config.Storage.SubPaths {
|
||||
// Enable running garbage-collect periodically for subImageStore
|
||||
if storageConfig.GC && storageConfig.GCInterval != 0 {
|
||||
c.StoreController.SubStore[route].RunGCPeriodically(storageConfig.GCInterval)
|
||||
}
|
||||
|
||||
// Enable extensions if extension config is provided for subImageStore
|
||||
if c.Config != nil && c.Config.Extensions != nil {
|
||||
ext.EnableExtensions(c.Config, c.Log, storageConfig.RootDirectory)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Enable extensions if extension config is provided for storeController
|
||||
if c.Config.Extensions != nil {
|
||||
if c.Config.Extensions.Sync != nil && *c.Config.Extensions.Sync.Enable {
|
||||
ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.StoreController, c.Log)
|
||||
}
|
||||
}
|
||||
|
||||
if c.Config.Extensions != nil {
|
||||
ext.EnableScrubExtension(c.Config, c.StoreController, c.Log)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4472,6 +4472,82 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestPeriodicGC(t *testing.T) {
|
||||
Convey("Periodic gc enabled for default store", t, func() {
|
||||
port := test.GetFreePort()
|
||||
baseURL := test.GetBaseURL(port)
|
||||
conf := config.New()
|
||||
conf.HTTP.Port = port
|
||||
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
conf.Log.Level = "debug"
|
||||
conf.Log.Output = logFile.Name()
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
ctlr := api.NewController(conf)
|
||||
dir := t.TempDir()
|
||||
ctlr.Config.Storage.RootDirectory = dir
|
||||
ctlr.Config.Storage.GC = true
|
||||
ctlr.Config.Storage.GCInterval = 1 * time.Hour
|
||||
ctlr.Config.Storage.GCDelay = 1 * time.Second
|
||||
|
||||
go startServer(ctlr)
|
||||
defer stopServer(ctlr)
|
||||
test.WaitTillServerReady(baseURL)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring,
|
||||
"\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":3600000000000")
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("executing GC of orphaned blobs for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
So(string(data), ShouldNotContainSubstring,
|
||||
fmt.Sprintf("error while running GC for %s", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("GC completed for %s, next GC scheduled after", ctlr.StoreController.DefaultStore.RootDir()))
|
||||
})
|
||||
|
||||
Convey("Periodic GC enabled for substore", t, func() {
|
||||
port := test.GetFreePort()
|
||||
baseURL := test.GetBaseURL(port)
|
||||
conf := config.New()
|
||||
conf.HTTP.Port = port
|
||||
|
||||
logFile, err := ioutil.TempFile("", "zot-log*.txt")
|
||||
So(err, ShouldBeNil)
|
||||
conf.Log.Level = "debug"
|
||||
conf.Log.Output = logFile.Name()
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
ctlr := api.NewController(conf)
|
||||
dir := t.TempDir()
|
||||
subDir := t.TempDir()
|
||||
|
||||
subPaths := make(map[string]config.StorageConfig)
|
||||
|
||||
subPaths["/a"] = config.StorageConfig{RootDirectory: subDir, GC: true, GCDelay: 1 * time.Second, GCInterval: 24 * time.Hour} // nolint:lll
|
||||
|
||||
ctlr.Config.Storage.SubPaths = subPaths
|
||||
ctlr.Config.Storage.RootDirectory = dir
|
||||
|
||||
go startServer(ctlr)
|
||||
defer stopServer(ctlr)
|
||||
test.WaitTillServerReady(baseURL)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
// periodic GC is not enabled for default store
|
||||
So(string(data), ShouldContainSubstring,
|
||||
"\"GCDelay\":3600000000000,\"GCInterval\":0,\"RootDirectory\":\""+dir+"\"")
|
||||
// periodic GC is enabled for sub store
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"GC\":true,\"Dedupe\":false,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) // nolint:lll
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("executing GC of orphaned blobs for %s", ctlr.StoreController.SubStore["/a"].RootDir()))
|
||||
})
|
||||
}
|
||||
|
||||
func getAllBlobs(imagePath string) []string {
|
||||
blobList := make([]string, 0)
|
||||
|
||||
|
|
|
@ -209,9 +209,23 @@ func validateConfiguration(config *config.Config) error {
|
|||
return errors.ErrBadConfig
|
||||
}
|
||||
|
||||
if !config.Storage.GC && config.Storage.GCDelay != 0 {
|
||||
log.Warn().Err(errors.ErrBadConfig).
|
||||
Msg("garbage-collect delay specified without enabling garbage-collect, will be ignored")
|
||||
if config.Storage.GCInterval < 0 {
|
||||
log.Error().Err(errors.ErrBadConfig).
|
||||
Msgf("invalid garbage-collect interval %v specified", config.Storage.GCInterval)
|
||||
|
||||
return errors.ErrBadConfig
|
||||
}
|
||||
|
||||
if !config.Storage.GC {
|
||||
if config.Storage.GCDelay != 0 {
|
||||
log.Warn().Err(errors.ErrBadConfig).
|
||||
Msg("garbage-collect delay specified without enabling garbage-collect, will be ignored")
|
||||
}
|
||||
|
||||
if config.Storage.GCInterval != 0 {
|
||||
log.Warn().Err(errors.ErrBadConfig).
|
||||
Msg("periodic garbage-collect interval specified without enabling garbage-collect, will be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
// check authorization config, it should have basic auth enabled or ldap
|
||||
|
|
|
@ -312,6 +312,8 @@ func TestGC(t *testing.T) {
|
|||
err = cli.LoadConfiguration(config, "../../examples/config-gc.json")
|
||||
So(err, ShouldBeNil)
|
||||
So(config.Storage.GCDelay, ShouldNotEqual, storage.DefaultGCDelay)
|
||||
err = cli.LoadConfiguration(config, "../../examples/config-gc-periodic.json")
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Test GC config corner cases", t, func(c C) {
|
||||
|
@ -336,6 +338,26 @@ func TestGC(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("GC interval without GC", func() {
|
||||
config := config.New()
|
||||
err = json.Unmarshal(contents, config)
|
||||
config.Storage.GC = false
|
||||
config.Storage.GCDelay = 0
|
||||
config.Storage.GCInterval = 24 * time.Hour
|
||||
|
||||
file, err := ioutil.TempFile("", "gc-config-*.json")
|
||||
So(err, ShouldBeNil)
|
||||
defer os.Remove(file.Name())
|
||||
|
||||
contents, err = json.MarshalIndent(config, "", " ")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = ioutil.WriteFile(file.Name(), contents, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
err = cli.LoadConfiguration(config, file.Name())
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Negative GC delay", func() {
|
||||
config := config.New()
|
||||
err = json.Unmarshal(contents, config)
|
||||
|
@ -371,6 +393,24 @@ func TestGC(t *testing.T) {
|
|||
So(err, ShouldBeNil)
|
||||
So(config.Storage.GCDelay, ShouldEqual, 0)
|
||||
})
|
||||
|
||||
Convey("Negative GC interval", func() {
|
||||
config := config.New()
|
||||
err = json.Unmarshal(contents, config)
|
||||
config.Storage.GCInterval = -1 * time.Second
|
||||
|
||||
file, err := ioutil.TempFile("", "gc-config-*.json")
|
||||
So(err, ShouldBeNil)
|
||||
defer os.Remove(file.Name())
|
||||
|
||||
contents, err = json.MarshalIndent(config, "", " ")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = ioutil.WriteFile(file.Name(), contents, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
err = cli.LoadConfiguration(config, file.Name())
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1006,6 +1006,9 @@ func (is *ObjectStorage) DedupeBlob(src string, dstDigest godigest.Digest, dst s
|
|||
return nil
|
||||
}
|
||||
|
||||
func (is *ObjectStorage) RunGCPeriodically(gcInterval time.Duration) {
|
||||
}
|
||||
|
||||
// DeleteBlobUpload deletes an existing blob upload that is currently in progress.
|
||||
func (is *ObjectStorage) DeleteBlobUpload(repo string, uuid string) error {
|
||||
blobUploadPath := is.BlobUploadPath(repo, uuid)
|
||||
|
|
|
@ -44,4 +44,5 @@ type ImageStore interface {
|
|||
GetIndexContent(repo string) ([]byte, error)
|
||||
GetBlobContent(repo, digest string) ([]byte, error)
|
||||
GetReferrers(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error)
|
||||
RunGCPeriodically(gcInterval time.Duration)
|
||||
}
|
||||
|
|
|
@ -691,14 +691,7 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp
|
|||
}
|
||||
|
||||
if is.gc {
|
||||
oci, err := umoci.OpenLayout(dir)
|
||||
if err := test.Error(err); err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer oci.Close()
|
||||
|
||||
err = oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay))
|
||||
if err := test.Error(err); err != nil {
|
||||
if err := is.garbageCollect(dir, repo); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
@ -793,13 +786,7 @@ func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error
|
|||
}
|
||||
|
||||
if is.gc {
|
||||
oci, err := umoci.OpenLayout(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer oci.Close()
|
||||
|
||||
if err := oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay)); err != nil {
|
||||
if err := is.garbageCollect(dir, repo); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -1610,6 +1597,21 @@ func ensureDir(dir string, log zerolog.Logger) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (is *ImageStoreFS) garbageCollect(dir string, repo string) error {
|
||||
oci, err := umoci.OpenLayout(dir)
|
||||
if err := test.Error(err); err != nil {
|
||||
return err
|
||||
}
|
||||
defer oci.Close()
|
||||
|
||||
err = oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay))
|
||||
if err := test.Error(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ifOlderThan(imgStore *ImageStoreFS, repo string, delay time.Duration) casext.GCPolicy {
|
||||
return func(ctx context.Context, digest godigest.Digest) (bool, error) {
|
||||
blobPath := imgStore.BlobPath(repo, digest)
|
||||
|
@ -1641,3 +1643,48 @@ func DirExists(d string) bool {
|
|||
|
||||
return true
|
||||
}
|
||||
|
||||
func gcAllRepos(imgStore *ImageStoreFS) error {
|
||||
repos, err := imgStore.GetRepositories()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, repo := range repos {
|
||||
dir := path.Join(imgStore.RootDir(), repo)
|
||||
|
||||
var lockLatency time.Time
|
||||
|
||||
imgStore.Lock(&lockLatency)
|
||||
|
||||
err := imgStore.garbageCollect(dir, repo)
|
||||
|
||||
imgStore.Unlock(&lockLatency)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (is *ImageStoreFS) RunGCPeriodically(gcInterval time.Duration) {
|
||||
go func() {
|
||||
for {
|
||||
execMessage := fmt.Sprintf("executing GC of orphaned blobs for %s", is.RootDir())
|
||||
is.log.Info().Msg(execMessage)
|
||||
|
||||
err := gcAllRepos(is)
|
||||
if err != nil {
|
||||
errMessage := fmt.Sprintf("error while running GC for %s", is.RootDir())
|
||||
is.log.Error().Err(err).Msg(errMessage)
|
||||
}
|
||||
|
||||
completedMessage := fmt.Sprintf("GC completed for %s, next GC scheduled after", is.RootDir())
|
||||
is.log.Info().Str(completedMessage, gcInterval.String()).Msg("")
|
||||
|
||||
time.Sleep(gcInterval)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"crypto/rand"
|
||||
_ "crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"os"
|
||||
|
@ -1079,6 +1080,69 @@ func TestGarbageCollect(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestGarbageCollectForImageStore(t *testing.T) {
|
||||
Convey("Garbage collect for all repos from an ImageStore", t, func(c C) {
|
||||
dir := t.TempDir()
|
||||
|
||||
Convey("Garbage collect error for repo with config removed", func() {
|
||||
logFile, _ := ioutil.TempFile("", "zot-log*.txt")
|
||||
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
log := log.NewLogger("debug", logFile.Name())
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
imgStore := storage.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics)
|
||||
repoName := "gc-all-repos-short"
|
||||
|
||||
err := test.CopyFiles("../../test/data/zot-test", path.Join(dir, repoName))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = os.Remove(path.Join(dir, repoName, "blobs/sha256",
|
||||
"2bacca16b9df395fc855c14ccf50b12b58d35d468b8e7f25758aff90f89bf396"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
imgStore.RunGCPeriodically(24 * time.Hour)
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring, fmt.Sprintf("error while running GC for %s", imgStore.RootDir()))
|
||||
})
|
||||
|
||||
Convey("Garbage collect error - not enough permissions to access index.json", func() {
|
||||
logFile, _ := ioutil.TempFile("", "zot-log*.txt")
|
||||
|
||||
defer os.Remove(logFile.Name()) // clean up
|
||||
|
||||
log := log.NewLogger("debug", logFile.Name())
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
imgStore := storage.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics)
|
||||
repoName := "gc-all-repos-short"
|
||||
|
||||
err := test.CopyFiles("../../test/data/zot-test", path.Join(dir, repoName))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o000), ShouldBeNil)
|
||||
|
||||
imgStore.RunGCPeriodically(24 * time.Hour)
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
data, err := os.ReadFile(logFile.Name())
|
||||
So(err, ShouldBeNil)
|
||||
So(string(data), ShouldContainSubstring, fmt.Sprintf("error while running GC for %s", imgStore.RootDir()))
|
||||
So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o755), ShouldBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func randSeq(n int) string {
|
||||
letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
|
||||
|
|
Loading…
Reference in a new issue