0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-13 22:50:38 -05:00
zot/cmd/zb/perf.go
peusebiu 1b184ceef8
fix(zb): fixed remote repositories cleanup (#1461)
fix(storage/local): also put deduped blobs in cache, not just origin blobs

this caused an error when trying to delete deduped blobs
from multiple repositories

fix(storage/s3): check blob is present in cache before deleting

this is an edge case where dedupe is false but cacheDriver is not nil
(because in s3 we open the cache.db if storage find it in rootDir)
it caused an error when trying to delete blobs uploaded with dedupe false

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
2023-05-19 09:51:15 -07:00

846 lines
18 KiB
Go

package main
import (
crand "crypto/rand"
"crypto/tls"
"fmt"
"log"
"math/big"
"net"
"net/http"
urlparser "net/url"
"os"
"path"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
jsoniter "github.com/json-iterator/go"
godigest "github.com/opencontainers/go-digest"
"gopkg.in/resty.v1"
"zotregistry.io/zot/pkg/api/constants"
)
const (
KiB = 1 * 1024
MiB = 1 * KiB * 1024
GiB = 1 * MiB * 1024
maxSize = 1 * GiB // 1GiB
defaultDirPerms = 0o700
defaultFilePerms = 0o600
defaultSchemaVersion = 2
smallBlob = 1 * MiB
mediumBlob = 10 * MiB
largeBlob = 100 * MiB
cicdFmt = "ci-cd"
secureProtocol = "https"
httpKeepAlive = 30 * time.Second
maxSourceIPs = 1000
httpTimeout = 30 * time.Second
TLSHandshakeTimeout = 10 * time.Second
)
//nolint:gochecknoglobals
var blobHash map[string]godigest.Digest = map[string]godigest.Digest{}
//nolint:gochecknoglobals // used only in this test
var statusRequests sync.Map
func setup(workingDir string) {
_ = os.MkdirAll(workingDir, defaultDirPerms)
const multiplier = 10
const rndPageSize = 4 * KiB
for size := 1 * MiB; size < maxSize; size *= multiplier {
fname := path.Join(workingDir, fmt.Sprintf("%d.blob", size))
fhandle, err := os.OpenFile(fname, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultFilePerms)
if err != nil {
log.Fatal(err)
}
err = fhandle.Truncate(int64(size))
if err != nil {
log.Fatal(err)
}
_, err = fhandle.Seek(0, 0)
if err != nil {
log.Fatal(err)
}
// write a random first page so every test run has different blob content
rnd := make([]byte, rndPageSize)
if _, err := crand.Read(rnd); err != nil {
log.Fatal(err)
}
if _, err := fhandle.Write(rnd); err != nil {
log.Fatal(err)
}
if _, err := fhandle.Seek(0, 0); err != nil {
log.Fatal(err)
}
fhandle.Close() // should flush the write
// pre-compute the SHA256
fhandle, err = os.OpenFile(fname, os.O_RDONLY, defaultFilePerms)
if err != nil {
log.Fatal(err)
}
defer fhandle.Close()
digest, err := godigest.FromReader(fhandle)
if err != nil {
log.Fatal(err) //nolint:gocritic // file closed on exit
}
blobHash[fname] = digest
}
}
func teardown(workingDir string) {
_ = os.RemoveAll(workingDir)
}
// statistics handling.
type Durations []time.Duration
func (a Durations) Len() int { return len(a) }
func (a Durations) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Durations) Less(i, j int) bool { return a[i] < a[j] }
type statsSummary struct {
latencies []time.Duration
name string
min, max, total time.Duration
statusHist map[string]int
rps float32
mixedSize, mixedType bool
errors int
}
func newStatsSummary(name string) statsSummary {
summary := statsSummary{
name: name,
min: -1,
max: -1,
statusHist: make(map[string]int),
mixedSize: false,
mixedType: false,
}
return summary
}
type statsRecord struct {
latency time.Duration
statusCode int
isConnFail bool
isErr bool
}
func updateStats(summary *statsSummary, record statsRecord) {
if record.isConnFail || record.isErr {
summary.errors++
}
if summary.min < 0 || record.latency < summary.min {
summary.min = record.latency
}
if summary.max < 0 || record.latency > summary.max {
summary.max = record.latency
}
// 2xx
if record.statusCode >= http.StatusOK &&
record.statusCode <= http.StatusAccepted {
summary.statusHist["2xx"]++
}
// 3xx
if record.statusCode >= http.StatusMultipleChoices &&
record.statusCode <= http.StatusPermanentRedirect {
summary.statusHist["3xx"]++
}
// 4xx
if record.statusCode >= http.StatusBadRequest &&
record.statusCode <= http.StatusUnavailableForLegalReasons {
summary.statusHist["4xx"]++
}
// 5xx
if record.statusCode >= http.StatusInternalServerError &&
record.statusCode <= http.StatusNetworkAuthenticationRequired {
summary.statusHist["5xx"]++
}
summary.latencies = append(summary.latencies, record.latency)
}
type cicdTestSummary struct {
Name string `json:"name"`
Unit string `json:"unit"`
Value interface{} `json:"value"`
Range string `json:"range,omitempty"`
}
type manifestStruct struct {
manifestHash map[string]string
manifestBySizeHash map[int](map[string]string)
}
//nolint:gochecknoglobals // used only in this test
var cicdSummary = []cicdTestSummary{}
func printStats(requests int, summary *statsSummary, outFmt string) {
log.Printf("============\n")
log.Printf("Test name:\t%s", summary.name)
log.Printf("Time taken for tests:\t%v", summary.total)
log.Printf("Complete requests:\t%v", requests-summary.errors)
log.Printf("Failed requests:\t%v", summary.errors)
log.Printf("Requests per second:\t%v", summary.rps)
log.Printf("\n")
if summary.mixedSize {
current := loadOrStore(&statusRequests, "1MB", 0)
log.Printf("1MB:\t%v", current)
current = loadOrStore(&statusRequests, "10MB", 0)
log.Printf("10MB:\t%v", current)
current = loadOrStore(&statusRequests, "100MB", 0)
log.Printf("100MB:\t%v", current)
log.Printf("\n")
}
if summary.mixedType {
pull := loadOrStore(&statusRequests, "Pull", 0)
log.Printf("Pull:\t%v", pull)
push := loadOrStore(&statusRequests, "Push", 0)
log.Printf("Push:\t%v", push)
log.Printf("\n")
}
for k, v := range summary.statusHist {
log.Printf("%s responses:\t%v", k, v)
}
log.Printf("\n")
log.Printf("min: %v", summary.min)
log.Printf("max: %v", summary.max)
log.Printf("%s:\t%v", "p50", summary.latencies[requests/2])
log.Printf("%s:\t%v", "p75", summary.latencies[requests*3/4])
log.Printf("%s:\t%v", "p90", summary.latencies[requests*9/10])
log.Printf("%s:\t%v", "p99", summary.latencies[requests*99/100])
log.Printf("\n")
// ci/cd
if outFmt == cicdFmt {
cicdSummary = append(cicdSummary,
cicdTestSummary{
Name: summary.name,
Unit: "requests per sec",
Value: summary.rps,
Range: "3",
},
)
}
}
// test suites/funcs.
type testFunc func(
workdir, url, repo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error
//nolint:gosec
func GetCatalog(
workdir, url, repo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
var err error
statusRequests = sync.Map{}
for count := 0; count < requests; count++ {
// Push random blob
_, repos, err = pushMonolithImage(workdir, url, repo, repos, config, client)
if err != nil {
return err
}
}
for count := 0; count < requests; count++ {
func() {
start := time.Now()
var isConnFail, isErr bool
var statusCode int
var latency time.Duration
defer func() {
// send a stats record
statsCh <- statsRecord{
latency: latency,
statusCode: statusCode,
isConnFail: isConnFail,
isErr: isErr,
}
}()
// send request and get response
resp, err := client.R().Get(url + constants.RoutePrefix + constants.ExtCatalogPrefix)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
}()
}
// clean up
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func PushMonolithStreamed(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
if config.mixedSize {
statusRequests = sync.Map{}
}
for count := 0; count < requests; count++ {
repos = pushMonolithAndCollect(workdir, url, trepo, count,
repos, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func PushChunkStreamed(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
if config.mixedSize {
statusRequests = sync.Map{}
}
for count := 0; count < requests; count++ {
repos = pushChunkAndCollect(workdir, url, trepo, count,
repos, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func Pull(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
var manifestHash map[string]string
manifestBySizeHash := make(map[int](map[string]string))
if config.mixedSize {
statusRequests = sync.Map{}
}
if config.mixedSize {
var manifestBySize map[string]string
smallSizeIdx := 0
mediumSizeIdx := 1
largeSizeIdx := 2
config.size = smallBlob
// Push small blob
manifestBySize, repos, err := pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestBySizeHash[smallSizeIdx] = manifestBySize
config.size = mediumBlob
// Push medium blob
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestBySizeHash[mediumSizeIdx] = manifestBySize
config.size = largeBlob
// Push large blob
//nolint: ineffassign, staticcheck, wastedassign
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestBySizeHash[largeSizeIdx] = manifestBySize
} else {
// Push blob given size
var err error
manifestHash, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
}
manifestItem := manifestStruct{
manifestHash: manifestHash,
manifestBySizeHash: manifestBySizeHash,
}
// download image
for count := 0; count < requests; count++ {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func MixedPullAndPush(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
statusRequests = sync.Map{}
// Push blob given size
manifestHash, repos, err := pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestItem := manifestStruct{
manifestHash: manifestHash,
}
for count := 0; count < requests; count++ {
idx := flipFunc(config.probabilityRange)
readTestIdx := 0
writeTestIdx := 1
if idx == readTestIdx {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
current := loadOrStore(&statusRequests, "Pull", 0)
statusRequests.Store("Pull", current+1)
} else if idx == writeTestIdx {
repos = pushMonolithAndCollect(workdir, url, trepo, count, repos, config, client, statsCh)
current := loadOrStore(&statusRequests, "Push", 0)
statusRequests.Store("Pull", current+1)
}
}
// clean up
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
// test driver.
type testConfig struct {
name string
tfunc testFunc
// test-specific params
size int
probabilityRange []float64
mixedSize, mixedType bool
}
var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this test
{
name: "Get Catalog",
tfunc: GetCatalog,
probabilityRange: normalizeProbabilityRange([]float64{0.7, 0.2, 0.1}),
},
{
name: "Push Monolith 1MB",
tfunc: PushMonolithStreamed,
size: smallBlob,
},
{
name: "Push Monolith 10MB",
tfunc: PushMonolithStreamed,
size: mediumBlob,
},
{
name: "Push Monolith 100MB",
tfunc: PushMonolithStreamed,
size: largeBlob,
},
{
name: "Push Chunk Streamed 1MB",
tfunc: PushChunkStreamed,
size: smallBlob,
},
{
name: "Push Chunk Streamed 10MB",
tfunc: PushChunkStreamed,
size: mediumBlob,
},
{
name: "Push Chunk Streamed 100MB",
tfunc: PushChunkStreamed,
size: largeBlob,
},
{
name: "Pull 1MB",
tfunc: Pull,
size: smallBlob,
},
{
name: "Pull 10MB",
tfunc: Pull,
size: mediumBlob,
},
{
name: "Pull 100MB",
tfunc: Pull,
size: largeBlob,
},
{
name: "Pull Mixed 20% 1MB, 70% 10MB, 10% 100MB",
tfunc: Pull,
probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}),
mixedSize: true,
},
{
name: "Push Monolith Mixed 20% 1MB, 70% 10MB, 10% 100MB",
tfunc: PushMonolithStreamed,
probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}),
mixedSize: true,
},
{
name: "Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB",
tfunc: PushChunkStreamed,
probabilityRange: normalizeProbabilityRange([]float64{0.33, 0.33, 0.33}),
mixedSize: true,
},
{
name: "Pull 75% and Push 25% Mixed 1MB",
tfunc: MixedPullAndPush,
size: smallBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
{
name: "Pull 75% and Push 25% Mixed 10MB",
tfunc: MixedPullAndPush,
size: mediumBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
{
name: "Pull 75% and Push 25% Mixed 100MB",
tfunc: MixedPullAndPush,
size: largeBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
}
func Perf(
workdir, url, auth, repo string,
concurrency int, requests int,
outFmt string, srcIPs string, srcCIDR string, skipCleanup bool,
) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
// logging
log.SetFlags(0)
log.SetOutput(tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.TabIndent))
// common header
log.Printf("Registry URL:\t%s", url)
log.Printf("\n")
log.Printf("Concurrency Level:\t%v", concurrency)
log.Printf("Total requests:\t%v", requests)
if workdir == "" {
cwd, err := os.Getwd()
if err != nil {
log.Fatal("unable to get current working dir")
}
log.Printf("Working dir:\t%v", cwd)
} else {
log.Printf("Working dir:\t%v", workdir)
}
log.Printf("\n")
// initialize test data
log.Printf("Preparing test data ...\n")
setup(workdir)
defer teardown(workdir)
log.Printf("Starting tests ...\n")
var err error
zbError := false
// get host ips from command line to make requests from
var ips []string
if len(srcIPs) > 0 {
ips = strings.Split(srcIPs, ",")
} else if len(srcCIDR) > 0 {
ips, err = getIPsFromCIDR(srcCIDR, maxSourceIPs)
if err != nil {
log.Fatal(err) //nolint: gocritic
}
}
for _, tconfig := range testSuite {
statsCh := make(chan statsRecord, requests)
var wg sync.WaitGroup
summary := newStatsSummary(tconfig.name)
start := time.Now()
for c := 0; c < concurrency; c++ {
// parallelize with clients
wg.Add(1)
go func() {
defer wg.Done()
httpClient, err := getRandomClientIPs(auth, url, ips)
if err != nil {
log.Fatal(err)
}
err = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient, skipCleanup)
if err != nil {
log.Fatal(err)
}
}()
}
wg.Wait()
summary.total = time.Since(start)
summary.rps = float32(requests) / float32(summary.total.Seconds())
if tconfig.mixedSize || tconfig.size == 0 {
summary.mixedSize = true
}
if tconfig.mixedType {
summary.mixedType = true
}
for count := 0; count < requests; count++ {
record := <-statsCh
updateStats(&summary, record)
}
sort.Sort(Durations(summary.latencies))
printStats(requests, &summary, outFmt)
if summary.errors != 0 && !zbError {
zbError = true
}
}
if outFmt == cicdFmt {
jsonOut, err := json.Marshal(cicdSummary)
if err != nil {
log.Fatal(err) // file closed on exit
}
if err := os.WriteFile(fmt.Sprintf("%s.json", outFmt), jsonOut, defaultFilePerms); err != nil {
log.Fatal(err)
}
}
if zbError {
os.Exit(1)
}
}
// getRandomClientIPs returns a resty client with a random bind address from ips slice.
func getRandomClientIPs(auth string, url string, ips []string) (*resty.Client, error) {
client := resty.New()
if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}
// get random ip client
if len(ips) != 0 {
// get random number
nBig, err := crand.Int(crand.Reader, big.NewInt(int64(len(ips))))
if err != nil {
return nil, err
}
// get random ip
ip := ips[nBig.Int64()]
// set ip in transport
localAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", ip))
if err != nil {
return nil, err
}
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: httpTimeout,
KeepAlive: httpKeepAlive,
LocalAddr: localAddr,
}).DialContext,
TLSHandshakeTimeout: TLSHandshakeTimeout,
}
client.SetTransport(transport)
}
parsedURL, err := urlparser.Parse(url)
if err != nil {
log.Fatal(err)
}
//nolint: gosec
if parsedURL.Scheme == secureProtocol {
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
}
return client, nil
}
// getIPsFromCIDR returns a list of ips given a cidr.
func getIPsFromCIDR(cidr string, maxIPs int) ([]string, error) {
//nolint:varnamelen
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, err
}
var ips []string
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip) && len(ips) < maxIPs; inc(ip) {
ips = append(ips, ip.String())
}
// remove network address and broadcast address
return ips[1 : len(ips)-1], nil
}
// https://go.dev/play/p/sdzcMvZYWnc
func inc(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}