diff --git a/backend/deps.edn b/backend/deps.edn index b57ee55ec..b75718a73 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -29,8 +29,6 @@ org.postgresql/postgresql {:mvn/version "42.4.0"} com.zaxxer/HikariCP {:mvn/version "5.0.1"} - funcool/datoteka {:mvn/version "3.0.64"} - buddy/buddy-hashers {:mvn/version "1.8.158"} buddy/buddy-sign {:mvn/version "3.4.333"} diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml index 7cc9abe3b..fe69b8197 100644 --- a/backend/resources/log4j2-devenv.xml +++ b/backend/resources/log4j2-devenv.xml @@ -30,6 +30,8 @@ <Logger name="app.msgbus" level="info" /> <Logger name="app.http.websocket" level="info" /> <Logger name="app.util.websocket" level="info" /> + <Logger name="app.redis" level="info" /> + <Logger name="app.rpc.rlimit" level="info" /> <Logger name="app.cli" level="debug" additivity="false"> <AppenderRef ref="console"/> diff --git a/backend/scripts/repl b/backend/scripts/repl index d200ae3f3..984c87bdd 100755 --- a/backend/scripts/repl +++ b/backend/scripts/repl @@ -2,7 +2,7 @@ export PENPOT_HOST=devenv export PENPOT_TENANT=dev -export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enable-transit-readable-response enable-demo-users disable-secure-session-cookies" +export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enable-transit-readable-response enable-demo-users disable-secure-session-cookies enable-rpc-rate-limit enable-warn-rpc-rate-limits" # export PENPOT_DATABASE_URI="postgresql://172.17.0.1:5432/penpot" # export PENPOT_DATABASE_USERNAME="penpot" @@ -16,6 +16,8 @@ export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enabl # export PENPOT_LOGGERS_LOKI_URI="http://172.17.0.1:3100/loki/api/v1/push" # export PENPOT_AUDIT_LOG_ARCHIVE_URI="http://localhost:6070/api/audit" +export PENPOT_DEFAULT_RATE_LIMIT="default,window,10000/h" + # Initialize MINIO config mc alias set penpot-s3/ http://minio:9000 minioadmin minioadmin mc admin user add penpot-s3 penpot-devenv penpot-devenv diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 5bb0119b2..0d6fd2e2a 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -20,6 +20,7 @@ [clojure.pprint :as pprint] [clojure.spec.alpha :as s] [cuerdas.core :as str] + [datoteka.fs :as fs] [environ.core :refer [env]] [integrant.core :as ig])) @@ -83,16 +84,18 @@ ;; a server prop key where initial project is stored. :initial-project-skey "initial-project"}) +(s/def ::default-rpc-rlimit ::us/vector-of-strings) +(s/def ::rpc-rlimit-config ::fs/path) (s/def ::media-max-file-size ::us/integer) -(s/def ::flags ::us/vec-of-valid-keywords) +(s/def ::flags ::us/vector-of-keywords) (s/def ::telemetry-enabled ::us/boolean) (s/def ::audit-log-archive-uri ::us/string) (s/def ::audit-log-gc-max-age ::dt/duration) -(s/def ::admins ::us/set-of-non-empty-strings) +(s/def ::admins ::us/set-of-strings) (s/def ::file-change-snapshot-every ::us/integer) (s/def ::file-change-snapshot-timeout ::dt/duration) @@ -131,8 +134,8 @@ (s/def ::oidc-token-uri ::us/string) (s/def ::oidc-auth-uri ::us/string) (s/def ::oidc-user-uri ::us/string) -(s/def ::oidc-scopes ::us/set-of-non-empty-strings) -(s/def ::oidc-roles ::us/set-of-non-empty-strings) +(s/def ::oidc-scopes ::us/set-of-strings) +(s/def ::oidc-roles ::us/set-of-strings) (s/def ::oidc-roles-attr ::us/keyword) (s/def ::oidc-email-attr ::us/keyword) (s/def ::oidc-name-attr ::us/keyword) @@ -165,11 +168,14 @@ (s/def ::profile-complaint-threshold ::us/integer) (s/def ::public-uri ::us/string) (s/def ::redis-uri ::us/string) -(s/def ::registration-domain-whitelist ::us/set-of-non-empty-strings) -(s/def ::rlimit-font ::us/integer) -(s/def ::rlimit-file-update ::us/integer) -(s/def ::rlimit-image ::us/integer) -(s/def ::rlimit-password ::us/integer) +(s/def ::registration-domain-whitelist ::us/set-of-strings) + + + +(s/def ::rpc-semaphore-permits-font ::us/integer) +(s/def ::rpc-semaphore-permits-file-update ::us/integer) +(s/def ::rpc-semaphore-permits-image ::us/integer) +(s/def ::rpc-semaphore-permits-password ::us/integer) (s/def ::smtp-default-from ::us/string) (s/def ::smtp-default-reply-to ::us/string) (s/def ::smtp-host ::us/string) @@ -217,6 +223,7 @@ ::database-min-pool-size ::database-max-pool-size ::default-blob-version + ::default-rpc-rlimit ::error-report-webhook ::default-executor-parallelism ::blocking-executor-parallelism @@ -272,10 +279,11 @@ ::public-uri ::redis-uri ::registration-domain-whitelist - ::rlimit-font - ::rlimit-file-update - ::rlimit-image - ::rlimit-password + ::rpc-semaphore-permits-font + ::rpc-semaphore-permits-file-update + ::rpc-semaphore-permits-image + ::rpc-semaphore-permits-password + ::rpc-rlimit-config ::sentry-dsn ::sentry-debug ::sentry-attach-stack-trace diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj index 852cd1caf..7939b309b 100644 --- a/backend/src/app/http/errors.clj +++ b/backend/src/app/http/errors.clj @@ -10,6 +10,7 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] + [app.http :as-alias http] [clojure.spec.alpha :as s] [cuerdas.core :as str] [yetti.request :as yrq] @@ -50,6 +51,11 @@ [err _] (yrs/response 400 (ex-data err))) +(defmethod handle-exception :rate-limit + [err _] + (let [headers (-> err ex-data ::http/headers)] + (yrs/response :status 429 :body "" :headers headers))) + (defmethod handle-exception :validation [err _] (let [{:keys [code] :as data} (ex-data err)] diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj index 6241b680e..20ec1b94b 100644 --- a/backend/src/app/http/session.clj +++ b/backend/src/app/http/session.clj @@ -69,7 +69,6 @@ {:id (:id data)}) (assoc data :updated-at updated-at)))) - (delete-session [_ token] (px/with-dispatch executor (db/delete! pool :http-session {:id token}) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index a29e5acd8..45636727a 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -64,10 +64,14 @@ :app.migrations/all {:main (ig/ref :app.migrations/migrations)} + :app.redis/redis + {:uri (cf/get :redis-uri) + :metrics (ig/ref :app.metrics/metrics)} + :app.msgbus/msgbus {:backend (cf/get :msgbus-backend :redis) :executor (ig/ref [::default :app.worker/executor]) - :redis-uri (cf/get :redis-uri)} + :redis (ig/ref :app.redis/redis)} :app.storage.tmp/cleaner {:executor (ig/ref [::worker :app.worker/executor]) @@ -220,6 +224,7 @@ :storage (ig/ref :app.storage/storage) :msgbus (ig/ref :app.msgbus/msgbus) :public-uri (cf/get :public-uri) + :redis (ig/ref :app.redis/redis) :audit (ig/ref :app.loggers.audit/collector) :ldap (ig/ref :app.auth.ldap/provider) :http-client (ig/ref :app.http/client) @@ -290,9 +295,6 @@ {:pool (ig/ref :app.db/pool) :key (cf/get :secret-key)} - ;; :app.setup/keys - ;; {:props (ig/ref :app.setup/props)} - :app.loggers.zmq/receiver {:endpoint (cf/get :loggers-zmq-uri)} diff --git a/backend/src/app/media.clj b/backend/src/app/media.clj index 99cbe15cd..796047f10 100644 --- a/backend/src/app/media.clj +++ b/backend/src/app/media.clj @@ -20,7 +20,7 @@ [clojure.java.shell :as sh] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [datoteka.core :as fs]) + [datoteka.fs :as fs]) (:import org.im4java.core.ConvertCmd org.im4java.core.IMOperation diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index 4c5e10e19..8641e9f07 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -37,51 +37,51 @@ (def default-metrics {:update-file-changes - {:name "rpc_update_file_changes_total" + {:name "penpot_rpc_update_file_changes_total" :help "A total number of changes submitted to update-file." :type :counter} :update-file-bytes-processed - {:name "rpc_update_file_bytes_processed_total" + {:name "penpot_rpc_update_file_bytes_processed_total" :help "A total number of bytes processed by update-file." :type :counter} :rpc-mutation-timing - {:name "rpc_mutation_timing" + {:name "penpot_rpc_mutation_timing" :help "RPC mutation method call timming." :labels ["name"] :type :histogram} :rpc-command-timing - {:name "rpc_command_timing" + {:name "penpot_rpc_command_timing" :help "RPC command method call timming." :labels ["name"] :type :histogram} :rpc-query-timing - {:name "rpc_query_timing" + {:name "penpot_rpc_query_timing" :help "RPC query method call timing." :labels ["name"] :type :histogram} :websocket-active-connections - {:name "websocket_active_connections" + {:name "penpot_websocket_active_connections" :help "Active websocket connections gauge" :type :gauge} :websocket-messages-total - {:name "websocket_message_total" + {:name "penpot_websocket_message_total" :help "Counter of processed messages." :labels ["op"] :type :counter} :websocket-session-timing - {:name "websocket_session_timing" + {:name "penpot_websocket_session_timing" :help "Websocket session timing (seconds)." :type :summary} :session-update-total - {:name "http_session_update_total" + {:name "penpot_http_session_update_total" :help "A counter of session update batch events." :type :counter} @@ -91,21 +91,27 @@ :labels ["name"] :type :summary} - :rlimit-queued-submissions - {:name "penpot_rlimit_queued_submissions" - :help "Current number of queued submissions on RLIMIT." + :redis-eval-timing + {:name "penpot_redis_eval_timing" + :help "Redis EVAL commands execution timings (ms)" + :labels ["name"] + :type :summary} + + :rpc-semaphore-queued-submissions + {:name "penpot_rpc_semaphore_queued_submissions" + :help "Current number of queued submissions on RPC-SEMAPHORE." :labels ["name"] :type :gauge} - :rlimit-used-permits - {:name "penpot_rlimit_used_permits" - :help "Current number of used permits on RLIMIT." + :rpc-semaphore-used-permits + {:name "penpot_rpc_semaphore_used_permits" + :help "Current number of used permits on RPC-SEMAPHORE." :labels ["name"] :type :gauge} - :rlimit-acquires-total - {:name "penpot_rlimit_acquires_total" - :help "Total number of acquire operations on RLIMIT." + :rpc-semaphore-acquires-total + {:name "penpot_rpc_semaphore_acquires_total" + :help "Total number of acquire operations on RPC-SEMAPHORE." :labels ["name"] :type :counter} @@ -147,6 +153,8 @@ :definitions definitions :registry registry})) + +;; TODO: revisit (s/def ::handler fn?) (s/def ::registry #(instance? CollectorRegistry %)) (s/def ::metrics diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index e14bf9e12..b4c1a6a77 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -13,28 +13,14 @@ [app.common.spec :as us] [app.common.transit :as t] [app.config :as cfg] + [app.redis :as redis] [app.util.async :as aa] [app.util.time :as dt] [app.worker :as wrk] [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] - [promesa.core :as p]) - (:import - io.lettuce.core.RedisClient - io.lettuce.core.RedisURI - io.lettuce.core.api.StatefulConnection - io.lettuce.core.api.StatefulRedisConnection - io.lettuce.core.api.async.RedisAsyncCommands - io.lettuce.core.codec.ByteArrayCodec - io.lettuce.core.codec.RedisCodec - io.lettuce.core.codec.StringCodec - io.lettuce.core.pubsub.RedisPubSubListener - io.lettuce.core.pubsub.StatefulRedisPubSubConnection - io.lettuce.core.pubsub.api.sync.RedisPubSubCommands - io.lettuce.core.resource.ClientResources - io.lettuce.core.resource.DefaultClientResources - java.time.Duration)) + [promesa.core :as p])) (set! *warn-on-reflection* true) @@ -62,18 +48,14 @@ :timeout (dt/duration {:seconds 30})} (d/without-nils cfg))) -(s/def ::timeout ::dt/duration) -(s/def ::redis-uri ::us/string) (s/def ::buffer-size ::us/integer) (defmethod ig/pre-init-spec ::msgbus [_] - (s/keys :req-un [::buffer-size ::redis-uri ::timeout ::wrk/executor])) + (s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor])) (defmethod ig/init-key ::msgbus - [_ {:keys [buffer-size redis-uri] :as cfg}] - (l/info :hint "initialize msgbus" - :buffer-size buffer-size - :redis-uri redis-uri) + [_ {:keys [buffer-size] :as cfg}] + (l/info :hint "initialize msgbus" :buffer-size buffer-size) (let [cmd-ch (a/chan buffer-size) rcv-ch (a/chan (a/dropping-buffer buffer-size)) pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic) @@ -106,33 +88,17 @@ ;; --- IMPL (defn- redis-connect - [{:keys [redis-uri timeout] :as cfg}] - (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) - - resources (.. (DefaultClientResources/builder) - (ioThreadPoolSize 4) - (computationThreadPoolSize 4) - (build)) - - uri (RedisURI/create redis-uri) - rclient (RedisClient/create ^ClientResources resources ^RedisURI uri) - - pconn (.connect ^RedisClient rclient ^RedisCodec codec) - sconn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] - - (.setTimeout ^StatefulRedisConnection pconn ^Duration timeout) - (.setTimeout ^StatefulRedisPubSubConnection sconn ^Duration timeout) - + [{:keys [timeout redis] :as cfg}] + (let [pconn (redis/connect redis :timeout timeout) + sconn (redis/connect redis :type :pubsub :timeout timeout)] (-> cfg - (assoc ::resources resources) (assoc ::pconn pconn) (assoc ::sconn sconn)))) (defn- redis-disconnect - [{:keys [::pconn ::sconn ::resources] :as cfg}] - (.. ^StatefulConnection pconn close) - (.. ^StatefulConnection sconn close) - (.shutdown ^ClientResources resources)) + [{:keys [::pconn ::sconn] :as cfg}] + (redis/close! pconn) + (redis/close! sconn)) (defn- conj-subscription "A low level function that is responsible to create on-demand @@ -204,27 +170,18 @@ (defn- create-listener [rcv-ch] - (reify RedisPubSubListener - (message [_ _pattern _topic _message]) - (message [_ topic message] - ;; There are no back pressure, so we use a slidding - ;; buffer for cases when the pubsub broker sends - ;; more messages that we can process. - (let [val {:topic topic :message (t/decode message)}] - (when-not (a/offer! rcv-ch val) - (l/warn :msg "dropping message on subscription loop")))) - (psubscribed [_ _pattern _count]) - (punsubscribed [_ _pattern _count]) - (subscribed [_ _topic _count]) - (unsubscribed [_ _topic _count]))) + (redis/pubsub-listener + :on-message (fn [_ topic message] + ;; There are no back pressure, so we use a slidding + ;; buffer for cases when the pubsub broker sends + ;; more messages that we can process. + (let [val {:topic topic :message (t/decode message)}] + (when-not (a/offer! rcv-ch val) + (l/warn :msg "dropping message on subscription loop")))))) (defn start-io-loop [{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}] - - ;; Add a single listener to the pubsub connection - (.addListener ^StatefulRedisPubSubConnection sconn - ^RedisPubSubListener (create-listener rcv-ch)) - + (redis/add-listener! sconn (create-listener rcv-ch)) (letfn [(send-to-topic [topic message] (a/go-loop [chans (seq (get-in @state [:topics topic])) closed #{}] @@ -270,11 +227,10 @@ intended to be used in core.async go blocks." [{:keys [::pconn] :as cfg} {:keys [topic message]}] (let [message (t/encode message) - res (a/chan 1) - pcomm (.async ^StatefulRedisConnection pconn)] - (-> (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message) + res (a/chan 1)] + (-> (redis/publish! pconn topic message) (p/finally (fn [_ cause] - (when (and cause (.isOpen ^StatefulConnection pconn)) + (when (and cause (redis/open? pconn)) (a/offer! res cause)) (a/close! res)))) res)) @@ -283,14 +239,10 @@ "Create redis subscription. Blocking operation, intended to be used inside an agent." [{:keys [::sconn] :as cfg} topic] - (let [topic (into-array String [topic]) - scomm (.sync ^StatefulRedisPubSubConnection sconn)] - (.subscribe ^RedisPubSubCommands scomm topic))) + (redis/subscribe! sconn topic)) (defn redis-unsub "Removes redis subscription. Blocking operation, intended to be used inside an agent." [{:keys [::sconn] :as cfg} topic] - (let [topic (into-array String [topic]) - scomm (.sync ^StatefulRedisPubSubConnection sconn)] - (.unsubscribe ^RedisPubSubCommands scomm topic))) + (redis/unsubscribe! sconn topic)) diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj new file mode 100644 index 000000000..4ec815e24 --- /dev/null +++ b/backend/src/app/redis.clj @@ -0,0 +1,319 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.redis + "The msgbus abstraction implemented using redis as underlying backend." + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.common.spec :as us] + [app.metrics :as mtx] + [app.redis.script :as-alias rscript] + [app.util.time :as dt] + [clojure.core :as c] + [clojure.java.io :as io] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [integrant.core :as ig] + [promesa.core :as p]) + (:import + clojure.lang.IDeref + io.lettuce.core.RedisClient + io.lettuce.core.RedisURI + io.lettuce.core.ScriptOutputType + io.lettuce.core.api.StatefulConnection + io.lettuce.core.api.StatefulRedisConnection + io.lettuce.core.api.async.RedisAsyncCommands + io.lettuce.core.api.async.RedisScriptingAsyncCommands + io.lettuce.core.codec.ByteArrayCodec + io.lettuce.core.codec.RedisCodec + io.lettuce.core.codec.StringCodec + io.lettuce.core.pubsub.RedisPubSubListener + io.lettuce.core.pubsub.StatefulRedisPubSubConnection + io.lettuce.core.pubsub.api.sync.RedisPubSubCommands + io.lettuce.core.resource.ClientResources + io.lettuce.core.resource.DefaultClientResources + io.netty.util.HashedWheelTimer + io.netty.util.Timer + java.lang.AutoCloseable + java.time.Duration)) + +(set! *warn-on-reflection* true) + +(declare initialize-resources) +(declare shutdown-resources) +(declare connect) +(declare close!) + +(s/def ::timer + #(instance? Timer %)) + +(s/def ::connection + #(or (instance? StatefulRedisConnection %) + (and (instance? IDeref %) + (instance? StatefulRedisConnection (deref %))))) + +(s/def ::pubsub-connection + #(or (instance? StatefulRedisPubSubConnection %) + (and (instance? IDeref %) + (instance? StatefulRedisPubSubConnection (deref %))))) + +(s/def ::redis-uri + #(instance? RedisURI %)) + +(s/def ::resources + #(instance? ClientResources %)) + +(s/def ::pubsub-listener + #(instance? RedisPubSubListener %)) + +(s/def ::uri ::us/not-empty-string) +(s/def ::timeout ::dt/duration) +(s/def ::connect? ::us/boolean) +(s/def ::io-threads ::us/integer) +(s/def ::worker-threads ::us/integer) + +(s/def ::redis + (s/keys :req [::resources ::redis-uri ::timer ::mtx/metrics] + :opt [::connection])) + +(defmethod ig/pre-init-spec ::redis [_] + (s/keys :req-un [::uri ::mtx/metrics] + :opt-un [::timeout + ::connect? + ::io-threads + ::worker-threads])) + +(defmethod ig/prep-key ::redis + [_ cfg] + (let [runtime (Runtime/getRuntime) + cpus (.availableProcessors ^Runtime runtime)] + (merge {:timeout (dt/duration 5000) + :io-threads (max 3 cpus) + :worker-threads (max 3 cpus)} + (d/without-nils cfg)))) + +(defmethod ig/init-key ::redis + [_ {:keys [connect?] :as cfg}] + (let [cfg (initialize-resources cfg)] + (cond-> cfg + connect? (assoc ::connection (connect cfg))))) + +(defmethod ig/halt-key! ::redis + [_ state] + (shutdown-resources state)) + +(def default-codec + (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)) + +(def string-codec + (RedisCodec/of StringCodec/UTF8 StringCodec/UTF8)) + +(defn- initialize-resources + "Initialize redis connection resources" + [{:keys [uri io-threads worker-threads connect? metrics] :as cfg}] + (l/info :hint "initialize redis resources" + :uri uri + :io-threads io-threads + :worker-threads worker-threads + :connect? connect?) + + (let [timer (HashedWheelTimer.) + resources (.. (DefaultClientResources/builder) + (ioThreadPoolSize ^long io-threads) + (computationThreadPoolSize ^long worker-threads) + (timer ^Timer timer) + (build)) + + redis-uri (RedisURI/create ^String uri)] + + (-> cfg + (assoc ::mtx/metrics metrics) + (assoc ::cache (atom {})) + (assoc ::timer timer) + (assoc ::redis-uri redis-uri) + (assoc ::resources resources)))) + +(defn- shutdown-resources + [{:keys [::resources ::cache ::timer]}] + (run! close! (vals @cache)) + (when resources + (.shutdown ^ClientResources resources)) + (when timer + (.stop ^Timer timer))) + +(defn connect + [{:keys [::resources ::redis-uri] :as cfg} + & {:keys [timeout codec type] :or {codec default-codec type :default}}] + + (us/assert! ::resources resources) + + (let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri) + timeout (or timeout (:timeout cfg)) + conn (case type + :default (.connect ^RedisClient client ^RedisCodec codec) + :pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))] + + (.setTimeout ^StatefulConnection conn ^Duration timeout) + + (reify + IDeref + (deref [_] conn) + + AutoCloseable + (close [_] + (.close ^StatefulConnection conn) + (.shutdown ^RedisClient client))))) + +(defn get-or-connect + [{:keys [::cache] :as state} key options] + (assoc state ::connection + (or (get @cache key) + (-> (swap! cache (fn [cache] + (when-let [prev (get cache key)] + (close! prev)) + (assoc cache key (connect state options)))) + (get key))))) + +(defn add-listener! + [conn listener] + (us/assert! ::pubsub-connection @conn) + (us/assert! ::pubsub-listener listener) + + (.addListener ^StatefulRedisPubSubConnection @conn + ^RedisPubSubListener listener) + conn) + +(defn publish! + [conn topic message] + (us/assert! ::us/string topic) + (us/assert! ::us/bytes message) + (us/assert! ::connection @conn) + + (let [pcomm (.async ^StatefulRedisConnection @conn)] + (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message))) + +(defn subscribe! + "Blocking operation, intended to be used on a worker/agent thread." + [conn & topics] + (us/assert! ::pubsub-connection @conn) + (let [topics (into-array String (map str topics)) + cmd (.sync ^StatefulRedisPubSubConnection @conn)] + (.subscribe ^RedisPubSubCommands cmd topics))) + +(defn unsubscribe! + "Blocking operation, intended to be used on a worker/agent thread." + [conn & topics] + (us/assert! ::pubsub-connection @conn) + (let [topics (into-array String (map str topics)) + cmd (.sync ^StatefulRedisPubSubConnection @conn)] + (.unsubscribe ^RedisPubSubCommands cmd topics))) + +(defn open? + [conn] + (.isOpen ^StatefulConnection @conn)) + +(defn pubsub-listener + [& {:keys [on-message on-subscribe on-unsubscribe]}] + (reify RedisPubSubListener + (message [_ pattern topic message] + (when on-message + (on-message pattern topic message))) + + (message [_ topic message] + (when on-message + (on-message nil topic message))) + + (psubscribed [_ pattern count] + (when on-subscribe + (on-subscribe pattern nil count))) + + (punsubscribed [_ pattern count] + (when on-unsubscribe + (on-unsubscribe pattern nil count))) + + (subscribed [_ topic count] + (when on-subscribe + (on-subscribe nil topic count))) + + (unsubscribed [_ topic count] + (when on-unsubscribe + (on-unsubscribe nil topic count))))) + +(defn close! + [o] + (.close ^AutoCloseable o)) + +(def ^:private scripts-cache (atom {})) +(def noop-fn (constantly nil)) + +(s/def ::rscript/name qualified-keyword?) +(s/def ::rscript/path ::us/not-empty-string) +(s/def ::rscript/keys (s/every any? :kind vector?)) +(s/def ::rscript/vals (s/every any? :kind vector?)) + +(s/def ::rscript/script + (s/keys :req [::rscript/name + ::rscript/path] + :opt [::rscript/keys + ::rscript/vals])) + +(defn eval! + [{:keys [::mtx/metrics] :as state} script] + (us/assert! ::rscript/script script) + (us/assert! ::redis state) + + (let [rconn (-> state ::connection deref) + cmd (.async ^StatefulRedisConnection rconn) + keys (into-array String (map str (::rscript/keys script))) + vals (into-array String (map str (::rscript/vals script))) + sname (::rscript/name script)] + + (letfn [(on-error [cause] + (if (instance? io.lettuce.core.RedisNoScriptException cause) + (do + (l/error :hint "no script found" :name sname :cause cause) + (-> (load-script) + (p/then eval-script))) + (if-let [on-error (::rscript/on-error script)] + (on-error cause) + (p/rejected cause)))) + + (eval-script [sha] + (let [start-ts (System/nanoTime)] + (-> (.evalsha ^RedisScriptingAsyncCommands cmd + ^String sha + ^ScriptOutputType ScriptOutputType/MULTI + ^"[Ljava.lang.String;" keys + ^"[Ljava.lang.String;" vals) + (p/then (fn [result] + (let [elapsed (dt/duration {:nanos (- (System/nanoTime) start-ts)})] + (mtx/run! metrics {:id :redis-eval-timing + :labels [(name sname)] + :val (inst-ms elapsed)}) + (l/trace :hint "eval script" + :name (name sname) + :sha sha + :params (str/join "," (::rscript/vals script)) + :elapsed (dt/format-duration elapsed)) + result))) + (p/catch on-error)))) + + (read-script [] + (-> script ::rscript/path io/resource slurp)) + + (load-script [] + (l/trace :hint "load script" :name sname) + (-> (.scriptLoad ^RedisScriptingAsyncCommands cmd + ^String (read-script)) + (p/then (fn [sha] + (swap! scripts-cache assoc sname sha) + sha))))] + + (if-let [sha (get @scripts-cache sname)] + (eval-script sha) + (-> (load-script) + (p/then eval-script)))))) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 548dda581..162d48444 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -10,10 +10,12 @@ [app.common.logging :as l] [app.common.spec :as us] [app.db :as db] + [app.http :as-alias http] [app.loggers.audit :as audit] [app.metrics :as mtx] [app.rpc.retry :as retry] [app.rpc.rlimit :as rlimit] + [app.rpc.semaphore :as rsem] [app.util.async :as async] [app.util.services :as sv] [app.worker :as wrk] @@ -39,81 +41,72 @@ (ex/ignoring (hook-fn))) response) +(defn- handle-response + [request result] + (let [mdata (meta result)] + (p/-> (yrs/response 200 result (::http/headers mdata {})) + (handle-response-transformation request mdata) + (handle-before-comple-hook mdata)))) + (defn- rpc-query-handler "Ring handler that dispatches query requests and convert between internal async flow into ring async flow." [methods {:keys [profile-id session-id params] :as request} respond raise] - (letfn [(handle-response [result] - (let [mdata (meta result)] - (-> (yrs/response 200 result) - (handle-response-transformation request mdata))))] + (let [type (keyword (:type params)) + data (into {::http/request request} params) + data (if profile-id + (assoc data :profile-id profile-id ::session-id session-id) + (dissoc data :profile-id)) + method (get methods type default-handler)] - (let [type (keyword (:type params)) - data (into {::request request} params) - data (if profile-id - (assoc data :profile-id profile-id ::session-id session-id) - (dissoc data :profile-id)) - method (get methods type default-handler)] - - (-> (method data) - (p/then handle-response) - (p/then respond) - (p/catch (fn [cause] - (let [context {:profile-id profile-id}] - (raise (ex/wrap-with-context cause context))))))))) + (-> (method data) + (p/then (partial handle-response request)) + (p/then respond) + (p/catch (fn [cause] + (let [context {:profile-id profile-id}] + (raise (ex/wrap-with-context cause context)))))))) (defn- rpc-mutation-handler "Ring handler that dispatches mutation requests and convert between internal async flow into ring async flow." [methods {:keys [profile-id session-id params] :as request} respond raise] - (letfn [(handle-response [result] - (let [mdata (meta result)] - (p/-> (yrs/response 200 result) - (handle-response-transformation request mdata) - (handle-before-comple-hook mdata))))] + (let [type (keyword (:type params)) + data (into {::request request} params) + data (if profile-id + (assoc data :profile-id profile-id ::session-id session-id) + (dissoc data :profile-id)) - (let [type (keyword (:type params)) - data (into {::request request} params) - data (if profile-id - (assoc data :profile-id profile-id ::session-id session-id) - (dissoc data :profile-id)) - - method (get methods type default-handler)] - (-> (method data) - (p/then handle-response) - (p/then respond) - (p/catch (fn [cause] - (let [context {:profile-id profile-id}] - (raise (ex/wrap-with-context cause context))))))))) + method (get methods type default-handler)] + (-> (method data) + (p/then (partial handle-response request)) + (p/then respond) + (p/catch (fn [cause] + (let [context {:profile-id profile-id}] + (raise (ex/wrap-with-context cause context)))))))) (defn- rpc-command-handler "Ring handler that dispatches cmd requests and convert between internal async flow into ring async flow." [methods {:keys [profile-id session-id params] :as request} respond raise] - (letfn [(handle-response [result] - (let [mdata (meta result)] - (p/-> (yrs/response 200 result) - (handle-response-transformation request mdata) - (handle-before-comple-hook mdata))))] + (let [cmd (keyword (:command params)) + data (into {::request request} params) + data (if profile-id + (assoc data :profile-id profile-id ::session-id session-id) + (dissoc data :profile-id)) - (let [cmd (keyword (:command params)) - data (into {::request request} params) - data (if profile-id - (assoc data :profile-id profile-id ::session-id session-id) - (dissoc data :profile-id)) - - method (get methods cmd default-handler)] - (-> (method data) - (p/then handle-response) - (p/then respond) - (p/catch (fn [cause] - (let [context {:profile-id profile-id}] - (raise (ex/wrap-with-context cause context))))))))) + method (get methods cmd default-handler)] + (-> (method data) + (p/then (partial handle-response request)) + (p/then respond) + (p/catch (fn [cause] + (let [context {:profile-id profile-id}] + (raise (ex/wrap-with-context cause context)))))))) (defn- wrap-metrics "Wrap service method with metrics measurement." [{:keys [metrics ::metrics-id]} f mdata] (let [labels (into-array String [(::sv/name mdata)])] + (fn [cfg params] (let [start (System/nanoTime)] (p/finally @@ -177,7 +170,8 @@ [cfg f mdata] (let [f (as-> f $ (wrap-dispatch cfg $ mdata) - (rlimit/wrap-rlimit cfg $ mdata) + (rsem/wrap cfg $ mdata) + (rlimit/wrap cfg $ mdata) (retry/wrap-retry cfg $ mdata) (wrap-audit cfg $ mdata) (wrap-metrics cfg $ mdata) diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index 625bf6cf5..d052c6b20 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -16,7 +16,7 @@ [app.rpc.doc :as-alias doc] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] - [app.rpc.rlimit :as rlimit] + [app.rpc.semaphore :as rsem] [app.tokens :as tokens] [app.util.services :as sv] [app.util.time :as dt] @@ -136,7 +136,7 @@ (sv/defmethod ::login-with-password "Performs authentication using penpot password." {:auth false - ::rlimit/permits (cf/get :rlimit-password) + ::rsem/permits (cf/get :rpc-semaphore-permits-password) ::doc/added "1.15"} [cfg params] (login-with-password cfg params)) @@ -177,7 +177,7 @@ (sv/defmethod ::recover-profile {:auth false - ::rlimit/permits (cf/get :rlimit-password) + ::rsem/permits (cf/get :rpc-semaphore-permits-password) ::doc/added "1.15"} [cfg params] (recover-profile cfg params)) @@ -368,7 +368,7 @@ (sv/defmethod ::register-profile {:auth false - ::rlimit/permits (cf/get :rlimit-password) + ::rsem/permits (cf/get :rpc-semaphore-permits-password) ::doc/added "1.15"} [{:keys [pool] :as cfg} params] (db/with-atomic [conn pool] diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index ff45fb6c6..305ac8e2c 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -20,7 +20,7 @@ [app.rpc.permissions :as perms] [app.rpc.queries.files :as files] [app.rpc.queries.projects :as proj] - [app.rpc.rlimit :as rlimit] + [app.rpc.semaphore :as rsem] [app.storage.impl :as simpl] [app.util.blob :as blob] [app.util.services :as sv] @@ -318,7 +318,7 @@ (contains? o :changes-with-metadata))))) (sv/defmethod ::update-file - {::rlimit/permits (cf/get :rlimit-file-update)} + {::rsem/permits (cf/get :rpc-semaphore-permits-file-update)} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (db/xact-lock! conn id) diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj index b24544a88..2fa930b6e 100644 --- a/backend/src/app/rpc/mutations/fonts.clj +++ b/backend/src/app/rpc/mutations/fonts.clj @@ -15,7 +15,7 @@ [app.media :as media] [app.rpc.doc :as-alias doc] [app.rpc.queries.teams :as teams] - [app.rpc.rlimit :as rlimit] + [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.util.services :as sv] [app.util.time :as dt] @@ -42,7 +42,7 @@ ::font-id ::font-family ::font-weight ::font-style])) (sv/defmethod ::create-font-variant - {::rlimit/permits (cf/get :rlimit-font)} + {::rsem/permits (cf/get :rpc-semaphore-permits-font)} [{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}] (let [cfg (update cfg :storage media/configure-assets-storage)] (teams/check-edition-permissions! pool profile-id team-id) diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj index 25d894d77..50ab06e0f 100644 --- a/backend/src/app/rpc/mutations/media.clj +++ b/backend/src/app/rpc/mutations/media.clj @@ -15,7 +15,7 @@ [app.db :as db] [app.media :as media] [app.rpc.queries.teams :as teams] - [app.rpc.rlimit :as rlimit] + [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.storage.tmp :as tmp] [app.util.bytes :as bs] @@ -53,7 +53,7 @@ :opt-un [::id])) (sv/defmethod ::upload-file-media-object - {::rlimit/permits (cf/get :rlimit-image)} + {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [{:keys [pool] :as cfg} {:keys [profile-id file-id content] :as params}] (let [file (select-file pool file-id) cfg (update cfg :storage media/configure-assets-storage)] @@ -181,7 +181,7 @@ :opt-un [::id ::name])) (sv/defmethod ::create-file-media-object-from-url - {::rlimit/permits (cf/get :rlimit-image)} + {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] (let [file (select-file pool file-id) cfg (update cfg :storage media/configure-assets-storage)] diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj index f8c0582c9..a72952be9 100644 --- a/backend/src/app/rpc/mutations/profile.clj +++ b/backend/src/app/rpc/mutations/profile.clj @@ -17,7 +17,7 @@ [app.rpc.commands.auth :as cmd.auth] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] - [app.rpc.rlimit :as rlimit] + [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.tokens :as tokens] [app.util.services :as sv] @@ -87,7 +87,7 @@ (s/keys :req-un [::profile-id ::password ::old-password])) (sv/defmethod ::update-profile-password - {::rlimit/permits (cf/get :rlimit-password)} + {::rsem/permits (cf/get :rpc-semaphore-permits-password)} [{:keys [pool] :as cfg} {:keys [password] :as params}] (db/with-atomic [conn pool] (let [profile (validate-password! conn params) @@ -130,7 +130,7 @@ (s/keys :req-un [::profile-id ::file])) (sv/defmethod ::update-profile-photo - {::rlimit/permits (cf/get :rlimit-image)} + {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [cfg {:keys [file] :as params}] ;; Validate incoming mime type (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) @@ -305,7 +305,7 @@ (s/def ::login ::cmd.auth/login-with-password) (sv/defmethod ::login - {:auth false ::rlimit/permits (cf/get :rlimit-password)} + {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)} [cfg params] (cmd.auth/login-with-password cfg params)) @@ -323,7 +323,7 @@ (s/def ::recover-profile ::cmd.auth/recover-profile) (sv/defmethod ::recover-profile - {:auth false ::rlimit/permits (cf/get :rlimit-password)} + {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)} [cfg params] (cmd.auth/recover-profile cfg params)) @@ -340,7 +340,7 @@ (s/def ::register-profile ::cmd.auth/register-profile) (sv/defmethod ::register-profile - {:auth false ::rlimit/permits (cf/get :rlimit-password)} + {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)} [{:keys [pool] :as cfg} params] (db/with-atomic [conn pool] (-> (assoc cfg :conn conn) diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj index c9ceae6ff..8e9c1d2c5 100644 --- a/backend/src/app/rpc/mutations/teams.clj +++ b/backend/src/app/rpc/mutations/teams.clj @@ -20,7 +20,7 @@ [app.rpc.permissions :as perms] [app.rpc.queries.profile :as profile] [app.rpc.queries.teams :as teams] - [app.rpc.rlimit :as rlimit] + [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.tokens :as tokens] [app.util.services :as sv] @@ -290,7 +290,7 @@ (s/keys :req-un [::profile-id ::team-id ::file])) (sv/defmethod ::update-team-photo - {::rlimit/permits (cf/get :rlimit-image)} + {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [cfg {:keys [file] :as params}] ;; Validate incoming mime type (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index af04af269..2a90a8aa1 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -5,63 +5,266 @@ ;; Copyright (c) UXBOX Labs SL (ns app.rpc.rlimit - "Resource usage limits (in other words: semaphores)." + "Rate limit strategies implementation for RPC services. + + It mainly implements two strategies: fixed window and bucket. You + can use one of them or both to create a combination of limits. All + limits are updated in each request and the most restrictive one + blocks the user activity. + + On the HTTP layer it translates to the 429 http response. + + The limits are defined as vector of 3 elements: + [<name:keyword> <strategy:keyword> <opts:string>] + + The opts format is strategy dependent. With fixed `:window` strategy + you have the following format: + [:somename :window \"1000/m\"] + + Where the first number means the quantity of allowed request and the + letter indicates the window unit, that can be `w` for weeks, `h` for + hours, `m` for minutes and `s` for seconds. + + The the `:bucket` strategy you will have something like this: + [:somename :bucket \"100/10/15s] + + Where the first number indicates the total tokens capacity (or + available burst), the second number indicates the refill rate and + the last number suffixed with the unit indicates the time window (or + interval) of the refill. This means that this limit configurations + allow burst of 100 elements and will refill 10 tokens each 15s (1 + token each 1.5segons). + + The bucket strategy works well for small intervals and window + strategy works better for large intervals. + + All limits uses the profile-id as user identifier. In case of the + profile-id is not available, the IP address is used as fallback + value. + " (:require [app.common.data :as d] + [app.common.data.macros :as dm] + [app.common.exceptions :as ex] [app.common.logging :as l] - [app.metrics :as mtx] - [app.util.services :as sv] + [app.common.spec :as us] + [app.common.uri :as uri] + [app.common.uuid :as uuid] + [app.config :as cf] + [app.http :as-alias http] + [app.loggers.audit :refer [parse-client-ip]] + [app.redis :as redis] + [app.redis.script :as-alias rscript] + [app.rpc.rlimit.result :as-alias lresult] + [app.util.services :as-alias sv] + [app.util.time :as dt] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] [promesa.core :as p])) -(defprotocol IAsyncSemaphore - (acquire! [_]) - (release! [_])) +(def ^:private default-timeout + (dt/duration 400)) -(defn semaphore - [{:keys [permits metrics name]}] - (let [name (d/name name) - used (volatile! 0) - queue (volatile! (d/queue)) - labels (into-array String [name])] - (reify IAsyncSemaphore - (acquire! [this] - (let [d (p/deferred)] - (locking this - (if (< @used permits) - (do - (vswap! used inc) - (p/resolve! d)) - (vswap! queue conj d))) +(def ^:private default-options + {:codec redis/string-codec + :timeout default-timeout}) - (mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels }) - (mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels}) - (mtx/run! metrics {:id :rlimit-acquires-total :inc 1 :labels labels}) - d)) +(def ^:private bucket-rate-limit-script + {::rscript/name ::bucket-rate-limit + ::rscript/path "app/rpc/rlimit/bucket.lua"}) - (release! [this] - (locking this - (if-let [item (peek @queue)] - (do - (vswap! queue pop) - (p/resolve! item)) - (when (pos? @used) - (vswap! used dec)))) +(def ^:private window-rate-limit-script + {::rscript/name ::window-rate-limit + ::rscript/path "app/rpc/rlimit/window.lua"}) - (mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels}) - (mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels}) - )))) +(def enabled? + "Allows on runtime completly disable rate limiting." + (atom true)) -(defn wrap-rlimit - [{:keys [metrics executors] :as cfg} f mdata] - (if-let [permits (::permits mdata)] - (let [sem (semaphore {:permits permits - :metrics metrics - :name (::sv/name mdata)})] - (l/debug :hint "wrapping rlimit" :handler (::sv/name mdata) :permits permits) - (fn [cfg params] - (-> (acquire! sem) - (p/then (fn [_] (f cfg params)) (:default executors)) - (p/finally (fn [_ _] (release! sem)))))) - f)) +(def ^:private window-opts-re + #"^(\d+)/([wdhms])$") +(def ^:private bucket-opts-re + #"^(\d+)/(\d+)/(\d+[hms])$") +(s/def ::strategy (s/and ::us/keyword #{:window :bucket})) + +(s/def ::limit-definition + (s/tuple ::us/keyword ::strategy string?)) + +(defmulti parse-limit (fn [[_ strategy _]] strategy)) +(defmulti process-limit (fn [_ _ _ o] (::strategy o))) + +(defmethod parse-limit :window + [[name strategy opts :as vlimit]] + (us/assert! ::limit-definition vlimit) + (merge + {::name name + ::strategy strategy} + (if-let [[_ nreq unit] (re-find window-opts-re opts)] + (let [nreq (parse-long nreq)] + {::nreq nreq + ::unit (case unit + "d" :days + "h" :hours + "m" :minutes + "s" :seconds + "w" :weeks) + ::key (dm/str "ratelimit.window." (d/name name)) + ::opts opts}) + (ex/raise :type :validation + :code :invalid-window-limit-opts + :hint (str/ffmt "looks like '%' does not have a valid format" opts))))) + +(defmethod parse-limit :bucket + [[name strategy opts :as vlimit]] + (us/assert! ::limit-definition vlimit) + (merge + {::name name + ::strategy strategy} + (if-let [[_ capacity rate interval] (re-find bucket-opts-re opts)] + (let [interval (dt/duration interval) + rate (parse-long rate) + capacity (parse-long capacity)] + {::capacity capacity + ::rate rate + ::interval interval + ::opts opts + ::params [(dt/->seconds interval) rate capacity] + ::key (dm/str "ratelimit.bucket." (d/name name))}) + (ex/raise :type :validation + :code :invalid-bucket-limit-opts + :hint (str/ffmt "looks like '%' does not have a valid format" opts))))) + +(defmethod process-limit :bucket + [redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}] + (let [script (-> bucket-rate-limit-script + (assoc ::rscript/keys [(dm/str key "." service "." user-id)]) + (assoc ::rscript/vals (conj params (dt/->seconds now))))] + (-> (redis/eval! redis script) + (p/then (fn [result] + (let [allowed? (boolean (nth result 0)) + remaining (nth result 1) + reset (* (/ (inst-ms interval) rate) + (- capacity remaining))] + (l/trace :hint "limit processed" + :service service + :limit (name (::name limit)) + :strategy (name (::strategy limit)) + :opts (::opts limit) + :allowed? allowed? + :remaining remaining) + (-> limit + (assoc ::lresult/allowed? allowed?) + (assoc ::lresult/reset (dt/plus now reset)) + (assoc ::lresult/remaining remaining)))))))) + +(defmethod process-limit :window + [redis user-id now {:keys [::nreq ::unit ::key ::service] :as limit}] + (let [ts (dt/truncate now unit) + ttl (dt/diff now (dt/plus ts {unit 1})) + script (-> window-rate-limit-script + (assoc ::rscript/keys [(dm/str key "." service "." user-id "." (dt/format-instant ts))]) + (assoc ::rscript/vals [nreq (dt/->seconds ttl)]))] + (-> (redis/eval! redis script) + (p/then (fn [result] + (let [allowed? (boolean (nth result 0)) + remaining (nth result 1)] + (l/trace :hint "limit processed" + :service service + :limit (name (::name limit)) + :strategy (name (::strategy limit)) + :opts (::opts limit) + :allowed? allowed? + :remaining remaining) + (-> limit + (assoc ::lresult/allowed? allowed?) + (assoc ::lresult/remaining remaining) + (assoc ::lresult/reset (dt/plus ts {unit 1}))))))))) + +(defn- process-limits + [redis user-id limits now] + (-> (p/all (map (partial process-limit redis user-id now) (reverse limits))) + (p/then (fn [results] + (let [remaining (->> results + (d/index-by ::name ::lresult/remaining) + (uri/map->query-string)) + reset (->> results + (d/index-by ::name (comp dt/->seconds ::lresult/reset)) + (uri/map->query-string)) + rejected (->> results + (filter (complement ::lresult/allowed?)) + (first))] + (when (and rejected (contains? cf/flags :warn-rpc-rate-limits)) + (l/warn :hint "rejected rate limit" + :user-id (dm/str user-id) + :limit-service (-> rejected ::service name) + :limit-name (-> rejected ::name name) + :limit-strategy (-> rejected ::strategy name))) + + {:enabled? true + :allowed? (some? rejected) + :headers {"x-rate-limit-remaining" remaining + "x-rate-limit-reset" reset}}))))) + +(defn- parse-limits + [service limits] + (let [default (some->> (cf/get :default-rpc-rlimit) + (us/conform ::limit-definition)) + + limits (cond->> limits + (some? default) (cons default))] + + (->> (reverse limits) + (sequence (comp (map parse-limit) + (map #(assoc % ::service service)) + (d/distinct-xf ::name)))))) + +(defn- handle-response + [f cfg params rres] + (if (:enabled? rres) + (let [headers {"x-rate-limit-remaining" (:remaining rres) + "x-rate-limit-reset" (:reset rres)}] + (when-not (:allowed? rres) + (ex/raise :type :rate-limit + :code :request-blocked + :hint "rate limit reached" + ::http/headers headers)) + (-> (f cfg params) + (p/then (fn [response] + (with-meta response + {::http/headers headers}))))) + + (f cfg params))) + +(defn wrap + [{:keys [redis] :as cfg} f {service ::sv/name :as mdata}] + (let [limits (parse-limits service (::limits mdata)) + default-rresp (p/resolved {:enabled? false})] + + (if (and (seq limits) + (or (contains? cf/flags :rpc-rate-limit) + (contains? cf/flags :soft-rpc-rate-limit))) + (fn [cfg {:keys [::http/request] :as params}] + (let [user-id (or (:profile-id params) + (some-> request parse-client-ip) + uuid/zero) + + rresp (when (and user-id @enabled?) + (let [redis (redis/get-or-connect redis ::rlimit default-options) + rresp (-> (process-limits redis user-id limits (dt/now)) + (p/catch (fn [cause] + ;; If we have an error on processing the + ;; rate-limit we just skip it for do not cause + ;; service interruption because of redis downtime + ;; or similar situation. + (l/error :hint "error on processing rate-limit" :cause cause) + {:enabled? false})))] + + ;; If soft rate are enabled, we process the rate-limit but return + ;; unprotected response. + (and (contains? cf/flags :soft-rpc-rate-limit) rresp)))] + + (p/then (or rresp default-rresp) + (partial handle-response f cfg params)))) + f))) diff --git a/backend/src/app/rpc/rlimit/bucket.lua b/backend/src/app/rpc/rlimit/bucket.lua new file mode 100644 index 000000000..4200dec4d --- /dev/null +++ b/backend/src/app/rpc/rlimit/bucket.lua @@ -0,0 +1,33 @@ +local tokensKey = KEYS[1] + +local interval = tonumber(ARGV[1]) +local rate = tonumber(ARGV[2]) +local capacity = tonumber(ARGV[3]) +local timestamp = tonumber(ARGV[4]) +local requested = tonumber(ARGV[5] or 1) + +local fillTime = capacity / (rate / interval); +local ttl = math.floor(fillTime * 2) + +local lastTokens = tonumber(redis.call("hget", tokensKey, "tokens")) +if lastTokens == nil then + lastTokens = capacity +end + +local lastRefreshed = tonumber(redis.call("hget", tokensKey, "timestamp")) +if lastRefreshed == nil then + lastRefreshed = 0 +end + +local delta = math.max(0, (timestamp - lastRefreshed) / interval) +local filled = math.min(capacity, lastTokens + math.floor(delta * rate)); +local allowed = filled >= requested +local newTokens = filled +if allowed then + newTokens = filled - requested +end + +redis.call("hset", tokensKey, "tokens", newTokens, "timestamp", timestamp) +redis.call("expire", tokensKey, ttl) + +return { allowed, newTokens } diff --git a/backend/src/app/rpc/rlimit/window.lua b/backend/src/app/rpc/rlimit/window.lua new file mode 100644 index 000000000..d5e8e8af6 --- /dev/null +++ b/backend/src/app/rpc/rlimit/window.lua @@ -0,0 +1,18 @@ +local windowKey = KEYS[1] + +local nreq = tonumber(ARGV[1]) +local ttl = tonumber(ARGV[2]) + +local total = tonumber(redis.call("incr", windowKey)) +redis.call("expire", windowKey, ttl) + +local allowed = total <= nreq +local remaining = nreq - total + +if remaining < 0 then + remaining = 0 +end + +return {allowed, remaining} + + diff --git a/backend/src/app/rpc/semaphore.clj b/backend/src/app/rpc/semaphore.clj new file mode 100644 index 000000000..45f90839d --- /dev/null +++ b/backend/src/app/rpc/semaphore.clj @@ -0,0 +1,68 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.rpc.semaphore + "Resource usage limits (in other words: semaphores)." + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.metrics :as mtx] + [app.util.locks :as locks] + [app.util.services :as-alias sv] + [promesa.core :as p])) + +(defprotocol IAsyncSemaphore + (acquire! [_]) + (release! [_])) + +(defn create + [& {:keys [permits metrics name]}] + (let [name (d/name name) + used (volatile! 0) + queue (volatile! (d/queue)) + labels (into-array String [name]) + lock (locks/create)] + + (reify IAsyncSemaphore + (acquire! [_] + (let [d (p/deferred)] + (locks/locking lock + (if (< @used permits) + (do + (vswap! used inc) + (p/resolve! d)) + (vswap! queue conj d))) + + (mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels }) + (mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels}) + (mtx/run! metrics {:id :rpc-semaphore-acquires-total :inc 1 :labels labels}) + d)) + + (release! [_] + (locks/locking lock + (if-let [item (peek @queue)] + (do + (vswap! queue pop) + (p/resolve! item)) + (when (pos? @used) + (vswap! used dec)))) + + (mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels}) + (mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels}))))) + +(defn wrap + [{:keys [metrics executors] :as cfg} f mdata] + (if-let [permits (::permits mdata)] + (let [sem (create {:permits permits + :metrics metrics + :name (::sv/name mdata)})] + (l/debug :hint "wrapping semaphore" :handler (::sv/name mdata) :permits permits) + (fn [cfg params] + (-> (acquire! sem) + (p/then (fn [_] (f cfg params)) (:default executors)) + (p/finally (fn [_ _] (release! sem)))))) + f)) + diff --git a/backend/src/app/setup/builtin_templates.clj b/backend/src/app/setup/builtin_templates.clj index d05255058..11cfe0fa9 100644 --- a/backend/src/app/setup/builtin_templates.clj +++ b/backend/src/app/setup/builtin_templates.clj @@ -14,7 +14,7 @@ [clojure.edn :as edn] [clojure.java.io :as io] [clojure.spec.alpha :as s] - [datoteka.core :as fs] + [datoteka.fs :as fs] [integrant.core :as ig])) (declare download-all!) diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index 4fbf05a5a..a4e7209ea 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -20,7 +20,7 @@ [app.util.time :as dt] [app.worker :as wrk] [clojure.spec.alpha :as s] - [datoteka.core :as fs] + [datoteka.fs :as fs] [integrant.core :as ig] [promesa.core :as p] [promesa.exec :as px])) diff --git a/backend/src/app/storage/fs.clj b/backend/src/app/storage/fs.clj index 4feaaf624..fbfbc8369 100644 --- a/backend/src/app/storage/fs.clj +++ b/backend/src/app/storage/fs.clj @@ -14,7 +14,7 @@ [clojure.java.io :as io] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [datoteka.core :as fs] + [datoteka.fs :as fs] [integrant.core :as ig] [promesa.core :as p] [promesa.exec :as px]) diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index 72480dd53..99113f833 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -17,7 +17,7 @@ [app.worker :as wrk] [clojure.java.io :as io] [clojure.spec.alpha :as s] - [datoteka.core :as fs] + [datoteka.fs :as fs] [integrant.core :as ig] [promesa.core :as p] [promesa.exec :as px]) diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj index cdb1b0cc7..69503a455 100644 --- a/backend/src/app/storage/tmp.clj +++ b/backend/src/app/storage/tmp.clj @@ -16,7 +16,7 @@ [app.worker :as wrk] [clojure.core.async :as a] [clojure.spec.alpha :as s] - [datoteka.core :as fs] + [datoteka.fs :as fs] [integrant.core :as ig] [promesa.exec :as px])) diff --git a/backend/src/app/tokens.clj b/backend/src/app/tokens.clj index ff4fd998a..3a991609e 100644 --- a/backend/src/app/tokens.clj +++ b/backend/src/app/tokens.clj @@ -12,11 +12,14 @@ [app.common.spec :as us] [app.common.transit :as t] [app.util.time :as dt] - [buddy.sign.jwe :as jwe])) + [buddy.sign.jwe :as jwe] + [clojure.spec.alpha :as s])) + +(s/def ::tokens-key bytes?) (defn generate [{:keys [tokens-key]} claims] - (us/assert! ::us/not-empty-string tokens-key) + (us/assert! ::tokens-key tokens-key) (let [payload (-> claims (assoc :iat (dt/now)) (d/without-nils) diff --git a/backend/src/app/util/bytes.clj b/backend/src/app/util/bytes.clj index 50a73d335..5e5c2dca4 100644 --- a/backend/src/app/util/bytes.clj +++ b/backend/src/app/util/bytes.clj @@ -8,7 +8,7 @@ "Bytes & Byte Streams helpers" (:require [clojure.java.io :as io] - [datoteka.core :as fs] + [datoteka.fs :as fs] [yetti.adapter :as yt]) (:import com.github.luben.zstd.ZstdInputStream @@ -23,6 +23,8 @@ org.apache.commons.io.IOUtils org.apache.commons.io.input.BoundedInputStream)) +;; TODO: migrate to datoteka.io + (set! *warn-on-reflection* true) (def ^:const default-buffer-size diff --git a/backend/src/app/util/locks.clj b/backend/src/app/util/locks.clj new file mode 100644 index 000000000..05a69166d --- /dev/null +++ b/backend/src/app/util/locks.clj @@ -0,0 +1,26 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.util.locks + "A syntactic helpers for using locks." + (:refer-clojure :exclude [locking]) + (:import + java.util.concurrent.locks.ReentrantLock + java.util.concurrent.locks.Lock)) + +(defn create + [] + (ReentrantLock.)) + +(defmacro locking + [lsym & body] + (let [lsym (vary-meta lsym assoc :tag `Lock)] + `(do + (.lock ~lsym) + (try + ~@body + (finally + (.unlock ~lsym)))))) diff --git a/backend/src/app/util/time.clj b/backend/src/app/util/time.clj index 422c92fb3..f51706cce 100644 --- a/backend/src/app/util/time.clj +++ b/backend/src/app/util/time.clj @@ -27,16 +27,29 @@ ;; Instant & Duration ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defn temporal-unit + [o] + (if (instance? TemporalUnit o) + o + (case o + :nanos ChronoUnit/NANOS + :millis ChronoUnit/MILLIS + :micros ChronoUnit/MICROS + :seconds ChronoUnit/SECONDS + :minutes ChronoUnit/MINUTES + :hours ChronoUnit/HOURS + :days ChronoUnit/DAYS + :weeks ChronoUnit/WEEKS + :monts ChronoUnit/MONTHS))) + ;; --- DURATION (defn- obj->duration - [{:keys [days minutes seconds hours nanos millis]}] - (cond-> (Duration/ofMillis (if (int? millis) ^long millis 0)) - (int? days) (.plusDays ^long days) - (int? hours) (.plusHours ^long hours) - (int? minutes) (.plusMinutes ^long minutes) - (int? seconds) (.plusSeconds ^long seconds) - (int? nanos) (.plusNanos ^long nanos))) + [params] + (reduce-kv (fn [o k v] + (.plus ^Duration o ^long v ^TemporalUnit (temporal-unit k))) + (Duration/ofMillis 0) + params)) (defn duration? [v] @@ -57,20 +70,17 @@ :else (obj->duration ms-or-obj))) +(defn ->seconds + [d] + (-> d inst-ms (/ 1000) int)) + (defn diff [t1 t2] (Duration/between t1 t2)) (defn truncate [o unit] - (let [unit (if (instance? TemporalUnit unit) - unit - (case unit - :nanos ChronoUnit/NANOS - :millis ChronoUnit/MILLIS - :micros ChronoUnit/MICROS - :seconds ChronoUnit/SECONDS - :minutes ChronoUnit/MINUTES))] + (let [unit (temporal-unit unit)] (cond (instance? Instant o) (.truncatedTo ^Instant o ^TemporalUnit unit) @@ -159,11 +169,11 @@ (defn in-future [v] - (plus (now) (duration v))) + (plus (now) v)) (defn in-past [v] - (minus (now) (duration v))) + (minus (now) v)) (defn instant->zoned-date-time [v] diff --git a/backend/test/app/bounce_handling_test.clj b/backend/test/app/bounce_handling_test.clj index 3d423f73f..490a8c1b9 100644 --- a/backend/test/app/bounce_handling_test.clj +++ b/backend/test/app/bounce_handling_test.clj @@ -10,6 +10,7 @@ [app.emails :as emails] [app.http.awsns :as awsns] [app.test-helpers :as th] + [app.tokens :as tokens] [app.util.time :as dt] [clojure.pprint :refer [pprint]] [clojure.test :as t] @@ -100,11 +101,11 @@ (t/deftest test-parse-bounce-report (let [profile (th/create-profile* 1) - tokens (:app.tokens/tokens th/*system*) - cfg {:tokens tokens} - report (bounce-report {:token (tokens :generate-predefined - {:iss :profile-identity - :profile-id (:id profile)})}) + sprops (:app.setup/props th/*system*) + cfg {:sprops sprops} + report (bounce-report {:token (tokens/generate sprops + {:iss :profile-identity + :profile-id (:id profile)})}) result (#'awsns/parse-notification cfg report)] ;; (pprint result) @@ -117,11 +118,11 @@ (t/deftest test-parse-complaint-report (let [profile (th/create-profile* 1) - tokens (:app.tokens/tokens th/*system*) - cfg {:tokens tokens} - report (complaint-report {:token (tokens :generate-predefined - {:iss :profile-identity - :profile-id (:id profile)})}) + sprops (:app.setup/props th/*system*) + cfg {:sprops sprops} + report (complaint-report {:token (tokens/generate sprops + {:iss :profile-identity + :profile-id (:id profile)})}) result (#'awsns/parse-notification cfg report)] ;; (pprint result) (t/is (= "complaint" (:type result))) @@ -132,8 +133,8 @@ )) (t/deftest test-parse-complaint-report-without-token - (let [tokens (:app.tokens/tokens th/*system*) - cfg {:tokens tokens} + (let [sprops (:app.setup/props th/*system*) + cfg {:sprops sprops} report (complaint-report {:token ""}) result (#'awsns/parse-notification cfg report)] (t/is (= "complaint" (:type result))) @@ -145,12 +146,12 @@ (t/deftest test-process-bounce-report (let [profile (th/create-profile* 1) - tokens (:app.tokens/tokens th/*system*) + sprops (:app.setup/props th/*system*) pool (:app.db/pool th/*system*) - cfg {:tokens tokens :pool pool} - report (bounce-report {:token (tokens :generate-predefined - {:iss :profile-identity - :profile-id (:id profile)})}) + cfg {:sprops sprops :pool pool} + report (bounce-report {:token (tokens/generate sprops + {:iss :profile-identity + :profile-id (:id profile)})}) report (#'awsns/parse-notification cfg report)] (#'awsns/process-report cfg report) @@ -174,12 +175,12 @@ (t/deftest test-process-complaint-report (let [profile (th/create-profile* 1) - tokens (:app.tokens/tokens th/*system*) + sprops (:app.setup/props th/*system*) pool (:app.db/pool th/*system*) - cfg {:tokens tokens :pool pool} - report (complaint-report {:token (tokens :generate-predefined - {:iss :profile-identity - :profile-id (:id profile)})}) + cfg {:sprops sprops :pool pool} + report (complaint-report {:token (tokens/generate sprops + {:iss :profile-identity + :profile-id (:id profile)})}) report (#'awsns/parse-notification cfg report)] (#'awsns/process-report cfg report) @@ -205,13 +206,13 @@ (t/deftest test-process-bounce-report-to-self (let [profile (th/create-profile* 1) - tokens (:app.tokens/tokens th/*system*) + sprops (:app.setup/props th/*system*) pool (:app.db/pool th/*system*) - cfg {:tokens tokens :pool pool} + cfg {:sprops sprops :pool pool} report (bounce-report {:email (:email profile) - :token (tokens :generate-predefined - {:iss :profile-identity - :profile-id (:id profile)})}) + :token (tokens/generate sprops + {:iss :profile-identity + :profile-id (:id profile)})}) report (#'awsns/parse-notification cfg report)] (#'awsns/process-report cfg report) @@ -227,13 +228,13 @@ (t/deftest test-process-complaint-report-to-self (let [profile (th/create-profile* 1) - tokens (:app.tokens/tokens th/*system*) + sprops (:app.setup/props th/*system*) pool (:app.db/pool th/*system*) - cfg {:tokens tokens :pool pool} + cfg {:sprops sprops :pool pool} report (complaint-report {:email (:email profile) - :token (tokens :generate-predefined - {:iss :profile-identity - :profile-id (:id profile)})}) + :token (tokens/generate sprops + {:iss :profile-identity + :profile-id (:id profile)})}) report (#'awsns/parse-notification cfg report)] (#'awsns/process-report cfg report) diff --git a/backend/test/app/services_profile_test.clj b/backend/test/app/services_profile_test.clj index 68f14c3b4..e750ea6d2 100644 --- a/backend/test/app/services_profile_test.clj +++ b/backend/test/app/services_profile_test.clj @@ -9,9 +9,10 @@ [app.common.uuid :as uuid] [app.config :as cf] [app.db :as db] - [app.rpc.mutations.profile :as profile] [app.rpc.commands.auth :as cauth] + [app.rpc.mutations.profile :as profile] [app.test-helpers :as th] + [app.tokens :as tokens] [app.util.time :as dt] [clojure.java.io :as io] [clojure.test :as t] @@ -196,13 +197,13 @@ (t/deftest prepare-and-register-with-invitation-and-disabled-registration-1 (with-redefs [app.config/flags [:disable-registration]] - (let [tokens-fn (:app.tokens/tokens th/*system*) - itoken (tokens-fn :generate - {:iss :team-invitation - :exp (dt/in-future "48h") - :role :editor - :team-id uuid/zero - :member-email "user@example.com"}) + (let [sprops (:app.setup/props th/*system*) + itoken (tokens/generate sprops + {:iss :team-invitation + :exp (dt/in-future "48h") + :role :editor + :team-id uuid/zero + :member-email "user@example.com"}) data {::th/type :prepare-register-profile :invitation-token itoken :email "user@example.com" @@ -226,13 +227,13 @@ (t/deftest prepare-and-register-with-invitation-and-disabled-registration-2 (with-redefs [app.config/flags [:disable-registration]] - (let [tokens-fn (:app.tokens/tokens th/*system*) - itoken (tokens-fn :generate - {:iss :team-invitation - :exp (dt/in-future "48h") - :role :editor - :team-id uuid/zero - :member-email "user2@example.com"}) + (let [sprops (:app.setup/props th/*system*) + itoken (tokens/generate sprops + {:iss :team-invitation + :exp (dt/in-future "48h") + :role :editor + :team-id uuid/zero + :member-email "user2@example.com"}) data {::th/type :prepare-register-profile :invitation-token itoken diff --git a/backend/test/app/test_helpers.clj b/backend/test/app/test_helpers.clj index 6b4e0ec47..a3dec3285 100644 --- a/backend/test/app/test_helpers.clj +++ b/backend/test/app/test_helpers.clj @@ -59,7 +59,7 @@ :path (-> "app/test_files/template.penpot" io/resource fs/path)}] config (-> main/system-config (merge main/worker-config) - (assoc-in [:app.msgbus/msgbus :redis-uri] (:redis-uri config)) + (assoc-in [:app.redis/redis :uri] (:redis-uri config)) (assoc-in [:app.db/pool :uri] (:database-uri config)) (assoc-in [:app.db/pool :username] (:database-username config)) (assoc-in [:app.db/pool :password] (:database-password config)) diff --git a/common/deps.edn b/common/deps.edn index 202f57b3e..64ad27bf0 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -28,7 +28,8 @@ :exclusions [org.clojure/data.json]} frankiesardo/linked {:mvn/version "1.3.0"} - commons-io/commons-io {:mvn/version "2.11.0"} + + funcool/datoteka {:mvn/version "3.0.65"} com.sun.mail/jakarta.mail {:mvn/version "2.0.1"} ;; exception printing diff --git a/common/src/app/common/data.cljc b/common/src/app/common/data.cljc index d03a935a2..6c0a195d2 100644 --- a/common/src/app/common/data.cljc +++ b/common/src/app/common/data.cljc @@ -10,6 +10,7 @@ parse-double group-by iteration]) #?(:cljs (:require-macros [app.common.data])) + (:require [app.common.math :as mth] [clojure.set :as set] diff --git a/common/src/app/common/spec.cljc b/common/src/app/common/spec.cljc index 654270206..29d747aa0 100644 --- a/common/src/app/common/spec.cljc +++ b/common/src/app/common/spec.cljc @@ -133,9 +133,9 @@ (dm/str v))] (s/def ::rgb-color-str (s/conformer conformer unformer))) -;; --- SPEC: set/vec of valid Keywords +;; --- SPEC: set/vector of Keywords -(letfn [(conform-fn [dest s] +(letfn [(conformer-fn [dest s] (let [xform (keep (fn [s] (cond (string? s) (keyword s) @@ -144,17 +144,38 @@ (cond (set? s) (into dest xform s) (string? s) (into dest xform (str/words s)) - :else ::s/invalid)))] + :else ::s/invalid))) + (unformer-fn [v] + (str/join " " (map name v)))] - (s/def ::set-of-valid-keywords - (s/conformer - (fn [s] (conform-fn #{} s)) - (fn [s] (str/join " " (map name s))))) + (s/def ::set-of-keywords + (s/conformer (partial conformer-fn #{}) unformer-fn)) - (s/def ::vec-of-valid-keywords - (s/conformer - (fn [s] (conform-fn [] s)) - (fn [s] (str/join " " (map name s)))))) + (s/def ::vector-of-keywords + (s/conformer (partial conformer-fn []) unformer-fn))) + +;; --- SPEC: set/vector of strings + +(def non-empty-strings-xf + (comp + (filter string?) + (remove str/empty?) + (remove str/blank?))) + +(letfn [(conformer-fn [dest v] + (cond + (string? v) (into dest non-empty-strings-xf (str/split v #"[\s,]+")) + (vector? v) (into dest non-empty-strings-xf v) + (set? v) (into dest non-empty-strings-xf v) + :else ::s/invalid)) + (unformer-fn [v] + (str/join "," v))] + + (s/def ::set-of-strings + (s/conformer (partial conformer-fn #{}) unformer-fn)) + + (s/def ::vector-of-strings + (s/conformer (partial conformer-fn []) unformer-fn))) ;; --- SPEC: set-of-valid-emails @@ -173,23 +194,15 @@ (str/join " " v))] (s/def ::set-of-valid-emails (s/conformer conformer unformer))) -;; --- SPEC: set-of-non-empty-strings - -(def non-empty-strings-xf - (comp - (filter string?) - (remove str/empty?) - (remove str/blank?))) +;; --- SPEC: query-string (letfn [(conformer [s] - (cond - (string? s) (->> (str/split s #"\s*,\s*") - (into #{} non-empty-strings-xf)) - (set? s) (into #{} non-empty-strings-xf s) - :else ::s/invalid)) + (if (string? s) + (ex/try* #(u/query-string->map s) (constantly ::s/invalid)) + s)) (unformer [s] - (str/join "," s))] - (s/def ::set-of-non-empty-strings (s/conformer conformer unformer))) + (u/map->query-string s))] + (s/def ::query-string (s/conformer conformer unformer))) ;; --- SPECS WITHOUT CONFORMER