0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2024-12-30 22:34:13 -05:00

Add reading tests for zb binary: read-only tests and mixed read-only tests

Add mixed write-only tests
Add mixed read-write tests

Signed-off-by: Roxana Nemulescu <roxana.nemulescu@gmail.com>
This commit is contained in:
Roxana Nemulescu 2022-01-20 18:03:13 +02:00 committed by Ramkumar Chinchani
parent e739cce983
commit efc55b013e
5 changed files with 1232 additions and 427 deletions

View file

@ -136,7 +136,7 @@ jobs:
./bin/zot-linux-amd64 serve test/cluster/config-minio3.json &
sleep 10
# run zb
bin/zb-linux-amd64 -c 10 -n 100 -o ci-cd http://localhost:8080
bin/zb-linux-amd64 -c 10 -n 50 -o ci-cd http://localhost:8080
env:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin

908
cmd/zb/helper.go Normal file
View file

@ -0,0 +1,908 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"time"
"github.com/google/uuid"
imeta "github.com/opencontainers/image-spec/specs-go"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/resty.v1"
"zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/test"
)
func deleteTestRepo(repos []string, url string, client *resty.Client) error {
for _, repo := range repos {
resp, err := client.R().Delete((fmt.Sprintf("%s/v2/%s/", url, repo)))
if err != nil {
return err
}
// request specific check
statusCode := resp.StatusCode()
if statusCode != http.StatusAccepted {
return errors.ErrUnknownCode
}
}
return nil
}
func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
config testConfig, client *resty.Client, statsCh chan statsRecord) []string {
manifestHash := manifestItem.manifestHash
manifestBySizeHash := manifestItem.manifestBySizeHash
func() {
start := time.Now()
var isConnFail, isErr bool
var statusCode int
var latency time.Duration
defer func() {
// send a stats record
statsCh <- statsRecord{
latency: latency,
statusCode: statusCode,
isConnFail: isConnFail,
isErr: isErr,
}
}()
if config.mixedSize {
smallSizeIdx := 0
mediumSizeIdx := 1
largeSizeIdx := 2
idx := flipFunc(config.probabilityRange)
switch idx {
case smallSizeIdx:
statusRequests["1MB"]++
case mediumSizeIdx:
statusRequests["10MB"]++
case largeSizeIdx:
statusRequests["100MB"]++
}
manifestHash = manifestBySizeHash[idx]
}
for repo, manifestTag := range manifestHash {
manifestLoc := fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag)
// check manifest
resp, err := client.R().
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
Head(manifestLoc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
// send request and get the manifest
resp, err = client.R().
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
Get(manifestLoc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
manifestBody := resp.Body()
// file copy simulation
_, err = io.Copy(ioutil.Discard, bytes.NewReader(manifestBody))
latency = time.Since(start)
if err != nil {
log.Fatal(err)
}
var pulledManifest ispec.Manifest
err = json.Unmarshal(manifestBody, &pulledManifest)
if err != nil {
log.Fatal(err)
}
// check config
configDigest := pulledManifest.Config.Digest
configLoc := fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, configDigest)
resp, err = client.R().Head(configLoc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
// send request and get the config
resp, err = client.R().Get(configLoc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
configBody := resp.Body()
// file copy simulation
_, err = io.Copy(ioutil.Discard, bytes.NewReader(configBody))
latency = time.Since(start)
if err != nil {
log.Fatal(err)
}
// download blobs
for _, layer := range pulledManifest.Layers {
blobDigest := layer.Digest
blobLoc := fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, blobDigest)
// check blob
resp, err := client.R().Head(blobLoc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
// send request and get response the blob
resp, err = client.R().Get(blobLoc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
blobBody := resp.Body()
// file copy simulation
_, err = io.Copy(ioutil.Discard, bytes.NewReader(blobBody))
if err != nil {
log.Fatal(err)
}
}
}
}()
return repos
}
func pushMonolithImage(workdir, url, trepo string, repos []string, size int,
client *resty.Client) (map[string]string, []string, error) {
var statusCode int
// key: repository name. value: manifest name
manifestHash := make(map[string]string)
ruid, err := uuid.NewUUID()
if err != nil {
return nil, repos, err
}
var repo string
if trepo != "" {
repo = trepo + "/" + ruid.String()
} else {
repo = ruid.String()
}
repos = append(repos, repo)
// upload blob
resp, err := client.R().Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
if err != nil {
return nil, repos, err
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
return nil, repos, errors.ErrUnknownCode
}
loc := test.Location(url, resp)
blob := path.Join(workdir, fmt.Sprintf("%d.blob", size))
fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms)
if err != nil {
return nil, repos, err
}
defer fhandle.Close()
// stream the entire blob
digest := blobHash[blob]
resp, err = client.R().
SetContentLength(true).
SetQueryParam("digest", digest.String()).
SetHeader("Content-Length", fmt.Sprintf("%d", size)).
SetHeader("Content-Type", "application/octet-stream").SetBody(fhandle).Put(loc)
if err != nil {
return nil, repos, err
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
}
// upload image config blob
resp, err = client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
if err != nil {
return nil, repos, err
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
return nil, repos, errors.ErrUnknownCode
}
loc = test.Location(url, resp)
cblob, cdigest := test.GetRandomImageConfig()
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", cdigest.String()).
SetBody(cblob).
Put(loc)
if err != nil {
return nil, repos, err
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
}
// create a manifest
manifest := ispec.Manifest{
Versioned: imeta.Versioned{
SchemaVersion: defaultSchemaVersion,
},
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(size),
},
},
}
content, err := json.MarshalIndent(&manifest, "", "\t")
if err != nil {
return nil, repos, err
}
manifestTag := fmt.Sprintf("tag%d", size)
// finish upload
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).
Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag))
if err != nil {
return nil, repos, err
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
}
manifestHash[repo] = manifestTag
return manifestHash, repos, nil
}
func pushMonolithAndCollect(workdir, url, trepo string, count int,
repos []string, config testConfig, client *resty.Client,
statsCh chan statsRecord) []string {
func() {
start := time.Now()
var isConnFail, isErr bool
var statusCode int
var latency time.Duration
defer func() {
// send a stats record
statsCh <- statsRecord{
latency: latency,
statusCode: statusCode,
isConnFail: isConnFail,
isErr: isErr,
}
}()
ruid, err := uuid.NewUUID()
if err != nil {
log.Fatal(err)
}
var repo string
if trepo != "" {
repo = trepo + "/" + ruid.String()
} else {
repo = ruid.String()
}
repos = append(repos, repo)
// create a new upload
resp, err := client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
loc := test.Location(url, resp)
var size int
if config.mixedSize {
idx := flipFunc(config.probabilityRange)
smallSizeIdx := 0
mediumSizeIdx := 1
largeSizeIdx := 2
switch idx {
case smallSizeIdx:
size = smallBlob
statusRequests["1MB"]++
case mediumSizeIdx:
size = mediumBlob
statusRequests["10MB"]++
case largeSizeIdx:
size = largeBlob
statusRequests["100MB"]++
default:
size = config.size
}
} else {
size = config.size
}
blob := path.Join(workdir, fmt.Sprintf("%d.blob", size))
fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms)
if err != nil {
isConnFail = true
return
}
defer fhandle.Close()
// stream the entire blob
digest := blobHash[blob]
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", size)).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest.String()).
SetBody(fhandle).
Put(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
// upload image config blob
resp, err = client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
loc = test.Location(url, resp)
cblob, cdigest := test.GetRandomImageConfig()
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", cdigest.String()).
SetBody(cblob).
Put(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
// create a manifest
manifest := ispec.Manifest{
Versioned: imeta.Versioned{
SchemaVersion: defaultSchemaVersion,
},
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(size),
},
},
}
content, err := json.MarshalIndent(&manifest, "", "\t")
if err != nil {
log.Fatal(err)
}
manifestTag := fmt.Sprintf("tag%d", count)
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).
Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
}()
return repos
}
func pushChunkAndCollect(workdir, url, trepo string, count int,
repos []string, config testConfig, client *resty.Client,
statsCh chan statsRecord) []string {
func() {
start := time.Now()
var isConnFail, isErr bool
var statusCode int
var latency time.Duration
defer func() {
// send a stats record
statsCh <- statsRecord{
latency: latency,
statusCode: statusCode,
isConnFail: isConnFail,
isErr: isErr,
}
}()
ruid, err := uuid.NewUUID()
if err != nil {
log.Fatal(err)
}
var repo string
if trepo != "" {
repo = trepo + "/" + ruid.String()
} else {
repo = ruid.String()
}
repos = append(repos, repo)
// create a new upload
resp, err := client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
loc := test.Location(url, resp)
var size int
if config.mixedSize {
idx := flipFunc(config.probabilityRange)
smallSizeIdx := 0
mediumSizeIdx := 1
largeSizeIdx := 2
switch idx {
case smallSizeIdx:
size = smallBlob
statusRequests["1MB"]++
case mediumSizeIdx:
size = mediumBlob
statusRequests["10MB"]++
case largeSizeIdx:
size = largeBlob
statusRequests["100MB"]++
default:
size = config.size
}
} else {
size = config.size
}
blob := path.Join(workdir, fmt.Sprintf("%d.blob", size))
fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms)
if err != nil {
isConnFail = true
return
}
defer fhandle.Close()
digest := blobHash[blob]
// upload blob
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/octet-stream").
SetBody(fhandle).
Patch(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
loc = test.Location(url, resp)
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
// finish upload
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", size)).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest.String()).
Put(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
// upload image config blob
resp, err = client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
loc = test.Location(url, resp)
cblob, cdigest := test.GetRandomImageConfig()
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/octet-stream").
SetBody(fhandle).
Patch(loc)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
// upload blob
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/octet-stream").
SetBody(cblob).
Patch(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
loc = test.Location(url, resp)
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
// finish upload
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", cdigest.String()).
Put(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
// create a manifest
manifest := ispec.Manifest{
Versioned: imeta.Versioned{
SchemaVersion: defaultSchemaVersion,
},
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(size),
},
},
}
content, err := json.Marshal(manifest)
if err != nil {
log.Fatal(err)
}
manifestTag := fmt.Sprintf("tag%d", count)
// finish upload
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).
Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
}()
return repos
}

View file

@ -63,7 +63,7 @@ func NewPerfRootCmd() *cobra.Command {
"Output format of test results: stdout (default), json, ci-cd")
// "version"
rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "show the version and exit")
rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show the version and exit")
return rootCmd
}

View file

@ -1,4 +1,4 @@
package main //nolint:testpackage // separate binary
package main // nolint:testpackage // separate binary
import (
"testing"

View file

@ -1,12 +1,14 @@
package main
import (
"crypto/rand"
"encoding/json"
crand "crypto/rand"
"crypto/tls"
"fmt"
"io/ioutil"
"log"
mrand "math/rand"
"net/http"
urlparser "net/url"
"os"
"path"
"sort"
@ -15,13 +17,9 @@ import (
"text/tabwriter"
"time"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
godigest "github.com/opencontainers/go-digest"
imeta "github.com/opencontainers/image-spec/specs-go"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/resty.v1"
"zotregistry.io/zot/pkg/test"
)
const (
@ -36,11 +34,15 @@ const (
mediumBlob = 10 * MiB
largeBlob = 100 * MiB
cicdFmt = "ci-cd"
secureProtocol = "https"
)
//nolint:gochecknoglobals // used only in this test
// nolint:gochecknoglobals // used only in this test
var blobHash map[string]godigest.Digest = map[string]godigest.Digest{}
// nolint:gochecknoglobals // used only in this test
var statusRequests map[string]int
func setup(workingDir string) {
_ = os.MkdirAll(workingDir, defaultDirPerms)
@ -68,7 +70,7 @@ func setup(workingDir string) {
// write a random first page so every test run has different blob content
rnd := make([]byte, rndPageSize)
if _, err := rand.Read(rnd); err != nil {
if _, err := crand.Read(rnd); err != nil {
log.Fatal(err)
}
@ -92,7 +94,7 @@ func setup(workingDir string) {
digest, err := godigest.FromReader(fhandle)
if err != nil {
log.Fatal(err) //nolint:gocritic // file closed on exit
log.Fatal(err) // nolint:gocritic // file closed on exit
}
blobHash[fname] = digest
@ -112,12 +114,13 @@ func (a Durations) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Durations) Less(i, j int) bool { return a[i] < a[j] }
type statsSummary struct {
latencies []time.Duration
name string
min, max, total time.Duration
rps float32
statusHist map[string]int
errors int
latencies []time.Duration
name string
min, max, total time.Duration
statusHist map[string]int
rps float32
mixedSize, mixedType bool
errors int
}
func newStatsSummary(name string) statsSummary {
@ -126,6 +129,8 @@ func newStatsSummary(name string) statsSummary {
min: -1,
max: -1,
statusHist: make(map[string]int),
mixedSize: false,
mixedType: false,
}
return summary
@ -185,7 +190,12 @@ type cicdTestSummary struct {
Range string `json:"range,omitempty"`
}
//nolint:gochecknoglobals // used only in this test
type manifestStruct struct {
manifestHash map[string]string
manifestBySizeHash map[int](map[string]string)
}
// nolint:gochecknoglobals // used only in this test
var cicdSummary = []cicdTestSummary{}
func printStats(requests int, summary *statsSummary, outFmt string) {
@ -197,6 +207,19 @@ func printStats(requests int, summary *statsSummary, outFmt string) {
log.Printf("Requests per second:\t%v", summary.rps)
log.Printf("\n")
if summary.mixedSize {
log.Printf("1MB:\t%v", statusRequests["1MB"])
log.Printf("10MB:\t%v", statusRequests["10MB"])
log.Printf("100MB:\t%v", statusRequests["100MB"])
log.Printf("\n")
}
if summary.mixedType {
log.Printf("Pull:\t%v", statusRequests["Pull"])
log.Printf("Push:\t%v", statusRequests["Push"])
log.Printf("\n")
}
for k, v := range summary.statusHist {
log.Printf("%s responses:\t%v", k, v)
}
@ -223,6 +246,40 @@ func printStats(requests int, summary *statsSummary, outFmt string) {
}
}
// nolint:gosec
func flipFunc(probabilityRange []float64) int {
mrand.Seed(time.Now().UTC().UnixNano())
toss := mrand.Float64()
for idx, r := range probabilityRange {
if toss < r {
return idx
}
}
return len(probabilityRange) - 1
}
// pbty - probabilities.
func normalizeProbabilityRange(pbty []float64) []float64 {
dim := len(pbty)
// npd - normalized probability density
npd := make([]float64, dim)
for idx := range pbty {
npd[idx] = 0.0
}
// [0.2, 0.7, 0.1] -> [0.2, 0.9, 1]
npd[0] = pbty[0]
for i := 1; i < dim; i++ {
npd[i] = npd[i-1] + pbty[i]
}
return npd
}
// test suites/funcs.
type testFunc func(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error
@ -235,6 +292,16 @@ func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig
client.SetBasicAuth(creds[0], creds[1])
}
parsedURL, err := urlparser.Parse(url)
if err != nil {
log.Fatal(err)
}
// nolint: gosec
if parsedURL.Scheme == secureProtocol {
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
}
for count := 0; count < requests; count++ {
func() {
start := time.Now()
@ -280,7 +347,8 @@ func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig
}
func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
config testConfig, statsCh chan statsRecord) error {
config testConfig, statsCh chan statsRecord,
) error {
client := resty.New()
if auth != "" {
@ -288,199 +356,39 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
client.SetBasicAuth(creds[0], creds[1])
}
parsedURL, err := urlparser.Parse(url)
if err != nil {
log.Fatal(err)
}
// nolint: gosec
if parsedURL.Scheme == secureProtocol {
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
}
var repos []string
if config.mixedSize {
statusRequests = make(map[string]int)
}
for count := 0; count < requests; count++ {
func() {
start := time.Now()
repos = pushMonolithAndCollect(workdir, url, trepo, count,
repos, config, client, statsCh)
}
var isConnFail, isErr bool
var statusCode int
var latency time.Duration
defer func() {
// send a stats record
statsCh <- statsRecord{
latency: latency,
statusCode: statusCode,
isConnFail: isConnFail,
isErr: isErr,
}
}()
ruid, err := uuid.NewUUID()
if err != nil {
log.Fatal(err)
}
var repo string
if trepo != "" {
repo = trepo + "/" + ruid.String()
} else {
repo = ruid.String()
}
// create a new upload
resp, err := resty.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
loc := test.Location(url, resp)
size := config.size
blob := path.Join(workdir, fmt.Sprintf("%d.blob", size))
fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms)
if err != nil {
isConnFail = true
return
}
defer fhandle.Close()
// stream the entire blob
digest := blobHash[blob]
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", size)).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest.String()).
SetBody(fhandle).
Put(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
// upload image config blob
resp, err = resty.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
loc = test.Location(url, resp)
cblob, cdigest := test.GetRandomImageConfig()
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", cdigest.String()).
SetBody(cblob).
Put(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
// create a manifest
manifest := ispec.Manifest{
Versioned: imeta.Versioned{
SchemaVersion: defaultSchemaVersion,
},
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(size),
},
},
}
content, err := json.MarshalIndent(&manifest, "", "\t")
if err != nil {
log.Fatal(err)
}
resp, err = resty.R().
SetContentLength(true).
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).
Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, fmt.Sprintf("tag%d", count)))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
}()
// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
return nil
}
func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
config testConfig, statsCh chan statsRecord) error {
config testConfig, statsCh chan statsRecord,
) error {
client := resty.New()
if auth != "" {
@ -488,261 +396,176 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
client.SetBasicAuth(creds[0], creds[1])
}
parsedURL, err := urlparser.Parse(url)
if err != nil {
log.Fatal(err)
}
// nolint: gosec
if parsedURL.Scheme == secureProtocol {
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
}
var repos []string
if config.mixedSize {
statusRequests = make(map[string]int)
}
for count := 0; count < requests; count++ {
func() {
start := time.Now()
repos = pushChunkAndCollect(workdir, url, trepo, count,
repos, config, client, statsCh)
}
var isConnFail, isErr bool
// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
var statusCode int
return nil
}
var latency time.Duration
func Pull(workdir, url, auth, trepo string, requests int,
config testConfig, statsCh chan statsRecord,
) error {
client := resty.New()
defer func() {
// send a stats record
statsCh <- statsRecord{
latency: latency,
statusCode: statusCode,
isConnFail: isConnFail,
isErr: isErr,
}
}()
if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}
ruid, err := uuid.NewUUID()
if err != nil {
log.Fatal(err)
}
parsedURL, err := urlparser.Parse(url)
if err != nil {
log.Fatal(err)
}
var repo string
// nolint: gosec
if parsedURL.Scheme == secureProtocol {
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
}
if trepo != "" {
repo = trepo + "/" + ruid.String()
} else {
repo = ruid.String()
}
var repos []string
// create a new upload
resp, err := resty.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
var manifestHash map[string]string
latency = time.Since(start)
manifestBySizeHash := make(map[int](map[string]string))
if err != nil {
isConnFail = true
if config.mixedSize {
statusRequests = make(map[string]int)
}
return
}
if config.mixedSize {
var manifestBySize map[string]string
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
smallSizeIdx := 0
mediumSizeIdx := 1
largeSizeIdx := 2
return
}
// Push small blob
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, smallBlob, client)
if err != nil {
return err
}
loc := test.Location(url, resp)
manifestBySizeHash[smallSizeIdx] = manifestBySize
size := config.size
blob := path.Join(workdir, fmt.Sprintf("%d.blob", size))
// Push medium blob
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, mediumBlob, client)
if err != nil {
return err
}
fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms)
if err != nil {
isConnFail = true
manifestBySizeHash[mediumSizeIdx] = manifestBySize
return
}
// Push large blob
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, largeBlob, client)
if err != nil {
return err
}
defer fhandle.Close()
manifestBySizeHash[largeSizeIdx] = manifestBySize
} else {
// Push blob given size
manifestHash, repos, err = pushMonolithImage(workdir, url, trepo, repos, config.size, client)
if err != nil {
return err
}
}
digest := blobHash[blob]
manifestItem := manifestStruct{
manifestHash: manifestHash,
manifestBySizeHash: manifestBySizeHash,
}
// upload blob
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/octet-stream").
SetBody(fhandle).
Patch(loc)
// download image
for count := 0; count < requests; count++ {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
}
latency = time.Since(start)
// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
if err != nil {
isConnFail = true
return nil
}
return
}
func MixedPullAndPush(workdir, url, auth, trepo string, requests int,
config testConfig, statsCh chan statsRecord,
) error {
client := resty.New()
loc = test.Location(url, resp)
if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
parsedURL, err := urlparser.Parse(url)
if err != nil {
log.Fatal(err)
}
return
}
// nolint: gosec
if parsedURL.Scheme == secureProtocol {
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
}
// finish upload
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", size)).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", digest.String()).
Put(loc)
var repos []string
latency = time.Since(start)
statusRequests = make(map[string]int)
if err != nil {
isConnFail = true
// Push blob given size
manifestHash, repos, err := pushMonolithImage(workdir, url, trepo, repos, config.size, client)
if err != nil {
return err
}
return
}
manifestItem := manifestStruct{
manifestHash: manifestHash,
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
for count := 0; count < requests; count++ {
idx := flipFunc(config.probabilityRange)
return
}
readTestIdx := 0
writeTestIdx := 1
// upload image config blob
resp, err = resty.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))
if idx == readTestIdx {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
statusRequests["Pull"]++
} else if idx == writeTestIdx {
repos = pushMonolithAndCollect(workdir, url, trepo, count, repos, config, client, statsCh)
statusRequests["Push"]++
}
}
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
loc = test.Location(url, resp)
cblob, cdigest := test.GetRandomImageConfig()
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/octet-stream").
SetBody(fhandle).
Patch(loc)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
// upload blob
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/octet-stream").
SetBody(cblob).
Patch(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
loc = test.Location(url, resp)
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
isErr = true
return
}
// finish upload
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", cdigest.String()).
Put(loc)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
// create a manifest
manifest := ispec.Manifest{
Versioned: imeta.Versioned{
SchemaVersion: defaultSchemaVersion,
},
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(size),
},
},
}
content, err := json.Marshal(manifest)
if err != nil {
log.Fatal(err)
}
resp, err = resty.R().
SetContentLength(true).
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).
Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, fmt.Sprintf("tag%d", count)))
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
isErr = true
return
}
}()
// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
return nil
@ -754,7 +577,9 @@ type testConfig struct {
name string
tfunc testFunc
// test-specific params
size int
size int
probabilityRange []float64
mixedSize, mixedType bool
}
var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this test
@ -792,6 +617,60 @@ var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this te
tfunc: PushChunkStreamed,
size: largeBlob,
},
{
name: "Pull 1MB",
tfunc: Pull,
size: smallBlob,
},
{
name: "Pull 10MB",
tfunc: Pull,
size: mediumBlob,
},
{
name: "Pull 100MB",
tfunc: Pull,
size: largeBlob,
},
{
name: "Pull Mixed 20% 1MB, 70% 10MB, 10% 100MB",
tfunc: Pull,
probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}),
mixedSize: true,
},
{
name: "Push Monolith Mixed 20% 1MB, 70% 10MB, 10% 100MB",
tfunc: PushMonolithStreamed,
probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}),
mixedSize: true,
},
{
name: "Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB",
tfunc: PushChunkStreamed,
probabilityRange: normalizeProbabilityRange([]float64{0.33, 0.33, 0.33}),
mixedSize: true,
},
{
name: "Pull 75% and Push 25% Mixed 1MB",
tfunc: MixedPullAndPush,
size: smallBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
{
name: "Pull 75% and Push 25% Mixed 10MB",
tfunc: MixedPullAndPush,
size: mediumBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
{
name: "Pull 75% and Push 25% Mixed 100MB",
tfunc: MixedPullAndPush,
size: largeBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
}
func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string) {
@ -812,6 +691,8 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
log.Printf("Working dir:\t%v", workdir)
log.Printf("\n")
zbError := false
for _, tconfig := range testSuite {
statsCh := make(chan statsRecord, requests)
@ -836,6 +717,14 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
summary.total = time.Since(start)
summary.rps = float32(requests) / float32(summary.total.Seconds())
if tconfig.mixedSize {
summary.mixedSize = true
}
if tconfig.mixedType {
summary.mixedType = true
}
for count := 0; count < requests; count++ {
record := <-statsCh
updateStats(&summary, record)
@ -844,16 +733,24 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
sort.Sort(Durations(summary.latencies))
printStats(requests, &summary, outFmt)
if summary.errors != 0 && !zbError {
zbError = true
}
}
if outFmt == cicdFmt {
jsonOut, err := json.Marshal(cicdSummary)
if err != nil {
log.Fatal(err) //nolint:gocritic // file closed on exit
log.Fatal(err) // nolint:gocritic // file closed on exit
}
if err := ioutil.WriteFile(fmt.Sprintf("%s.json", outFmt), jsonOut, defaultFilePerms); err != nil {
log.Fatal(err)
}
}
if zbError {
os.Exit(1)
}
}