From 72da8303c5a2e89c57c205f8d81f6e70c6e3e3ff Mon Sep 17 00:00:00 2001 From: Ramkumar Chinchani Date: Fri, 10 Dec 2021 22:27:40 +0000 Subject: [PATCH] perf: add a 'zb' binary for perf testing of dist-spec registries 'make bench' produces a bin/zb binary bin/zb --help Signed-off-by: Ramkumar Chinchani --- Makefile | 14 +- cmd/zb/main.go | 75 +++++ cmd/zb/main_test.go | 22 ++ cmd/zb/perf.go | 696 ++++++++++++++++++++++++++++++++++++++++++++ codecov.yml | 3 +- 5 files changed, 806 insertions(+), 4 deletions(-) create mode 100644 cmd/zb/main.go create mode 100644 cmd/zb/main_test.go create mode 100644 cmd/zb/perf.go diff --git a/Makefile b/Makefile index 950010dd..20302685 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ OS ?= linux ARCH ?= amd64 .PHONY: all -all: swagger binary binary-minimal binary-debug binary-arch binary-arch-minimal cli cli-arch exporter-minimal verify-config test test-clean check +all: swagger binary binary-minimal binary-debug binary-arch binary-arch-minimal cli cli-arch bench bench-arch exporter-minimal verify-config test test-clean check .PHONY: binary-minimal binary-minimal: swagger @@ -41,9 +41,17 @@ cli: env CGO_ENABLED=0 GOOS=$(OS) GOARCH=$(ARCH) go build -o bin/zli -tags extended,containers_image_openpgp -v -trimpath -ldflags "-X zotregistry.io/zot/pkg/api/config.Commit=${COMMIT} -X zotregistry.io/zot/pkg/api/config.BinaryType=extended -X zotregistry.io/zot/pkg/api/config.GoVersion=${GO_VERSION} -s -w" ./cmd/zli .PHONY: cli-arch -cli-arch: swagger +cli-arch: env CGO_ENABLED=0 GOOS=$(OS) GOARCH=$(ARCH) go build -o bin/zli-$(ARCH) -tags extended,containers_image_openpgp -v -trimpath -ldflags "-X zotregistry.io/zot/pkg/api/config.Commit=${COMMIT} -X zotregistry.io/zot/pkg/api/config.BinaryType=extended -X zotregistry.io/zot/pkg/api/config.GoVersion=${GO_VERSION} -s -w" ./cmd/zli +.PHONY: bench +bench: + env CGO_ENABLED=0 GOOS=$(OS) GOARCH=$(ARCH) go build -o bin/zb -tags extended,containers_image_openpgp -v -trimpath -ldflags "-X zotregistry.io/zot/pkg/api/config.Commit=${COMMIT} -X zotregistry.io/zot/pkg/api/config.BinaryType=extended -X zotregistry.io/zot/pkg/api/config.GoVersion=${GO_VERSION} -s -w" ./cmd/zb + +.PHONY: bench-arch +bench-arch: + env CGO_ENABLED=0 GOOS=$(OS) GOARCH=$(ARCH) go build -o bin/zb-$(ARCH) -tags extended,containers_image_openpgp -v -trimpath -ldflags "-X zotregistry.io/zot/pkg/api/config.Commit=${COMMIT} -X zotregistry.io/zot/pkg/api/config.BinaryType=extended -X zotregistry.io/zot/pkg/api/config.GoVersion=${GO_VERSION} -s -w" ./cmd/zb + .PHONY: exporter-minimal exporter-minimal: swagger env CGO_ENABLED=0 GOOS=$(OS) GOARCH=$(ARCH) go build -o bin/zot-exporter -tags minimal,containers_image_openpgp -v -trimpath ./cmd/exporter @@ -100,7 +108,7 @@ update-licenses: .PHONY: clean clean: - rm -f bin/zot* + rm -f bin/z* rm -rf hack .PHONY: run diff --git a/cmd/zb/main.go b/cmd/zb/main.go new file mode 100644 index 00000000..d97d75ac --- /dev/null +++ b/cmd/zb/main.go @@ -0,0 +1,75 @@ +package main + +import ( + "os" + + distspec "github.com/opencontainers/distribution-spec/specs-go" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "zotregistry.io/zot/pkg/api/config" +) + +// "zb" - performance benchmark and stress. +func NewPerfRootCmd() *cobra.Command { + showVersion := false + + var auth, workdir, repo, output string + + var concurrency, requests int + + rootCmd := &cobra.Command{ + Use: "zb [options] ", + Short: "`zb`", + Long: "`zb`", + Run: func(cmd *cobra.Command, args []string) { + if showVersion { + log.Info().Str("distribution-spec", distspec.Version).Str("commit", config.Commit). + Str("binary-type", config.BinaryType).Str("go version", config.GoVersion).Msg("version") + } + + if len(args) == 0 { + _ = cmd.Usage() + cmd.SilenceErrors = false + + return + } + + url := "" + if len(args) > 0 { + url = args[0] + } + + if requests < concurrency { + panic("requests cannot be less than concurrency") + } + + requests = concurrency * (requests / concurrency) + + Perf(workdir, url, auth, repo, concurrency, requests) + }, + } + + rootCmd.Flags().StringVarP(&auth, "auth-creds", "A", "", + "Use colon-separated BASIC auth creds") + rootCmd.Flags().StringVarP(&workdir, "working-dir", "d", "", + "Use specified directory to store test data") + rootCmd.Flags().StringVarP(&repo, "repo", "r", "", + "Use specified repo on remote registry for test data") + rootCmd.Flags().IntVarP(&concurrency, "concurrency", "c", 1, + "Number of multiple requests to make at a time") + rootCmd.Flags().IntVarP(&requests, "requests", "n", 1, + "Number of requests to perform") + rootCmd.Flags().StringVarP(&output, "output-format", "o", "", + "Output format of test results [default: stdout]") + + // "version" + rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "show the version and exit") + + return rootCmd +} + +func main() { + if err := NewPerfRootCmd().Execute(); err != nil { + os.Exit(1) + } +} diff --git a/cmd/zb/main_test.go b/cmd/zb/main_test.go new file mode 100644 index 00000000..4507fccb --- /dev/null +++ b/cmd/zb/main_test.go @@ -0,0 +1,22 @@ +package main //nolint:testpackage // separate binary + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "zotregistry.io/zot/pkg/api" + "zotregistry.io/zot/pkg/api/config" +) + +func TestIntegration(t *testing.T) { + Convey("Make a new controller", t, func() { + conf := config.New() + c := api.NewController(conf) + So(c, ShouldNotBeNil) + + cl := NewPerfRootCmd() + So(cl, ShouldNotBeNil) + + So(cl.Execute(), ShouldBeNil) + }) +} diff --git a/cmd/zb/perf.go b/cmd/zb/perf.go new file mode 100644 index 00000000..82aa55fd --- /dev/null +++ b/cmd/zb/perf.go @@ -0,0 +1,696 @@ +package main + +import ( + "crypto/rand" + "encoding/json" + "fmt" + "log" + "net/http" + "net/url" + "os" + "path" + "sort" + "strings" + "sync" + "text/tabwriter" + "time" + + "github.com/google/uuid" + godigest "github.com/opencontainers/go-digest" + imeta "github.com/opencontainers/image-spec/specs-go" + ispec "github.com/opencontainers/image-spec/specs-go/v1" + "gopkg.in/resty.v1" +) + +const ( + KiB = 1 * 1024 + MiB = 1 * KiB * 1024 + GiB = 1 * MiB * 1024 + maxSize = 1 * GiB // 1GiB + defaultDirPerms = 0o700 + defaultFilePerms = 0o600 + defaultSchemaVersion = 2 + smallBlob = 1 * MiB + mediumBlob = 10 * MiB + largeBlob = 100 * MiB +) + +// helper routines + +func location(baseURL string, resp *resty.Response) string { + // For some API responses, the Location header is set and is supposed to + // indicate an opaque value. However, it is not clear if this value is an + // absolute URL (https://server:port/v2/...) or just a path (/v2/...) + // zot implements the latter as per the spec, but some registries appear to + // return the former - this needs to be clarified + loc := resp.Header().Get("Location") + + uloc, err := url.Parse(loc) + if err != nil { + return "" + } + + path := uloc.Path + if query := uloc.RawQuery; query != "" { + path += "?" + query + } + + return baseURL + path +} + +//nolint:gochecknoglobals // used only in this test +var blobHash map[string]godigest.Digest = map[string]godigest.Digest{} + +func setup(workingDir string) { + _ = os.MkdirAll(workingDir, defaultDirPerms) + + const multiplier = 10 + + const rndPageSize = 4 * KiB + + for size := 1 * MiB; size < maxSize; size *= multiplier { + fname := path.Join(workingDir, fmt.Sprintf("%d.blob", size)) + + fhandle, err := os.OpenFile(fname, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultFilePerms) + if err != nil { + log.Fatal(err) + } + + err = fhandle.Truncate(int64(size)) + if err != nil { + log.Fatal(err) + } + + _, err = fhandle.Seek(0, 0) + if err != nil { + log.Fatal(err) + } + + // write a random first page so every test run has different blob content + rnd := make([]byte, rndPageSize) + if _, err := rand.Read(rnd); err != nil { + log.Fatal(err) + } + + if _, err := fhandle.Write(rnd); err != nil { + log.Fatal(err) + } + + if _, err := fhandle.Seek(0, 0); err != nil { + log.Fatal(err) + } + + fhandle.Close() // should flush the write + + // pre-compute the SHA256 + fhandle, err = os.OpenFile(fname, os.O_RDONLY, defaultFilePerms) + if err != nil { + log.Fatal(err) + } + + defer fhandle.Close() + + digest, err := godigest.FromReader(fhandle) + if err != nil { + log.Fatal(err) //nolint:gocritic // file closed on exit + } + + blobHash[fname] = digest + } +} + +func teardown(workingDir string) { + _ = os.RemoveAll(workingDir) +} + +// statistics handling. + +type Durations []time.Duration + +func (a Durations) Len() int { return len(a) } +func (a Durations) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a Durations) Less(i, j int) bool { return a[i] < a[j] } + +type statsSummary struct { + latencies []time.Duration + name string + min, max, total time.Duration + rps float32 + statusHist map[string]int + errors int +} + +func newStatsSummary(name string) statsSummary { + summary := statsSummary{ + name: name, + min: -1, + max: -1, + statusHist: make(map[string]int), + } + + return summary +} + +type statsRecord struct { + latency time.Duration + statusCode int + isConnFail bool + isErr bool +} + +func updateStats(summary *statsSummary, record statsRecord) { + if record.isConnFail || record.isErr { + summary.errors++ + } + + if summary.min < 0 || record.latency < summary.min { + summary.min = record.latency + } + + if summary.max < 0 || record.latency > summary.max { + summary.max = record.latency + } + + // 2xx + if record.statusCode >= http.StatusOK && + record.statusCode <= http.StatusAccepted { + summary.statusHist["2xx"]++ + } + + // 3xx + if record.statusCode >= http.StatusMultipleChoices && + record.statusCode <= http.StatusPermanentRedirect { + summary.statusHist["3xx"]++ + } + + // 4xx + if record.statusCode >= http.StatusBadRequest && + record.statusCode <= http.StatusUnavailableForLegalReasons { + summary.statusHist["4xx"]++ + } + + // 5xx + if record.statusCode >= http.StatusInternalServerError && + record.statusCode <= http.StatusNetworkAuthenticationRequired { + summary.statusHist["5xx"]++ + } + + summary.latencies = append(summary.latencies, record.latency) +} + +func printStats(requests int, summary *statsSummary) { + log.Printf("============\n") + log.Printf("Test name:\t%s", summary.name) + log.Printf("Time taken for tests:\t%v", summary.total) + log.Printf("Complete requests:\t%v", requests-summary.errors) + log.Printf("Failed requests:\t%v", summary.errors) + log.Printf("Requests per second:\t%v", summary.rps) + log.Printf("\n") + + for k, v := range summary.statusHist { + log.Printf("%s responses:\t%v", k, v) + } + + log.Printf("\n") + log.Printf("min: %v", summary.min) + log.Printf("max: %v", summary.max) + log.Printf("%s:\t%v", "p50", summary.latencies[requests/2]) + log.Printf("%s:\t%v", "p75", summary.latencies[requests*3/4]) + log.Printf("%s:\t%v", "p90", summary.latencies[requests*9/10]) + log.Printf("%s:\t%v", "p99", summary.latencies[requests*99/100]) + log.Printf("\n") +} + +// test suites/funcs. + +type testFunc func(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error + +func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error { + client := resty.New() + + if auth != "" { + creds := strings.Split(auth, ":") + client.SetBasicAuth(creds[0], creds[1]) + } + + for count := 0; count < requests; count++ { + func() { + start := time.Now() + + var isConnFail, isErr bool + + var statusCode int + + var latency time.Duration + + defer func() { + // send a stats record + statsCh <- statsRecord{ + latency: latency, + statusCode: statusCode, + isConnFail: isConnFail, + isErr: isErr, + } + }() + + // send request and get response + resp, err := client.R().Get(url + "/v2/_catalog") + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusOK { + isErr = true + + return + } + }() + } + + return nil +} + +func PushMonolithStreamed(workdir, url, auth, trepo string, requests int, + config testConfig, statsCh chan statsRecord) error { + client := resty.New() + + if auth != "" { + creds := strings.Split(auth, ":") + client.SetBasicAuth(creds[0], creds[1]) + } + + for count := 0; count < requests; count++ { + func() { + start := time.Now() + + var isConnFail, isErr bool + + var statusCode int + + var latency time.Duration + + defer func() { + // send a stats record + statsCh <- statsRecord{ + latency: latency, + statusCode: statusCode, + isConnFail: isConnFail, + isErr: isErr, + } + }() + + ruid, err := uuid.NewUUID() + if err != nil { + log.Fatal(err) + } + + var repo string + + if trepo != "" { + repo = trepo + "/" + ruid.String() + } else { + repo = ruid.String() + } + + // create a new upload + resp, err := resty.R().Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + loc := location(url, resp) + + size := config.size + blob := path.Join(workdir, fmt.Sprintf("%d.blob", size)) + + fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms) + if err != nil { + isConnFail = true + + return + } + + defer fhandle.Close() + + // stream the entire blob + digest := blobHash[blob] + + resp, err = client.R(). + SetContentLength(true). + SetQueryParam("digest", digest.String()). + SetHeader("Content-Length", fmt.Sprintf("%d", size)). + SetHeader("Content-Type", "application/octet-stream").SetBody(fhandle).Put(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + + // create a manifest + manifest := ispec.Manifest{ + Versioned: imeta.Versioned{ + SchemaVersion: defaultSchemaVersion, + }, + Config: ispec.Descriptor{ + Digest: digest, + Size: int64(size), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(size), + }, + }, + } + + content, err := json.Marshal(manifest) + if err != nil { + log.Fatal(err) + } + + digest = godigest.FromBytes(content) + + resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(content).Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, digest.String())) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + }() + } + + return nil +} + +func PushChunkStreamed(workdir, url, auth, trepo string, requests int, + config testConfig, statsCh chan statsRecord) error { + client := resty.New() + + if auth != "" { + creds := strings.Split(auth, ":") + client.SetBasicAuth(creds[0], creds[1]) + } + + for count := 0; count < requests; count++ { + func() { + start := time.Now() + + var isConnFail, isErr bool + + var statusCode int + + var latency time.Duration + + defer func() { + // send a stats record + statsCh <- statsRecord{ + latency: latency, + statusCode: statusCode, + isConnFail: isConnFail, + isErr: isErr, + } + }() + + ruid, err := uuid.NewUUID() + if err != nil { + log.Fatal(err) + } + + var repo string + + if trepo != "" { + repo = trepo + "/" + ruid.String() + } else { + repo = ruid.String() + } + + // create a new upload + resp, err := resty.R().Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo)) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + loc := location(url, resp) + + size := config.size + blob := path.Join(workdir, fmt.Sprintf("%d.blob", size)) + + fhandle, err := os.OpenFile(blob, os.O_RDONLY, defaultFilePerms) + if err != nil { + isConnFail = true + + return + } + + defer fhandle.Close() + + digest := blobHash[blob] + + // upload blob + resp, err = client.R(). + SetContentLength(true). + SetHeader("Content-Type", "application/octet-stream").SetBody(fhandle).Patch(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + loc = location(url, resp) + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusAccepted { + isErr = true + + return + } + + // finish upload + resp, err = client.R(). + SetContentLength(true). + SetQueryParam("digest", digest.String()). + SetHeader("Content-Length", fmt.Sprintf("%d", size)). + SetHeader("Content-Type", "application/octet-stream").Put(loc) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + + // create a manifest + manifest := ispec.Manifest{ + Versioned: imeta.Versioned{ + SchemaVersion: defaultSchemaVersion, + }, + Config: ispec.Descriptor{ + Digest: digest, + Size: int64(size), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(size), + }, + }, + } + + content, err := json.Marshal(manifest) + if err != nil { + log.Fatal(err) + } + + digest = godigest.FromBytes(content) + + resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(content).Put(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, digest.String())) + + latency = time.Since(start) + + if err != nil { + isConnFail = true + + return + } + + // request specific check + statusCode = resp.StatusCode() + if statusCode != http.StatusCreated { + isErr = true + + return + } + }() + } + + return nil +} + +// test driver. + +type testConfig struct { + name string + tfunc testFunc + // test-specific params + size int +} + +var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this test + { + name: "Get Catalog", + tfunc: GetCatalog, + }, + { + name: "Push Monolith 1MB", + tfunc: PushMonolithStreamed, + size: smallBlob, + }, + { + name: "Push Monolith 10MB", + tfunc: PushMonolithStreamed, + size: mediumBlob, + }, + { + name: "Push Monolith 100MB", + tfunc: PushMonolithStreamed, + size: largeBlob, + }, + { + name: "Push Chunk Streamed 1MB", + tfunc: PushChunkStreamed, + size: smallBlob, + }, + { + name: "Push Chunk Streamed 10MB", + tfunc: PushChunkStreamed, + size: mediumBlob, + }, + { + name: "Push Chunk Streamed 100MB", + tfunc: PushChunkStreamed, + size: largeBlob, + }, +} + +func Perf(workdir, url, auth, repo string, concurrency int, requests int) { + // logging + log.SetFlags(0) + log.SetOutput(tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.TabIndent)) + + // initialize test data + setup(workdir) + defer teardown(workdir) + + // common header + log.Printf("Registry URL:\t%s", url) + log.Printf("\n") + log.Printf("Concurrency Level:\t%v", concurrency) + log.Printf("Total requests:\t%v", requests) + log.Printf("\n") + + for _, tconfig := range testSuite { + statsCh := make(chan statsRecord, requests) + + var wg sync.WaitGroup + + summary := newStatsSummary(tconfig.name) + + start := time.Now() + + for c := 0; c < concurrency; c++ { + // parallelize with clients + wg.Add(1) + + go func() { + defer wg.Done() + + _ = tconfig.tfunc(workdir, url, auth, repo, requests/concurrency, tconfig, statsCh) + }() + } + wg.Wait() + + summary.total = time.Since(start) + summary.rps = float32(requests) / float32(summary.total.Seconds()) + + for count := 0; count < requests; count++ { + record := <-statsCh + updateStats(&summary, record) + } + + sort.Sort(Durations(summary.latencies)) + + printStats(requests, &summary) + } +} diff --git a/codecov.yml b/codecov.yml index 475ebafa..3f2fcdfd 100644 --- a/codecov.yml +++ b/codecov.yml @@ -3,4 +3,5 @@ ignore: - "./pkg/extensions/search/*_gen.go" - "./pkg/extensions/search/generated.go" - "./pkg/extensions/minimal.go" - - "./pkg/cli/minimal.go" \ No newline at end of file + - "./pkg/cli/minimal.go" + - "./cmd/zb/*.go"