mirror of
https://github.com/project-zot/zot.git
synced 2025-03-11 02:17:43 -05:00
refactor: remove global locking feature from the imagestore
Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
This commit is contained in:
parent
501c79b72d
commit
2ae3f0e26f
7 changed files with 118 additions and 172 deletions
|
@ -89,40 +89,6 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo
|
|||
return imgStore
|
||||
}
|
||||
|
||||
// RLock read-lock.
|
||||
func (is *ImageStore) RLock(lockStart *time.Time) {
|
||||
*lockStart = time.Now()
|
||||
|
||||
is.lock.RLock()
|
||||
}
|
||||
|
||||
// RUnlock read-unlock.
|
||||
func (is *ImageStore) RUnlock(lockStart *time.Time) {
|
||||
is.lock.RUnlock()
|
||||
|
||||
lockEnd := time.Now()
|
||||
// includes time spent in acquiring and holding a lock
|
||||
latency := lockEnd.Sub(*lockStart)
|
||||
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram
|
||||
}
|
||||
|
||||
// Lock write-lock.
|
||||
func (is *ImageStore) Lock(lockStart *time.Time) {
|
||||
*lockStart = time.Now()
|
||||
|
||||
is.lock.Lock()
|
||||
}
|
||||
|
||||
// Unlock write-unlock.
|
||||
func (is *ImageStore) Unlock(lockStart *time.Time) {
|
||||
is.lock.Unlock()
|
||||
|
||||
lockEnd := time.Now()
|
||||
// includes time spent in acquiring and holding a lock
|
||||
latency := lockEnd.Sub(*lockStart)
|
||||
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
|
||||
}
|
||||
|
||||
// RLock read-lock for specific repo.
|
||||
func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) {
|
||||
*lockStart = time.Now()
|
||||
|
@ -296,13 +262,12 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) {
|
|||
|
||||
// GetRepositories returns a list of all the repositories under this store.
|
||||
func (is *ImageStore) GetRepositories() ([]string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
// Ideally this function would lock while walking in order to avoid concurrency issues
|
||||
// but we can't lock everything as we don't have a valid list of all repositories
|
||||
// let's assume the result of this function is a best effort and some repos may be
|
||||
// added or removed by the time it returns
|
||||
dir := is.rootDir
|
||||
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
stores := make([]string, 0)
|
||||
|
||||
err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
|
||||
|
@ -326,7 +291,9 @@ func (is *ImageStore) GetRepositories() ([]string, error) {
|
|||
return nil //nolint:nilerr // ignore invalid repos
|
||||
}
|
||||
|
||||
stores = append(stores, rel)
|
||||
if !zcommon.Contains(stores, rel) {
|
||||
stores = append(stores, rel)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -342,13 +309,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) {
|
|||
|
||||
// GetNextRepository returns next repository under this store.
|
||||
func (is *ImageStore) GetNextRepository(repo string) (string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
// Ideally this function would lock while walking in order to avoid concurrency issues
|
||||
// but we can't lock everything as we don't have a valid list of all repositories
|
||||
// let's assume the result of this function is a best effort and some repos may be
|
||||
// added or removed by the time it returns
|
||||
dir := is.rootDir
|
||||
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
_, err := is.storeDriver.List(dir)
|
||||
if err != nil {
|
||||
if errors.As(err, &driver.PathNotFoundError{}) {
|
||||
|
@ -1190,8 +1156,6 @@ func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string {
|
|||
}
|
||||
|
||||
func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
if err := digest.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1200,9 +1164,6 @@ func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]str
|
|||
return nil, nil //nolint:nilnil
|
||||
}
|
||||
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
blobsPaths, err := is.cache.GetAllBlobs(digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1799,21 +1760,21 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]godigest.Digest, error) {
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest,
|
||||
func (is *ImageStore) GetNextDigestWithBlobPaths(allRepos []string, lastDigests []godigest.Digest,
|
||||
) (godigest.Digest, []string, []string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := is.rootDir
|
||||
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
for _, repo := range allRepos {
|
||||
is.RLockRepo(repo, &lockLatency)
|
||||
defer is.RUnlockRepo(repo, &lockLatency)
|
||||
}
|
||||
|
||||
var duplicateBlobs, duplicateRepos []string
|
||||
|
||||
var digest godigest.Digest
|
||||
|
||||
var repo string
|
||||
|
||||
err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
|
||||
// skip blobs under .sync and .uploads
|
||||
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
|
||||
|
@ -1821,19 +1782,40 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
|
|||
return driver.ErrSkipDir
|
||||
}
|
||||
|
||||
if fileInfo.IsDir() {
|
||||
// skip repositories not found in repos
|
||||
repo = path.Base(fileInfo.Path())
|
||||
if !zcommon.Contains(repos, repo) && repo != ispec.ImageBlobsDir {
|
||||
candidateAlgorithm := godigest.Algorithm(repo)
|
||||
if strings.HasSuffix(fileInfo.Path(), ispec.ImageLayoutFile) ||
|
||||
strings.HasSuffix(fileInfo.Path(), ispec.ImageIndexFile) ||
|
||||
strings.HasSuffix(fileInfo.Path(), ".db") {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !candidateAlgorithm.Available() {
|
||||
return driver.ErrSkipDir
|
||||
}
|
||||
// the path is always under root dir because the walk function walks the root dir
|
||||
rel, _ := filepath.Rel(is.rootDir, fileInfo.Path())
|
||||
|
||||
if fileInfo.IsDir() {
|
||||
if fileInfo.Path() == is.rootDir || zcommon.Contains(allRepos, rel) {
|
||||
// this is the root directory or a repo, go deeped into subfolders
|
||||
return nil
|
||||
}
|
||||
|
||||
// attempt to determine is the base folder
|
||||
lastFolderInPath := path.Base(rel)
|
||||
if lastFolderInPath == ispec.ImageBlobsDir {
|
||||
// this is the blobs dir, go deeper into subfolders
|
||||
return nil
|
||||
}
|
||||
|
||||
// this is not the root dir, a repo, or a blobs dir
|
||||
// it is also unclear if we are under a repo, as this could be .trivy
|
||||
// skip entire directory if the base name does not match a valid hash algorithm
|
||||
candidateAlgorithm := godigest.Algorithm(lastFolderInPath)
|
||||
if !candidateAlgorithm.Available() {
|
||||
return driver.ErrSkipDir
|
||||
} else {
|
||||
// this is the folder sha256 or similar
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
repo = path.Dir(path.Dir(fileInfo.Path()))
|
||||
digestHash := path.Base(fileInfo.Path())
|
||||
digestAlgorithm := godigest.Algorithm(path.Base(path.Dir(fileInfo.Path())))
|
||||
|
||||
|
@ -1853,6 +1835,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
|
|||
if blobDigest == digest {
|
||||
duplicateBlobs = append(duplicateBlobs, fileInfo.Path())
|
||||
|
||||
repo := path.Dir(path.Dir(path.Dir(rel)))
|
||||
if !zcommon.Contains(duplicateRepos, repo) {
|
||||
duplicateRepos = append(duplicateRepos, repo)
|
||||
}
|
||||
|
@ -2047,8 +2030,10 @@ func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Di
|
|||
) error {
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
for _, repo := range duplicateRepos {
|
||||
is.LockRepo(repo, &lockLatency)
|
||||
defer is.UnlockRepo(repo, &lockLatency)
|
||||
}
|
||||
|
||||
if dedupe {
|
||||
return is.dedupeBlobs(ctx, digest, duplicateBlobs)
|
||||
|
|
|
@ -6,93 +6,92 @@ import (
|
|||
|
||||
type ImageStoreLock struct {
|
||||
// locks per repository paths
|
||||
repoLocks sync.Map
|
||||
// lock for the entire storage, needed in case all repos need to be processed
|
||||
// including blocking creating new repos
|
||||
globalLock *sync.RWMutex
|
||||
repoLocks map[string]*sync.RWMutex
|
||||
// lock for managing the content of the repo lock map
|
||||
internalLock *sync.Mutex
|
||||
}
|
||||
|
||||
func NewImageStoreLock() *ImageStoreLock {
|
||||
return &ImageStoreLock{
|
||||
repoLocks: sync.Map{},
|
||||
globalLock: &sync.RWMutex{},
|
||||
repoLocks: map[string]*sync.RWMutex{},
|
||||
internalLock: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) RLock() {
|
||||
// block reads and writes to the entire storage, including new repos
|
||||
sl.globalLock.RLock()
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) RUnlock() {
|
||||
// unlock to the storage in general
|
||||
sl.globalLock.RUnlock()
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) Lock() {
|
||||
// block reads and writes to the entire storage, including new repos
|
||||
sl.globalLock.Lock()
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) Unlock() {
|
||||
// unlock to the storage in general
|
||||
sl.globalLock.Unlock()
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) RLockRepo(repo string) {
|
||||
// besides the individual repo increment the read counter for the
|
||||
// global lock, this will make sure the storage cannot be
|
||||
// write-locked at global level while individual repos are accessed
|
||||
sl.globalLock.RLock()
|
||||
|
||||
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
|
||||
repoLock, _ := sl.loadLock(repo)
|
||||
|
||||
// lock individual repo
|
||||
repoLock, _ := val.(*sync.RWMutex)
|
||||
repoLock.RLock()
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) RUnlockRepo(repo string) {
|
||||
val, ok := sl.repoLocks.Load(repo)
|
||||
repoLock, ok := sl.loadLock(repo)
|
||||
if !ok {
|
||||
// somehow the unlock is called for repo that was never locked
|
||||
// somehow the unlock is called for a repo that was not locked
|
||||
return
|
||||
}
|
||||
|
||||
// read-unlock individual repo
|
||||
repoLock, _ := val.(*sync.RWMutex)
|
||||
repoLock.RUnlock()
|
||||
|
||||
// decrement the global read counter after the one for the individual repo is decremented
|
||||
sl.globalLock.RUnlock()
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) LockRepo(repo string) {
|
||||
// besides the individual repo increment the read counter for the
|
||||
// global lock, this will make sure the storage cannot be
|
||||
// write-locked at global level while individual repos are accessed
|
||||
// we are not using the write lock here, as that would make all repos
|
||||
// wait for one another
|
||||
sl.globalLock.RLock()
|
||||
|
||||
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
|
||||
repoLock, _ := sl.loadLock(repo)
|
||||
|
||||
// write-lock individual repo
|
||||
repoLock, _ := val.(*sync.RWMutex)
|
||||
repoLock.Lock()
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) UnlockRepo(repo string) {
|
||||
val, ok := sl.repoLocks.Load(repo)
|
||||
repoLock, ok := sl.loadLock(repo)
|
||||
if !ok {
|
||||
// somehow the unlock is called for a repo that was never locked
|
||||
// somehow the unlock is called for a repo that was not locked
|
||||
return
|
||||
}
|
||||
|
||||
// write-unlock individual repo
|
||||
repoLock, _ := val.(*sync.RWMutex)
|
||||
repoLock.Unlock()
|
||||
|
||||
// decrement the global read counter after the individual repo was unlocked
|
||||
sl.globalLock.RUnlock()
|
||||
// attempt to clean up the map of unused locks
|
||||
sl.discardLockIfPossible(repo)
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) loadLock(repo string) (*sync.RWMutex, bool) {
|
||||
sl.internalLock.Lock()
|
||||
defer sl.internalLock.Unlock()
|
||||
|
||||
repoLock, ok := sl.repoLocks[repo]
|
||||
if !ok {
|
||||
sl.repoLocks[repo] = &sync.RWMutex{}
|
||||
repoLock = sl.repoLocks[repo]
|
||||
}
|
||||
|
||||
return repoLock, ok
|
||||
}
|
||||
|
||||
func (sl *ImageStoreLock) discardLockIfPossible(repo string) {
|
||||
sl.internalLock.Lock()
|
||||
defer sl.internalLock.Unlock()
|
||||
|
||||
repoLock, ok := sl.repoLocks[repo]
|
||||
if !ok {
|
||||
// the lock is not set, no need to do anything else
|
||||
return
|
||||
}
|
||||
|
||||
// check if the lock is in use
|
||||
// this is a non-blocking operation if someone else is already blocking the lock
|
||||
// the internalLock prevents the case where someone else attempts
|
||||
// to load/block the lock after this function started executing
|
||||
ok = repoLock.TryLock()
|
||||
if !ok {
|
||||
// if someone else is using this lock, it is still needed, keep it as is
|
||||
return
|
||||
}
|
||||
// final unlock
|
||||
defer repoLock.Unlock()
|
||||
|
||||
// nobody else is using this lock, remove it from the map
|
||||
delete(sl.repoLocks, repo)
|
||||
}
|
||||
|
|
|
@ -189,7 +189,13 @@ func (driver *Driver) WriteFile(filepath string, content []byte) (int, error) {
|
|||
func (driver *Driver) Walk(path string, walkFn storagedriver.WalkFn) error {
|
||||
children, err := driver.List(path)
|
||||
if err != nil {
|
||||
return err
|
||||
switch errors.As(err, &storagedriver.PathNotFoundError{}) {
|
||||
case true:
|
||||
// repository was removed in between listing and enumeration. Ignore it.
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sort.Stable(sort.StringSlice(children))
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"path"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -889,33 +888,6 @@ func TestStorageAPIs(t *testing.T) {
|
|||
_, _, err = imgStore.PutImageManifest("replace", "1.0", ispec.MediaTypeImageManifest, manifestBuf)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Locks", func() {
|
||||
// in parallel, a mix of read and write locks - mainly for coverage
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
var lockLatency time.Time
|
||||
|
||||
defer wg.Done()
|
||||
imgStore.Lock(&lockLatency)
|
||||
func() {}()
|
||||
imgStore.Unlock(&lockLatency)
|
||||
}()
|
||||
go func() {
|
||||
var lockLatency time.Time
|
||||
|
||||
defer wg.Done()
|
||||
imgStore.RLock(&lockLatency)
|
||||
func() {}()
|
||||
imgStore.RUnlock(&lockLatency)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -22,10 +22,6 @@ type ImageStore interface { //nolint:interfacebloat
|
|||
Name() string
|
||||
DirExists(d string) bool
|
||||
RootDir() string
|
||||
RLock(*time.Time)
|
||||
RUnlock(*time.Time)
|
||||
Lock(*time.Time)
|
||||
Unlock(*time.Time)
|
||||
RLockRepo(repo string, lockStart *time.Time)
|
||||
RUnlockRepo(repo string, lockStart *time.Time)
|
||||
LockRepo(repo string, lockStart *time.Time)
|
||||
|
|
|
@ -71,18 +71,6 @@ func (is MockedImageStore) StatIndex(repo string) (bool, int64, time.Time, error
|
|||
return true, 0, time.Time{}, nil
|
||||
}
|
||||
|
||||
func (is MockedImageStore) Lock(t *time.Time) {
|
||||
}
|
||||
|
||||
func (is MockedImageStore) Unlock(t *time.Time) {
|
||||
}
|
||||
|
||||
func (is MockedImageStore) RUnlock(t *time.Time) {
|
||||
}
|
||||
|
||||
func (is MockedImageStore) RLock(t *time.Time) {
|
||||
}
|
||||
|
||||
func (is MockedImageStore) LockRepo(repo string, t *time.Time) {
|
||||
}
|
||||
|
||||
|
|
|
@ -102,8 +102,8 @@ func (olu BaseOciLayoutUtils) GetImageManifests(repo string) ([]ispec.Descriptor
|
|||
|
||||
imageStore := olu.StoreController.GetImageStore(repo)
|
||||
|
||||
imageStore.RLock(&lockLatency)
|
||||
defer imageStore.RUnlock(&lockLatency)
|
||||
imageStore.RLockRepo(repo, &lockLatency)
|
||||
defer imageStore.RUnlockRepo(repo, &lockLatency)
|
||||
|
||||
buf, err := imageStore.GetIndexContent(repo)
|
||||
if err != nil {
|
||||
|
@ -137,8 +137,8 @@ func (olu BaseOciLayoutUtils) GetImageBlobManifest(repo string, digest godigest.
|
|||
|
||||
imageStore := olu.StoreController.GetImageStore(repo)
|
||||
|
||||
imageStore.RLock(&lockLatency)
|
||||
defer imageStore.RUnlock(&lockLatency)
|
||||
imageStore.RLockRepo(repo, &lockLatency)
|
||||
defer imageStore.RUnlockRepo(repo, &lockLatency)
|
||||
|
||||
blobBuf, err := imageStore.GetBlobContent(repo, digest)
|
||||
if err != nil {
|
||||
|
@ -163,8 +163,8 @@ func (olu BaseOciLayoutUtils) GetImageInfo(repo string, configDigest godigest.Di
|
|||
|
||||
imageStore := olu.StoreController.GetImageStore(repo)
|
||||
|
||||
imageStore.RLock(&lockLatency)
|
||||
defer imageStore.RUnlock(&lockLatency)
|
||||
imageStore.RLockRepo(repo, &lockLatency)
|
||||
defer imageStore.RUnlockRepo(repo, &lockLatency)
|
||||
|
||||
blobBuf, err := imageStore.GetBlobContent(repo, configDigest)
|
||||
if err != nil {
|
||||
|
@ -323,8 +323,8 @@ func (olu BaseOciLayoutUtils) GetImageManifestSize(repo string, manifestDigest g
|
|||
|
||||
var lockLatency time.Time
|
||||
|
||||
imageStore.RLock(&lockLatency)
|
||||
defer imageStore.RUnlock(&lockLatency)
|
||||
imageStore.RLockRepo(repo, &lockLatency)
|
||||
defer imageStore.RUnlockRepo(repo, &lockLatency)
|
||||
|
||||
manifestBlob, err := imageStore.GetBlobContent(repo, manifestDigest)
|
||||
if err != nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue