diff --git a/.github/workflows/cluster.yaml b/.github/workflows/cluster.yaml index 846a0e6a..c4b89c53 100644 --- a/.github/workflows/cluster.yaml +++ b/.github/workflows/cluster.yaml @@ -136,7 +136,7 @@ jobs: ./bin/zot-linux-amd64 serve test/cluster/config-minio3.json & sleep 10 # run zb - bin/zb-linux-amd64 -c 10 -n 100 -o ci-cd http://localhost:8080 + bin/zb-linux-amd64 -c 10 -n 50 -o ci-cd http://localhost:8080 env: AWS_ACCESS_KEY_ID: minioadmin AWS_SECRET_ACCESS_KEY: minioadmin diff --git a/cmd/zb/helper.go b/cmd/zb/helper.go new file mode 100644 index 00000000..b66ea5a6 --- /dev/null +++ b/cmd/zb/helper.go @@ -0,0 +1,908 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "path" + "time" + + "github.com/google/uuid" + imeta "github.com/opencontainers/image-spec/specs-go" + ispec "github.com/opencontainers/image-spec/specs-go/v1" + "gopkg.in/resty.v1" + "zotregistry.io/zot/errors" + "zotregistry.io/zot/pkg/test" +) + +func deleteTestRepo(repos []string, url string, client *resty.Client) error { + for _, repo := range repos { + resp, err := client.R().Delete((fmt.Sprintf("%s/v2/%s/", url, repo))) + if err != nil { + return err + } + + // request specific check + statusCode := resp.StatusCode() + if statusCode != http.StatusAccepted { + return errors.ErrUnknownCode + } + } + + return nil +} + +func pullAndCollect(url string, repos []string, manifestItem manifestStruct, + config testConfig, client *resty.Client, statsCh chan statsRecord) []string { + manifestHash := manifestItem.manifestHash + manifestBySizeHash := manifestItem.manifestBySizeHash + + func() { + start := time.Now() + + var isConnFail, isErr bool + + var statusCode int + + var latency time.Duration + + defer func() { + // send a stats record + statsCh <- statsRecord{ + latency: latency, + statusCode: statusCode, + isConnFail: isConnFail, + isErr: isErr, + } + }() + + if config.mixedSize { + smallSizeIdx := 0 + mediumSizeIdx := 1 + largeSizeIdx := 2 + + idx := flipFunc(config.probabilityRange) + + switch idx { + case smallSizeIdx: + statusRequests["1MB"]++ + case mediumSizeIdx: + statusRequests["10MB"]++ + case largeSizeIdx: + statusRequests["100MB"]++ + } + + manifestHash = manifestBySizeHash[idx] + } + + for repo, manifestTag := range manifestHash { + manifestLoc := fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag) + + // check manifest + resp, err := client.R(). + SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + Head(manifestLoc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusOK { + isErr = true + + return + } + + // send request and get the manifest + resp, err = client.R(). + SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + Get(manifestLoc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusOK { + isErr = true + + return + } + + manifestBody := resp.Body() + + // file copy simulation + _, err = io.Copy(ioutil.Discard, bytes.NewReader(manifestBody)) + + latency = time.Since(start) + + if err != nil { + log.Fatal(err) + } + + var pulledManifest ispec.Manifest + + err = json.Unmarshal(manifestBody, &pulledManifest) + if err != nil { + log.Fatal(err) + } + + // check config + configDigest := pulledManifest.Config.Digest + configLoc := fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, configDigest) + resp, err = client.R().Head(configLoc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusOK { + isErr = true + + return + } + + // send request and get the config + resp, err = client.R().Get(configLoc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusOK { + isErr = true + + return + } + + configBody := resp.Body() + + // file copy simulation + _, err = io.Copy(ioutil.Discard, bytes.NewReader(configBody)) + + latency = time.Since(start) + + if err != nil { + log.Fatal(err) + } + + // download blobs + for _, layer := range pulledManifest.Layers { + blobDigest := layer.Digest + blobLoc := fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, blobDigest) + + // check blob + resp, err := client.R().Head(blobLoc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusOK { + isErr = true + + return + } + + // send request and get response the blob + resp, err = client.R().Get(blobLoc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusOK { + isErr = true + + return + } + + blobBody := resp.Body() + + // file copy simulation + _, err = io.Copy(ioutil.Discard, bytes.NewReader(blobBody)) + if err != nil { + log.Fatal(err) + } + } + } + }() + + return repos +} + +func pushMonolithImage(workdir, url, trepo string, repos []string, size int, + client *resty.Client) (map[string]string, []string, error) { + var statusCode int + + // key: repository name. value: manifest name + manifestHash := make(map[string]string) + + ruid, err := uuid.NewUUID() + if err != nil { + return nil, repos, err + } + + var repo string + + if trepo != "" { + repo = trepo + "/" + ruid.String() + } else { + repo = ruid.String() + } + + repos = append(repos, repo) + + // upload blob + resp, err := client.R().Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + if err != nil { + return nil, repos, err + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + return nil, repos, errors.ErrUnknownCode + } + + loc := test.Location(url, resp) + blob := path.Join(workdir, fmt.Sprintf("%d.blob", size)) + + fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms) + if err != nil { + return nil, repos, err + } + + defer fhandle.Close() + + // stream the entire blob + digest := blobHash[blob] + + resp, err = client.R(). + SetContentLength(true). + SetQueryParam("digest", digest.String()). + SetHeader("Content-Length", fmt.Sprintf("%d", size)). + SetHeader("Content-Type", "application/octet-stream").SetBody(fhandle).Put(loc) + + if err != nil { + return nil, repos, err + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + return nil, repos, errors.ErrUnknownCode + } + + // upload image config blob + resp, err = client.R(). + Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + + if err != nil { + return nil, repos, err + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + return nil, repos, errors.ErrUnknownCode + } + + loc = test.Location(url, resp) + cblob, cdigest := test.GetRandomImageConfig() + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", cdigest.String()). + SetBody(cblob). + Put(loc) + + if err != nil { + return nil, repos, err + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + return nil, repos, errors.ErrUnknownCode + } + + // create a manifest + manifest := ispec.Manifest{ + Versioned: imeta.Versioned{ + SchemaVersion: defaultSchemaVersion, + }, + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(size), + }, + }, + } + + content, err := json.MarshalIndent(&manifest, "", "\t") + if err != nil { + return nil, repos, err + } + + manifestTag := fmt.Sprintf("tag%d", size) + + // finish upload + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(content). + Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag)) + + if err != nil { + return nil, repos, err + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + return nil, repos, errors.ErrUnknownCode + } + + manifestHash[repo] = manifestTag + + return manifestHash, repos, nil +} + +func pushMonolithAndCollect(workdir, url, trepo string, count int, + repos []string, config testConfig, client *resty.Client, + statsCh chan statsRecord) []string { + func() { + start := time.Now() + + var isConnFail, isErr bool + + var statusCode int + + var latency time.Duration + + defer func() { + // send a stats record + statsCh <- statsRecord{ + latency: latency, + statusCode: statusCode, + isConnFail: isConnFail, + isErr: isErr, + } + }() + + ruid, err := uuid.NewUUID() + if err != nil { + log.Fatal(err) + } + + var repo string + + if trepo != "" { + repo = trepo + "/" + ruid.String() + } else { + repo = ruid.String() + } + + repos = append(repos, repo) + + // create a new upload + resp, err := client.R(). + Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + loc := test.Location(url, resp) + + var size int + + if config.mixedSize { + idx := flipFunc(config.probabilityRange) + smallSizeIdx := 0 + mediumSizeIdx := 1 + largeSizeIdx := 2 + + switch idx { + case smallSizeIdx: + size = smallBlob + statusRequests["1MB"]++ + case mediumSizeIdx: + size = mediumBlob + statusRequests["10MB"]++ + case largeSizeIdx: + size = largeBlob + statusRequests["100MB"]++ + default: + size = config.size + } + } else { + size = config.size + } + + blob := path.Join(workdir, fmt.Sprintf("%d.blob", size)) + + fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms) + if err != nil { + isConnFail = true + + return + } + + defer fhandle.Close() + + // stream the entire blob + digest := blobHash[blob] + + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Length", fmt.Sprintf("%d", size)). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", digest.String()). + SetBody(fhandle). + Put(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + + // upload image config blob + resp, err = client.R(). + Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + loc = test.Location(url, resp) + cblob, cdigest := test.GetRandomImageConfig() + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", cdigest.String()). + SetBody(cblob). + Put(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + + // create a manifest + manifest := ispec.Manifest{ + Versioned: imeta.Versioned{ + SchemaVersion: defaultSchemaVersion, + }, + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(size), + }, + }, + } + + content, err := json.MarshalIndent(&manifest, "", "\t") + if err != nil { + log.Fatal(err) + } + + manifestTag := fmt.Sprintf("tag%d", count) + + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(content). + Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + }() + + return repos +} + +func pushChunkAndCollect(workdir, url, trepo string, count int, + repos []string, config testConfig, client *resty.Client, + statsCh chan statsRecord) []string { + func() { + start := time.Now() + + var isConnFail, isErr bool + + var statusCode int + + var latency time.Duration + + defer func() { + // send a stats record + statsCh <- statsRecord{ + latency: latency, + statusCode: statusCode, + isConnFail: isConnFail, + isErr: isErr, + } + }() + + ruid, err := uuid.NewUUID() + if err != nil { + log.Fatal(err) + } + + var repo string + + if trepo != "" { + repo = trepo + "/" + ruid.String() + } else { + repo = ruid.String() + } + + repos = append(repos, repo) + + // create a new upload + resp, err := client.R(). + Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + loc := test.Location(url, resp) + + var size int + + if config.mixedSize { + idx := flipFunc(config.probabilityRange) + smallSizeIdx := 0 + mediumSizeIdx := 1 + largeSizeIdx := 2 + + switch idx { + case smallSizeIdx: + size = smallBlob + statusRequests["1MB"]++ + case mediumSizeIdx: + size = mediumBlob + statusRequests["10MB"]++ + case largeSizeIdx: + size = largeBlob + statusRequests["100MB"]++ + + default: + size = config.size + } + } else { + size = config.size + } + + blob := path.Join(workdir, fmt.Sprintf("%d.blob", size)) + + fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms) + if err != nil { + isConnFail = true + + return + } + + defer fhandle.Close() + + digest := blobHash[blob] + + // upload blob + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Type", "application/octet-stream"). + SetBody(fhandle). + Patch(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + loc = test.Location(url, resp) + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + // finish upload + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Length", fmt.Sprintf("%d", size)). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", digest.String()). + Put(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + + // upload image config blob + resp, err = client.R(). + Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + loc = test.Location(url, resp) + cblob, cdigest := test.GetRandomImageConfig() + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Type", "application/octet-stream"). + SetBody(fhandle). + Patch(loc) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + // upload blob + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Type", "application/octet-stream"). + SetBody(cblob). + Patch(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + loc = test.Location(url, resp) + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + // finish upload + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", cdigest.String()). + Put(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + + // create a manifest + manifest := ispec.Manifest{ + Versioned: imeta.Versioned{ + SchemaVersion: defaultSchemaVersion, + }, + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(size), + }, + }, + } + + content, err := json.Marshal(manifest) + if err != nil { + log.Fatal(err) + } + + manifestTag := fmt.Sprintf("tag%d", count) + + // finish upload + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(content). + Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, manifestTag)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + }() + + return repos +} diff --git a/cmd/zb/main.go b/cmd/zb/main.go index 651004f1..9dee562b 100644 --- a/cmd/zb/main.go +++ b/cmd/zb/main.go @@ -63,7 +63,7 @@ func NewPerfRootCmd() *cobra.Command { "Output format of test results: stdout (default), json, ci-cd") // "version" - rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "show the version and exit") + rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show the version and exit") return rootCmd } diff --git a/cmd/zb/main_test.go b/cmd/zb/main_test.go index 4507fccb..b0224066 100644 --- a/cmd/zb/main_test.go +++ b/cmd/zb/main_test.go @@ -1,4 +1,4 @@ -package main //nolint:testpackage // separate binary +package main // nolint:testpackage // separate binary import ( "testing" diff --git a/cmd/zb/perf.go b/cmd/zb/perf.go index 5aee153f..21db825b 100644 --- a/cmd/zb/perf.go +++ b/cmd/zb/perf.go @@ -1,12 +1,14 @@ package main import ( - "crypto/rand" - "encoding/json" + crand "crypto/rand" + "crypto/tls" "fmt" "io/ioutil" "log" + mrand "math/rand" "net/http" + urlparser "net/url" "os" "path" "sort" @@ -15,13 +17,9 @@ import ( "text/tabwriter" "time" - "github.com/google/uuid" jsoniter "github.com/json-iterator/go" godigest "github.com/opencontainers/go-digest" - imeta "github.com/opencontainers/image-spec/specs-go" - ispec "github.com/opencontainers/image-spec/specs-go/v1" "gopkg.in/resty.v1" - "zotregistry.io/zot/pkg/test" ) const ( @@ -36,11 +34,15 @@ const ( mediumBlob = 10 * MiB largeBlob = 100 * MiB cicdFmt = "ci-cd" + secureProtocol = "https" ) -//nolint:gochecknoglobals // used only in this test +// nolint:gochecknoglobals // used only in this test var blobHash map[string]godigest.Digest = map[string]godigest.Digest{} +// nolint:gochecknoglobals // used only in this test +var statusRequests map[string]int + func setup(workingDir string) { _ = os.MkdirAll(workingDir, defaultDirPerms) @@ -68,7 +70,7 @@ func setup(workingDir string) { // write a random first page so every test run has different blob content rnd := make([]byte, rndPageSize) - if _, err := rand.Read(rnd); err != nil { + if _, err := crand.Read(rnd); err != nil { log.Fatal(err) } @@ -92,7 +94,7 @@ func setup(workingDir string) { digest, err := godigest.FromReader(fhandle) if err != nil { - log.Fatal(err) //nolint:gocritic // file closed on exit + log.Fatal(err) // nolint:gocritic // file closed on exit } blobHash[fname] = digest @@ -112,12 +114,13 @@ func (a Durations) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a Durations) Less(i, j int) bool { return a[i] < a[j] } type statsSummary struct { - latencies []time.Duration - name string - min, max, total time.Duration - rps float32 - statusHist map[string]int - errors int + latencies []time.Duration + name string + min, max, total time.Duration + statusHist map[string]int + rps float32 + mixedSize, mixedType bool + errors int } func newStatsSummary(name string) statsSummary { @@ -126,6 +129,8 @@ func newStatsSummary(name string) statsSummary { min: -1, max: -1, statusHist: make(map[string]int), + mixedSize: false, + mixedType: false, } return summary @@ -185,7 +190,12 @@ type cicdTestSummary struct { Range string `json:"range,omitempty"` } -//nolint:gochecknoglobals // used only in this test +type manifestStruct struct { + manifestHash map[string]string + manifestBySizeHash map[int](map[string]string) +} + +// nolint:gochecknoglobals // used only in this test var cicdSummary = []cicdTestSummary{} func printStats(requests int, summary *statsSummary, outFmt string) { @@ -197,6 +207,19 @@ func printStats(requests int, summary *statsSummary, outFmt string) { log.Printf("Requests per second:\t%v", summary.rps) log.Printf("\n") + if summary.mixedSize { + log.Printf("1MB:\t%v", statusRequests["1MB"]) + log.Printf("10MB:\t%v", statusRequests["10MB"]) + log.Printf("100MB:\t%v", statusRequests["100MB"]) + log.Printf("\n") + } + + if summary.mixedType { + log.Printf("Pull:\t%v", statusRequests["Pull"]) + log.Printf("Push:\t%v", statusRequests["Push"]) + log.Printf("\n") + } + for k, v := range summary.statusHist { log.Printf("%s responses:\t%v", k, v) } @@ -223,6 +246,40 @@ func printStats(requests int, summary *statsSummary, outFmt string) { } } +// nolint:gosec +func flipFunc(probabilityRange []float64) int { + mrand.Seed(time.Now().UTC().UnixNano()) + toss := mrand.Float64() + + for idx, r := range probabilityRange { + if toss < r { + return idx + } + } + + return len(probabilityRange) - 1 +} + +// pbty - probabilities. +func normalizeProbabilityRange(pbty []float64) []float64 { + dim := len(pbty) + + // npd - normalized probability density + npd := make([]float64, dim) + + for idx := range pbty { + npd[idx] = 0.0 + } + + // [0.2, 0.7, 0.1] -> [0.2, 0.9, 1] + npd[0] = pbty[0] + for i := 1; i < dim; i++ { + npd[i] = npd[i-1] + pbty[i] + } + + return npd +} + // test suites/funcs. type testFunc func(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error @@ -235,6 +292,16 @@ func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig client.SetBasicAuth(creds[0], creds[1]) } + parsedURL, err := urlparser.Parse(url) + if err != nil { + log.Fatal(err) + } + + // nolint: gosec + if parsedURL.Scheme == secureProtocol { + client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + } + for count := 0; count < requests; count++ { func() { start := time.Now() @@ -280,7 +347,8 @@ func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig } func PushMonolithStreamed(workdir, url, auth, trepo string, requests int, - config testConfig, statsCh chan statsRecord) error { + config testConfig, statsCh chan statsRecord, +) error { client := resty.New() if auth != "" { @@ -288,199 +356,39 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int, client.SetBasicAuth(creds[0], creds[1]) } + parsedURL, err := urlparser.Parse(url) + if err != nil { + log.Fatal(err) + } + + // nolint: gosec + if parsedURL.Scheme == secureProtocol { + client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + } + + var repos []string + + if config.mixedSize { + statusRequests = make(map[string]int) + } + for count := 0; count < requests; count++ { - func() { - start := time.Now() + repos = pushMonolithAndCollect(workdir, url, trepo, count, + repos, config, client, statsCh) + } - var isConnFail, isErr bool - - var statusCode int - - var latency time.Duration - - defer func() { - // send a stats record - statsCh <- statsRecord{ - latency: latency, - statusCode: statusCode, - isConnFail: isConnFail, - isErr: isErr, - } - }() - - ruid, err := uuid.NewUUID() - if err != nil { - log.Fatal(err) - } - - var repo string - - if trepo != "" { - repo = trepo + "/" + ruid.String() - } else { - repo = ruid.String() - } - - // create a new upload - resp, err := resty.R(). - Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusAccepted { - isErr = true - - return - } - - loc := test.Location(url, resp) - - size := config.size - blob := path.Join(workdir, fmt.Sprintf("%d.blob", size)) - - fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms) - if err != nil { - isConnFail = true - - return - } - - defer fhandle.Close() - - // stream the entire blob - digest := blobHash[blob] - - resp, err = client.R(). - SetContentLength(true). - SetHeader("Content-Length", fmt.Sprintf("%d", size)). - SetHeader("Content-Type", "application/octet-stream"). - SetQueryParam("digest", digest.String()). - SetBody(fhandle). - Put(loc) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusCreated { - isErr = true - - return - } - - // upload image config blob - resp, err = resty.R(). - Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusAccepted { - isErr = true - - return - } - - loc = test.Location(url, resp) - cblob, cdigest := test.GetRandomImageConfig() - resp, err = client.R(). - SetContentLength(true). - SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))). - SetHeader("Content-Type", "application/octet-stream"). - SetQueryParam("digest", cdigest.String()). - SetBody(cblob). - Put(loc) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusCreated { - isErr = true - - return - } - - // create a manifest - manifest := ispec.Manifest{ - Versioned: imeta.Versioned{ - SchemaVersion: defaultSchemaVersion, - }, - Config: ispec.Descriptor{ - MediaType: "application/vnd.oci.image.config.v1+json", - Digest: cdigest, - Size: int64(len(cblob)), - }, - Layers: []ispec.Descriptor{ - { - MediaType: "application/vnd.oci.image.layer.v1.tar", - Digest: digest, - Size: int64(size), - }, - }, - } - - content, err := json.MarshalIndent(&manifest, "", "\t") - if err != nil { - log.Fatal(err) - } - - resp, err = resty.R(). - SetContentLength(true). - SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). - SetBody(content). - Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, fmt.Sprintf("tag%d", count))) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusCreated { - isErr = true - - return - } - }() + // clean up + err = deleteTestRepo(repos, url, client) + if err != nil { + return err } return nil } func PushChunkStreamed(workdir, url, auth, trepo string, requests int, - config testConfig, statsCh chan statsRecord) error { + config testConfig, statsCh chan statsRecord, +) error { client := resty.New() if auth != "" { @@ -488,261 +396,176 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int, client.SetBasicAuth(creds[0], creds[1]) } + parsedURL, err := urlparser.Parse(url) + if err != nil { + log.Fatal(err) + } + + // nolint: gosec + if parsedURL.Scheme == secureProtocol { + client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + } + + var repos []string + + if config.mixedSize { + statusRequests = make(map[string]int) + } + for count := 0; count < requests; count++ { - func() { - start := time.Now() + repos = pushChunkAndCollect(workdir, url, trepo, count, + repos, config, client, statsCh) + } - var isConnFail, isErr bool + // clean up + err = deleteTestRepo(repos, url, client) + if err != nil { + return err + } - var statusCode int + return nil +} - var latency time.Duration +func Pull(workdir, url, auth, trepo string, requests int, + config testConfig, statsCh chan statsRecord, +) error { + client := resty.New() - defer func() { - // send a stats record - statsCh <- statsRecord{ - latency: latency, - statusCode: statusCode, - isConnFail: isConnFail, - isErr: isErr, - } - }() + if auth != "" { + creds := strings.Split(auth, ":") + client.SetBasicAuth(creds[0], creds[1]) + } - ruid, err := uuid.NewUUID() - if err != nil { - log.Fatal(err) - } + parsedURL, err := urlparser.Parse(url) + if err != nil { + log.Fatal(err) + } - var repo string + // nolint: gosec + if parsedURL.Scheme == secureProtocol { + client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + } - if trepo != "" { - repo = trepo + "/" + ruid.String() - } else { - repo = ruid.String() - } + var repos []string - // create a new upload - resp, err := resty.R(). - Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + var manifestHash map[string]string - latency = time.Since(start) + manifestBySizeHash := make(map[int](map[string]string)) - if err != nil { - isConnFail = true + if config.mixedSize { + statusRequests = make(map[string]int) + } - return - } + if config.mixedSize { + var manifestBySize map[string]string - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusAccepted { - isErr = true + smallSizeIdx := 0 + mediumSizeIdx := 1 + largeSizeIdx := 2 - return - } + // Push small blob + manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, smallBlob, client) + if err != nil { + return err + } - loc := test.Location(url, resp) + manifestBySizeHash[smallSizeIdx] = manifestBySize - size := config.size - blob := path.Join(workdir, fmt.Sprintf("%d.blob", size)) + // Push medium blob + manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, mediumBlob, client) + if err != nil { + return err + } - fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms) - if err != nil { - isConnFail = true + manifestBySizeHash[mediumSizeIdx] = manifestBySize - return - } + // Push large blob + manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, largeBlob, client) + if err != nil { + return err + } - defer fhandle.Close() + manifestBySizeHash[largeSizeIdx] = manifestBySize + } else { + // Push blob given size + manifestHash, repos, err = pushMonolithImage(workdir, url, trepo, repos, config.size, client) + if err != nil { + return err + } + } - digest := blobHash[blob] + manifestItem := manifestStruct{ + manifestHash: manifestHash, + manifestBySizeHash: manifestBySizeHash, + } - // upload blob - resp, err = client.R(). - SetContentLength(true). - SetHeader("Content-Type", "application/octet-stream"). - SetBody(fhandle). - Patch(loc) + // download image + for count := 0; count < requests; count++ { + repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh) + } - latency = time.Since(start) + // clean up + err = deleteTestRepo(repos, url, client) + if err != nil { + return err + } - if err != nil { - isConnFail = true + return nil +} - return - } +func MixedPullAndPush(workdir, url, auth, trepo string, requests int, + config testConfig, statsCh chan statsRecord, +) error { + client := resty.New() - loc = test.Location(url, resp) + if auth != "" { + creds := strings.Split(auth, ":") + client.SetBasicAuth(creds[0], creds[1]) + } - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusAccepted { - isErr = true + parsedURL, err := urlparser.Parse(url) + if err != nil { + log.Fatal(err) + } - return - } + // nolint: gosec + if parsedURL.Scheme == secureProtocol { + client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + } - // finish upload - resp, err = client.R(). - SetContentLength(true). - SetHeader("Content-Length", fmt.Sprintf("%d", size)). - SetHeader("Content-Type", "application/octet-stream"). - SetQueryParam("digest", digest.String()). - Put(loc) + var repos []string - latency = time.Since(start) + statusRequests = make(map[string]int) - if err != nil { - isConnFail = true + // Push blob given size + manifestHash, repos, err := pushMonolithImage(workdir, url, trepo, repos, config.size, client) + if err != nil { + return err + } - return - } + manifestItem := manifestStruct{ + manifestHash: manifestHash, + } - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusCreated { - isErr = true + for count := 0; count < requests; count++ { + idx := flipFunc(config.probabilityRange) - return - } + readTestIdx := 0 + writeTestIdx := 1 - // upload image config blob - resp, err = resty.R(). - Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + if idx == readTestIdx { + repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh) + statusRequests["Pull"]++ + } else if idx == writeTestIdx { + repos = pushMonolithAndCollect(workdir, url, trepo, count, repos, config, client, statsCh) + statusRequests["Push"]++ + } + } - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusAccepted { - isErr = true - - return - } - - loc = test.Location(url, resp) - cblob, cdigest := test.GetRandomImageConfig() - resp, err = client.R(). - SetContentLength(true). - SetHeader("Content-Type", "application/octet-stream"). - SetBody(fhandle). - Patch(loc) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusAccepted { - isErr = true - - return - } - - // upload blob - resp, err = client.R(). - SetContentLength(true). - SetHeader("Content-Type", "application/octet-stream"). - SetBody(cblob). - Patch(loc) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - loc = test.Location(url, resp) - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusAccepted { - isErr = true - - return - } - - // finish upload - resp, err = client.R(). - SetContentLength(true). - SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))). - SetHeader("Content-Type", "application/octet-stream"). - SetQueryParam("digest", cdigest.String()). - Put(loc) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusCreated { - isErr = true - - return - } - - // create a manifest - manifest := ispec.Manifest{ - Versioned: imeta.Versioned{ - SchemaVersion: defaultSchemaVersion, - }, - Config: ispec.Descriptor{ - MediaType: "application/vnd.oci.image.config.v1+json", - Digest: cdigest, - Size: int64(len(cblob)), - }, - Layers: []ispec.Descriptor{ - { - MediaType: "application/vnd.oci.image.layer.v1.tar", - Digest: digest, - Size: int64(size), - }, - }, - } - - content, err := json.Marshal(manifest) - if err != nil { - log.Fatal(err) - } - - resp, err = resty.R(). - SetContentLength(true). - SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). - SetBody(content). - Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, fmt.Sprintf("tag%d", count))) - - latency = time.Since(start) - - if err != nil { - isConnFail = true - - return - } - - // request specific check - statusCode = resp.StatusCode() - if statusCode != http.StatusCreated { - isErr = true - - return - } - }() + // clean up + err = deleteTestRepo(repos, url, client) + if err != nil { + return err } return nil @@ -754,7 +577,9 @@ type testConfig struct { name string tfunc testFunc // test-specific params - size int + size int + probabilityRange []float64 + mixedSize, mixedType bool } var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this test @@ -792,6 +617,60 @@ var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this te tfunc: PushChunkStreamed, size: largeBlob, }, + { + name: "Pull 1MB", + tfunc: Pull, + size: smallBlob, + }, + { + name: "Pull 10MB", + tfunc: Pull, + size: mediumBlob, + }, + { + name: "Pull 100MB", + tfunc: Pull, + size: largeBlob, + }, + { + name: "Pull Mixed 20% 1MB, 70% 10MB, 10% 100MB", + tfunc: Pull, + probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}), + mixedSize: true, + }, + { + name: "Push Monolith Mixed 20% 1MB, 70% 10MB, 10% 100MB", + tfunc: PushMonolithStreamed, + probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}), + mixedSize: true, + }, + { + name: "Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB", + tfunc: PushChunkStreamed, + probabilityRange: normalizeProbabilityRange([]float64{0.33, 0.33, 0.33}), + mixedSize: true, + }, + { + name: "Pull 75% and Push 25% Mixed 1MB", + tfunc: MixedPullAndPush, + size: smallBlob, + mixedType: true, + probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}), + }, + { + name: "Pull 75% and Push 25% Mixed 10MB", + tfunc: MixedPullAndPush, + size: mediumBlob, + mixedType: true, + probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}), + }, + { + name: "Pull 75% and Push 25% Mixed 100MB", + tfunc: MixedPullAndPush, + size: largeBlob, + mixedType: true, + probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}), + }, } func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string) { @@ -812,6 +691,8 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt log.Printf("Working dir:\t%v", workdir) log.Printf("\n") + zbError := false + for _, tconfig := range testSuite { statsCh := make(chan statsRecord, requests) @@ -836,6 +717,14 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt summary.total = time.Since(start) summary.rps = float32(requests) / float32(summary.total.Seconds()) + if tconfig.mixedSize { + summary.mixedSize = true + } + + if tconfig.mixedType { + summary.mixedType = true + } + for count := 0; count < requests; count++ { record := <-statsCh updateStats(&summary, record) @@ -844,16 +733,24 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt sort.Sort(Durations(summary.latencies)) printStats(requests, &summary, outFmt) + + if summary.errors != 0 && !zbError { + zbError = true + } } if outFmt == cicdFmt { jsonOut, err := json.Marshal(cicdSummary) if err != nil { - log.Fatal(err) //nolint:gocritic // file closed on exit + log.Fatal(err) // nolint:gocritic // file closed on exit } if err := ioutil.WriteFile(fmt.Sprintf("%s.json", outFmt), jsonOut, defaultFilePerms); err != nil { log.Fatal(err) } } + + if zbError { + os.Exit(1) + } }