From 5ae7a028d973cc70c3a6284ad334bead020a8193 Mon Sep 17 00:00:00 2001 From: Vishwas R <30438425+vrajashkr@users.noreply.github.com> Date: Mon, 20 May 2024 21:35:21 +0530 Subject: [PATCH] feat(cluster): Add support for request proxying for scale out (#2385) * feat(cluster): initial commit for scale-out cluster Signed-off-by: Ramkumar Chinchani * feat(cluster): support shared storage scale out This change introduces support for shared storage backed zot cluster scale out. New feature Multiple stateless zot instances can run using the same shared storage backend where each instance looks at a specific set of repositories based on a siphash of the repository name to improve scale as the load is distributed across multiple instances. For a given config, there will only be one instance that can perform dist-spec read/write on a given repository. What's changed? - introduced a transparent request proxy for dist-spec endpoints based on siphash of repository name. - new config for scale out cluster that specifies list of cluster members. Signed-off-by: Vishwas Rajashekar --------- Signed-off-by: Ramkumar Chinchani Signed-off-by: Vishwas Rajashekar Co-authored-by: Ramkumar Chinchani --- .github/workflows/ecosystem-tools.yaml | 35 +- .github/workflows/nightly.yaml | 83 +++- Makefile | 11 + .../config-cluster-member0.json | 44 ++ .../config-cluster-member1.json | 44 ++ .../config-cluster-member2.json | 44 ++ examples/scale-out-cluster-cloud/haproxy.cfg | 26 + .../tls/config-cluster-member0.json | 51 ++ .../tls/config-cluster-member1.json | 51 ++ .../tls/config-cluster-member2.json | 51 ++ .../scale-out-cluster-cloud/tls/haproxy.cfg | 25 + go.mod | 1 + go.sum | 2 + pkg/api/config/config.go | 27 ++ pkg/api/constants/consts.go | 5 + pkg/api/controller.go | 48 +- pkg/api/controller_test.go | 454 ++++++++++++++++++ pkg/api/proxy.go | 258 ++++++++++ pkg/api/proxy_test.go | 59 +++ pkg/api/routes.go | 60 ++- pkg/cli/server/root.go | 29 ++ pkg/cli/server/root_test.go | 144 ++++++ pkg/common/common.go | 91 ++++ pkg/common/common_test.go | 104 ++++ .../cloud_scale_out_basic_auth_tls.bats | 75 +++ .../cloud_scale_out_basic_auth_tls_scale.bats | 74 +++ test/scale-out/cloud_scale_out_no_auth.bats | 69 +++ test/scale-out/helpers_cloud.bash | 35 ++ test/scale-out/helpers_haproxy.bash | 71 +++ test/scale-out/helpers_zot.bash | 273 +++++++++++ 30 files changed, 2320 insertions(+), 24 deletions(-) create mode 100644 examples/scale-out-cluster-cloud/config-cluster-member0.json create mode 100644 examples/scale-out-cluster-cloud/config-cluster-member1.json create mode 100644 examples/scale-out-cluster-cloud/config-cluster-member2.json create mode 100644 examples/scale-out-cluster-cloud/haproxy.cfg create mode 100644 examples/scale-out-cluster-cloud/tls/config-cluster-member0.json create mode 100644 examples/scale-out-cluster-cloud/tls/config-cluster-member1.json create mode 100644 examples/scale-out-cluster-cloud/tls/config-cluster-member2.json create mode 100644 examples/scale-out-cluster-cloud/tls/haproxy.cfg create mode 100644 pkg/api/proxy.go create mode 100644 pkg/api/proxy_test.go create mode 100644 test/scale-out/cloud_scale_out_basic_auth_tls.bats create mode 100644 test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats create mode 100644 test/scale-out/cloud_scale_out_no_auth.bats create mode 100644 test/scale-out/helpers_cloud.bash create mode 100644 test/scale-out/helpers_haproxy.bash create mode 100644 test/scale-out/helpers_zot.bash diff --git a/.github/workflows/ecosystem-tools.yaml b/.github/workflows/ecosystem-tools.yaml index 05dd8c71..0c268534 100644 --- a/.github/workflows/ecosystem-tools.yaml +++ b/.github/workflows/ecosystem-tools.yaml @@ -27,7 +27,7 @@ jobs: go install github.com/swaggo/swag/cmd/swag@v1.16.2 go mod download sudo apt-get update - sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap haproxy jq # install skopeo git clone -b v1.12.0 https://github.com/containers/skopeo.git cd skopeo @@ -80,4 +80,37 @@ jobs: env: AWS_ACCESS_KEY_ID: fake AWS_SECRET_ACCESS_KEY: fake + - name: Run cloud scale-out tests + id: scale + run: | + make run-cloud-scale-out-tests + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake + continue-on-error: true + - name: print service logs for scale-out + run: | + find /tmp/zot-ft-logs -name '*.log' -print0 | xargs -0 cat + - name: multi-hop detection + id: multihop + run: | + if find /tmp/zot-ft-logs -name '*.log' -print0 | xargs -0 cat | grep 'cannot proxy an already proxied request'; then + echo "detected multi-hop" + exit 1 + else + exit 0 + fi + continue-on-error: true + - name: clean up scale-out logs + run: | + rm -r /tmp/zot-ft-logs + - name: fail job if error + if: ${{ steps.scale.outcome != 'success' || steps.multihop.outcome != 'success' }} + run: | + exit 1 + - name: Upload zb test results zip as build artifact + uses: actions/upload-artifact@v4 + with: + name: zb-cloud-scale-out-functional-results-${{ github.sha }} + path: ./zb-results/ - uses: ./.github/actions/teardown-localstack diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml index 3df6e3b2..c8ff4c18 100644 --- a/.github/workflows/nightly.yaml +++ b/.github/workflows/nightly.yaml @@ -6,11 +6,13 @@ on: permissions: read-all -# Here we are running two tests: +# The following tests are run: # 1. run zot with local storage and dedupe disabled, push images, restart zot with dedupe enabled # task scheduler will start a dedupe all blobs process at zot startup and it shouldn't interfere with clients. # 2. run zot with s3 storage and dynamodb and dedupe enabled, push images, restart zot with dedupe false and no cache # task scheduler will start a restore all blobs process at zot startup, after it finishes all blobs should be restored to their original state (have content) +# 3. run many, many, many instances of zot with shared storage and metadata front-ended by HAProxy. start a long-running zb run with high concurrency and number of requests +# to achieve a long-running sustained load on the system. The system is expected to perform well without errors and return performance data after the test. jobs: dedupe: name: Dedupe/restore blobs @@ -195,3 +197,82 @@ jobs: - name: Run tests run: | ./examples/kind/kind-ci.sh + + cloud-scale-out: + name: s3+dynamodb scale-out + runs-on: ubuntu-latest-16-cores + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + cache: false + go-version: 1.22.x + - name: Install dependencies + run: | + cd $GITHUB_WORKSPACE + go install github.com/swaggo/swag/cmd/swag@v1.16.2 + go mod download + sudo apt-get update + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap haproxy jq + # install skopeo + git clone -b v1.12.0 https://github.com/containers/skopeo.git + cd skopeo + make bin/skopeo + sudo cp bin/skopeo /usr/bin + skopeo -v + cd $GITHUB_WORKSPACE + - name: Log in to GitHub Docker Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ github.token }} + - uses: actions/setup-python@v5 + with: + python-version: '3.11' + - name: Install localstack + run: | + pip install --upgrade pyopenssl + pip install localstack==3.3.0 awscli-local[ver1] # install LocalStack cli and awslocal + docker pull ghcr.io/project-zot/ci-images/localstack:3.3.0 # Make sure to pull a working version of the image + localstack start -d # Start LocalStack in the background + + echo "Waiting for LocalStack startup..." # Wait 30 seconds for the LocalStack container + localstack wait -t 30 # to become ready before timing out + echo "Startup complete" + - name: Run cloud scale-out high scale performance tests + id: scale + run: | + make run-cloud-scale-out-high-scale-tests + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake + continue-on-error: true + - name: print service logs + run: | + sudo dmesg + cat /tmp/zot-logs/*.log + - name: multi-hop detection + id: multihop + run: | + if cat /tmp/zot-logs/*.log | grep 'cannot proxy an already proxied request'; then + echo "detected multi-hop" + exit 1 + else + exit 0 + fi + continue-on-error: true + - name: clean up logs + run: | + rm -r /tmp/zot-logs + - name: fail job if error + if: ${{ steps.scale.outcome != 'success' || steps.multihop.outcome != 'success' }} + run: | + exit 1 + - name: Upload zb test results zip as build artifact + if: steps.scale.outcome == 'success' + uses: actions/upload-artifact@v4 + with: + name: zb-cloud-scale-out-perf-results-${{ github.sha }} + path: ./zb-results/ + - uses: ./.github/actions/teardown-localstack diff --git a/Makefile b/Makefile index 75cb7092..2afab068 100644 --- a/Makefile +++ b/Makefile @@ -489,6 +489,17 @@ run-blackbox-tests: $(BATS_TEST_FILE_PATH) check-blackbox-prerequisites binary b echo running bats test "$(BATS_TEST_FILE_PATH)"; \ $(BATS) $(BATS_FLAGS) $(BATS_TEST_FILE_PATH) +.PHONY: run-cloud-scale-out-tests +run-cloud-scale-out-tests: check-blackbox-prerequisites check-awslocal binary bench test-prereq + echo running scale out bats test; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_no_auth.bats; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_basic_auth_tls.bats + +.PHONY: run-cloud-scale-out-high-scale-tests +run-cloud-scale-out-high-scale-tests: check-blackbox-prerequisites check-awslocal binary bench test-prereq + echo running cloud scale out bats high scale test; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats + .PHONY: run-blackbox-ci run-blackbox-ci: check-blackbox-prerequisites binary binary-minimal cli echo running CI bats tests concurently diff --git a/examples/scale-out-cluster-cloud/config-cluster-member0.json b/examples/scale-out-cluster-cloud/config-cluster-member0.json new file mode 100644 index 00000000..f00b969c --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member0.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9000" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/config-cluster-member1.json b/examples/scale-out-cluster-cloud/config-cluster-member1.json new file mode 100644 index 00000000..468dabb4 --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member1.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9001" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/config-cluster-member2.json b/examples/scale-out-cluster-cloud/config-cluster-member2.json new file mode 100644 index 00000000..84981d74 --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member2.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9002" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/haproxy.cfg b/examples/scale-out-cluster-cloud/haproxy.cfg new file mode 100644 index 00000000..ffb5664e --- /dev/null +++ b/examples/scale-out-cluster-cloud/haproxy.cfg @@ -0,0 +1,26 @@ +global + log /tmp/log local0 + log /tmp/log local1 notice + maxconn 2000 + stats timeout 30s + daemon + +defaults + log global + mode http + option httplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend zot + bind *:8080 + default_backend zot-cluster + +backend zot-cluster + balance roundrobin + cookie SERVER insert indirect nocache + server zot0 127.0.0.1:9000 cookie zot0 + server zot1 127.0.0.1:9001 cookie zot1 + server zot2 127.0.0.1:9002 cookie zot2 diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json new file mode 100644 index 00000000..5d7f2196 --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9000", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json new file mode 100644 index 00000000..bd269e3f --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9001", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json new file mode 100644 index 00000000..5b87b5bb --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9002", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/haproxy.cfg b/examples/scale-out-cluster-cloud/tls/haproxy.cfg new file mode 100644 index 00000000..5a39636b --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/haproxy.cfg @@ -0,0 +1,25 @@ +global + log /tmp/log local0 + log /tmp/log local1 notice + maxconn 2000 + stats timeout 30s + +defaults + log global + mode tcp + option tcplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend zot + bind *:8080 + default_backend zot-cluster + +backend zot-cluster + balance roundrobin + cookie SERVER insert indirect nocache + server zot0 127.0.0.1:9000 cookie zot0 + server zot1 127.0.0.1:9001 cookie zot1 + server zot2 127.0.0.1:9002 cookie zot2 diff --git a/go.mod b/go.mod index 31441332..73f9ef07 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 github.com/aws/aws-secretsmanager-caching-go v1.1.3 github.com/containers/image/v5 v5.30.0 + github.com/dchest/siphash v1.2.3 github.com/google/go-github/v52 v52.0.0 github.com/gorilla/securecookie v1.1.2 github.com/gorilla/sessions v1.2.2 diff --git a/go.sum b/go.sum index d24ccf77..666d7269 100644 --- a/go.sum +++ b/go.sum @@ -591,6 +591,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936 h1:foGzavPWwtoyBvjWyKJYDYsyzy+23iBV7NKTwdk+LRY= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936/go.mod h1:ttKPnOepYt4LLzD+loXQ1rT6EmpyIYHro7TAJuIIlHo= github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= diff --git a/pkg/api/config/config.go b/pkg/api/config/config.go index f474843f..12892a65 100644 --- a/pkg/api/config/config.go +++ b/pkg/api/config/config.go @@ -121,6 +121,32 @@ type SchedulerConfig struct { NumWorkers int } +// contains the scale-out configuration which is identical for all zot replicas. +type ClusterConfig struct { + // contains the "host:port" of all the zot instances participating + // in the cluster. + Members []string `json:"members" mapstructure:"members"` + + // contains the hash key that is required for siphash. + // must be a 128-bit (16-byte) key + // https://github.com/dchest/siphash?tab=readme-ov-file#func-newkey-byte-hashhash64 + HashKey string `json:"hashKey" mapstructure:"hashKey"` + + // contains client TLS config. + TLS *TLSConfig `json:"tls" mapstructure:"tls"` + + // private field for storing Proxy details such as internal socket list. + Proxy *ClusterRequestProxyConfig `json:"-" mapstructure:"-"` +} + +type ClusterRequestProxyConfig struct { + // holds the cluster socket (IP:port) derived from the host's + // interface configuration and the listening port of the HTTP server. + LocalMemberClusterSocket string + // index of the local member cluster socket in the members array. + LocalMemberClusterSocketIndex uint64 +} + type LDAPCredentials struct { BindDN string BindPassword string @@ -230,6 +256,7 @@ type Config struct { Log *LogConfig Extensions *extconf.ExtensionConfig Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"` + Cluster *ClusterConfig `json:"cluster" mapstructure:",omitempty"` } func New() *Config { diff --git a/pkg/api/constants/consts.go b/pkg/api/constants/consts.go index df3cec8e..5afe00a9 100644 --- a/pkg/api/constants/consts.go +++ b/pkg/api/constants/consts.go @@ -31,4 +31,9 @@ const ( DeletePermission = "delete" // behaviour actions. DetectManifestCollisionPermission = "detectManifestCollision" + // zot scale-out hop count header. + ScaleOutHopCountHeader = "X-Zot-Cluster-Hop-Count" + // log string keys. + // these can be used together with the logger to add context to a log message. + RepositoryLogKey = "repository" ) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index e8c167c3..d86d697a 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -19,6 +19,7 @@ import ( "zotregistry.dev/zot/errors" "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/common" ext "zotregistry.dev/zot/pkg/extensions" extconf "zotregistry.dev/zot/pkg/extensions/config" "zotregistry.dev/zot/pkg/extensions/monitoring" @@ -54,15 +55,52 @@ type Controller struct { chosenPort int // kernel-chosen port } -func NewController(config *config.Config) *Controller { +func NewController(appConfig *config.Config) *Controller { var controller Controller - logger := log.NewLogger(config.Log.Level, config.Log.Output) - controller.Config = config + logger := log.NewLogger(appConfig.Log.Level, appConfig.Log.Output) + + if appConfig.Cluster != nil { + // we need the set of local sockets (IP address:port) for identifying + // the local member cluster socket for logging and lookup. + localSockets, err := common.GetLocalSockets(appConfig.HTTP.Port) + if err != nil { + logger.Error().Err(err).Msg("failed to get local sockets") + panic("failed to get local sockets") + } + + // memberSocket is the local member's socket + // the index is also fetched for quick lookups during proxying + memberSocketIdx, memberSocket, err := GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets) + if err != nil { + logger.Error().Err(err).Msg("failed to get member socket") + panic("failed to get member socket") + } + + if memberSocket == "" { + // there is a misconfiguration if the memberSocket cannot be identified + logger.Error(). + Str("members", strings.Join(appConfig.Cluster.Members, ",")). + Str("localSockets", strings.Join(localSockets, ",")). + Msg("failed to determine the local cluster socket") + panic("failed to determine the local cluster socket") + } + + internalProxyConfig := &config.ClusterRequestProxyConfig{ + LocalMemberClusterSocket: memberSocket, + LocalMemberClusterSocketIndex: uint64(memberSocketIdx), + } + appConfig.Cluster.Proxy = internalProxyConfig + + logger.Logger = logger.Logger.With(). + Str("clusterMember", memberSocket). + Str("clusterMemberIndex", strconv.Itoa(memberSocketIdx)).Logger() + } + controller.Config = appConfig controller.Log = logger - if config.Log.Audit != "" { - audit := log.NewAuditLogger(config.Log.Level, config.Log.Audit) + if appConfig.Log.Audit != "" { + audit := log.NewAuditLogger(appConfig.Log.Level, appConfig.Log.Audit) controller.Audit = audit } diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 4d312cee..4f3652a4 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -95,6 +95,32 @@ func TestNew(t *testing.T) { So(conf, ShouldNotBeNil) So(api.NewController(conf), ShouldNotBeNil) }) + + Convey("Given a scale out cluster config where the local cluster socket cannot be found", t, func() { + conf := config.New() + So(conf, ShouldNotBeNil) + conf.HTTP = config.HTTPConfig{ + Address: "127.0.0.2", + Port: "9000", + } + conf.Cluster = &config.ClusterConfig{ + Members: []string{}, + } + So(func() { api.NewController(conf) }, ShouldPanicWith, "failed to determine the local cluster socket") + }) + + Convey("Given a scale out cluster config where the local cluster socket cannot be found due to an error", t, func() { + conf := config.New() + So(conf, ShouldNotBeNil) + conf.HTTP = config.HTTPConfig{ + Address: "127.0.0.2", + Port: "9000", + } + conf.Cluster = &config.ClusterConfig{ + Members: []string{"127.0.0.1"}, + } + So(func() { api.NewController(conf) }, ShouldPanicWith, "failed to get member socket") + }) } func TestCreateCacheDatabaseDriver(t *testing.T) { @@ -958,6 +984,434 @@ func TestBlobReferenced(t *testing.T) { }) } +// tests for shared-storage scale-out cluster. +func TestScaleOutRequestProxy(t *testing.T) { + // when there is only one member, no proxying is expected and the responses should be correct. + Convey("Given a zot scale out cluster in http mode with only 1 member", t, func() { + port := test.GetFreePort() + clusterMembers := make([]string, 1) + clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port) + + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + + Convey("Controller should start up and respond without error", func() { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Should upload images and fetch valid responses for repo tags list", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for _, repoName := range reposToTest { + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0") + So(err, ShouldBeNil) + + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + }) + }) + + // when only one member in the cluster is online, an error is expected when there is a + // request proxied to an offline member. + Convey("Given a scale out http cluster with only 1 online member", t, func() { + port := test.GetFreePort() + clusterMembers := make([]string, 3) + clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port) + clusterMembers[1] = "127.0.0.1:1" + clusterMembers[2] = "127.0.0.1:2" + + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + + Convey("Controller should start up and respond without error", func() { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Should fail to upload an image that is proxied to another instance", func() { + repoName := "alpine" + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0") + So(err, ShouldNotBeNil) + So(err.Error(), ShouldEqual, "can't post blob") + + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when there are multiple members in a cluster, requests are expected to return + // the same data for any member due to proxying. + Convey("Given a zot scale out cluster in http mode with 3 members", t, func() { + numMembers := 3 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + Convey("All 3 controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Should upload images to repos and fetch same response from all 3 members", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for idx, repoName := range reposToTest { + img := CreateRandomImage() + + // Upload to each instance based on loop counter + err := UploadImage(img, test.GetBaseURL(ports[idx]), repoName, "1.0") + So(err, ShouldBeNil) + + // Query all 3 instances and expect the same response + for _, port := range ports { + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + } + }) + }) + + // this test checks for functionality when TLS and htpasswd auth are enabled. + // it primarily checks that headers are correctly copied over during the proxying process. + Convey("Given a zot scale out cluster in https mode with auth enabled", t, func() { + numMembers := 3 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + username, _ := test.GenerateRandomString() + password, _ := test.GenerateRandomString() + htpasswdPath := test.MakeHtpasswdFileFromString(test.GetCredString(username, password)) + defer os.Remove(htpasswdPath) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.HTTP.Auth = &config.AuthConfig{ + HTPasswd: config.AuthHTPasswd{ + Path: htpasswdPath, + }, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + Convey("All 3 controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().SetBasicAuth(username, password).Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Should upload images to repos and fetch same response from all 3 instances", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for idx, repoName := range reposToTest { + img := CreateRandomImage() + + // Upload to each instance based on loop counter + err := UploadImageWithBasicAuth(img, test.GetSecureBaseURL(ports[idx]), repoName, "1.0", username, password) + So(err, ShouldBeNil) + + // Query all 3 instances and expect the same response + for _, port := range ports { + resp, err := resty.R().SetBasicAuth(username, password).Get( + fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(port), repoName), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + } + }) + }) + + // when the RootCA file does not exist, expect an error + Convey("Given a zot scale out cluster in with 2 members and an incorrect RootCACert", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when the server cert file does not exist, expect an error while proxying + Convey("Given a zot scale out cluster in with 2 members and an incorrect server cert", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + Cert: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when the server key file does not exist, expect an error while proxying + Convey("Given a zot scale out cluster in with 2 members and an incorrect server key", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + Cert: ServerCert, + Key: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) +} + func TestPrintTracebackOnPanic(t *testing.T) { Convey("Run server on unavailable port", t, func() { port := test.GetFreePort() diff --git a/pkg/api/proxy.go b/pkg/api/proxy.go new file mode 100644 index 00000000..d3c6224d --- /dev/null +++ b/pkg/api/proxy.go @@ -0,0 +1,258 @@ +package api + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "net/http" + + "github.com/dchest/siphash" + "github.com/gorilla/mux" + + "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/api/constants" + "zotregistry.dev/zot/pkg/common" +) + +// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure +// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster. +// based on the hash value of the repository name, the request will either be handled locally +// or proxied to another zot member in the cluster to get the data before sending a response to the client. +func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc { + return func(next http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { + config := ctrlr.Config + logger := ctrlr.Log + + // if no cluster or single-node cluster, handle locally. + if config.Cluster == nil || len(config.Cluster.Members) == 1 { + next.ServeHTTP(response, request) + + return + } + + // since the handler has been wrapped, it should be possible to get the name + // of the repository from the mux. + vars := mux.Vars(request) + name, ok := vars["name"] + + if !ok || name == "" { + response.WriteHeader(http.StatusNotFound) + + return + } + + // the target member is the only one which should do read/write for the dist-spec APIs + // for the given repository. + targetMemberIndex, targetMember := computeTargetMember(config, name) + + logger.Debug().Str(constants.RepositoryLogKey, name). + Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex)) + + // if the target member is the same as the local member, the current member should handle the request. + // since the instances have the same config, a quick index lookup is sufficient + if targetMemberIndex == config.Cluster.Proxy.LocalMemberClusterSocketIndex { + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally") + next.ServeHTTP(response, request) + + return + } + + // if the header contains a hop-count, return an error response as there should be no multi-hop + if request.Header.Get(constants.ScaleOutHopCountHeader) != "" { + logger.Fatal().Str("url", request.URL.String()). + Msg("failed to process request - cannot proxy an already proxied request") + + return + } + + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request") + + proxyResponse, err := proxyHTTPRequest(request.Context(), request, targetMember, ctrlr) + if err != nil { + logger.Error().Err(err).Str(constants.RepositoryLogKey, name).Msg("failed to proxy the request") + http.Error(response, err.Error(), http.StatusInternalServerError) + + return + } + defer proxyResponse.Body.Close() + + copyHeader(response.Header(), proxyResponse.Header) + response.WriteHeader(proxyResponse.StatusCode) + _, _ = io.Copy(response, proxyResponse.Body) + }) + } +} + +// computes the target member using siphash and returns the index and the member +// siphash was chosen to prevent against hash attacks where an attacker +// can target all requests to one given instance instead of balancing across the cluster +// resulting in a Denial-of-Service (DOS). +// ref: https://en.wikipedia.org/wiki/SipHash +func computeTargetMember(config *config.Config, name string) (uint64, string) { + h := siphash.New([]byte(config.Cluster.HashKey)) + h.Write([]byte(name)) + sum64 := h.Sum64() + targetIdx := sum64 % uint64(len(config.Cluster.Members)) + + return targetIdx, config.Cluster.Members[targetIdx] +} + +// gets all the server sockets of a target member - IP:Port. +// for IPv6, the socket is [IPv6]:Port. +// if the input is an IP address, returns the same targetMember in an array. +// if the input is a host name, performs a lookup and returns the server sockets. +func getTargetMemberServerSockets(targetMemberSocket string) ([]string, error) { + targetHost, targetPort, err := net.SplitHostPort(targetMemberSocket) + if err != nil { + return []string{}, err + } + + addr := net.ParseIP(targetHost) + if addr != nil { + // this is an IP address, return as is + return []string{targetMemberSocket}, nil + } + // this is a hostname - try to resolve to an IP + resolvedAddrs, err := common.GetIPFromHostName(targetHost) + if err != nil { + return []string{}, err + } + + targetSockets := make([]string, len(resolvedAddrs)) + for idx, resolvedAddr := range resolvedAddrs { + targetSockets[idx] = net.JoinHostPort(resolvedAddr, targetPort) + } + + return targetSockets, nil +} + +// proxy the request to the target member and return a pointer to the response or an error. +func proxyHTTPRequest(ctx context.Context, req *http.Request, + targetMember string, ctrlr *Controller, +) (*http.Response, error) { + cloneURL := *req.URL + + proxyQueryScheme := "http" + if ctrlr.Config.HTTP.TLS != nil { + proxyQueryScheme = "https" + } + + cloneURL.Scheme = proxyQueryScheme + cloneURL.Host = targetMember + + clonedBody := cloneRequestBody(req) + + fwdRequest, err := http.NewRequestWithContext(ctx, req.Method, cloneURL.String(), clonedBody) + if err != nil { + return nil, err + } + + copyHeader(fwdRequest.Header, req.Header) + + // always set hop count to 1 for now. + // the handler wrapper above will terminate the process if it sees a request that + // already has a hop count but is due for proxying. + fwdRequest.Header.Set(constants.ScaleOutHopCountHeader, "1") + + clientOpts := common.HTTPClientOptions{ + TLSEnabled: ctrlr.Config.HTTP.TLS != nil, + VerifyTLS: ctrlr.Config.HTTP.TLS != nil, // for now, always verify TLS when TLS mode is enabled + Host: targetMember, + } + + tlsConfig := ctrlr.Config.Cluster.TLS + if tlsConfig != nil { + clientOpts.CertOptions.ClientCertFile = tlsConfig.Cert + clientOpts.CertOptions.ClientKeyFile = tlsConfig.Key + clientOpts.CertOptions.RootCaCertFile = tlsConfig.CACert + } + + httpClient, err := common.CreateHTTPClient(&clientOpts) + if err != nil { + return nil, err + } + + resp, err := httpClient.Do(fwdRequest) + if err != nil { + return nil, err + } + + var clonedRespBody bytes.Buffer + + // copy out the contents into a new buffer as the response body + // stream should be closed to get all the data out. + _, _ = io.Copy(&clonedRespBody, resp.Body) + resp.Body.Close() + + // after closing the original body, substitute it with a new reader + // using the buffer that was just created. + // this buffer should be closed later by the consumer of the response. + resp.Body = io.NopCloser(bytes.NewReader(clonedRespBody.Bytes())) + + return resp, nil +} + +func cloneRequestBody(src *http.Request) io.Reader { + var bCloneForOriginal, bCloneForCopy bytes.Buffer + multiWriter := io.MultiWriter(&bCloneForOriginal, &bCloneForCopy) + numBytesCopied, _ := io.Copy(multiWriter, src.Body) + + // if the body is a type of io.NopCloser and length is 0, + // the Content-Length header is not sent in the proxied request. + // explicitly returning http.NoBody allows the implementation + // to set the header. + // ref: https://github.com/golang/go/issues/34295 + if numBytesCopied == 0 { + src.Body = http.NoBody + + return http.NoBody + } + + src.Body = io.NopCloser(&bCloneForOriginal) + + return bytes.NewReader(bCloneForCopy.Bytes()) +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} + +// identifies and returns the cluster socket and index. +// this is the socket which the scale out cluster members will use for +// proxying and communication among each other. +// returns index, socket, error. +// returns an empty string and index value -1 if the cluster socket is not found. +func GetLocalMemberClusterSocket(members []string, localSockets []string) (int, string, error) { + for memberIdx, member := range members { + // for each member, get the full list of sockets, including DNS resolution + memberSockets, err := getTargetMemberServerSockets(member) + if err != nil { + return -1, "", err + } + + // for each member socket that we have, compare all the local sockets with + // it to see if there is any match. + for _, memberSocket := range memberSockets { + for _, localSocket := range localSockets { + // this checks if the sockets are equal at a host port level + areSocketsEqual, err := common.AreSocketsEqual(memberSocket, localSocket) + if err != nil { + return -1, "", err + } + + if areSocketsEqual { + return memberIdx, member, nil + } + } + } + } + + return -1, "", nil +} diff --git a/pkg/api/proxy_test.go b/pkg/api/proxy_test.go new file mode 100644 index 00000000..f11daede --- /dev/null +++ b/pkg/api/proxy_test.go @@ -0,0 +1,59 @@ +//go:build sync && scrub && metrics && search && lint && userprefs && mgmt && imagetrust && ui +// +build sync,scrub,metrics,search,lint,userprefs,mgmt,imagetrust,ui + +package api_test + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + + "zotregistry.dev/zot/pkg/api" +) + +func TestGetLocalMemberClusterSocket(t *testing.T) { + Convey("Should return an error if a domain name doesn't exist", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "thisdoesnotexist:9000", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err.Error(), ShouldContainSubstring, "lookup thisdoesnotexist") + So(index, ShouldEqual, -1) + So(socket, ShouldEqual, "") + }) + + Convey("Should return an error if a local socket is missing a port", t, func() { + localSockets := []string{"127.0.0.1", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err.Error(), ShouldEqual, "address 127.0.0.1: missing port in address") + So(index, ShouldEqual, -1) + So(socket, ShouldEqual, "") + }) + + Convey("Should return an error if a member socket is missing a port", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "www.github.com", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err.Error(), ShouldEqual, "address www.github.com: missing port in address") + So(index, ShouldEqual, -1) + So(socket, ShouldEqual, "") + }) + + Convey("Should return the right socket when a local socket is part of members", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err, ShouldBeNil) + So(index, ShouldEqual, 2) + So(socket, ShouldEqual, "127.0.0.1:9000") + }) + + Convey("Should return empty when no local socket is part of members", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9002", "127.0.0.1:9001", "www.github.com:443"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err, ShouldBeNil) + So(index, ShouldEqual, -1) + So(socket, ShouldBeEmpty) + }) +} diff --git a/pkg/api/routes.go b/pkg/api/routes.go index b9001b5f..f71d7ae5 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -127,40 +127,66 @@ func (rh *RouteHandler) SetupRoutes() { prefixedDistSpecRouter.Use(DistSpecAuthzHandler(rh.c)) } + clusterRouteProxy := ClusterProxy(rh.c) + // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#endpoints + // dist-spec APIs that need to be proxied are wrapped in clusterRouteProxy for scale-out proxying. + // these are handlers that have a repository name. { prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/tags/list", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( - applyCORSHeaders(rh.ListTags))).Methods(http.MethodGet, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( + applyCORSHeaders(rh.ListTags), + ), + ), + ).Methods(http.MethodGet, http.MethodOptions) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)( - applyCORSHeaders(rh.CheckManifest))).Methods(http.MethodHead, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)( + applyCORSHeaders(rh.CheckManifest), + ), + ), + ).Methods(http.MethodHead, http.MethodOptions) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - applyCORSHeaders(rh.GetManifest)).Methods(http.MethodGet) + clusterRouteProxy( + applyCORSHeaders(rh.GetManifest), + ), + ).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - rh.UpdateManifest).Methods(http.MethodPut) + clusterRouteProxy(rh.UpdateManifest)).Methods(http.MethodPut) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - applyCORSHeaders(rh.DeleteManifest)).Methods(http.MethodDelete) + clusterRouteProxy( + applyCORSHeaders(rh.DeleteManifest), + ), + ).Methods(http.MethodDelete) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.CheckBlob).Methods(http.MethodHead) + clusterRouteProxy(rh.CheckBlob)).Methods(http.MethodHead) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.GetBlob).Methods(http.MethodGet) + clusterRouteProxy(rh.GetBlob)).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.DeleteBlob).Methods(http.MethodDelete) + clusterRouteProxy(rh.DeleteBlob)).Methods(http.MethodDelete) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/", zreg.NameRegexp.String()), - rh.CreateBlobUpload).Methods(http.MethodPost) + clusterRouteProxy(rh.CreateBlobUpload)).Methods(http.MethodPost) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.GetBlobUpload).Methods(http.MethodGet) + clusterRouteProxy(rh.GetBlobUpload)).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.PatchBlobUpload).Methods(http.MethodPatch) + clusterRouteProxy(rh.PatchBlobUpload)).Methods(http.MethodPatch) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.UpdateBlobUpload).Methods(http.MethodPut) + clusterRouteProxy(rh.UpdateBlobUpload)).Methods(http.MethodPut) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.DeleteBlobUpload).Methods(http.MethodDelete) + clusterRouteProxy(rh.DeleteBlobUpload)).Methods(http.MethodDelete) // support for OCI artifact references prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/referrers/{digest}", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( - applyCORSHeaders(rh.GetReferrers))).Methods(http.MethodGet, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( + applyCORSHeaders(rh.GetReferrers), + ), + ), + ).Methods(http.MethodGet, http.MethodOptions) + + // handlers which work fine with a single node do not need proxying. + // catalog handler doesn't require proxying as the metadata and storage are shared. + // discover and the default path handlers are node-specific so do not require proxying. prefixedRouter.HandleFunc(constants.ExtCatalogPrefix, getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( applyCORSHeaders(rh.ListRepositories))).Methods(http.MethodGet, http.MethodOptions) diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index 4916cd20..46f16b0d 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -457,6 +457,11 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error { } } + // check validity of scale out cluster config + if err := validateClusterConfig(config, log); err != nil { + return err + } + return nil } @@ -1103,3 +1108,27 @@ func validateSync(config *config.Config, log zlog.Logger) error { return nil } + +func validateClusterConfig(config *config.Config, log zlog.Logger) error { + if config.Cluster != nil { + if len(config.Cluster.Members) == 0 { + log.Error().Err(zerr.ErrBadConfig). + Msg("cannot have 0 members in a scale out cluster") + + return zerr.ErrBadConfig + } + + // the allowed length is 16 as the siphash requires a 128 bit key. + // that translates to 16 characters * 8 bits each. + allowedHashKeyLength := 16 + if len(config.Cluster.HashKey) != allowedHashKeyLength { + log.Error().Err(zerr.ErrBadConfig). + Str("hashkey", config.Cluster.HashKey). + Msg(fmt.Sprintf("hashKey for scale out cluster must have %d characters", allowedHashKeyLength)) + + return zerr.ErrBadConfig + } + } + + return nil +} diff --git a/pkg/cli/server/root_test.go b/pkg/cli/server/root_test.go index b0f7e94f..9e7fab4e 100644 --- a/pkg/cli/server/root_test.go +++ b/pkg/cli/server/root_test.go @@ -2028,6 +2028,150 @@ func TestUpdateLDAPConfig(t *testing.T) { }) } +func TestClusterConfig(t *testing.T) { + baseExamplePath := "../../../examples/scale-out-cluster-cloud/" + + Convey("Should successfully load example configs for cloud", t, func() { + for memberIdx := 0; memberIdx < 3; memberIdx++ { + cfgFileToLoad := fmt.Sprintf("%s/config-cluster-member%d.json", baseExamplePath, memberIdx) + cfg := config.New() + err := cli.LoadConfiguration(cfg, cfgFileToLoad) + So(err, ShouldBeNil) + } + }) + + Convey("Should successfully load example TLS configs for cloud", t, func() { + for memberIdx := 0; memberIdx < 3; memberIdx++ { + cfgFileToLoad := fmt.Sprintf("%s/tls/config-cluster-member%d.json", baseExamplePath, memberIdx) + cfg := config.New() + err := cli.LoadConfiguration(cfg, cfgFileToLoad) + So(err, ShouldBeNil) + } + }) + + Convey("Should reject scale out cluster invalid cases", t, func() { + cfgFileContents, err := os.ReadFile(baseExamplePath + "config-cluster-member0.json") + So(err, ShouldBeNil) + + Convey("Should reject empty members list", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the members to an empty list + cfg.Cluster.Members = []string{} + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject missing members list", func() { + cfg := config.New() + + configStr := ` + { + "storage": { + "RootDirectory": "/tmp/example" + }, + "http": { + "address": "127.0.0.1", + "port": "800" + }, + "cluster" { + "hashKey": "loremipsumdolors" + } + }` + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + err = os.WriteFile(file.Name(), []byte(configStr), 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject missing hashkey", func() { + cfg := config.New() + + configStr := ` + { + "storage": { + "RootDirectory": "/tmp/example" + }, + "http": { + "address": "127.0.0.1", + "port": "800" + }, + "cluster" { + "members": ["127.0.0.1:9000"] + } + }` + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + err = os.WriteFile(file.Name(), []byte(configStr), 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject a hashkey that is too short", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the hashkey to a string shorter than 16 characters + cfg.Cluster.HashKey = "fifteencharacte" + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject a hashkey that is too long", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the hashkey to a string longer than 16 characters + cfg.Cluster.HashKey = "seventeencharacte" + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + }) +} + // run cli and return output. func runCLIWithConfig(tempDir string, config string) (string, error) { port := GetFreePort() diff --git a/pkg/common/common.go b/pkg/common/common.go index 3b9abe8b..cc215d1a 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io/fs" + "net" "os" "regexp" "strings" @@ -145,3 +146,93 @@ func IsContextDone(ctx context.Context) bool { return false } } + +// get a list of IP addresses configured on the host's +// interfaces. +func GetLocalIPs() ([]string, error) { + var localIPs []string + + ifaces, err := net.Interfaces() + if err != nil { + return []string{}, err + } + + for _, i := range ifaces { + addrs, err := i.Addrs() + if err != nil { + return localIPs, err + } + + for _, addr := range addrs { + if localIP, ok := addr.(*net.IPNet); ok { + localIPs = append(localIPs, localIP.IP.String()) + } + } + } + + return localIPs, nil +} + +// get a list of listening sockets on the host (IP:port). +// IPv6 is returned as [host]:port. +func GetLocalSockets(port string) ([]string, error) { + localIPs, err := GetLocalIPs() + if err != nil { + return []string{}, err + } + + localSockets := make([]string, len(localIPs)) + + for idx, ip := range localIPs { + // JoinHostPort automatically wraps IPv6 addresses in [] + localSockets[idx] = net.JoinHostPort(ip, port) + } + + return localSockets, nil +} + +func GetIPFromHostName(host string) ([]string, error) { + addrs, err := net.LookupIP(host) + if err != nil { + return []string{}, err + } + + ips := make([]string, 0, len(addrs)) + + for _, ip := range addrs { + ips = append(ips, ip.String()) + } + + return ips, nil +} + +// checks if 2 sockets are equal at the host port level. +func AreSocketsEqual(socketA string, socketB string) (bool, error) { + hostA, portA, err := net.SplitHostPort(socketA) + if err != nil { + return false, err + } + + hostB, portB, err := net.SplitHostPort(socketB) + if err != nil { + return false, err + } + + hostAIP := net.ParseIP(hostA) + if hostAIP == nil { + // this could be a fully-qualified domain name (FQDN) + // for FQDN, just a normal compare is enough + return hostA == hostB, nil + } + + hostBIP := net.ParseIP(hostB) + if hostBIP == nil { + // if the host part of socketA was parsed successfully, it was an IP + // if the host part of socketA was an FQDN, then the comparison is + // already done as the host of socketB is also assumed to be an FQDN. + // since the parsing failed, assume that A and B are not equal. + return false, nil + } + + return (hostAIP.Equal(hostBIP) && (portA == portB)), nil +} diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index a59ea8df..3d7b5e9e 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -3,6 +3,7 @@ package common_test import ( "os" "path" + "strings" "testing" notreg "github.com/notaryproject/notation-go/registry" @@ -61,4 +62,107 @@ func TestCommon(t *testing.T) { Convey("Test ArtifactTypeNotation const has same value as in notaryproject", t, func() { So(common.ArtifactTypeNotation, ShouldEqual, notreg.ArtifactTypeNotation) }) + + Convey("Test GetLocalIPs", t, func() { + localIPs, err := common.GetLocalIPs() + So(err, ShouldBeNil) + So(localIPs, ShouldNotBeEmpty) + So(localIPs, ShouldContain, "127.0.0.1") + }) + + Convey("Test GetLocalSockets IPv4", t, func() { + localSockets, err := common.GetLocalSockets("8765") + So(err, ShouldBeNil) + So(localSockets, ShouldNotBeEmpty) + So(localSockets, ShouldContain, "127.0.0.1:8765") + for _, socket := range localSockets { + lastColonIndex := strings.LastIndex(socket, ":") + So(socket[lastColonIndex+1:], ShouldEqual, "8765") + } + }) + + Convey("Test GetLocalSockets IPv6", t, func() { + localSockets, err := common.GetLocalSockets("8766") + So(err, ShouldBeNil) + So(localSockets, ShouldNotBeEmpty) + So(localSockets, ShouldContain, "[::1]:8766") + for _, socket := range localSockets { + lastColonIndex := strings.LastIndex(socket, ":") + So(socket[lastColonIndex+1:], ShouldEqual, "8766") + } + }) + + Convey("Test GetIPFromHostName with valid hostname", t, func() { + addrs, err := common.GetIPFromHostName("github.com") + So(err, ShouldBeNil) + So(addrs, ShouldNotBeEmpty) + // we can't check the actual addresses here as they can change + }) + + Convey("Test GetIPFromHostName with non-existent hostname", t, func() { + addrs, err := common.GetIPFromHostName("thisdoesnotexist") + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "lookup thisdoesnotexist") + So(addrs, ShouldBeEmpty) + }) + + Convey("Test AreSocketsEqual with equal IPv4 sockets", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "127.0.0.1:9000") + So(err, ShouldBeNil) + So(result, ShouldBeTrue) + }) + + Convey("Test AreSocketsEqual with equal IPv6 sockets", t, func() { + result, err := common.AreSocketsEqual("[::1]:9000", "[0000:0000:0000:0000:0000:0000:0000:00001]:9000") + So(err, ShouldBeNil) + So(result, ShouldBeTrue) + }) + + Convey("Test AreSocketsEqual with different IPv4 socket ports", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "127.0.0.1:9001") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with different IPv4 socket hosts", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "127.0.0.2:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with 2 equal host names", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "localhost:9000") + So(err, ShouldBeNil) + So(result, ShouldBeTrue) + }) + + Convey("Test AreSocketsEqual with 2 different host names", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "notlocalhost:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with hostname and IP address", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "127.0.0.1:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with IP address and hostname", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "localhost:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with invalid first socket", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1", "localhost:9000") + So(err, ShouldNotBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with invalid second socket", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "127.0.0.1") + So(err, ShouldNotBeNil) + So(result, ShouldBeFalse) + }) } diff --git a/test/scale-out/cloud_scale_out_basic_auth_tls.bats b/test/scale-out/cloud_scale_out_basic_auth_tls.bats new file mode 100644 index 00000000..bf232d5c --- /dev/null +++ b/test/scale-out/cloud_scale_out_basic_auth_tls.bats @@ -0,0 +1,75 @@ +# note: intended to be run as "make run-cloud-scale-out-tests". +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 +ZOT_LOG_DIR=/tmp/zot-ft-logs/auth-tls + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p ${ZOT_LOG_DIR} + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="${ZOT_LOG_DIR}/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + update_zot_cfg_set_htpasswd_auth "${zot_config_file}" ${ZOT_HTPASSWD_PATH} + update_zot_cfg_set_tls "${zot_config_file}" ${ZOT_TLS_CERT_PATH} ${ZOT_TLS_KEY_PATH} ${ZOT_TLS_CA_CERT_PATH} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} "https" +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + # setup htpasswd for local auth + setup_local_htpasswd + + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "https" + haproxy_start ${HAPROXY_CFG_FILE} + + # list haproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-basic-auth-tls-bats" "https://127.0.0.1:8000" 3 5 "${ZOT_AUTH_USER}:${ZOT_AUTH_PASS}" +} diff --git a/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats b/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats new file mode 100644 index 00000000..446aa21a --- /dev/null +++ b/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats @@ -0,0 +1,74 @@ +# note: intended to be run as "make run-cloud-scale-out-high-scale-tests" +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p /tmp/zot-logs + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="/tmp/zot-logs/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + update_zot_cfg_set_htpasswd_auth "${zot_config_file}" ${ZOT_HTPASSWD_PATH} + update_zot_cfg_set_tls "${zot_config_file}" ${ZOT_TLS_CERT_PATH} ${ZOT_TLS_KEY_PATH} ${ZOT_TLS_CA_CERT_PATH} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} "https" +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + # setup htpasswd for local auth + setup_local_htpasswd + + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "https" + haproxy_start ${HAPROXY_CFG_FILE} + + # list haproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-high-scale-bats" "https://127.0.0.1:8000" 10 100 "${ZOT_AUTH_USER}:${ZOT_AUTH_PASS}" +} diff --git a/test/scale-out/cloud_scale_out_no_auth.bats b/test/scale-out/cloud_scale_out_no_auth.bats new file mode 100644 index 00000000..938a02a4 --- /dev/null +++ b/test/scale-out/cloud_scale_out_no_auth.bats @@ -0,0 +1,69 @@ +# note: intended to be run as "make run-cloud-scale-out-tests" +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 +ZOT_LOG_DIR=/tmp/zot-ft-logs/no-auth + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p ${ZOT_LOG_DIR} + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="${ZOT_LOG_DIR}/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "http" + haproxy_start ${HAPROXY_CFG_FILE} + + # list HAproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-no-auth-bats" "http://127.0.0.1:8000" 3 5 +} diff --git a/test/scale-out/helpers_cloud.bash b/test/scale-out/helpers_cloud.bash new file mode 100644 index 00000000..a747e8d0 --- /dev/null +++ b/test/scale-out/helpers_cloud.bash @@ -0,0 +1,35 @@ +function setup_cloud_services() { + setup_s3 "us-east-2" "zot-storage-test" + setup_dynamodb "us-east-2" +} + +function teardown_cloud_services() { + delete_s3_bucket "zot-storage-test" + teardown_dynamodb "us-east-2" +} + +function setup_s3() { + local region=${1} + local bucket=${2} + awslocal s3 --region ${region} mb s3://${bucket} +} + +function delete_s3_bucket() { + local bucket=${1} + awslocal s3 rb s3://${bucket} --force +} + +function setup_dynamodb() { + local region=${1} + awslocal dynamodb --region ${region} \ + create-table \ + --table-name "BlobTable" \ + --attribute-definitions AttributeName=Digest,AttributeType=S \ + --key-schema AttributeName=Digest,KeyType=HASH \ + --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=5 +} + +function teardown_dynamodb() { + local region=${1} + awslocal dynamodb --region ${region} delete-table --table-name "BlobTable" +} diff --git a/test/scale-out/helpers_haproxy.bash b/test/scale-out/helpers_haproxy.bash new file mode 100644 index 00000000..68cafa4a --- /dev/null +++ b/test/scale-out/helpers_haproxy.bash @@ -0,0 +1,71 @@ +HAPROXY_CFG_FILE="${BATS_FILE_TMPDIR}/haproxy/haproxy-test.cfg" + +function generate_haproxy_server_list() { + local num_instances=${1} + for ((i=0;i<${num_instances};i++)) do + local port=$(( 10000 + $i )) + echo " server zot${i} 127.0.0.1:${port}" + done +} + +# stops all haproxy instances started by the test +function haproxy_stop_all() { + pkill haproxy +} + +# starts one haproxy instance with the given config file +# expects the haproxy config to specify daemon mode +function haproxy_start() { + local haproxy_cfg_file=${1} + + # Check the config file + haproxy -f ${haproxy_cfg_file} -c >&3 + + # Start haproxy + haproxy -f ${haproxy_cfg_file} +} + +# generates HAproxy config for use in the test +function generate_haproxy_config() { + local haproxy_cfg_file="${1}" + local haproxy_root_dir="$(dirname ${haproxy_cfg_file})" + # can be either http or https + local protocol="${2}" + + mkdir -p ${haproxy_root_dir} + + local haproxy_mode='http' + if [ "$protocol" == 'https' ]; then + haproxy_mode='tcp' + fi + + cat > ${haproxy_cfg_file}<> ${haproxy_cfg_file} + + cat ${haproxy_cfg_file} >&3 +} diff --git a/test/scale-out/helpers_zot.bash b/test/scale-out/helpers_zot.bash new file mode 100644 index 00000000..22dc5b92 --- /dev/null +++ b/test/scale-out/helpers_zot.bash @@ -0,0 +1,273 @@ +ROOT_DIR=$(git rev-parse --show-toplevel) +OS=$(go env GOOS) +ARCH=$(go env GOARCH) +ZOT_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH} +ZLI_PATH=${ROOT_DIR}/bin/zli-${OS}-${ARCH} +ZOT_MINIMAL_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH}-minimal + +# basic auth +ZOT_AUTH_USER=poweruser +ZOT_AUTH_PASS=sup*rSecr9T +ZOT_CREDS_PATH="${BATS_FILE_TMPDIR}/creds" +ZOT_HTPASSWD_PATH="${ZOT_CREDS_PATH}/htpasswd" + +# zb +ZB_PATH=${ROOT_DIR}/bin/zb-${OS}-${ARCH} +ZB_RESULTS_PATH=${ROOT_DIR}/zb-results +ZB_CI_CD_OUTPUT_FILE=${ROOT_DIR}/ci-cd.json + +# zot scale out cluster +ZOT_CLUSTER_MEMBERS_PATCH_FILE="${BATS_FILE_TMPDIR}/members-patch.json" +ZOT_ROOT_DIR="${BATS_FILE_TMPDIR}/zot" +ZOT_TLS_CERT_PATH="${ROOT_DIR}/test/data/server.cert" +ZOT_TLS_KEY_PATH="${ROOT_DIR}/test/data/server.key" +ZOT_TLS_CA_CERT_PATH="${ROOT_DIR}/test/data/ca.crt" + +function verify_prerequisites { + if [ ! -f ${ZOT_PATH} ]; then + echo "you need to build ${ZOT_PATH} before running the tests" >&3 + return 1 + fi + + if [ ! -f ${ZB_PATH} ]; then + echo "you need to build ${ZB_PATH} before running the tests" >&3 + return 1 + fi + + if [ ! $(command -v skopeo) ]; then + echo "you need to install skopeo as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v awslocal) ] &>/dev/null; then + echo "you need to install aws cli as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v haproxy) ] &>/dev/null; then + echo "you need to install haproxy as a prerequisite to running the tests" >&3 + return 1 + fi + + return 0 +} + +function get_free_port(){ + while true + do + random_port=$(( ((RANDOM<<15)|RANDOM) % 49152 + 10000 )) + status="$(nc -z 127.0.0.1 $random_port < /dev/null &>/dev/null; echo $?)" + if [ "${status}" != "0" ]; then + free_port=${random_port}; + break; + fi + done + + echo ${free_port} +} + +function zot_serve() { + local config_file=${1} + ${ZOT_PATH} serve ${config_file} & +} + +# stops all zot instances started by the test +function zot_stop_all() { + pkill zot +} + +# waits for the zot server to be reachable +# leaving argument 2 blank or specifying "http" causes the function to use HTTP +# specifying "https" for argument 2 causes the function to use TLS +function wait_zot_reachable() { + local zot_port=${1} + local protocol=${2} + if [ -z "${protocol}" ]; then + protocol="http" + fi + + local zot_url="${protocol}://127.0.0.1:${zot_port}/v2/_catalog" + + local curl_opts=( + --connect-timeout 3 + --max-time 5 + --retry 20 + --retry-delay 1 + --retry-max-time 180 + --retry-connrefused + ) + + # since this is only a reachability check, we can disable cert verification + if [ "${protocol}" == "https" ]; then + curl_opts=(--insecure "${curl_opts[@]}") + fi + + curl "${curl_opts[@]}" ${zot_url} +} + +function zb_run() { + local test_name=${1} + local zot_address=${2} + local concurrent_reqs=${3} + local num_requests=${4} + local credentials=${5} + + if [ ! -d "${ZB_RESULTS_PATH}" ]; then + mkdir -p "${ZB_RESULTS_PATH}" + fi + + local zb_args=( + -c ${concurrent_reqs} + -n ${num_requests} + --src-cidr 127.0.10.0/24 + -o ci-cd + --skip-cleanup + ) + + if [ ! -z "${credentials}" ]; then + zb_args=(-A ${credentials} "${zb_args[@]}") + fi + + start=$(date +%s) + ${ZB_PATH} "${zb_args[@]}" ${zot_address} + stop=$(date +%s) + + runtime=$((stop-start)) + echo "Duration: ${runtime} seconds" >&3 + + if [ -f "${ZB_CI_CD_OUTPUT_FILE}" ]; then + mv "${ZB_CI_CD_OUTPUT_FILE}" "${ZB_RESULTS_PATH}/${test_name}-results.json" + fi +} + +function setup_local_htpasswd() { + create_htpasswd_file "${ZOT_CREDS_PATH}" "${ZOT_HTPASSWD_PATH}" ${ZOT_AUTH_USER} ${ZOT_AUTH_PASS} +} + +function create_htpasswd_file() { + local creds_dir_path="${1}" + local htpasswd_file_path="${2}" + local user=${3} + local password=${4} + + mkdir -p "${creds_dir_path}" + htpasswd -b -c -B "${htpasswd_file_path}" ${user} ${password} +} + +# given the number of zot instances, computes a list of cluster members +# and saves them as a JSON to a file that can be used with jq later. +function generate_zot_cluster_member_list() { + local num_zot_instances=${1} + local patch_file_path=${2} + local temp_file="${BATS_FILE_TMPDIR}/jq-member-dump.json" + echo "{\"cluster\":{\"members\":[]}}" > ${patch_file_path} + + for ((i=0;i<${num_zot_instances};i++)); do + local member="127.0.0.1:$(( 10000 + $i ))" + jq ".cluster.members += [\"${member}\"]" ${patch_file_path} > ${temp_file} && \ + mv ${temp_file} ${patch_file_path} + done + + echo "cluster members patch file" >&3 + cat ${patch_file_path} >&3 +} + +# patches an existing zot config file to add all the cluster members. +function update_zot_cluster_member_list_in_config_file() { + local zot_config_file=${1} + local zot_members_patch_file=${2} + local temp_file="${BATS_FILE_TMPDIR}/jq-mem-update-dump.json" + + jq -s '.[0] * .[1]' ${zot_config_file} ${zot_members_patch_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +} + +# generates and saves a base cloud config with shared storage +# given some basic parameters about the zot instance. +function create_zot_cloud_base_config_file() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir="${3}" + local zot_config_file="${4}" + local zot_log_file="${5}" + + cat > ${zot_config_file}< ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +} + +# updates an existing zot config file that already has an HTTP config +# to include TLS configuration. +# intended for use with create_zot_cloud_base_config_file() above. +function update_zot_cfg_set_tls() { + local zot_config_file="${1}" + local zot_cert_path="${2}" + local zot_key_path="${3}" + local zot_cacert_path="${4}" + local temp_file="${BATS_FILE_TMPDIR}/jq-tls-dump.json" + + # set zot TLS config + jq --arg zot_cert_path "${zot_cert_path}" --arg zot_key_path "${zot_key_path}" '(.http) += {tls: {cert: $zot_cert_path, key: $zot_key_path}}' \ + ${zot_config_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} + + jq --arg zot_cacert_path "${zot_cacert_path}" '(.cluster) += {tls: {cacert: $zot_cacert_path}}' \ + ${zot_config_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +}