0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-03-11 02:17:43 -05:00

refactor: more work on locks

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
This commit is contained in:
Andrei Aaron 2024-08-17 10:00:52 +00:00
parent 54465e93df
commit 501c79b72d
No known key found for this signature in database
GPG key ID: B095BFF89FD83228
8 changed files with 83 additions and 59 deletions

View file

@ -825,9 +825,6 @@ type DedupeTaskGenerator struct {
ImgStore storageTypes.ImageStore
// storage dedupe value
Dedupe bool
// store blobs paths grouped by digest
digest godigest.Digest
duplicateBlobs []string
/* store processed digest, used for iterating duplicateBlobs one by one
and generating a task for each unprocessed one*/
lastDigests []godigest.Digest
@ -866,7 +863,7 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
}
// get all blobs from storage.imageStore and group them by digest
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
digest, duplicateBlobs, duplicateRepos, err := gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
if err != nil {
gen.Log.Error().Err(err).Str("component", "dedupe").Msg("failed to get next digest")
@ -874,7 +871,7 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
}
// if no digests left, then mark the task generator as done
if gen.digest == "" {
if digest == "" {
gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished")
gen.done = true
@ -883,10 +880,10 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
}
// mark digest as processed before running its task
gen.lastDigests = append(gen.lastDigests, gen.digest)
gen.lastDigests = append(gen.lastDigests, digest)
// generate rebuild dedupe task for this digest
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil
return newDedupeTask(gen.ImgStore, digest, gen.Dedupe, duplicateBlobs, duplicateRepos, gen.Log), nil
}
func (gen *DedupeTaskGenerator) IsDone() bool {
@ -899,9 +896,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool {
func (gen *DedupeTaskGenerator) Reset() {
gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{}
gen.repos = []string{}
gen.digest = ""
gen.done = false
}
@ -911,19 +906,21 @@ type dedupeTask struct {
digest godigest.Digest
// blobs paths with the same digest ^
duplicateBlobs []string
duplicateRepos []string
dedupe bool
log zlog.Logger
}
func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, dedupe bool,
duplicateBlobs []string, log zlog.Logger,
duplicateBlobs, duplicateRepos []string, log zlog.Logger,
) *dedupeTask {
return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log}
return &dedupeTask{imgStore, digest, duplicateBlobs, duplicateRepos, dedupe, log}
}
func (dt *dedupeTask) DoWork(ctx context.Context) error {
// run task
err := dt.imgStore.RunDedupeForDigest(ctx, dt.digest, dt.dedupe, dt.duplicateBlobs) //nolint: contextcheck
err := dt.imgStore.RunDedupeForDigest(ctx, dt.digest, dt.dedupe, dt.duplicateBlobs, //nolint: contextcheck
dt.duplicateRepos)
if err != nil {
// log it
dt.log.Error().Err(err).Str("digest", dt.digest.String()).Str("component", "dedupe").

View file

@ -489,9 +489,9 @@ func TestDedupeGeneratorErrors(t *testing.T) {
return []string{"repo1", "repo2"}, nil
},
GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) (
godigest.Digest, []string, error,
godigest.Digest, []string, []string, error,
) {
return "sha256:123", []string{}, ErrTestError
return "sha256:123", []string{}, []string{}, ErrTestError
},
}

View file

@ -123,7 +123,7 @@ func (is *ImageStore) Unlock(lockStart *time.Time) {
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
}
// RLock read-lock for specific repo
// RLock read-lock for specific repo.
func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) {
*lockStart = time.Now()
@ -140,14 +140,14 @@ func (is *ImageStore) RUnlockRepo(repo string, lockStart *time.Time) {
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram
}
// Lock write-lock for specific repo..
// Lock write-lock for specific repo.
func (is *ImageStore) LockRepo(repo string, lockStart *time.Time) {
*lockStart = time.Now()
is.lock.LockRepo(repo)
}
// Unlock write-unlock for specific repo..
// Unlock write-unlock for specific repo.
func (is *ImageStore) UnlockRepo(repo string, lockStart *time.Time) {
is.lock.UnlockRepo(repo)
@ -448,6 +448,7 @@ func (is *ImageStore) GetImageManifest(repo, reference string) ([]byte, godigest
var err error
is.RLockRepo(repo, &lockLatency)
defer func() {
is.RUnlockRepo(repo, &lockLatency)
@ -500,6 +501,7 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli
var err error
is.LockRepo(repo, &lockLatency)
defer func() {
is.UnlockRepo(repo, &lockLatency)
@ -1798,7 +1800,7 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]godigest.Digest, error) {
}
func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest,
) (godigest.Digest, []string, error) {
) (godigest.Digest, []string, []string, error) {
var lockLatency time.Time
dir := is.rootDir
@ -1806,10 +1808,12 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
var duplicateBlobs []string
var duplicateBlobs, duplicateRepos []string
var digest godigest.Digest
var repo string
err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
// skip blobs under .sync and .uploads
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
@ -1819,7 +1823,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
if fileInfo.IsDir() {
// skip repositories not found in repos
repo := path.Base(fileInfo.Path())
repo = path.Base(fileInfo.Path())
if !zcommon.Contains(repos, repo) && repo != ispec.ImageBlobsDir {
candidateAlgorithm := godigest.Algorithm(repo)
@ -1829,6 +1833,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
}
}
repo = path.Dir(path.Dir(fileInfo.Path()))
digestHash := path.Base(fileInfo.Path())
digestAlgorithm := godigest.Algorithm(path.Base(path.Dir(fileInfo.Path())))
@ -1847,6 +1852,10 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
if blobDigest == digest {
duplicateBlobs = append(duplicateBlobs, fileInfo.Path())
if !zcommon.Contains(duplicateRepos, repo) {
duplicateRepos = append(duplicateRepos, repo)
}
}
return nil
@ -1856,10 +1865,10 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
var perr driver.PathNotFoundError
if errors.As(err, &perr) {
return digest, duplicateBlobs, nil
return digest, duplicateBlobs, duplicateRepos, nil
}
return digest, duplicateBlobs, err
return digest, duplicateBlobs, duplicateRepos, err
}
func (is *ImageStore) getOriginalBlobFromDisk(duplicateBlobs []string) (string, error) {
@ -2034,7 +2043,7 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D
}
func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs []string,
duplicateBlobs []string, duplicateRepos []string,
) error {
var lockLatency time.Time

View file

@ -48,7 +48,7 @@ func (sl *ImageStoreLock) RLockRepo(repo string) {
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
// lock individual repo
repoLock := val.(*sync.RWMutex)
repoLock, _ := val.(*sync.RWMutex)
repoLock.RLock()
}
@ -60,7 +60,7 @@ func (sl *ImageStoreLock) RUnlockRepo(repo string) {
}
// read-unlock individual repo
repoLock := val.(*sync.RWMutex)
repoLock, _ := val.(*sync.RWMutex)
repoLock.RUnlock()
// decrement the global read counter after the one for the individual repo is decremented
@ -78,7 +78,7 @@ func (sl *ImageStoreLock) LockRepo(repo string) {
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
// write-lock individual repo
repoLock := val.(*sync.RWMutex)
repoLock, _ := val.(*sync.RWMutex)
repoLock.Lock()
}
@ -90,7 +90,7 @@ func (sl *ImageStoreLock) UnlockRepo(repo string) {
}
// write-unlock individual repo
repoLock := val.(*sync.RWMutex)
repoLock, _ := val.(*sync.RWMutex)
repoLock.Unlock()
// decrement the global read counter after the individual repo was unlocked

View file

@ -1280,11 +1280,16 @@ func TestDedupeLinks(t *testing.T) {
path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest2),
}
duplicateRepos := []string{
path.Join(dir, "dedupe1"),
}
// remove original blob so that it can not be statted
err := os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), godigest.Digest(blobDigest1), true, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), godigest.Digest(blobDigest1), true,
duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})

View file

@ -2252,10 +2252,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2301,10 +2302,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2350,10 +2352,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2396,10 +2399,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
Convey("Trigger Stat() error in dedupeBlobs()", func() {
@ -2441,10 +2445,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
})
@ -2492,10 +2497,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2540,10 +2546,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2588,10 +2595,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2603,7 +2611,7 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
_, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
_, _, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldNotBeNil)
})
@ -2695,10 +2703,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2717,10 +2726,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
@ -2735,10 +2745,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"},
[]godigest.Digest{})
So(err, ShouldBeNil)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs)
err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos)
So(err, ShouldNotBeNil)
})
})

View file

@ -64,8 +64,9 @@ type ImageStore interface { //nolint:interfacebloat
GetBlobContent(repo string, digest godigest.Digest) ([]byte, error)
GetReferrers(repo string, digest godigest.Digest, artifactTypes []string) (ispec.Index, error)
RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler)
RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, duplicateBlobs []string) error
GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error)
RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs, duplicateRepos []string) error
GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, []string, error)
GetAllBlobs(repo string) ([]godigest.Digest, error)
PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler)
VerifyBlobDigestValue(repo string, digest godigest.Digest) error

View file

@ -51,8 +51,9 @@ type MockedImageStore struct {
RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler)
RunDedupeBlobsFn func(interval time.Duration, sch *scheduler.Scheduler)
RunDedupeForDigestFn func(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs []string) error
GetNextDigestWithBlobPathsFn func(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error)
duplicateBlobs []string, duplicateRepos []string) error
GetNextDigestWithBlobPathsFn func(repos []string, lastDigests []godigest.Digest,
) (godigest.Digest, []string, []string, error)
GetAllBlobsFn func(repo string) ([]godigest.Digest, error)
CleanupRepoFn func(repo string, blobs []godigest.Digest, removeRepo bool) (int, error)
PutIndexContentFn func(repo string, index ispec.Index) error
@ -403,22 +404,22 @@ func (is MockedImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler
}
func (is MockedImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs []string,
duplicateBlobs []string, duplicateRepos []string,
) error {
if is.RunDedupeForDigestFn != nil {
return is.RunDedupeForDigestFn(ctx, digest, dedupe, duplicateBlobs)
return is.RunDedupeForDigestFn(ctx, digest, dedupe, duplicateBlobs, duplicateRepos)
}
return nil
}
func (is MockedImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest,
) (godigest.Digest, []string, error) {
) (godigest.Digest, []string, []string, error) {
if is.GetNextDigestWithBlobPathsFn != nil {
return is.GetNextDigestWithBlobPathsFn(repos, lastDigests)
}
return "", []string{}, nil
return "", []string{}, []string{}, nil
}
func (is MockedImageStore) CleanupRepo(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) {