mirror of
https://github.com/project-zot/zot.git
synced 2024-12-16 21:56:37 -05:00
refactor: switch back to using a syncmap for locking
Remove the logic to discard unused locks as it produced deadlocks Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
This commit is contained in:
parent
2ae3f0e26f
commit
a775dcabd8
2 changed files with 82 additions and 52 deletions
|
@ -6,92 +6,51 @@ import (
|
||||||
|
|
||||||
type ImageStoreLock struct {
|
type ImageStoreLock struct {
|
||||||
// locks per repository paths
|
// locks per repository paths
|
||||||
repoLocks map[string]*sync.RWMutex
|
repoLocks sync.Map
|
||||||
// lock for managing the content of the repo lock map
|
|
||||||
internalLock *sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewImageStoreLock() *ImageStoreLock {
|
func NewImageStoreLock() *ImageStoreLock {
|
||||||
return &ImageStoreLock{
|
return &ImageStoreLock{
|
||||||
repoLocks: map[string]*sync.RWMutex{},
|
repoLocks: sync.Map{},
|
||||||
internalLock: &sync.Mutex{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *ImageStoreLock) RLockRepo(repo string) {
|
func (sl *ImageStoreLock) RLockRepo(repo string) {
|
||||||
repoLock, _ := sl.loadLock(repo)
|
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
|
||||||
|
|
||||||
// lock individual repo
|
// lock individual repo
|
||||||
|
repoLock, _ := val.(*sync.RWMutex)
|
||||||
repoLock.RLock()
|
repoLock.RLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *ImageStoreLock) RUnlockRepo(repo string) {
|
func (sl *ImageStoreLock) RUnlockRepo(repo string) {
|
||||||
repoLock, ok := sl.loadLock(repo)
|
val, ok := sl.repoLocks.Load(repo)
|
||||||
if !ok {
|
if !ok {
|
||||||
// somehow the unlock is called for a repo that was not locked
|
// somehow the unlock is called for repo that was not locked
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// read-unlock individual repo
|
// read-unlock individual repo
|
||||||
|
repoLock, _ := val.(*sync.RWMutex)
|
||||||
repoLock.RUnlock()
|
repoLock.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *ImageStoreLock) LockRepo(repo string) {
|
func (sl *ImageStoreLock) LockRepo(repo string) {
|
||||||
repoLock, _ := sl.loadLock(repo)
|
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
|
||||||
|
|
||||||
// write-lock individual repo
|
// write-lock individual repo
|
||||||
|
repoLock, _ := val.(*sync.RWMutex)
|
||||||
repoLock.Lock()
|
repoLock.Lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *ImageStoreLock) UnlockRepo(repo string) {
|
func (sl *ImageStoreLock) UnlockRepo(repo string) {
|
||||||
repoLock, ok := sl.loadLock(repo)
|
val, ok := sl.repoLocks.Load(repo)
|
||||||
if !ok {
|
if !ok {
|
||||||
// somehow the unlock is called for a repo that was not locked
|
// somehow the unlock is called for a repo that was not locked
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// write-unlock individual repo
|
// write-unlock individual repo
|
||||||
|
repoLock, _ := val.(*sync.RWMutex)
|
||||||
repoLock.Unlock()
|
repoLock.Unlock()
|
||||||
|
|
||||||
// attempt to clean up the map of unused locks
|
|
||||||
sl.discardLockIfPossible(repo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sl *ImageStoreLock) loadLock(repo string) (*sync.RWMutex, bool) {
|
|
||||||
sl.internalLock.Lock()
|
|
||||||
defer sl.internalLock.Unlock()
|
|
||||||
|
|
||||||
repoLock, ok := sl.repoLocks[repo]
|
|
||||||
if !ok {
|
|
||||||
sl.repoLocks[repo] = &sync.RWMutex{}
|
|
||||||
repoLock = sl.repoLocks[repo]
|
|
||||||
}
|
|
||||||
|
|
||||||
return repoLock, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sl *ImageStoreLock) discardLockIfPossible(repo string) {
|
|
||||||
sl.internalLock.Lock()
|
|
||||||
defer sl.internalLock.Unlock()
|
|
||||||
|
|
||||||
repoLock, ok := sl.repoLocks[repo]
|
|
||||||
if !ok {
|
|
||||||
// the lock is not set, no need to do anything else
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if the lock is in use
|
|
||||||
// this is a non-blocking operation if someone else is already blocking the lock
|
|
||||||
// the internalLock prevents the case where someone else attempts
|
|
||||||
// to load/block the lock after this function started executing
|
|
||||||
ok = repoLock.TryLock()
|
|
||||||
if !ok {
|
|
||||||
// if someone else is using this lock, it is still needed, keep it as is
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// final unlock
|
|
||||||
defer repoLock.Unlock()
|
|
||||||
|
|
||||||
// nobody else is using this lock, remove it from the map
|
|
||||||
delete(sl.repoLocks, repo)
|
|
||||||
}
|
}
|
||||||
|
|
71
pkg/storage/imagestore/lock_test.go
Normal file
71
pkg/storage/imagestore/lock_test.go
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
package imagestore_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "crypto/sha256"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
|
|
||||||
|
"zotregistry.dev/zot/pkg/extensions/monitoring"
|
||||||
|
zlog "zotregistry.dev/zot/pkg/log"
|
||||||
|
"zotregistry.dev/zot/pkg/storage"
|
||||||
|
"zotregistry.dev/zot/pkg/storage/cache"
|
||||||
|
"zotregistry.dev/zot/pkg/storage/local"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStorageLocks(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
log := zlog.Logger{Logger: zerolog.New(os.Stdout)}
|
||||||
|
metrics := monitoring.NewMetricsServer(false, log)
|
||||||
|
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
|
||||||
|
RootDir: dir,
|
||||||
|
Name: "cache",
|
||||||
|
UseRelPaths: true,
|
||||||
|
}, log)
|
||||||
|
|
||||||
|
imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
|
||||||
|
|
||||||
|
Convey("Locks", t, func() {
|
||||||
|
// in parallel, a mix of read and write locks - mainly for coverage
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
repo := "repo" + strconv.Itoa(i%10)
|
||||||
|
|
||||||
|
wg.Add(2)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
var lockLatency time.Time
|
||||||
|
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
t.Logf("Repo %s will be write-locked in loop %d", repo, i)
|
||||||
|
imgStore.LockRepo(repo, &lockLatency)
|
||||||
|
func() {
|
||||||
|
t.Logf("Execute while repo %s is write-locked in loop %d", repo, i)
|
||||||
|
}()
|
||||||
|
imgStore.UnlockRepo(repo, &lockLatency)
|
||||||
|
t.Logf("Repo %s is write-unlocked in loop %d", repo, i)
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
var lockLatency time.Time
|
||||||
|
|
||||||
|
defer wg.Done()
|
||||||
|
t.Logf("Repo %s will be read-locked in loop %d", repo, i)
|
||||||
|
imgStore.RLockRepo(repo, &lockLatency)
|
||||||
|
func() {
|
||||||
|
t.Logf("Execute while repo %s is read-locked in loop %d", repo, i)
|
||||||
|
}()
|
||||||
|
imgStore.RUnlockRepo(repo, &lockLatency)
|
||||||
|
t.Logf("Repo %s is read-unlocked in loop %d", repo, i)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in a new issue