diff --git a/.github/workflows/ecosystem-tools.yaml b/.github/workflows/ecosystem-tools.yaml index 05dd8c71..0c268534 100644 --- a/.github/workflows/ecosystem-tools.yaml +++ b/.github/workflows/ecosystem-tools.yaml @@ -27,7 +27,7 @@ jobs: go install github.com/swaggo/swag/cmd/swag@v1.16.2 go mod download sudo apt-get update - sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap haproxy jq # install skopeo git clone -b v1.12.0 https://github.com/containers/skopeo.git cd skopeo @@ -80,4 +80,37 @@ jobs: env: AWS_ACCESS_KEY_ID: fake AWS_SECRET_ACCESS_KEY: fake + - name: Run cloud scale-out tests + id: scale + run: | + make run-cloud-scale-out-tests + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake + continue-on-error: true + - name: print service logs for scale-out + run: | + find /tmp/zot-ft-logs -name '*.log' -print0 | xargs -0 cat + - name: multi-hop detection + id: multihop + run: | + if find /tmp/zot-ft-logs -name '*.log' -print0 | xargs -0 cat | grep 'cannot proxy an already proxied request'; then + echo "detected multi-hop" + exit 1 + else + exit 0 + fi + continue-on-error: true + - name: clean up scale-out logs + run: | + rm -r /tmp/zot-ft-logs + - name: fail job if error + if: ${{ steps.scale.outcome != 'success' || steps.multihop.outcome != 'success' }} + run: | + exit 1 + - name: Upload zb test results zip as build artifact + uses: actions/upload-artifact@v4 + with: + name: zb-cloud-scale-out-functional-results-${{ github.sha }} + path: ./zb-results/ - uses: ./.github/actions/teardown-localstack diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml index 3df6e3b2..c8ff4c18 100644 --- a/.github/workflows/nightly.yaml +++ b/.github/workflows/nightly.yaml @@ -6,11 +6,13 @@ on: permissions: read-all -# Here we are running two tests: +# The following tests are run: # 1. run zot with local storage and dedupe disabled, push images, restart zot with dedupe enabled # task scheduler will start a dedupe all blobs process at zot startup and it shouldn't interfere with clients. # 2. run zot with s3 storage and dynamodb and dedupe enabled, push images, restart zot with dedupe false and no cache # task scheduler will start a restore all blobs process at zot startup, after it finishes all blobs should be restored to their original state (have content) +# 3. run many, many, many instances of zot with shared storage and metadata front-ended by HAProxy. start a long-running zb run with high concurrency and number of requests +# to achieve a long-running sustained load on the system. The system is expected to perform well without errors and return performance data after the test. jobs: dedupe: name: Dedupe/restore blobs @@ -195,3 +197,82 @@ jobs: - name: Run tests run: | ./examples/kind/kind-ci.sh + + cloud-scale-out: + name: s3+dynamodb scale-out + runs-on: ubuntu-latest-16-cores + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + cache: false + go-version: 1.22.x + - name: Install dependencies + run: | + cd $GITHUB_WORKSPACE + go install github.com/swaggo/swag/cmd/swag@v1.16.2 + go mod download + sudo apt-get update + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config rpm uidmap haproxy jq + # install skopeo + git clone -b v1.12.0 https://github.com/containers/skopeo.git + cd skopeo + make bin/skopeo + sudo cp bin/skopeo /usr/bin + skopeo -v + cd $GITHUB_WORKSPACE + - name: Log in to GitHub Docker Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ github.token }} + - uses: actions/setup-python@v5 + with: + python-version: '3.11' + - name: Install localstack + run: | + pip install --upgrade pyopenssl + pip install localstack==3.3.0 awscli-local[ver1] # install LocalStack cli and awslocal + docker pull ghcr.io/project-zot/ci-images/localstack:3.3.0 # Make sure to pull a working version of the image + localstack start -d # Start LocalStack in the background + + echo "Waiting for LocalStack startup..." # Wait 30 seconds for the LocalStack container + localstack wait -t 30 # to become ready before timing out + echo "Startup complete" + - name: Run cloud scale-out high scale performance tests + id: scale + run: | + make run-cloud-scale-out-high-scale-tests + env: + AWS_ACCESS_KEY_ID: fake + AWS_SECRET_ACCESS_KEY: fake + continue-on-error: true + - name: print service logs + run: | + sudo dmesg + cat /tmp/zot-logs/*.log + - name: multi-hop detection + id: multihop + run: | + if cat /tmp/zot-logs/*.log | grep 'cannot proxy an already proxied request'; then + echo "detected multi-hop" + exit 1 + else + exit 0 + fi + continue-on-error: true + - name: clean up logs + run: | + rm -r /tmp/zot-logs + - name: fail job if error + if: ${{ steps.scale.outcome != 'success' || steps.multihop.outcome != 'success' }} + run: | + exit 1 + - name: Upload zb test results zip as build artifact + if: steps.scale.outcome == 'success' + uses: actions/upload-artifact@v4 + with: + name: zb-cloud-scale-out-perf-results-${{ github.sha }} + path: ./zb-results/ + - uses: ./.github/actions/teardown-localstack diff --git a/Makefile b/Makefile index 75cb7092..2afab068 100644 --- a/Makefile +++ b/Makefile @@ -489,6 +489,17 @@ run-blackbox-tests: $(BATS_TEST_FILE_PATH) check-blackbox-prerequisites binary b echo running bats test "$(BATS_TEST_FILE_PATH)"; \ $(BATS) $(BATS_FLAGS) $(BATS_TEST_FILE_PATH) +.PHONY: run-cloud-scale-out-tests +run-cloud-scale-out-tests: check-blackbox-prerequisites check-awslocal binary bench test-prereq + echo running scale out bats test; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_no_auth.bats; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_basic_auth_tls.bats + +.PHONY: run-cloud-scale-out-high-scale-tests +run-cloud-scale-out-high-scale-tests: check-blackbox-prerequisites check-awslocal binary bench test-prereq + echo running cloud scale out bats high scale test; \ + $(BATS) $(BATS_FLAGS) test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats + .PHONY: run-blackbox-ci run-blackbox-ci: check-blackbox-prerequisites binary binary-minimal cli echo running CI bats tests concurently diff --git a/examples/scale-out-cluster-cloud/config-cluster-member0.json b/examples/scale-out-cluster-cloud/config-cluster-member0.json new file mode 100644 index 00000000..f00b969c --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member0.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9000" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/config-cluster-member1.json b/examples/scale-out-cluster-cloud/config-cluster-member1.json new file mode 100644 index 00000000..468dabb4 --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member1.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9001" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/config-cluster-member2.json b/examples/scale-out-cluster-cloud/config-cluster-member2.json new file mode 100644 index 00000000..84981d74 --- /dev/null +++ b/examples/scale-out-cluster-cloud/config-cluster-member2.json @@ -0,0 +1,44 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9002" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors" + } +} diff --git a/examples/scale-out-cluster-cloud/haproxy.cfg b/examples/scale-out-cluster-cloud/haproxy.cfg new file mode 100644 index 00000000..ffb5664e --- /dev/null +++ b/examples/scale-out-cluster-cloud/haproxy.cfg @@ -0,0 +1,26 @@ +global + log /tmp/log local0 + log /tmp/log local1 notice + maxconn 2000 + stats timeout 30s + daemon + +defaults + log global + mode http + option httplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend zot + bind *:8080 + default_backend zot-cluster + +backend zot-cluster + balance roundrobin + cookie SERVER insert indirect nocache + server zot0 127.0.0.1:9000 cookie zot0 + server zot1 127.0.0.1:9001 cookie zot1 + server zot2 127.0.0.1:9002 cookie zot2 diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json new file mode 100644 index 00000000..5d7f2196 --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member0.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9000", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json new file mode 100644 index 00000000..bd269e3f --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member1.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9001", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json b/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json new file mode 100644 index 00000000..5b87b5bb --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/config-cluster-member2.json @@ -0,0 +1,51 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "/tmp/zot", + "dedupe": false, + "remoteCache": true, + "storageDriver": { + "name": "s3", + "rootdirectory": "/zot", + "region": "us-east-1", + "regionendpoint": "localhost:4566", + "bucket": "zot-storage", + "secure": false, + "skipverify": false + }, + "cacheDriver": { + "name": "dynamodb", + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "cacheTablename": "ZotBlobTable", + "repoMetaTablename": "ZotRepoMetadataTable", + "imageMetaTablename": "ZotImageMetaTable", + "repoBlobsInfoTablename": "ZotRepoBlobsInfoTable", + "userDataTablename": "ZotUserDataTable", + "versionTablename": "ZotVersion", + "apiKeyTablename": "ZotApiKeyTable" + } + }, + "http": { + "address": "127.0.0.1", + "port": "9002", + "tls": { + "cert": "test/data/server.cert", + "key": "test/data/server.key" + } + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001", + "127.0.0.1:9002" + ], + "hashKey": "loremipsumdolors", + "tls": { + "cacert": "test/data/ca.crt" + } + } +} diff --git a/examples/scale-out-cluster-cloud/tls/haproxy.cfg b/examples/scale-out-cluster-cloud/tls/haproxy.cfg new file mode 100644 index 00000000..5a39636b --- /dev/null +++ b/examples/scale-out-cluster-cloud/tls/haproxy.cfg @@ -0,0 +1,25 @@ +global + log /tmp/log local0 + log /tmp/log local1 notice + maxconn 2000 + stats timeout 30s + +defaults + log global + mode tcp + option tcplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend zot + bind *:8080 + default_backend zot-cluster + +backend zot-cluster + balance roundrobin + cookie SERVER insert indirect nocache + server zot0 127.0.0.1:9000 cookie zot0 + server zot1 127.0.0.1:9001 cookie zot1 + server zot2 127.0.0.1:9002 cookie zot2 diff --git a/go.mod b/go.mod index 31441332..73f9ef07 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 github.com/aws/aws-secretsmanager-caching-go v1.1.3 github.com/containers/image/v5 v5.30.0 + github.com/dchest/siphash v1.2.3 github.com/google/go-github/v52 v52.0.0 github.com/gorilla/securecookie v1.1.2 github.com/gorilla/sessions v1.2.2 diff --git a/go.sum b/go.sum index d24ccf77..666d7269 100644 --- a/go.sum +++ b/go.sum @@ -591,6 +591,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936 h1:foGzavPWwtoyBvjWyKJYDYsyzy+23iBV7NKTwdk+LRY= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936/go.mod h1:ttKPnOepYt4LLzD+loXQ1rT6EmpyIYHro7TAJuIIlHo= github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= diff --git a/pkg/api/config/config.go b/pkg/api/config/config.go index f474843f..12892a65 100644 --- a/pkg/api/config/config.go +++ b/pkg/api/config/config.go @@ -121,6 +121,32 @@ type SchedulerConfig struct { NumWorkers int } +// contains the scale-out configuration which is identical for all zot replicas. +type ClusterConfig struct { + // contains the "host:port" of all the zot instances participating + // in the cluster. + Members []string `json:"members" mapstructure:"members"` + + // contains the hash key that is required for siphash. + // must be a 128-bit (16-byte) key + // https://github.com/dchest/siphash?tab=readme-ov-file#func-newkey-byte-hashhash64 + HashKey string `json:"hashKey" mapstructure:"hashKey"` + + // contains client TLS config. + TLS *TLSConfig `json:"tls" mapstructure:"tls"` + + // private field for storing Proxy details such as internal socket list. + Proxy *ClusterRequestProxyConfig `json:"-" mapstructure:"-"` +} + +type ClusterRequestProxyConfig struct { + // holds the cluster socket (IP:port) derived from the host's + // interface configuration and the listening port of the HTTP server. + LocalMemberClusterSocket string + // index of the local member cluster socket in the members array. + LocalMemberClusterSocketIndex uint64 +} + type LDAPCredentials struct { BindDN string BindPassword string @@ -230,6 +256,7 @@ type Config struct { Log *LogConfig Extensions *extconf.ExtensionConfig Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"` + Cluster *ClusterConfig `json:"cluster" mapstructure:",omitempty"` } func New() *Config { diff --git a/pkg/api/constants/consts.go b/pkg/api/constants/consts.go index df3cec8e..5afe00a9 100644 --- a/pkg/api/constants/consts.go +++ b/pkg/api/constants/consts.go @@ -31,4 +31,9 @@ const ( DeletePermission = "delete" // behaviour actions. DetectManifestCollisionPermission = "detectManifestCollision" + // zot scale-out hop count header. + ScaleOutHopCountHeader = "X-Zot-Cluster-Hop-Count" + // log string keys. + // these can be used together with the logger to add context to a log message. + RepositoryLogKey = "repository" ) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index e8c167c3..d86d697a 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -19,6 +19,7 @@ import ( "zotregistry.dev/zot/errors" "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/common" ext "zotregistry.dev/zot/pkg/extensions" extconf "zotregistry.dev/zot/pkg/extensions/config" "zotregistry.dev/zot/pkg/extensions/monitoring" @@ -54,15 +55,52 @@ type Controller struct { chosenPort int // kernel-chosen port } -func NewController(config *config.Config) *Controller { +func NewController(appConfig *config.Config) *Controller { var controller Controller - logger := log.NewLogger(config.Log.Level, config.Log.Output) - controller.Config = config + logger := log.NewLogger(appConfig.Log.Level, appConfig.Log.Output) + + if appConfig.Cluster != nil { + // we need the set of local sockets (IP address:port) for identifying + // the local member cluster socket for logging and lookup. + localSockets, err := common.GetLocalSockets(appConfig.HTTP.Port) + if err != nil { + logger.Error().Err(err).Msg("failed to get local sockets") + panic("failed to get local sockets") + } + + // memberSocket is the local member's socket + // the index is also fetched for quick lookups during proxying + memberSocketIdx, memberSocket, err := GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets) + if err != nil { + logger.Error().Err(err).Msg("failed to get member socket") + panic("failed to get member socket") + } + + if memberSocket == "" { + // there is a misconfiguration if the memberSocket cannot be identified + logger.Error(). + Str("members", strings.Join(appConfig.Cluster.Members, ",")). + Str("localSockets", strings.Join(localSockets, ",")). + Msg("failed to determine the local cluster socket") + panic("failed to determine the local cluster socket") + } + + internalProxyConfig := &config.ClusterRequestProxyConfig{ + LocalMemberClusterSocket: memberSocket, + LocalMemberClusterSocketIndex: uint64(memberSocketIdx), + } + appConfig.Cluster.Proxy = internalProxyConfig + + logger.Logger = logger.Logger.With(). + Str("clusterMember", memberSocket). + Str("clusterMemberIndex", strconv.Itoa(memberSocketIdx)).Logger() + } + controller.Config = appConfig controller.Log = logger - if config.Log.Audit != "" { - audit := log.NewAuditLogger(config.Log.Level, config.Log.Audit) + if appConfig.Log.Audit != "" { + audit := log.NewAuditLogger(appConfig.Log.Level, appConfig.Log.Audit) controller.Audit = audit } diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 4d312cee..4f3652a4 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -95,6 +95,32 @@ func TestNew(t *testing.T) { So(conf, ShouldNotBeNil) So(api.NewController(conf), ShouldNotBeNil) }) + + Convey("Given a scale out cluster config where the local cluster socket cannot be found", t, func() { + conf := config.New() + So(conf, ShouldNotBeNil) + conf.HTTP = config.HTTPConfig{ + Address: "127.0.0.2", + Port: "9000", + } + conf.Cluster = &config.ClusterConfig{ + Members: []string{}, + } + So(func() { api.NewController(conf) }, ShouldPanicWith, "failed to determine the local cluster socket") + }) + + Convey("Given a scale out cluster config where the local cluster socket cannot be found due to an error", t, func() { + conf := config.New() + So(conf, ShouldNotBeNil) + conf.HTTP = config.HTTPConfig{ + Address: "127.0.0.2", + Port: "9000", + } + conf.Cluster = &config.ClusterConfig{ + Members: []string{"127.0.0.1"}, + } + So(func() { api.NewController(conf) }, ShouldPanicWith, "failed to get member socket") + }) } func TestCreateCacheDatabaseDriver(t *testing.T) { @@ -958,6 +984,434 @@ func TestBlobReferenced(t *testing.T) { }) } +// tests for shared-storage scale-out cluster. +func TestScaleOutRequestProxy(t *testing.T) { + // when there is only one member, no proxying is expected and the responses should be correct. + Convey("Given a zot scale out cluster in http mode with only 1 member", t, func() { + port := test.GetFreePort() + clusterMembers := make([]string, 1) + clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port) + + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + + Convey("Controller should start up and respond without error", func() { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Should upload images and fetch valid responses for repo tags list", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for _, repoName := range reposToTest { + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0") + So(err, ShouldBeNil) + + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + }) + }) + + // when only one member in the cluster is online, an error is expected when there is a + // request proxied to an offline member. + Convey("Given a scale out http cluster with only 1 online member", t, func() { + port := test.GetFreePort() + clusterMembers := make([]string, 3) + clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port) + clusterMembers[1] = "127.0.0.1:1" + clusterMembers[2] = "127.0.0.1:2" + + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + + Convey("Controller should start up and respond without error", func() { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + }) + + Convey("Should fail to upload an image that is proxied to another instance", func() { + repoName := "alpine" + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0") + So(err, ShouldNotBeNil) + So(err.Error(), ShouldEqual, "can't post blob") + + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when there are multiple members in a cluster, requests are expected to return + // the same data for any member due to proxying. + Convey("Given a zot scale out cluster in http mode with 3 members", t, func() { + numMembers := 3 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + Convey("All 3 controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Should upload images to repos and fetch same response from all 3 members", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for idx, repoName := range reposToTest { + img := CreateRandomImage() + + // Upload to each instance based on loop counter + err := UploadImage(img, test.GetBaseURL(ports[idx]), repoName, "1.0") + So(err, ShouldBeNil) + + // Query all 3 instances and expect the same response + for _, port := range ports { + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName)) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + } + }) + }) + + // this test checks for functionality when TLS and htpasswd auth are enabled. + // it primarily checks that headers are correctly copied over during the proxying process. + Convey("Given a zot scale out cluster in https mode with auth enabled", t, func() { + numMembers := 3 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + username, _ := test.GenerateRandomString() + password, _ := test.GenerateRandomString() + htpasswdPath := test.MakeHtpasswdFileFromString(test.GetCredString(username, password)) + defer os.Remove(htpasswdPath) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.HTTP.Auth = &config.AuthConfig{ + HTPasswd: config.AuthHTPasswd{ + Path: htpasswdPath, + }, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + Convey("All 3 controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().SetBasicAuth(username, password).Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Should upload images to repos and fetch same response from all 3 instances", func() { + reposToTest := []string{"debian", "alpine", "ubuntu"} + for idx, repoName := range reposToTest { + img := CreateRandomImage() + + // Upload to each instance based on loop counter + err := UploadImageWithBasicAuth(img, test.GetSecureBaseURL(ports[idx]), repoName, "1.0", username, password) + So(err, ShouldBeNil) + + // Query all 3 instances and expect the same response + for _, port := range ports { + resp, err := resty.R().SetBasicAuth(username, password).Get( + fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(port), repoName), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + result := common.ImageTags{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + t.Fatalf("Failed to unmarshal") + } + So(result.Name, ShouldEqual, repoName) + So(len(result.Tags), ShouldEqual, 1) + So(result.Tags[0], ShouldEqual, "1.0") + } + } + }) + }) + + // when the RootCA file does not exist, expect an error + Convey("Given a zot scale out cluster in with 2 members and an incorrect RootCACert", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when the server cert file does not exist, expect an error while proxying + Convey("Given a zot scale out cluster in with 2 members and an incorrect server cert", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + Cert: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) + + // when the server key file does not exist, expect an error while proxying + Convey("Given a zot scale out cluster in with 2 members and an incorrect server key", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port) + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.HTTP.TLS = &config.TLSConfig{ + Cert: ServerCert, + Key: ServerKey, + } + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + TLS: &config.TLSConfig{ + CACert: CACert, + Cert: ServerCert, + Key: "/tmp/does-not-exist.crt", + }, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + defer cm.StopServer() + } + + caCert, err := os.ReadFile(CACert) + So(err, ShouldBeNil) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12}) + defer func() { resty.SetTLSClientConfig(nil) }() + + Convey("Both controllers should start up and respond without error", func() { + for _, port := range ports { + resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + }) + + Convey("Proxying a request should fail with an error", func() { + // debian gets proxied to the second instance + resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian")) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) +} + func TestPrintTracebackOnPanic(t *testing.T) { Convey("Run server on unavailable port", t, func() { port := test.GetFreePort() diff --git a/pkg/api/proxy.go b/pkg/api/proxy.go new file mode 100644 index 00000000..d3c6224d --- /dev/null +++ b/pkg/api/proxy.go @@ -0,0 +1,258 @@ +package api + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "net/http" + + "github.com/dchest/siphash" + "github.com/gorilla/mux" + + "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/api/constants" + "zotregistry.dev/zot/pkg/common" +) + +// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure +// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster. +// based on the hash value of the repository name, the request will either be handled locally +// or proxied to another zot member in the cluster to get the data before sending a response to the client. +func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc { + return func(next http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { + config := ctrlr.Config + logger := ctrlr.Log + + // if no cluster or single-node cluster, handle locally. + if config.Cluster == nil || len(config.Cluster.Members) == 1 { + next.ServeHTTP(response, request) + + return + } + + // since the handler has been wrapped, it should be possible to get the name + // of the repository from the mux. + vars := mux.Vars(request) + name, ok := vars["name"] + + if !ok || name == "" { + response.WriteHeader(http.StatusNotFound) + + return + } + + // the target member is the only one which should do read/write for the dist-spec APIs + // for the given repository. + targetMemberIndex, targetMember := computeTargetMember(config, name) + + logger.Debug().Str(constants.RepositoryLogKey, name). + Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex)) + + // if the target member is the same as the local member, the current member should handle the request. + // since the instances have the same config, a quick index lookup is sufficient + if targetMemberIndex == config.Cluster.Proxy.LocalMemberClusterSocketIndex { + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally") + next.ServeHTTP(response, request) + + return + } + + // if the header contains a hop-count, return an error response as there should be no multi-hop + if request.Header.Get(constants.ScaleOutHopCountHeader) != "" { + logger.Fatal().Str("url", request.URL.String()). + Msg("failed to process request - cannot proxy an already proxied request") + + return + } + + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request") + + proxyResponse, err := proxyHTTPRequest(request.Context(), request, targetMember, ctrlr) + if err != nil { + logger.Error().Err(err).Str(constants.RepositoryLogKey, name).Msg("failed to proxy the request") + http.Error(response, err.Error(), http.StatusInternalServerError) + + return + } + defer proxyResponse.Body.Close() + + copyHeader(response.Header(), proxyResponse.Header) + response.WriteHeader(proxyResponse.StatusCode) + _, _ = io.Copy(response, proxyResponse.Body) + }) + } +} + +// computes the target member using siphash and returns the index and the member +// siphash was chosen to prevent against hash attacks where an attacker +// can target all requests to one given instance instead of balancing across the cluster +// resulting in a Denial-of-Service (DOS). +// ref: https://en.wikipedia.org/wiki/SipHash +func computeTargetMember(config *config.Config, name string) (uint64, string) { + h := siphash.New([]byte(config.Cluster.HashKey)) + h.Write([]byte(name)) + sum64 := h.Sum64() + targetIdx := sum64 % uint64(len(config.Cluster.Members)) + + return targetIdx, config.Cluster.Members[targetIdx] +} + +// gets all the server sockets of a target member - IP:Port. +// for IPv6, the socket is [IPv6]:Port. +// if the input is an IP address, returns the same targetMember in an array. +// if the input is a host name, performs a lookup and returns the server sockets. +func getTargetMemberServerSockets(targetMemberSocket string) ([]string, error) { + targetHost, targetPort, err := net.SplitHostPort(targetMemberSocket) + if err != nil { + return []string{}, err + } + + addr := net.ParseIP(targetHost) + if addr != nil { + // this is an IP address, return as is + return []string{targetMemberSocket}, nil + } + // this is a hostname - try to resolve to an IP + resolvedAddrs, err := common.GetIPFromHostName(targetHost) + if err != nil { + return []string{}, err + } + + targetSockets := make([]string, len(resolvedAddrs)) + for idx, resolvedAddr := range resolvedAddrs { + targetSockets[idx] = net.JoinHostPort(resolvedAddr, targetPort) + } + + return targetSockets, nil +} + +// proxy the request to the target member and return a pointer to the response or an error. +func proxyHTTPRequest(ctx context.Context, req *http.Request, + targetMember string, ctrlr *Controller, +) (*http.Response, error) { + cloneURL := *req.URL + + proxyQueryScheme := "http" + if ctrlr.Config.HTTP.TLS != nil { + proxyQueryScheme = "https" + } + + cloneURL.Scheme = proxyQueryScheme + cloneURL.Host = targetMember + + clonedBody := cloneRequestBody(req) + + fwdRequest, err := http.NewRequestWithContext(ctx, req.Method, cloneURL.String(), clonedBody) + if err != nil { + return nil, err + } + + copyHeader(fwdRequest.Header, req.Header) + + // always set hop count to 1 for now. + // the handler wrapper above will terminate the process if it sees a request that + // already has a hop count but is due for proxying. + fwdRequest.Header.Set(constants.ScaleOutHopCountHeader, "1") + + clientOpts := common.HTTPClientOptions{ + TLSEnabled: ctrlr.Config.HTTP.TLS != nil, + VerifyTLS: ctrlr.Config.HTTP.TLS != nil, // for now, always verify TLS when TLS mode is enabled + Host: targetMember, + } + + tlsConfig := ctrlr.Config.Cluster.TLS + if tlsConfig != nil { + clientOpts.CertOptions.ClientCertFile = tlsConfig.Cert + clientOpts.CertOptions.ClientKeyFile = tlsConfig.Key + clientOpts.CertOptions.RootCaCertFile = tlsConfig.CACert + } + + httpClient, err := common.CreateHTTPClient(&clientOpts) + if err != nil { + return nil, err + } + + resp, err := httpClient.Do(fwdRequest) + if err != nil { + return nil, err + } + + var clonedRespBody bytes.Buffer + + // copy out the contents into a new buffer as the response body + // stream should be closed to get all the data out. + _, _ = io.Copy(&clonedRespBody, resp.Body) + resp.Body.Close() + + // after closing the original body, substitute it with a new reader + // using the buffer that was just created. + // this buffer should be closed later by the consumer of the response. + resp.Body = io.NopCloser(bytes.NewReader(clonedRespBody.Bytes())) + + return resp, nil +} + +func cloneRequestBody(src *http.Request) io.Reader { + var bCloneForOriginal, bCloneForCopy bytes.Buffer + multiWriter := io.MultiWriter(&bCloneForOriginal, &bCloneForCopy) + numBytesCopied, _ := io.Copy(multiWriter, src.Body) + + // if the body is a type of io.NopCloser and length is 0, + // the Content-Length header is not sent in the proxied request. + // explicitly returning http.NoBody allows the implementation + // to set the header. + // ref: https://github.com/golang/go/issues/34295 + if numBytesCopied == 0 { + src.Body = http.NoBody + + return http.NoBody + } + + src.Body = io.NopCloser(&bCloneForOriginal) + + return bytes.NewReader(bCloneForCopy.Bytes()) +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} + +// identifies and returns the cluster socket and index. +// this is the socket which the scale out cluster members will use for +// proxying and communication among each other. +// returns index, socket, error. +// returns an empty string and index value -1 if the cluster socket is not found. +func GetLocalMemberClusterSocket(members []string, localSockets []string) (int, string, error) { + for memberIdx, member := range members { + // for each member, get the full list of sockets, including DNS resolution + memberSockets, err := getTargetMemberServerSockets(member) + if err != nil { + return -1, "", err + } + + // for each member socket that we have, compare all the local sockets with + // it to see if there is any match. + for _, memberSocket := range memberSockets { + for _, localSocket := range localSockets { + // this checks if the sockets are equal at a host port level + areSocketsEqual, err := common.AreSocketsEqual(memberSocket, localSocket) + if err != nil { + return -1, "", err + } + + if areSocketsEqual { + return memberIdx, member, nil + } + } + } + } + + return -1, "", nil +} diff --git a/pkg/api/proxy_test.go b/pkg/api/proxy_test.go new file mode 100644 index 00000000..f11daede --- /dev/null +++ b/pkg/api/proxy_test.go @@ -0,0 +1,59 @@ +//go:build sync && scrub && metrics && search && lint && userprefs && mgmt && imagetrust && ui +// +build sync,scrub,metrics,search,lint,userprefs,mgmt,imagetrust,ui + +package api_test + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + + "zotregistry.dev/zot/pkg/api" +) + +func TestGetLocalMemberClusterSocket(t *testing.T) { + Convey("Should return an error if a domain name doesn't exist", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "thisdoesnotexist:9000", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err.Error(), ShouldContainSubstring, "lookup thisdoesnotexist") + So(index, ShouldEqual, -1) + So(socket, ShouldEqual, "") + }) + + Convey("Should return an error if a local socket is missing a port", t, func() { + localSockets := []string{"127.0.0.1", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err.Error(), ShouldEqual, "address 127.0.0.1: missing port in address") + So(index, ShouldEqual, -1) + So(socket, ShouldEqual, "") + }) + + Convey("Should return an error if a member socket is missing a port", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "www.github.com", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err.Error(), ShouldEqual, "address www.github.com: missing port in address") + So(index, ShouldEqual, -1) + So(socket, ShouldEqual, "") + }) + + Convey("Should return the right socket when a local socket is part of members", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err, ShouldBeNil) + So(index, ShouldEqual, 2) + So(socket, ShouldEqual, "127.0.0.1:9000") + }) + + Convey("Should return empty when no local socket is part of members", t, func() { + localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} + members := []string{"127.0.0.1:9002", "127.0.0.1:9001", "www.github.com:443"} + index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + So(err, ShouldBeNil) + So(index, ShouldEqual, -1) + So(socket, ShouldBeEmpty) + }) +} diff --git a/pkg/api/routes.go b/pkg/api/routes.go index b9001b5f..f71d7ae5 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -127,40 +127,66 @@ func (rh *RouteHandler) SetupRoutes() { prefixedDistSpecRouter.Use(DistSpecAuthzHandler(rh.c)) } + clusterRouteProxy := ClusterProxy(rh.c) + // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#endpoints + // dist-spec APIs that need to be proxied are wrapped in clusterRouteProxy for scale-out proxying. + // these are handlers that have a repository name. { prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/tags/list", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( - applyCORSHeaders(rh.ListTags))).Methods(http.MethodGet, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( + applyCORSHeaders(rh.ListTags), + ), + ), + ).Methods(http.MethodGet, http.MethodOptions) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)( - applyCORSHeaders(rh.CheckManifest))).Methods(http.MethodHead, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)( + applyCORSHeaders(rh.CheckManifest), + ), + ), + ).Methods(http.MethodHead, http.MethodOptions) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - applyCORSHeaders(rh.GetManifest)).Methods(http.MethodGet) + clusterRouteProxy( + applyCORSHeaders(rh.GetManifest), + ), + ).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - rh.UpdateManifest).Methods(http.MethodPut) + clusterRouteProxy(rh.UpdateManifest)).Methods(http.MethodPut) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()), - applyCORSHeaders(rh.DeleteManifest)).Methods(http.MethodDelete) + clusterRouteProxy( + applyCORSHeaders(rh.DeleteManifest), + ), + ).Methods(http.MethodDelete) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.CheckBlob).Methods(http.MethodHead) + clusterRouteProxy(rh.CheckBlob)).Methods(http.MethodHead) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.GetBlob).Methods(http.MethodGet) + clusterRouteProxy(rh.GetBlob)).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()), - rh.DeleteBlob).Methods(http.MethodDelete) + clusterRouteProxy(rh.DeleteBlob)).Methods(http.MethodDelete) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/", zreg.NameRegexp.String()), - rh.CreateBlobUpload).Methods(http.MethodPost) + clusterRouteProxy(rh.CreateBlobUpload)).Methods(http.MethodPost) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.GetBlobUpload).Methods(http.MethodGet) + clusterRouteProxy(rh.GetBlobUpload)).Methods(http.MethodGet) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.PatchBlobUpload).Methods(http.MethodPatch) + clusterRouteProxy(rh.PatchBlobUpload)).Methods(http.MethodPatch) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.UpdateBlobUpload).Methods(http.MethodPut) + clusterRouteProxy(rh.UpdateBlobUpload)).Methods(http.MethodPut) prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()), - rh.DeleteBlobUpload).Methods(http.MethodDelete) + clusterRouteProxy(rh.DeleteBlobUpload)).Methods(http.MethodDelete) // support for OCI artifact references prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/referrers/{digest}", zreg.NameRegexp.String()), - getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( - applyCORSHeaders(rh.GetReferrers))).Methods(http.MethodGet, http.MethodOptions) + clusterRouteProxy( + getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( + applyCORSHeaders(rh.GetReferrers), + ), + ), + ).Methods(http.MethodGet, http.MethodOptions) + + // handlers which work fine with a single node do not need proxying. + // catalog handler doesn't require proxying as the metadata and storage are shared. + // discover and the default path handlers are node-specific so do not require proxying. prefixedRouter.HandleFunc(constants.ExtCatalogPrefix, getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)( applyCORSHeaders(rh.ListRepositories))).Methods(http.MethodGet, http.MethodOptions) diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index 4916cd20..46f16b0d 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -457,6 +457,11 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error { } } + // check validity of scale out cluster config + if err := validateClusterConfig(config, log); err != nil { + return err + } + return nil } @@ -1103,3 +1108,27 @@ func validateSync(config *config.Config, log zlog.Logger) error { return nil } + +func validateClusterConfig(config *config.Config, log zlog.Logger) error { + if config.Cluster != nil { + if len(config.Cluster.Members) == 0 { + log.Error().Err(zerr.ErrBadConfig). + Msg("cannot have 0 members in a scale out cluster") + + return zerr.ErrBadConfig + } + + // the allowed length is 16 as the siphash requires a 128 bit key. + // that translates to 16 characters * 8 bits each. + allowedHashKeyLength := 16 + if len(config.Cluster.HashKey) != allowedHashKeyLength { + log.Error().Err(zerr.ErrBadConfig). + Str("hashkey", config.Cluster.HashKey). + Msg(fmt.Sprintf("hashKey for scale out cluster must have %d characters", allowedHashKeyLength)) + + return zerr.ErrBadConfig + } + } + + return nil +} diff --git a/pkg/cli/server/root_test.go b/pkg/cli/server/root_test.go index b0f7e94f..9e7fab4e 100644 --- a/pkg/cli/server/root_test.go +++ b/pkg/cli/server/root_test.go @@ -2028,6 +2028,150 @@ func TestUpdateLDAPConfig(t *testing.T) { }) } +func TestClusterConfig(t *testing.T) { + baseExamplePath := "../../../examples/scale-out-cluster-cloud/" + + Convey("Should successfully load example configs for cloud", t, func() { + for memberIdx := 0; memberIdx < 3; memberIdx++ { + cfgFileToLoad := fmt.Sprintf("%s/config-cluster-member%d.json", baseExamplePath, memberIdx) + cfg := config.New() + err := cli.LoadConfiguration(cfg, cfgFileToLoad) + So(err, ShouldBeNil) + } + }) + + Convey("Should successfully load example TLS configs for cloud", t, func() { + for memberIdx := 0; memberIdx < 3; memberIdx++ { + cfgFileToLoad := fmt.Sprintf("%s/tls/config-cluster-member%d.json", baseExamplePath, memberIdx) + cfg := config.New() + err := cli.LoadConfiguration(cfg, cfgFileToLoad) + So(err, ShouldBeNil) + } + }) + + Convey("Should reject scale out cluster invalid cases", t, func() { + cfgFileContents, err := os.ReadFile(baseExamplePath + "config-cluster-member0.json") + So(err, ShouldBeNil) + + Convey("Should reject empty members list", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the members to an empty list + cfg.Cluster.Members = []string{} + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject missing members list", func() { + cfg := config.New() + + configStr := ` + { + "storage": { + "RootDirectory": "/tmp/example" + }, + "http": { + "address": "127.0.0.1", + "port": "800" + }, + "cluster" { + "hashKey": "loremipsumdolors" + } + }` + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + err = os.WriteFile(file.Name(), []byte(configStr), 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject missing hashkey", func() { + cfg := config.New() + + configStr := ` + { + "storage": { + "RootDirectory": "/tmp/example" + }, + "http": { + "address": "127.0.0.1", + "port": "800" + }, + "cluster" { + "members": ["127.0.0.1:9000"] + } + }` + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + err = os.WriteFile(file.Name(), []byte(configStr), 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject a hashkey that is too short", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the hashkey to a string shorter than 16 characters + cfg.Cluster.HashKey = "fifteencharacte" + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + + Convey("Should reject a hashkey that is too long", func() { + cfg := config.New() + err := json.Unmarshal(cfgFileContents, cfg) + So(err, ShouldBeNil) + + // set the hashkey to a string longer than 16 characters + cfg.Cluster.HashKey = "seventeencharacte" + + file, err := os.CreateTemp("", "cluster-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + cfgFileContents, err := json.MarshalIndent(cfg, "", " ") + So(err, ShouldBeNil) + + err = os.WriteFile(file.Name(), cfgFileContents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(cfg, file.Name()) + So(err, ShouldNotBeNil) + }) + }) +} + // run cli and return output. func runCLIWithConfig(tempDir string, config string) (string, error) { port := GetFreePort() diff --git a/pkg/common/common.go b/pkg/common/common.go index 3b9abe8b..cc215d1a 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io/fs" + "net" "os" "regexp" "strings" @@ -145,3 +146,93 @@ func IsContextDone(ctx context.Context) bool { return false } } + +// get a list of IP addresses configured on the host's +// interfaces. +func GetLocalIPs() ([]string, error) { + var localIPs []string + + ifaces, err := net.Interfaces() + if err != nil { + return []string{}, err + } + + for _, i := range ifaces { + addrs, err := i.Addrs() + if err != nil { + return localIPs, err + } + + for _, addr := range addrs { + if localIP, ok := addr.(*net.IPNet); ok { + localIPs = append(localIPs, localIP.IP.String()) + } + } + } + + return localIPs, nil +} + +// get a list of listening sockets on the host (IP:port). +// IPv6 is returned as [host]:port. +func GetLocalSockets(port string) ([]string, error) { + localIPs, err := GetLocalIPs() + if err != nil { + return []string{}, err + } + + localSockets := make([]string, len(localIPs)) + + for idx, ip := range localIPs { + // JoinHostPort automatically wraps IPv6 addresses in [] + localSockets[idx] = net.JoinHostPort(ip, port) + } + + return localSockets, nil +} + +func GetIPFromHostName(host string) ([]string, error) { + addrs, err := net.LookupIP(host) + if err != nil { + return []string{}, err + } + + ips := make([]string, 0, len(addrs)) + + for _, ip := range addrs { + ips = append(ips, ip.String()) + } + + return ips, nil +} + +// checks if 2 sockets are equal at the host port level. +func AreSocketsEqual(socketA string, socketB string) (bool, error) { + hostA, portA, err := net.SplitHostPort(socketA) + if err != nil { + return false, err + } + + hostB, portB, err := net.SplitHostPort(socketB) + if err != nil { + return false, err + } + + hostAIP := net.ParseIP(hostA) + if hostAIP == nil { + // this could be a fully-qualified domain name (FQDN) + // for FQDN, just a normal compare is enough + return hostA == hostB, nil + } + + hostBIP := net.ParseIP(hostB) + if hostBIP == nil { + // if the host part of socketA was parsed successfully, it was an IP + // if the host part of socketA was an FQDN, then the comparison is + // already done as the host of socketB is also assumed to be an FQDN. + // since the parsing failed, assume that A and B are not equal. + return false, nil + } + + return (hostAIP.Equal(hostBIP) && (portA == portB)), nil +} diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index a59ea8df..3d7b5e9e 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -3,6 +3,7 @@ package common_test import ( "os" "path" + "strings" "testing" notreg "github.com/notaryproject/notation-go/registry" @@ -61,4 +62,107 @@ func TestCommon(t *testing.T) { Convey("Test ArtifactTypeNotation const has same value as in notaryproject", t, func() { So(common.ArtifactTypeNotation, ShouldEqual, notreg.ArtifactTypeNotation) }) + + Convey("Test GetLocalIPs", t, func() { + localIPs, err := common.GetLocalIPs() + So(err, ShouldBeNil) + So(localIPs, ShouldNotBeEmpty) + So(localIPs, ShouldContain, "127.0.0.1") + }) + + Convey("Test GetLocalSockets IPv4", t, func() { + localSockets, err := common.GetLocalSockets("8765") + So(err, ShouldBeNil) + So(localSockets, ShouldNotBeEmpty) + So(localSockets, ShouldContain, "127.0.0.1:8765") + for _, socket := range localSockets { + lastColonIndex := strings.LastIndex(socket, ":") + So(socket[lastColonIndex+1:], ShouldEqual, "8765") + } + }) + + Convey("Test GetLocalSockets IPv6", t, func() { + localSockets, err := common.GetLocalSockets("8766") + So(err, ShouldBeNil) + So(localSockets, ShouldNotBeEmpty) + So(localSockets, ShouldContain, "[::1]:8766") + for _, socket := range localSockets { + lastColonIndex := strings.LastIndex(socket, ":") + So(socket[lastColonIndex+1:], ShouldEqual, "8766") + } + }) + + Convey("Test GetIPFromHostName with valid hostname", t, func() { + addrs, err := common.GetIPFromHostName("github.com") + So(err, ShouldBeNil) + So(addrs, ShouldNotBeEmpty) + // we can't check the actual addresses here as they can change + }) + + Convey("Test GetIPFromHostName with non-existent hostname", t, func() { + addrs, err := common.GetIPFromHostName("thisdoesnotexist") + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "lookup thisdoesnotexist") + So(addrs, ShouldBeEmpty) + }) + + Convey("Test AreSocketsEqual with equal IPv4 sockets", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "127.0.0.1:9000") + So(err, ShouldBeNil) + So(result, ShouldBeTrue) + }) + + Convey("Test AreSocketsEqual with equal IPv6 sockets", t, func() { + result, err := common.AreSocketsEqual("[::1]:9000", "[0000:0000:0000:0000:0000:0000:0000:00001]:9000") + So(err, ShouldBeNil) + So(result, ShouldBeTrue) + }) + + Convey("Test AreSocketsEqual with different IPv4 socket ports", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "127.0.0.1:9001") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with different IPv4 socket hosts", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "127.0.0.2:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with 2 equal host names", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "localhost:9000") + So(err, ShouldBeNil) + So(result, ShouldBeTrue) + }) + + Convey("Test AreSocketsEqual with 2 different host names", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "notlocalhost:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with hostname and IP address", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "127.0.0.1:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with IP address and hostname", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1:9000", "localhost:9000") + So(err, ShouldBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with invalid first socket", t, func() { + result, err := common.AreSocketsEqual("127.0.0.1", "localhost:9000") + So(err, ShouldNotBeNil) + So(result, ShouldBeFalse) + }) + + Convey("Test AreSocketsEqual with invalid second socket", t, func() { + result, err := common.AreSocketsEqual("localhost:9000", "127.0.0.1") + So(err, ShouldNotBeNil) + So(result, ShouldBeFalse) + }) } diff --git a/test/scale-out/cloud_scale_out_basic_auth_tls.bats b/test/scale-out/cloud_scale_out_basic_auth_tls.bats new file mode 100644 index 00000000..bf232d5c --- /dev/null +++ b/test/scale-out/cloud_scale_out_basic_auth_tls.bats @@ -0,0 +1,75 @@ +# note: intended to be run as "make run-cloud-scale-out-tests". +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 +ZOT_LOG_DIR=/tmp/zot-ft-logs/auth-tls + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p ${ZOT_LOG_DIR} + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="${ZOT_LOG_DIR}/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + update_zot_cfg_set_htpasswd_auth "${zot_config_file}" ${ZOT_HTPASSWD_PATH} + update_zot_cfg_set_tls "${zot_config_file}" ${ZOT_TLS_CERT_PATH} ${ZOT_TLS_KEY_PATH} ${ZOT_TLS_CA_CERT_PATH} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} "https" +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + # setup htpasswd for local auth + setup_local_htpasswd + + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "https" + haproxy_start ${HAPROXY_CFG_FILE} + + # list haproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-basic-auth-tls-bats" "https://127.0.0.1:8000" 3 5 "${ZOT_AUTH_USER}:${ZOT_AUTH_PASS}" +} diff --git a/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats b/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats new file mode 100644 index 00000000..446aa21a --- /dev/null +++ b/test/scale-out/cloud_scale_out_basic_auth_tls_scale.bats @@ -0,0 +1,74 @@ +# note: intended to be run as "make run-cloud-scale-out-high-scale-tests" +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p /tmp/zot-logs + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="/tmp/zot-logs/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + update_zot_cfg_set_htpasswd_auth "${zot_config_file}" ${ZOT_HTPASSWD_PATH} + update_zot_cfg_set_tls "${zot_config_file}" ${ZOT_TLS_CERT_PATH} ${ZOT_TLS_KEY_PATH} ${ZOT_TLS_CA_CERT_PATH} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} "https" +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + # setup htpasswd for local auth + setup_local_htpasswd + + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "https" + haproxy_start ${HAPROXY_CFG_FILE} + + # list haproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-high-scale-bats" "https://127.0.0.1:8000" 10 100 "${ZOT_AUTH_USER}:${ZOT_AUTH_PASS}" +} diff --git a/test/scale-out/cloud_scale_out_no_auth.bats b/test/scale-out/cloud_scale_out_no_auth.bats new file mode 100644 index 00000000..938a02a4 --- /dev/null +++ b/test/scale-out/cloud_scale_out_no_auth.bats @@ -0,0 +1,69 @@ +# note: intended to be run as "make run-cloud-scale-out-tests" +# makefile target installs & checks all necessary tooling +# extra tools that are not covered in Makefile target needs to be added in verify_prerequisites() + +NUM_ZOT_INSTANCES=6 +ZOT_LOG_DIR=/tmp/zot-ft-logs/no-auth + +load helpers_zot +load helpers_cloud +load helpers_haproxy + +function launch_zot_server() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir=${ZOT_ROOT_DIR} + + mkdir -p ${zot_root_dir} + mkdir -p ${ZOT_LOG_DIR} + + local zot_config_file="${BATS_FILE_TMPDIR}/zot_config_${zot_server_address}_${zot_server_port}.json" + local zot_log_file="${ZOT_LOG_DIR}/zot-${zot_server_address}-${zot_server_port}.log" + + create_zot_cloud_base_config_file ${zot_server_address} ${zot_server_port} ${zot_root_dir} ${zot_config_file} ${zot_log_file} + update_zot_cluster_member_list_in_config_file ${zot_config_file} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + echo "launching zot server ${zot_server_address}:${zot_server_port}" >&3 + echo "config file: ${zot_config_file}" >&3 + echo "log file: ${zot_log_file}" >&3 + + zot_serve ${zot_config_file} + wait_zot_reachable ${zot_server_port} +} + +function setup() { + # verify prerequisites are available + if ! $(verify_prerequisites); then + exit 1 + fi + + # setup S3 bucket and DynamoDB tables + setup_cloud_services + generate_zot_cluster_member_list ${NUM_ZOT_INSTANCES} ${ZOT_CLUSTER_MEMBERS_PATCH_FILE} + + for ((i=0;i<${NUM_ZOT_INSTANCES};i++)); do + launch_zot_server 127.0.0.1 $(( 10000 + $i )) + done + + # list all zot processes that were started + ps -ef | grep ".*zot.*serve.*" | grep -v grep >&3 + + generate_haproxy_config ${HAPROXY_CFG_FILE} "http" + haproxy_start ${HAPROXY_CFG_FILE} + + # list HAproxy processes that were started + ps -ef | grep "haproxy" | grep -v grep >&3 +} + +function teardown() { + local zot_root_dir=${ZOT_ROOT_DIR} + haproxy_stop_all + zot_stop_all + rm -rf ${zot_root_dir} + teardown_cloud_services +} + +@test "Check for successful zb run on haproxy frontend" { + # zb_run + zb_run "cloud-scale-out-no-auth-bats" "http://127.0.0.1:8000" 3 5 +} diff --git a/test/scale-out/helpers_cloud.bash b/test/scale-out/helpers_cloud.bash new file mode 100644 index 00000000..a747e8d0 --- /dev/null +++ b/test/scale-out/helpers_cloud.bash @@ -0,0 +1,35 @@ +function setup_cloud_services() { + setup_s3 "us-east-2" "zot-storage-test" + setup_dynamodb "us-east-2" +} + +function teardown_cloud_services() { + delete_s3_bucket "zot-storage-test" + teardown_dynamodb "us-east-2" +} + +function setup_s3() { + local region=${1} + local bucket=${2} + awslocal s3 --region ${region} mb s3://${bucket} +} + +function delete_s3_bucket() { + local bucket=${1} + awslocal s3 rb s3://${bucket} --force +} + +function setup_dynamodb() { + local region=${1} + awslocal dynamodb --region ${region} \ + create-table \ + --table-name "BlobTable" \ + --attribute-definitions AttributeName=Digest,AttributeType=S \ + --key-schema AttributeName=Digest,KeyType=HASH \ + --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=5 +} + +function teardown_dynamodb() { + local region=${1} + awslocal dynamodb --region ${region} delete-table --table-name "BlobTable" +} diff --git a/test/scale-out/helpers_haproxy.bash b/test/scale-out/helpers_haproxy.bash new file mode 100644 index 00000000..68cafa4a --- /dev/null +++ b/test/scale-out/helpers_haproxy.bash @@ -0,0 +1,71 @@ +HAPROXY_CFG_FILE="${BATS_FILE_TMPDIR}/haproxy/haproxy-test.cfg" + +function generate_haproxy_server_list() { + local num_instances=${1} + for ((i=0;i<${num_instances};i++)) do + local port=$(( 10000 + $i )) + echo " server zot${i} 127.0.0.1:${port}" + done +} + +# stops all haproxy instances started by the test +function haproxy_stop_all() { + pkill haproxy +} + +# starts one haproxy instance with the given config file +# expects the haproxy config to specify daemon mode +function haproxy_start() { + local haproxy_cfg_file=${1} + + # Check the config file + haproxy -f ${haproxy_cfg_file} -c >&3 + + # Start haproxy + haproxy -f ${haproxy_cfg_file} +} + +# generates HAproxy config for use in the test +function generate_haproxy_config() { + local haproxy_cfg_file="${1}" + local haproxy_root_dir="$(dirname ${haproxy_cfg_file})" + # can be either http or https + local protocol="${2}" + + mkdir -p ${haproxy_root_dir} + + local haproxy_mode='http' + if [ "$protocol" == 'https' ]; then + haproxy_mode='tcp' + fi + + cat > ${haproxy_cfg_file}<> ${haproxy_cfg_file} + + cat ${haproxy_cfg_file} >&3 +} diff --git a/test/scale-out/helpers_zot.bash b/test/scale-out/helpers_zot.bash new file mode 100644 index 00000000..22dc5b92 --- /dev/null +++ b/test/scale-out/helpers_zot.bash @@ -0,0 +1,273 @@ +ROOT_DIR=$(git rev-parse --show-toplevel) +OS=$(go env GOOS) +ARCH=$(go env GOARCH) +ZOT_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH} +ZLI_PATH=${ROOT_DIR}/bin/zli-${OS}-${ARCH} +ZOT_MINIMAL_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH}-minimal + +# basic auth +ZOT_AUTH_USER=poweruser +ZOT_AUTH_PASS=sup*rSecr9T +ZOT_CREDS_PATH="${BATS_FILE_TMPDIR}/creds" +ZOT_HTPASSWD_PATH="${ZOT_CREDS_PATH}/htpasswd" + +# zb +ZB_PATH=${ROOT_DIR}/bin/zb-${OS}-${ARCH} +ZB_RESULTS_PATH=${ROOT_DIR}/zb-results +ZB_CI_CD_OUTPUT_FILE=${ROOT_DIR}/ci-cd.json + +# zot scale out cluster +ZOT_CLUSTER_MEMBERS_PATCH_FILE="${BATS_FILE_TMPDIR}/members-patch.json" +ZOT_ROOT_DIR="${BATS_FILE_TMPDIR}/zot" +ZOT_TLS_CERT_PATH="${ROOT_DIR}/test/data/server.cert" +ZOT_TLS_KEY_PATH="${ROOT_DIR}/test/data/server.key" +ZOT_TLS_CA_CERT_PATH="${ROOT_DIR}/test/data/ca.crt" + +function verify_prerequisites { + if [ ! -f ${ZOT_PATH} ]; then + echo "you need to build ${ZOT_PATH} before running the tests" >&3 + return 1 + fi + + if [ ! -f ${ZB_PATH} ]; then + echo "you need to build ${ZB_PATH} before running the tests" >&3 + return 1 + fi + + if [ ! $(command -v skopeo) ]; then + echo "you need to install skopeo as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v awslocal) ] &>/dev/null; then + echo "you need to install aws cli as a prerequisite to running the tests" >&3 + return 1 + fi + + if [ ! $(command -v haproxy) ] &>/dev/null; then + echo "you need to install haproxy as a prerequisite to running the tests" >&3 + return 1 + fi + + return 0 +} + +function get_free_port(){ + while true + do + random_port=$(( ((RANDOM<<15)|RANDOM) % 49152 + 10000 )) + status="$(nc -z 127.0.0.1 $random_port < /dev/null &>/dev/null; echo $?)" + if [ "${status}" != "0" ]; then + free_port=${random_port}; + break; + fi + done + + echo ${free_port} +} + +function zot_serve() { + local config_file=${1} + ${ZOT_PATH} serve ${config_file} & +} + +# stops all zot instances started by the test +function zot_stop_all() { + pkill zot +} + +# waits for the zot server to be reachable +# leaving argument 2 blank or specifying "http" causes the function to use HTTP +# specifying "https" for argument 2 causes the function to use TLS +function wait_zot_reachable() { + local zot_port=${1} + local protocol=${2} + if [ -z "${protocol}" ]; then + protocol="http" + fi + + local zot_url="${protocol}://127.0.0.1:${zot_port}/v2/_catalog" + + local curl_opts=( + --connect-timeout 3 + --max-time 5 + --retry 20 + --retry-delay 1 + --retry-max-time 180 + --retry-connrefused + ) + + # since this is only a reachability check, we can disable cert verification + if [ "${protocol}" == "https" ]; then + curl_opts=(--insecure "${curl_opts[@]}") + fi + + curl "${curl_opts[@]}" ${zot_url} +} + +function zb_run() { + local test_name=${1} + local zot_address=${2} + local concurrent_reqs=${3} + local num_requests=${4} + local credentials=${5} + + if [ ! -d "${ZB_RESULTS_PATH}" ]; then + mkdir -p "${ZB_RESULTS_PATH}" + fi + + local zb_args=( + -c ${concurrent_reqs} + -n ${num_requests} + --src-cidr 127.0.10.0/24 + -o ci-cd + --skip-cleanup + ) + + if [ ! -z "${credentials}" ]; then + zb_args=(-A ${credentials} "${zb_args[@]}") + fi + + start=$(date +%s) + ${ZB_PATH} "${zb_args[@]}" ${zot_address} + stop=$(date +%s) + + runtime=$((stop-start)) + echo "Duration: ${runtime} seconds" >&3 + + if [ -f "${ZB_CI_CD_OUTPUT_FILE}" ]; then + mv "${ZB_CI_CD_OUTPUT_FILE}" "${ZB_RESULTS_PATH}/${test_name}-results.json" + fi +} + +function setup_local_htpasswd() { + create_htpasswd_file "${ZOT_CREDS_PATH}" "${ZOT_HTPASSWD_PATH}" ${ZOT_AUTH_USER} ${ZOT_AUTH_PASS} +} + +function create_htpasswd_file() { + local creds_dir_path="${1}" + local htpasswd_file_path="${2}" + local user=${3} + local password=${4} + + mkdir -p "${creds_dir_path}" + htpasswd -b -c -B "${htpasswd_file_path}" ${user} ${password} +} + +# given the number of zot instances, computes a list of cluster members +# and saves them as a JSON to a file that can be used with jq later. +function generate_zot_cluster_member_list() { + local num_zot_instances=${1} + local patch_file_path=${2} + local temp_file="${BATS_FILE_TMPDIR}/jq-member-dump.json" + echo "{\"cluster\":{\"members\":[]}}" > ${patch_file_path} + + for ((i=0;i<${num_zot_instances};i++)); do + local member="127.0.0.1:$(( 10000 + $i ))" + jq ".cluster.members += [\"${member}\"]" ${patch_file_path} > ${temp_file} && \ + mv ${temp_file} ${patch_file_path} + done + + echo "cluster members patch file" >&3 + cat ${patch_file_path} >&3 +} + +# patches an existing zot config file to add all the cluster members. +function update_zot_cluster_member_list_in_config_file() { + local zot_config_file=${1} + local zot_members_patch_file=${2} + local temp_file="${BATS_FILE_TMPDIR}/jq-mem-update-dump.json" + + jq -s '.[0] * .[1]' ${zot_config_file} ${zot_members_patch_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +} + +# generates and saves a base cloud config with shared storage +# given some basic parameters about the zot instance. +function create_zot_cloud_base_config_file() { + local zot_server_address=${1} + local zot_server_port=${2} + local zot_root_dir="${3}" + local zot_config_file="${4}" + local zot_log_file="${5}" + + cat > ${zot_config_file}< ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +} + +# updates an existing zot config file that already has an HTTP config +# to include TLS configuration. +# intended for use with create_zot_cloud_base_config_file() above. +function update_zot_cfg_set_tls() { + local zot_config_file="${1}" + local zot_cert_path="${2}" + local zot_key_path="${3}" + local zot_cacert_path="${4}" + local temp_file="${BATS_FILE_TMPDIR}/jq-tls-dump.json" + + # set zot TLS config + jq --arg zot_cert_path "${zot_cert_path}" --arg zot_key_path "${zot_key_path}" '(.http) += {tls: {cert: $zot_cert_path, key: $zot_key_path}}' \ + ${zot_config_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} + + jq --arg zot_cacert_path "${zot_cacert_path}" '(.cluster) += {tls: {cacert: $zot_cacert_path}}' \ + ${zot_config_file} > ${temp_file} && \ + mv ${temp_file} ${zot_config_file} +}