From a775dcabd8b9aa321bafa0c4929500d3b12f3809 Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Sat, 17 Aug 2024 19:43:29 +0000 Subject: [PATCH] refactor: switch back to using a syncmap for locking Remove the logic to discard unused locks as it produced deadlocks Signed-off-by: Andrei Aaron --- pkg/storage/imagestore/lock.go | 63 +++++-------------------- pkg/storage/imagestore/lock_test.go | 71 +++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 52 deletions(-) create mode 100644 pkg/storage/imagestore/lock_test.go diff --git a/pkg/storage/imagestore/lock.go b/pkg/storage/imagestore/lock.go index 43546952..0ead2d58 100644 --- a/pkg/storage/imagestore/lock.go +++ b/pkg/storage/imagestore/lock.go @@ -6,92 +6,51 @@ import ( type ImageStoreLock struct { // locks per repository paths - repoLocks map[string]*sync.RWMutex - // lock for managing the content of the repo lock map - internalLock *sync.Mutex + repoLocks sync.Map } func NewImageStoreLock() *ImageStoreLock { return &ImageStoreLock{ - repoLocks: map[string]*sync.RWMutex{}, - internalLock: &sync.Mutex{}, + repoLocks: sync.Map{}, } } func (sl *ImageStoreLock) RLockRepo(repo string) { - repoLock, _ := sl.loadLock(repo) + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) // lock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.RLock() } func (sl *ImageStoreLock) RUnlockRepo(repo string) { - repoLock, ok := sl.loadLock(repo) + val, ok := sl.repoLocks.Load(repo) if !ok { - // somehow the unlock is called for a repo that was not locked + // somehow the unlock is called for repo that was not locked return } // read-unlock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.RUnlock() } func (sl *ImageStoreLock) LockRepo(repo string) { - repoLock, _ := sl.loadLock(repo) + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) // write-lock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.Lock() } func (sl *ImageStoreLock) UnlockRepo(repo string) { - repoLock, ok := sl.loadLock(repo) + val, ok := sl.repoLocks.Load(repo) if !ok { // somehow the unlock is called for a repo that was not locked return } // write-unlock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.Unlock() - - // attempt to clean up the map of unused locks - sl.discardLockIfPossible(repo) -} - -func (sl *ImageStoreLock) loadLock(repo string) (*sync.RWMutex, bool) { - sl.internalLock.Lock() - defer sl.internalLock.Unlock() - - repoLock, ok := sl.repoLocks[repo] - if !ok { - sl.repoLocks[repo] = &sync.RWMutex{} - repoLock = sl.repoLocks[repo] - } - - return repoLock, ok -} - -func (sl *ImageStoreLock) discardLockIfPossible(repo string) { - sl.internalLock.Lock() - defer sl.internalLock.Unlock() - - repoLock, ok := sl.repoLocks[repo] - if !ok { - // the lock is not set, no need to do anything else - return - } - - // check if the lock is in use - // this is a non-blocking operation if someone else is already blocking the lock - // the internalLock prevents the case where someone else attempts - // to load/block the lock after this function started executing - ok = repoLock.TryLock() - if !ok { - // if someone else is using this lock, it is still needed, keep it as is - return - } - // final unlock - defer repoLock.Unlock() - - // nobody else is using this lock, remove it from the map - delete(sl.repoLocks, repo) } diff --git a/pkg/storage/imagestore/lock_test.go b/pkg/storage/imagestore/lock_test.go new file mode 100644 index 00000000..939371a9 --- /dev/null +++ b/pkg/storage/imagestore/lock_test.go @@ -0,0 +1,71 @@ +package imagestore_test + +import ( + _ "crypto/sha256" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + . "github.com/smartystreets/goconvey/convey" + + "zotregistry.dev/zot/pkg/extensions/monitoring" + zlog "zotregistry.dev/zot/pkg/log" + "zotregistry.dev/zot/pkg/storage" + "zotregistry.dev/zot/pkg/storage/cache" + "zotregistry.dev/zot/pkg/storage/local" +) + +func TestStorageLocks(t *testing.T) { + dir := t.TempDir() + + log := zlog.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: dir, + Name: "cache", + UseRelPaths: true, + }, log) + + imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) + + Convey("Locks", t, func() { + // in parallel, a mix of read and write locks - mainly for coverage + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + repo := "repo" + strconv.Itoa(i%10) + + wg.Add(2) + + go func() { + var lockLatency time.Time + + defer wg.Done() + + t.Logf("Repo %s will be write-locked in loop %d", repo, i) + imgStore.LockRepo(repo, &lockLatency) + func() { + t.Logf("Execute while repo %s is write-locked in loop %d", repo, i) + }() + imgStore.UnlockRepo(repo, &lockLatency) + t.Logf("Repo %s is write-unlocked in loop %d", repo, i) + }() + go func() { + var lockLatency time.Time + + defer wg.Done() + t.Logf("Repo %s will be read-locked in loop %d", repo, i) + imgStore.RLockRepo(repo, &lockLatency) + func() { + t.Logf("Execute while repo %s is read-locked in loop %d", repo, i) + }() + imgStore.RUnlockRepo(repo, &lockLatency) + t.Logf("Repo %s is read-unlocked in loop %d", repo, i) + }() + } + + wg.Wait() + }) +}