0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-30 22:34:13 -05:00

fix: remove inline GC and schedule a background task instead (#1610)

* fix: remove inline GC and set a default value of gc interval

- remove inline GC
- add a default value of GC interval
- run the GC periodically by default with the default value if no interval provided
- generate GC tasks with a random delay(0-30s) between
- add IsReady() method to scheduler.TaskGenerator interface

Signed-off-by: Andreea-Lupu <andreealupu1470@yahoo.com>

* ci: add test for gc with short interval

Signed-off-by: Andreea-Lupu <andreealupu1470@yahoo.com>

---------

Signed-off-by: Andreea-Lupu <andreealupu1470@yahoo.com>
This commit is contained in:
Andreea Lupu 2023-08-07 22:55:19 +03:00 committed by GitHub
parent fce9a02ed5
commit 76277f5ebd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 411 additions and 151 deletions

37
.github/workflows/gc-stress-test.yaml vendored Normal file
View file

@ -0,0 +1,37 @@
name: "GC stress test"
on:
push:
branches:
- main
pull_request:
branches: [main]
release:
types:
- published
permissions: read-all
jobs:
client-tools:
name: GC with short interval
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: ./.github/actions/clean-runner
- uses: actions/setup-go@v4
with:
cache: false
go-version: 1.20.x
- name: Run zb
run: |
make binary
make bench
./bin/zot-linux-amd64 serve examples/config-gc-bench.json &
sleep 10
bin/zb-linux-amd64 -c 10 -n 100 -o ci-cd http://localhost:8080
killall -r zot-*
# clean zot storage
sudo rm -rf /tmp/zot

View file

@ -2,8 +2,8 @@
"distSpecVersion": "1.1.0-dev", "distSpecVersion": "1.1.0-dev",
"storage": { "storage": {
"rootDirectory": "/tmp/zot", "rootDirectory": "/tmp/zot",
"gc": false, "gc": true,
"dedupe": false "dedupe": true
}, },
"http": { "http": {
"address": "0.0.0.0", "address": "0.0.0.0",

View file

@ -0,0 +1,17 @@
{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "/tmp/zot",
"gc": true,
"gcDelay": "10s",
"gcInterval": "1s"
},
"http": {
"address": "127.0.0.1",
"port": "8080"
},
"log": {
"level": "debug",
"output": "/dev/null"
}
}

View file

@ -185,7 +185,10 @@ func New() *Config {
ReleaseTag: ReleaseTag, ReleaseTag: ReleaseTag,
BinaryType: BinaryType, BinaryType: BinaryType,
Storage: GlobalStorageConfig{ Storage: GlobalStorageConfig{
StorageConfig: StorageConfig{GC: true, GCDelay: storageConstants.DefaultGCDelay, Dedupe: true}, StorageConfig: StorageConfig{
GC: true, GCDelay: storageConstants.DefaultGCDelay,
GCInterval: storageConstants.DefaultGCInterval, Dedupe: true,
},
}, },
HTTP: HTTPConfig{Address: "127.0.0.1", Port: "8080", Auth: &AuthConfig{FailDelay: 0}}, HTTP: HTTPConfig{Address: "127.0.0.1", Port: "8080", Auth: &AuthConfig{FailDelay: 0}},
Log: &LogConfig{Level: "debug"}, Log: &LogConfig{Level: "debug"},

View file

@ -323,7 +323,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
taskScheduler.RunScheduler(reloadCtx) taskScheduler.RunScheduler(reloadCtx)
// Enable running garbage-collect periodically for DefaultStore // Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC && c.Config.Storage.GCInterval != 0 { if c.Config.Storage.GC {
c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval, taskScheduler) c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval, taskScheduler)
} }
@ -339,7 +339,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
if c.Config.Storage.SubPaths != nil { if c.Config.Storage.SubPaths != nil {
for route, storageConfig := range c.Config.Storage.SubPaths { for route, storageConfig := range c.Config.Storage.SubPaths {
// Enable running garbage-collect periodically for subImageStore // Enable running garbage-collect periodically for subImageStore
if storageConfig.GC && storageConfig.GCInterval != 0 { if storageConfig.GC {
c.StoreController.SubStore[route].RunGCPeriodically(storageConfig.GCInterval, taskScheduler) c.StoreController.SubStore[route].RunGCPeriodically(storageConfig.GCInterval, taskScheduler)
} }

View file

@ -4514,6 +4514,7 @@ func TestCrossRepoMount(t *testing.T) {
cm.StopServer() cm.StopServer()
ctlr.Config.Storage.Dedupe = true ctlr.Config.Storage.Dedupe = true
ctlr.Config.Storage.GC = false
ctlr.Config.Storage.RootDirectory = newDir ctlr.Config.Storage.RootDirectory = newDir
cm = test.NewControllerManager(ctlr) //nolint: varnamelen cm = test.NewControllerManager(ctlr) //nolint: varnamelen
cm.StartAndWait(port) cm.StartAndWait(port)
@ -7363,48 +7364,6 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
So(resp.StatusCode, ShouldEqual, http.StatusCreated) So(resp.StatusCode, ShouldEqual, http.StatusCreated)
} }
}) })
Convey("code coverage: error inside PutImageManifest method of img store (umoci.OpenLayout error)", func() {
injected := inject.InjectFailure(3)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
Convey("code coverage: error inside PutImageManifest method of img store (oci.GC)", func() {
injected := inject.InjectFailure(4)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
Convey("when index.json is not in json format", func() { Convey("when index.json is not in json format", func() {
resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).Put(baseURL + "/v2/repotest/manifests/v1.0") SetBody(content).Put(baseURL + "/v2/repotest/manifests/v1.0")
@ -7447,6 +7406,8 @@ func TestGCSignaturesAndUntaggedManifests(t *testing.T) {
ctlr.Config.Storage.GC = true ctlr.Config.Storage.GC = true
ctlr.Config.Storage.GCDelay = 1 * time.Millisecond ctlr.Config.Storage.GCDelay = 1 * time.Millisecond
ctlr.Config.Storage.Dedupe = false
test.CopyTestFiles("../../test/data/zot-test", path.Join(dir, repoName)) test.CopyTestFiles("../../test/data/zot-test", path.Join(dir, repoName))
cm := test.NewControllerManager(ctlr) cm := test.NewControllerManager(ctlr)
@ -7530,6 +7491,9 @@ func TestGCSignaturesAndUntaggedManifests(t *testing.T) {
img := test.CreateRandomImage() img := test.CreateRandomImage()
err = test.UploadImage(img, baseURL, repoName, img.DigestStr()) err = test.UploadImage(img, baseURL, repoName, img.DigestStr())
So(err, ShouldBeNil)
err = ctlr.StoreController.DefaultStore.RunGCRepo(repoName)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
err = os.Chmod(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), 0o755) err = os.Chmod(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), 0o755)
@ -7541,6 +7505,9 @@ func TestGCSignaturesAndUntaggedManifests(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = test.UploadImage(img, baseURL, repoName, tag) err = test.UploadImage(img, baseURL, repoName, tag)
So(err, ShouldBeNil)
err = ctlr.StoreController.DefaultStore.RunGCRepo(repoName)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
err = os.WriteFile(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), content, 0o600) err = os.WriteFile(path.Join(dir, repoName, "blobs", "sha256", refs.Manifests[0].Digest.Encoded()), content, 0o600)
@ -7579,6 +7546,9 @@ func TestGCSignaturesAndUntaggedManifests(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
newManifestDigest := godigest.FromBytes(manifestBuf) newManifestDigest := godigest.FromBytes(manifestBuf)
err = ctlr.StoreController.DefaultStore.RunGCRepo(repoName)
So(err, ShouldBeNil)
// both signatures should be gc'ed // both signatures should be gc'ed
resp, err = resty.R().Get(baseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, cosignTag)) resp, err = resty.R().Get(baseURL + fmt.Sprintf("/v2/%s/manifests/%s", repoName, cosignTag))
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -7669,6 +7639,9 @@ func TestGCSignaturesAndUntaggedManifests(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated) So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
err = ctlr.StoreController.DefaultStore.RunGCRepo(repoName)
So(err, ShouldBeNil)
resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex). resp, err = resty.R().SetHeader("Content-Type", ispec.MediaTypeImageIndex).
Get(baseURL + fmt.Sprintf("/v2/%s/manifests/latest", repoName)) Get(baseURL + fmt.Sprintf("/v2/%s/manifests/latest", repoName))
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -7752,9 +7725,9 @@ func TestPeriodicGC(t *testing.T) {
data, err := os.ReadFile(logFile.Name()) data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil) So(err, ShouldBeNil)
// periodic GC is not enabled for default store // periodic GC is enabled by default for default store with a default interval
So(string(data), ShouldContainSubstring, So(string(data), ShouldContainSubstring,
"\"GCDelay\":3600000000000,\"GCInterval\":0,\"") "\"GCDelay\":3600000000000,\"GCInterval\":3600000000000,\"")
// periodic GC is enabled for sub store // periodic GC is enabled for sub store
So(string(data), ShouldContainSubstring, So(string(data), ShouldContainSubstring,
fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"Dedupe\":false,\"RemoteCache\":false,\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) //nolint:lll // gofumpt conflicts with lll fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"Dedupe\":false,\"RemoteCache\":false,\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) //nolint:lll // gofumpt conflicts with lll

View file

@ -614,10 +614,16 @@ func applyDefaultValues(config *config.Config, viperInstance *viper.Viper) {
} }
} }
if !config.Storage.GC && viperInstance.Get("storage::gcdelay") == nil { if !config.Storage.GC {
if viperInstance.Get("storage::gcdelay") == nil {
config.Storage.GCDelay = 0 config.Storage.GCDelay = 0
} }
if viperInstance.Get("storage::gcinterval") == nil {
config.Storage.GCInterval = 0
}
}
// cache settings // cache settings
// global storage // global storage
@ -662,9 +668,18 @@ func applyDefaultValues(config *config.Config, viperInstance *viper.Viper) {
} }
} }
// if gc is enabled and gcDelay is not set, it is set to default value // if gc is enabled
if storageConfig.GC && !viperInstance.IsSet("storage::subpaths::"+name+"::gcdelay") { if storageConfig.GC {
// and gcDelay is not set, it is set to default value
if !viperInstance.IsSet("storage::subpaths::" + name + "::gcdelay") {
storageConfig.GCDelay = storageConstants.DefaultGCDelay storageConfig.GCDelay = storageConstants.DefaultGCDelay
}
// and gcInterval is not set, it is set to default value
if !viperInstance.IsSet("storage::subpaths::" + name + "::gcinterval") {
storageConfig.GCInterval = storageConstants.DefaultGCInterval
}
config.Storage.SubPaths[name] = storageConfig config.Storage.SubPaths[name] = storageConfig
} }
} }

View file

@ -21,9 +21,8 @@ var playgroundHTML embed.FS
// SetupGQLPlaygroundRoutes ... // SetupGQLPlaygroundRoutes ...
func SetupGQLPlaygroundRoutes(router *mux.Router, func SetupGQLPlaygroundRoutes(router *mux.Router,
storeController storage.StoreController, l log.Logger, storeController storage.StoreController, log log.Logger,
) { ) {
log := log.Logger{Logger: l.With().Caller().Timestamp().Logger()}
log.Info().Msg("setting up graphql playground route") log.Info().Msg("setting up graphql playground route")
templ, err := template.ParseFS(playgroundHTML, "index.html.tmpl") templ, err := template.ParseFS(playgroundHTML, "index.html.tmpl")

View file

@ -218,6 +218,16 @@ func TestSignatureUploadAndVerification(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(found, ShouldBeTrue) So(found, ShouldBeTrue)
found, err = test.ReadLogFileAndSearchString(logFile.Name(),
"finished generating tasks for updating signatures validity", 10*time.Second)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)
found, err = test.ReadLogFileAndSearchString(logFile.Name(),
"finished resetting task generator for updating signatures validity", 10*time.Second)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)
resp, err = client.R().SetHeader("Content-type", "application/octet-stream"). resp, err = client.R().SetHeader("Content-type", "application/octet-stream").
SetBody([]byte("wrong content")).Post(baseURL + constants.FullCosign) SetBody([]byte("wrong content")).Post(baseURL + constants.FullCosign)
So(err, ShouldBeNil) So(err, ShouldBeNil)

View file

@ -79,6 +79,10 @@ func (gen *taskGenerator) IsDone() bool {
return gen.done return gen.done
} }
func (gen *taskGenerator) IsReady() bool {
return true
}
func (gen *taskGenerator) Reset() { func (gen *taskGenerator) Reset() {
gen.lastRepo = "" gen.lastRepo = ""
gen.done = false gen.done = false

View file

@ -108,6 +108,10 @@ func (gen *TrivyTaskGenerator) IsDone() bool {
return status == done return status == done
} }
func (gen *TrivyTaskGenerator) IsReady() bool {
return true
}
func (gen *TrivyTaskGenerator) Reset() { func (gen *TrivyTaskGenerator) Reset() {
gen.lock.Lock() gen.lock.Lock()
gen.status = pending gen.status = pending

View file

@ -46,6 +46,7 @@ func TestScrubExtension(t *testing.T) {
conf.Storage.RootDirectory = dir conf.Storage.RootDirectory = dir
conf.Storage.Dedupe = false conf.Storage.Dedupe = false
conf.Storage.GC = false
substore := config.StorageConfig{RootDirectory: subdir} substore := config.StorageConfig{RootDirectory: subdir}
conf.Storage.SubPaths = map[string]config.StorageConfig{"/a": substore} conf.Storage.SubPaths = map[string]config.StorageConfig{"/a": substore}
@ -89,6 +90,7 @@ func TestScrubExtension(t *testing.T) {
conf.Storage.RootDirectory = dir conf.Storage.RootDirectory = dir
conf.Storage.Dedupe = false conf.Storage.Dedupe = false
conf.Storage.GC = false
conf.Log.Output = logFile.Name() conf.Log.Output = logFile.Name()
trueValue := true trueValue := true
@ -137,6 +139,7 @@ func TestScrubExtension(t *testing.T) {
conf.Storage.RootDirectory = dir conf.Storage.RootDirectory = dir
conf.Storage.Dedupe = false conf.Storage.Dedupe = false
conf.Storage.GC = false
conf.Log.Output = logFile.Name() conf.Log.Output = logFile.Name()
trueValue := true trueValue := true

View file

@ -114,6 +114,10 @@ func (gen *TaskGenerator) IsDone() bool {
return gen.done return gen.done
} }
func (gen *TaskGenerator) IsReady() bool {
return true
}
func (gen *TaskGenerator) Reset() { func (gen *TaskGenerator) Reset() {
gen.lastRepo = "" gen.lastRepo = ""
gen.Service.ResetCatalog() gen.Service.ResetCatalog()

View file

@ -131,6 +131,7 @@ func makeUpstreamServer(
} }
srcConfig.HTTP.Port = srcPort srcConfig.HTTP.Port = srcPort
srcConfig.Storage.GC = false
srcDir := t.TempDir() srcDir := t.TempDir()
@ -1663,6 +1664,7 @@ func TestPermsDenied(t *testing.T) {
destDir := t.TempDir() destDir := t.TempDir()
destConfig.Storage.GC = false
destConfig.Storage.RootDirectory = destDir destConfig.Storage.RootDirectory = destDir
destConfig.Extensions = &extconf.ExtensionConfig{} destConfig.Extensions = &extconf.ExtensionConfig{}
@ -3038,6 +3040,8 @@ func TestSubPaths(t *testing.T) {
srcConfig.HTTP.Port = srcPort srcConfig.HTTP.Port = srcPort
srcConfig.Storage.GC = false
srcDir := t.TempDir() srcDir := t.TempDir()
subpath := "/subpath" subpath := "/subpath"
@ -4506,6 +4510,7 @@ func TestOnDemandRetryGoroutine(t *testing.T) {
srcBaseURL := test.GetBaseURL(srcPort) srcBaseURL := test.GetBaseURL(srcPort)
srcConfig.HTTP.Port = srcPort srcConfig.HTTP.Port = srcPort
srcConfig.Storage.GC = false
srcDir := t.TempDir() srcDir := t.TempDir()
@ -4712,6 +4717,7 @@ func TestOnDemandMultipleImage(t *testing.T) {
srcBaseURL := test.GetBaseURL(srcPort) srcBaseURL := test.GetBaseURL(srcPort)
srcConfig.HTTP.Port = srcPort srcConfig.HTTP.Port = srcPort
srcConfig.Storage.GC = false
srcDir := t.TempDir() srcDir := t.TempDir()

View file

@ -99,6 +99,8 @@ func (gen *sigValidityTaskGenerator) Next() (scheduler.Task, error) {
if gen.repoIndex >= len(gen.repos) { if gen.repoIndex >= len(gen.repos) {
gen.done = true gen.done = true
gen.log.Info().Msg("finished generating tasks for updating signatures validity")
return nil, nil return nil, nil
} }
@ -109,10 +111,16 @@ func (gen *sigValidityTaskGenerator) IsDone() bool {
return gen.done return gen.done
} }
func (gen *sigValidityTaskGenerator) IsReady() bool {
return true
}
func (gen *sigValidityTaskGenerator) Reset() { func (gen *sigValidityTaskGenerator) Reset() {
gen.done = false gen.done = false
gen.repoIndex = -1 gen.repoIndex = -1
gen.repos = []mTypes.RepoMetadata{} gen.repos = []mTypes.RepoMetadata{}
gen.log.Info().Msg("finished resetting task generator for updating signatures validity")
} }
type validityTask struct { type validityTask struct {

View file

@ -1,7 +1,7 @@
# How to submit a Generator to the scheduler # How to submit a Generator to the scheduler
## What is a generator and how should it be implemented? ## What is a generator and how should it be implemented?
In order to create a new generator (which will generate new tasks one by one) and add it to the scheduler there are 3 methods which should be implemented: In order to create a new generator (which will generate new tasks one by one) and add it to the scheduler there are 4 methods which should be implemented:
1. ***GenerateTask() (Task, error)*** 1. ***GenerateTask() (Task, error)***
``` ```
This method should implement the logic for generating a new task. This method should implement the logic for generating a new task.
@ -12,7 +12,11 @@ In order to create a new generator (which will generate new tasks one by one) an
``` ```
This method should return true after the generator finished all the work and has no more tasks to generate. This method should return true after the generator finished all the work and has no more tasks to generate.
``` ```
3. ***Reset()*** 3. ***IsReady() bool***
```
This method should return true if the generator is ready to generate a new task and should be used when it is needed to generate tasks with some delay between.
```
4. ***Reset()***
``` ```
When this method is called the generator should reset to its initial state. When this method is called the generator should reset to its initial state.
After the generator is reset, it will generate new tasks as if it hadn't been used before. After the generator is reset, it will generate new tasks as if it hadn't been used before.

View file

@ -284,6 +284,7 @@ const (
type TaskGenerator interface { type TaskGenerator interface {
Next() (Task, error) Next() (Task, error)
IsDone() bool IsDone() bool
IsReady() bool
Reset() Reset()
} }
@ -351,6 +352,10 @@ func (gen *generator) getState() state {
} }
} }
if !gen.taskGenerator.IsReady() {
return waiting
}
return ready return ready
} }

View file

@ -56,6 +56,10 @@ func (g *generator) IsDone() bool {
return g.done return g.done
} }
func (g *generator) IsReady() bool {
return true
}
func (g *generator) Reset() { func (g *generator) Reset() {
g.done = false g.done = false
g.step = 0 g.step = 0
@ -79,6 +83,10 @@ func (g *shortGenerator) IsDone() bool {
return g.done return g.done
} }
func (g *shortGenerator) IsReady() bool {
return true
}
func (g *shortGenerator) Reset() { func (g *shortGenerator) Reset() {
g.done = true g.done = true
g.step = 0 g.step = 0

View file

@ -4,8 +4,11 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"io"
"math/rand"
"path" "path"
"strings" "strings"
"time"
notreg "github.com/notaryproject/notation-go/registry" notreg "github.com/notaryproject/notation-go/registry"
godigest "github.com/opencontainers/go-digest" godigest "github.com/opencontainers/go-digest"
@ -853,6 +856,10 @@ func (gen *DedupeTaskGenerator) IsDone() bool {
return gen.done return gen.done
} }
func (gen *DedupeTaskGenerator) IsReady() bool {
return true
}
func (gen *DedupeTaskGenerator) Reset() { func (gen *DedupeTaskGenerator) Reset() {
gen.lastDigests = []godigest.Digest{} gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{} gen.duplicateBlobs = []string{}
@ -886,3 +893,78 @@ func (dt *dedupeTask) DoWork() error {
return err return err
} }
/*
GCTaskGenerator takes all repositories found in the storage.imagestore
and it will execute garbage collection for each repository by creating a task
for each repository and pushing it to the task scheduler.
*/
type GCTaskGenerator struct {
ImgStore storageTypes.ImageStore
lastRepo string
nextRun time.Time
done bool
rand *rand.Rand
}
func (gen *GCTaskGenerator) getRandomDelay() int {
maxDelay := 30
return gen.rand.Intn(maxDelay)
}
func (gen *GCTaskGenerator) Next() (scheduler.Task, error) {
if gen.lastRepo == "" && gen.nextRun.IsZero() {
gen.rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) //nolint: gosec
}
delay := gen.getRandomDelay()
gen.nextRun = time.Now().Add(time.Duration(delay) * time.Second)
repo, err := gen.ImgStore.GetNextRepository(gen.lastRepo)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
if repo == "" {
gen.done = true
return nil, nil
}
gen.lastRepo = repo
return NewGCTask(gen.ImgStore, repo), nil
}
func (gen *GCTaskGenerator) IsDone() bool {
return gen.done
}
func (gen *GCTaskGenerator) IsReady() bool {
return time.Now().After(gen.nextRun)
}
func (gen *GCTaskGenerator) Reset() {
gen.lastRepo = ""
gen.done = false
gen.nextRun = time.Time{}
}
type gcTask struct {
imgStore storageTypes.ImageStore
repo string
}
func NewGCTask(imgStore storageTypes.ImageStore, repo string,
) *gcTask {
return &gcTask{imgStore, repo}
}
func (gct *gcTask) DoWork() error {
// run task
return gct.imgStore.RunGCRepo(gct.repo)
}

View file

@ -77,36 +77,6 @@ func TestValidateManifest(t *testing.T) {
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
}) })
Convey("bad config blob", func() {
manifest := ispec.Manifest{
Config: ispec.Descriptor{
MediaType: ispec.MediaTypeImageConfig,
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: ispec.MediaTypeImageLayer,
Digest: digest,
Size: int64(len(content)),
},
},
}
manifest.SchemaVersion = 2
configBlobPath := imgStore.BlobPath("test", cdigest)
err := os.WriteFile(configBlobPath, []byte("bad config blob"), 0o000)
So(err, ShouldBeNil)
body, err := json.Marshal(manifest)
So(err, ShouldBeNil)
_, _, err = imgStore.PutImageManifest("test", "1.0", ispec.MediaTypeImageManifest, body)
So(err, ShouldNotBeNil)
})
Convey("manifest with non-distributable layers", func() { Convey("manifest with non-distributable layers", func() {
content := []byte("this blob doesn't exist") content := []byte("this blob doesn't exist")
digest := godigest.FromBytes(content) digest := godigest.FromBytes(content)

View file

@ -20,5 +20,6 @@ const (
BoltdbName = "cache" BoltdbName = "cache"
DynamoDBDriverName = "dynamodb" DynamoDBDriverName = "dynamodb"
DefaultGCDelay = 1 * time.Hour DefaultGCDelay = 1 * time.Hour
DefaultGCInterval = 1 * time.Hour
S3StorageDriverName = "s3" S3StorageDriverName = "s3"
) )

View file

@ -593,12 +593,6 @@ func (is *ImageStoreLocal) PutImageManifest(repo, reference, mediaType string, /
return "", "", err return "", "", err
} }
if is.gc {
if err := is.garbageCollect(dir, repo); err != nil {
return "", "", err
}
}
return desc.Digest, subjectDigest, nil return desc.Digest, subjectDigest, nil
} }
@ -650,12 +644,6 @@ func (is *ImageStoreLocal) DeleteImageManifest(repo, reference string, detectCol
return err return err
} }
if is.gc {
if err := is.garbageCollect(dir, repo); err != nil {
return err
}
}
// Delete blob only when blob digest not present in manifest entry. // Delete blob only when blob digest not present in manifest entry.
// e.g. 1.0.1 & 1.0.2 have same blob digest so if we delete 1.0.1, blob should not be removed. // e.g. 1.0.1 & 1.0.2 have same blob digest so if we delete 1.0.1, blob should not be removed.
toDelete := true toDelete := true
@ -1812,58 +1800,12 @@ func (is *ImageStoreLocal) RunGCRepo(repo string) error {
} }
func (is *ImageStoreLocal) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) { func (is *ImageStoreLocal) RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler) {
generator := &taskGenerator{ generator := &common.GCTaskGenerator{
imgStore: is, ImgStore: is,
} }
sch.SubmitGenerator(generator, interval, scheduler.MediumPriority) sch.SubmitGenerator(generator, interval, scheduler.MediumPriority)
} }
type taskGenerator struct {
imgStore *ImageStoreLocal
lastRepo string
done bool
}
func (gen *taskGenerator) Next() (scheduler.Task, error) {
repo, err := gen.imgStore.GetNextRepository(gen.lastRepo)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
if repo == "" {
gen.done = true
return nil, nil
}
gen.lastRepo = repo
return newGCTask(gen.imgStore, repo), nil
}
func (gen *taskGenerator) IsDone() bool {
return gen.done
}
func (gen *taskGenerator) Reset() {
gen.lastRepo = ""
gen.done = false
}
type gcTask struct {
imgStore *ImageStoreLocal
repo string
}
func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask {
return &gcTask{imgStore, repo}
}
func (gcT *gcTask) DoWork() error {
return gcT.imgStore.RunGCRepo(gcT.repo)
}
func (is *ImageStoreLocal) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest, func (is *ImageStoreLocal) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest,
) (godigest.Digest, []string, error) { ) (godigest.Digest, []string, error) {
var lockLatency time.Time var lockLatency time.Time

View file

@ -2040,6 +2040,98 @@ func TestInjectWriteFile(t *testing.T) {
}) })
} }
func TestGCInjectFailure(t *testing.T) {
Convey("code coverage: error inside garbageCollect method of img store", t, func() {
dir := t.TempDir()
logFile, _ := os.CreateTemp("", "zot-log*.txt")
defer os.Remove(logFile.Name()) // clean up
log := log.NewLogger("debug", logFile.Name())
metrics := monitoring.NewMetricsServer(false, log)
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: dir,
Name: "cache",
UseRelPaths: true,
}, log)
imgStore := local.NewImageStore(dir, true, storageConstants.DefaultGCDelay,
true, true, log, metrics, nil, cacheDriver)
repoName := "test-gc"
upload, err := imgStore.NewBlobUpload(repoName)
So(err, ShouldBeNil)
So(upload, ShouldNotBeEmpty)
content := []byte("test-data1")
buf := bytes.NewBuffer(content)
buflen := buf.Len()
bdigest := godigest.FromBytes(content)
blob, err := imgStore.PutBlobChunk(repoName, upload, 0, int64(buflen), buf)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
err = imgStore.FinishBlobUpload(repoName, upload, buf, bdigest)
So(err, ShouldBeNil)
annotationsMap := make(map[string]string)
annotationsMap[ispec.AnnotationRefName] = tag
cblob, cdigest := test.GetRandomImageConfig()
_, clen, err := imgStore.FullBlobUpload(repoName, bytes.NewReader(cblob), cdigest)
So(err, ShouldBeNil)
So(clen, ShouldEqual, len(cblob))
hasBlob, _, err := imgStore.CheckBlob(repoName, cdigest)
So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true)
manifest := ispec.Manifest{
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: bdigest,
Size: int64(buflen),
},
},
Annotations: annotationsMap,
}
manifest.SchemaVersion = 2
manifestBuf, err := json.Marshal(manifest)
So(err, ShouldBeNil)
_, _, err = imgStore.PutImageManifest(repoName, tag, ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil)
// umoci.OpenLayout error
injected := inject.InjectFailure(0)
err = imgStore.RunGCRepo(repoName)
if injected {
So(err, ShouldNotBeNil)
} else {
So(err, ShouldBeNil)
}
// oci.GC
injected = inject.InjectFailure(1)
err = imgStore.RunGCRepo(repoName)
if injected {
So(err, ShouldNotBeNil)
} else {
So(err, ShouldBeNil)
}
})
}
func TestGarbageCollect(t *testing.T) { func TestGarbageCollect(t *testing.T) {
Convey("Repo layout", t, func(c C) { Convey("Repo layout", t, func(c C) {
dir := t.TempDir() dir := t.TempDir()
@ -2108,6 +2200,9 @@ func TestGarbageCollect(t *testing.T) {
_, _, err = imgStore.PutImageManifest(repoName, tag, ispec.MediaTypeImageManifest, manifestBuf) _, _, err = imgStore.PutImageManifest(repoName, tag, ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = imgStore.RunGCRepo(repoName)
So(err, ShouldBeNil)
hasBlob, _, err = imgStore.CheckBlob(repoName, bdigest) hasBlob, _, err = imgStore.CheckBlob(repoName, bdigest)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true) So(hasBlob, ShouldEqual, true)
@ -2115,6 +2210,9 @@ func TestGarbageCollect(t *testing.T) {
err = imgStore.DeleteImageManifest(repoName, digest.String(), false) err = imgStore.DeleteImageManifest(repoName, digest.String(), false)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = imgStore.RunGCRepo(repoName)
So(err, ShouldBeNil)
hasBlob, _, err = imgStore.CheckBlob(repoName, bdigest) hasBlob, _, err = imgStore.CheckBlob(repoName, bdigest)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true) So(hasBlob, ShouldEqual, true)
@ -2201,6 +2299,9 @@ func TestGarbageCollect(t *testing.T) {
_, _, err = imgStore.PutImageManifest(repoName, tag, ispec.MediaTypeImageManifest, manifestBuf) _, _, err = imgStore.PutImageManifest(repoName, tag, ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = imgStore.RunGCRepo(repoName)
So(err, ShouldBeNil)
hasBlob, _, err = imgStore.CheckBlob(repoName, odigest) hasBlob, _, err = imgStore.CheckBlob(repoName, odigest)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(hasBlob, ShouldEqual, false) So(hasBlob, ShouldEqual, false)
@ -2223,6 +2324,9 @@ func TestGarbageCollect(t *testing.T) {
err = imgStore.DeleteImageManifest(repoName, digest.String(), false) err = imgStore.DeleteImageManifest(repoName, digest.String(), false)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = imgStore.RunGCRepo(repoName)
So(err, ShouldBeNil)
hasBlob, _, err = imgStore.CheckBlob(repoName, bdigest) hasBlob, _, err = imgStore.CheckBlob(repoName, bdigest)
So(err, ShouldNotBeNil) So(err, ShouldNotBeNil)
So(hasBlob, ShouldEqual, false) So(hasBlob, ShouldEqual, false)
@ -2360,7 +2464,7 @@ func TestGarbageCollect(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true) So(hasBlob, ShouldEqual, true)
// immediately upload any other image to second repo which should invoke GC inline, but expect layers to persist // immediately upload any other image to second repo and run GC, but expect layers to persist
upload, err = imgStore.NewBlobUpload(repo2Name) upload, err = imgStore.NewBlobUpload(repo2Name)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -2413,6 +2517,9 @@ func TestGarbageCollect(t *testing.T) {
_, _, err = imgStore.PutImageManifest(repo2Name, tag, ispec.MediaTypeImageManifest, manifestBuf) _, _, err = imgStore.PutImageManifest(repo2Name, tag, ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil) So(err, ShouldBeNil)
err = imgStore.RunGCRepo(repo2Name)
So(err, ShouldBeNil)
// original blob should exist // original blob should exist
hasBlob, _, err = imgStore.CheckBlob(repo2Name, tdigest) hasBlob, _, err = imgStore.CheckBlob(repo2Name, tdigest)
@ -2494,6 +2601,64 @@ func TestGarbageCollectForImageStore(t *testing.T) {
fmt.Sprintf("error while running GC for %s", path.Join(imgStore.RootDir(), repoName))) fmt.Sprintf("error while running GC for %s", path.Join(imgStore.RootDir(), repoName)))
So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o755), ShouldBeNil) So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o755), ShouldBeNil)
}) })
Convey("Garbage collect - the manifest which the reference points to can be found", func() {
logFile, _ := os.CreateTemp("", "zot-log*.txt")
defer os.Remove(logFile.Name()) // clean up
log := log.NewLogger("debug", logFile.Name())
metrics := monitoring.NewMetricsServer(false, log)
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: dir,
Name: "cache",
UseRelPaths: true,
}, log)
imgStore := local.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics, nil, cacheDriver)
repoName := "gc-sig"
storeController := storage.StoreController{DefaultStore: imgStore}
img := test.CreateRandomImage()
err := test.WriteImageToFileSystem(img, repoName, "tag1", storeController)
So(err, ShouldBeNil)
// add fake signature for tag1
cosignTag, err := test.GetCosignSignatureTagForManifest(img.Manifest)
So(err, ShouldBeNil)
cosignSig := test.CreateRandomImage()
So(err, ShouldBeNil)
err = test.WriteImageToFileSystem(cosignSig, repoName, cosignTag, storeController)
So(err, ShouldBeNil)
// add sbom
manifestBlob, err := json.Marshal(img.Manifest)
So(err, ShouldBeNil)
manifestDigest := godigest.FromBytes(manifestBlob)
sbomTag := fmt.Sprintf("sha256-%s.%s", manifestDigest.Encoded(), "sbom")
So(err, ShouldBeNil)
sbomImg := test.CreateRandomImage()
So(err, ShouldBeNil)
err = test.WriteImageToFileSystem(sbomImg, repoName, sbomTag, storeController)
So(err, ShouldBeNil)
// add fake signature for tag1
notationSig := test.CreateImageWith().
RandomLayers(1, 10).
ArtifactConfig("application/vnd.cncf.notary.signature").
Subject(img.DescriptorRef()).Build()
err = test.WriteImageToFileSystem(notationSig, repoName, "notation", storeController)
So(err, ShouldBeNil)
err = imgStore.RunGCRepo(repoName)
So(err, ShouldBeNil)
})
}) })
} }

View file

@ -61,7 +61,7 @@ function teardown() {
wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog" wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog"
# wait for scrub to be done and logs to get populated # wait for scrub to be done and logs to get populated
run sleep 10s run sleep 15s
run not_affected run not_affected
[ "$status" -eq 0 ] [ "$status" -eq 0 ]
[ $(echo "${lines[0]}" ) = 'true' ] [ $(echo "${lines[0]}" ) = 'true' ]
@ -76,7 +76,7 @@ function teardown() {
wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog" wait_zot_reachable "http://127.0.0.1:8080/v2/_catalog"
# wait for scrub to be done and logs to get populated # wait for scrub to be done and logs to get populated
run sleep 10s run sleep 15s
run affected run affected
[ "$status" -eq 0 ] [ "$status" -eq 0 ]
[ $(echo "${lines[0]}" ) = 'true' ] [ $(echo "${lines[0]}" ) = 'true' ]