0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-06 22:40:28 -05:00

zb: replace map with sync.Map to avoid concurrent writes closes #582

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
Petu Eusebiu 2022-06-20 11:46:04 +03:00 committed by Ramkumar Chinchani
parent eed48c1715
commit 616d5f8a6d
2 changed files with 54 additions and 22 deletions

View file

@ -10,6 +10,7 @@ import (
"net/http" "net/http"
"os" "os"
"path" "path"
"sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@ -71,11 +72,14 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
switch idx { switch idx {
case smallSizeIdx: case smallSizeIdx:
statusRequests["1MB"]++ current := loadOrStore(&statusRequests, "1MB", 0)
statusRequests.Store("1MB", current+1)
case mediumSizeIdx: case mediumSizeIdx:
statusRequests["10MB"]++ current := loadOrStore(&statusRequests, "10MB", 0)
statusRequests.Store("10MB", current+1)
case largeSizeIdx: case largeSizeIdx:
statusRequests["100MB"]++ current := loadOrStore(&statusRequests, "100MB", 0)
statusRequests.Store("100MB", current+1)
} }
manifestHash = manifestBySizeHash[idx] manifestHash = manifestBySizeHash[idx]
@ -469,13 +473,16 @@ func pushMonolithAndCollect(workdir, url, trepo string, count int,
switch idx { switch idx {
case smallSizeIdx: case smallSizeIdx:
size = smallBlob size = smallBlob
statusRequests["1MB"]++ current := loadOrStore(&statusRequests, "1MB", 0)
statusRequests.Store("1MB", current+1)
case mediumSizeIdx: case mediumSizeIdx:
size = mediumBlob size = mediumBlob
statusRequests["10MB"]++ current := loadOrStore(&statusRequests, "10MB", 0)
statusRequests.Store("10MB", current+1)
case largeSizeIdx: case largeSizeIdx:
size = largeBlob size = largeBlob
statusRequests["100MB"]++ current := loadOrStore(&statusRequests, "100MB", 0)
statusRequests.Store("100MB", current+1)
default: default:
size = config.size size = config.size
} }
@ -690,14 +697,16 @@ func pushChunkAndCollect(workdir, url, trepo string, count int,
switch idx { switch idx {
case smallSizeIdx: case smallSizeIdx:
size = smallBlob size = smallBlob
statusRequests["1MB"]++ current := loadOrStore(&statusRequests, "1MB", 0)
statusRequests.Store("1MB", current+1)
case mediumSizeIdx: case mediumSizeIdx:
size = mediumBlob size = mediumBlob
statusRequests["10MB"]++ current := loadOrStore(&statusRequests, "10MB", 0)
statusRequests.Store("10MB", current+1)
case largeSizeIdx: case largeSizeIdx:
size = largeBlob size = largeBlob
statusRequests["100MB"]++ current := loadOrStore(&statusRequests, "100MB", 0)
statusRequests.Store("100MB", current+1)
default: default:
size = config.size size = config.size
} }
@ -910,3 +919,14 @@ func pushChunkAndCollect(workdir, url, trepo string, count int,
return repos return repos
} }
func loadOrStore(statusRequests *sync.Map, key string, value int) int {
val, _ := statusRequests.LoadOrStore(key, value)
intValue, ok := val.(int)
if !ok {
log.Fatalf("invalid type: %#v, should be int", val)
}
return intValue
}

View file

@ -48,7 +48,7 @@ const (
var blobHash map[string]godigest.Digest = map[string]godigest.Digest{} var blobHash map[string]godigest.Digest = map[string]godigest.Digest{}
// nolint:gochecknoglobals // used only in this test // nolint:gochecknoglobals // used only in this test
var statusRequests map[string]int var statusRequests sync.Map
func setup(workingDir string) { func setup(workingDir string) {
_ = os.MkdirAll(workingDir, defaultDirPerms) _ = os.MkdirAll(workingDir, defaultDirPerms)
@ -215,15 +215,25 @@ func printStats(requests int, summary *statsSummary, outFmt string) {
log.Printf("\n") log.Printf("\n")
if summary.mixedSize { if summary.mixedSize {
log.Printf("1MB:\t%v", statusRequests["1MB"]) current := loadOrStore(&statusRequests, "1MB", 0)
log.Printf("10MB:\t%v", statusRequests["10MB"]) log.Printf("1MB:\t%v", current)
log.Printf("100MB:\t%v", statusRequests["100MB"])
current = loadOrStore(&statusRequests, "10MB", 0)
log.Printf("10MB:\t%v", current)
current = loadOrStore(&statusRequests, "100MB", 0)
log.Printf("100MB:\t%v", current)
log.Printf("\n") log.Printf("\n")
} }
if summary.mixedType { if summary.mixedType {
log.Printf("Pull:\t%v", statusRequests["Pull"]) pull := loadOrStore(&statusRequests, "Pull", 0)
log.Printf("Push:\t%v", statusRequests["Push"]) log.Printf("Pull:\t%v", pull)
push := loadOrStore(&statusRequests, "Push", 0)
log.Printf("Push:\t%v", push)
log.Printf("\n") log.Printf("\n")
} }
@ -358,7 +368,7 @@ func PushMonolithStreamed(
var repos []string var repos []string
if config.mixedSize { if config.mixedSize {
statusRequests = make(map[string]int) statusRequests = sync.Map{}
} }
for count := 0; count < requests; count++ { for count := 0; count < requests; count++ {
@ -385,7 +395,7 @@ func PushChunkStreamed(
var repos []string var repos []string
if config.mixedSize { if config.mixedSize {
statusRequests = make(map[string]int) statusRequests = sync.Map{}
} }
for count := 0; count < requests; count++ { for count := 0; count < requests; count++ {
@ -416,7 +426,7 @@ func Pull(
manifestBySizeHash := make(map[int](map[string]string)) manifestBySizeHash := make(map[int](map[string]string))
if config.mixedSize { if config.mixedSize {
statusRequests = make(map[string]int) statusRequests = sync.Map{}
} }
if config.mixedSize { if config.mixedSize {
@ -487,7 +497,7 @@ func MixedPullAndPush(
) error { ) error {
var repos []string var repos []string
statusRequests = make(map[string]int) statusRequests = sync.Map{}
// Push blob given size // Push blob given size
manifestHash, repos, err := pushMonolithImage(workdir, url, trepo, repos, config.size, client) manifestHash, repos, err := pushMonolithImage(workdir, url, trepo, repos, config.size, client)
@ -507,10 +517,12 @@ func MixedPullAndPush(
if idx == readTestIdx { if idx == readTestIdx {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh) repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
statusRequests["Pull"]++ current := loadOrStore(&statusRequests, "Pull", 0)
statusRequests.Store("Pull", current+1)
} else if idx == writeTestIdx { } else if idx == writeTestIdx {
repos = pushMonolithAndCollect(workdir, url, trepo, count, repos, config, client, statsCh) repos = pushMonolithAndCollect(workdir, url, trepo, count, repos, config, client, statsCh)
statusRequests["Push"]++ current := loadOrStore(&statusRequests, "Push", 0)
statusRequests.Store("Pull", current+1)
} }
} }