From f86f93deea4a3928098b7f0d59f05927b4c5d2e0 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 1 Sep 2022 08:38:17 +0200 Subject: [PATCH] :sparkles: Make the rate limit configuration automatically reloadable --- backend/resources/rlimit.edn | 6 + backend/src/app/config.clj | 1 + backend/src/app/main.clj | 5 + backend/src/app/rpc.clj | 1 + backend/src/app/rpc/rlimit.clj | 205 ++++++++++++++++++++++++++------- backend/src/app/util/time.clj | 9 +- 6 files changed, 185 insertions(+), 42 deletions(-) create mode 100644 backend/resources/rlimit.edn diff --git a/backend/resources/rlimit.edn b/backend/resources/rlimit.edn new file mode 100644 index 000000000..bba62dfc9 --- /dev/null +++ b/backend/resources/rlimit.edn @@ -0,0 +1,6 @@ +^{:refresh "30s"} +{:default + [[:default :window "200000/h"]] + + #{:query/profile} + [[:burst :bucket "100/60/1m"]]} diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 0d6fd2e2a..e5ea68123 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -49,6 +49,7 @@ :default-blob-version 4 :loggers-zmq-uri "tcp://localhost:45556" + :rpc-rlimit-config (fs/path "resources/rlimit.edn") :file-change-snapshot-every 5 :file-change-snapshot-timeout "3h" diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 863cf8812..da3ff106c 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -216,6 +216,10 @@ {:pool (ig/ref :app.db/pool) :executor (ig/ref [::default :app.worker/executor])} + :app.rpc/rlimit + {:executor (ig/ref [::worker :app.worker/executor]) + :scheduler (ig/ref :app.worker/scheduler)} + :app.rpc/methods {:pool (ig/ref :app.db/pool) :session (ig/ref :app.http/session) @@ -228,6 +232,7 @@ :audit (ig/ref :app.loggers.audit/collector) :ldap (ig/ref :app.auth.ldap/provider) :http-client (ig/ref :app.http/client) + :rlimit (ig/ref :app.rpc/rlimit) :executors (ig/ref :app.worker/executors) :templates (ig/ref :app.setup/builtin-templates)} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index ff0739949..91b84211b 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -264,6 +264,7 @@ ::public-uri ::msgbus ::http-client + ::rlimit/rlimit ::mtx/metrics ::db/pool ::ldap])) diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index 2a90a8aa1..3b0450de0 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -55,12 +55,18 @@ [app.loggers.audit :refer [parse-client-ip]] [app.redis :as redis] [app.redis.script :as-alias rscript] + [app.rpc :as-alias rpc] [app.rpc.rlimit.result :as-alias lresult] [app.util.services :as-alias sv] [app.util.time :as dt] + [app.worker :as wrk] + [clojure.edn :as edn] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [promesa.core :as p])) + [datoteka.fs :as fs] + [integrant.core :as ig] + [promesa.core :as p] + [promesa.exec :as px])) (def ^:private default-timeout (dt/duration 400)) @@ -87,17 +93,12 @@ (def ^:private bucket-opts-re #"^(\d+)/(\d+)/(\d+[hms])$") -(s/def ::strategy (s/and ::us/keyword #{:window :bucket})) - -(s/def ::limit-definition - (s/tuple ::us/keyword ::strategy string?)) - (defmulti parse-limit (fn [[_ strategy _]] strategy)) (defmulti process-limit (fn [_ _ _ o] (::strategy o))) (defmethod parse-limit :window [[name strategy opts :as vlimit]] - (us/assert! ::limit-definition vlimit) + (us/assert! ::limit-tuple vlimit) (merge {::name name ::strategy strategy} @@ -118,7 +119,7 @@ (defmethod parse-limit :bucket [[name strategy opts :as vlimit]] - (us/assert! ::limit-definition vlimit) + (us/assert! ::limit-tuple vlimit) (merge {::name name ::strategy strategy} @@ -184,7 +185,7 @@ (defn- process-limits [redis user-id limits now] - (-> (p/all (map (partial process-limit redis user-id now) (reverse limits))) + (-> (p/all (map (partial process-limit redis user-id now) limits)) (p/then (fn [results] (let [remaining (->> results (d/index-by ::name ::lresult/remaining) @@ -195,6 +196,7 @@ rejected (->> results (filter (complement ::lresult/allowed?)) (first))] + (when (and rejected (contains? cf/flags :warn-rpc-rate-limits)) (l/warn :hint "rejected rate limit" :user-id (dm/str user-id) @@ -207,19 +209,6 @@ :headers {"x-rate-limit-remaining" remaining "x-rate-limit-reset" reset}}))))) -(defn- parse-limits - [service limits] - (let [default (some->> (cf/get :default-rpc-rlimit) - (us/conform ::limit-definition)) - - limits (cond->> limits - (some? default) (cons default))] - - (->> (reverse limits) - (sequence (comp (map parse-limit) - (map #(assoc % ::service service)) - (d/distinct-xf ::name)))))) - (defn- handle-response [f cfg params rres] (if (:enabled? rres) @@ -238,33 +227,169 @@ (f cfg params))) (defn wrap - [{:keys [redis] :as cfg} f {service ::sv/name :as mdata}] - (let [limits (parse-limits service (::limits mdata)) + [{:keys [rlimit redis] :as cfg} f mdata] + (let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name)) + sname (dm/str (::rpc/type cfg) "." (->> mdata ::sv/spec name)) default-rresp (p/resolved {:enabled? false})] - - (if (and (seq limits) - (or (contains? cf/flags :rpc-rate-limit) - (contains? cf/flags :soft-rpc-rate-limit))) + (if (or (contains? cf/flags :rpc-rate-limit) + (contains? cf/flags :soft-rpc-rate-limit)) (fn [cfg {:keys [::http/request] :as params}] (let [user-id (or (:profile-id params) (some-> request parse-client-ip) uuid/zero) rresp (when (and user-id @enabled?) - (let [redis (redis/get-or-connect redis ::rlimit default-options) - rresp (-> (process-limits redis user-id limits (dt/now)) - (p/catch (fn [cause] - ;; If we have an error on processing the - ;; rate-limit we just skip it for do not cause - ;; service interruption because of redis downtime - ;; or similar situation. - (l/error :hint "error on processing rate-limit" :cause cause) - {:enabled? false})))] + (when-let [limits (get-in @rlimit [::limits skey])] + (let [redis (redis/get-or-connect redis ::rlimit default-options) + limits (map #(assoc % ::service sname) limits) + rresp (-> (process-limits redis user-id limits (dt/now)) + (p/catch (fn [cause] + ;; If we have an error on processing the + ;; rate-limit we just skip it for do not cause + ;; service interruption because of redis downtime + ;; or similar situation. + (l/error :hint "error on processing rate-limit" :cause cause) + {:enabled? false})))] - ;; If soft rate are enabled, we process the rate-limit but return - ;; unprotected response. - (and (contains? cf/flags :soft-rpc-rate-limit) rresp)))] + ;; If soft rate are enabled, we process the rate-limit but return + ;; unprotected response. + (and (contains? cf/flags :soft-rpc-rate-limit) rresp))))] (p/then (or rresp default-rresp) (partial handle-response f cfg params)))) f))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; CONFIG WATCHER +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(s/def ::strategy (s/and ::us/keyword #{:window :bucket})) +(s/def ::capacity ::us/integer) +(s/def ::rate ::us/integer) +(s/def ::interval ::dt/duration) +(s/def ::key ::us/string) +(s/def ::opts ::us/string) +(s/def ::params vector?) +(s/def ::unit #{:days :hours :minutes :seconds :weeks}) +(s/def ::nreq ::us/integer) +(s/def ::refresh ::dt/duration) + +(s/def ::limit-tuple + (s/tuple ::us/keyword ::strategy string?)) + +(s/def ::limits + (s/map-of keyword? (s/every ::limit :kind vector?))) + +(s/def ::limit + (s/and + (s/keys :req [::name ::strategy ::key ::opts]) + (s/or :bucket + (s/keys :req [::capacity + ::rate + ::interval + ::params]) + :window + (s/keys :req [::nreq + ::unit])))) + +(s/def ::rlimit + #(instance? clojure.lang.Agent %)) + +(s/def ::config + (s/map-of (s/or :kw keyword? :set set?) + (s/every ::limit-tuple :kind vector?))) + +(defn read-config + [path] + (letfn [(compile-pass-1 [config] + (reduce-kv (fn [o k v] + (cond + (keyword? k) + (assoc o k (mapv parse-limit v)) + + (set? k) + (let [limits (mapv parse-limit v)] + (reduce #(assoc %1 %2 limits) o k)) + + :else + (throw (ex-info "invalid arguments" {})))) + {} + config)) + + (compile-pass-2 [config] + (let [default (:default config)] + (reduce-kv (fn [o k v] + (assoc o k (into [] (d/distinct-xf ::name) (concat v default)))) + {} + config)))] + + (when-let [config (some->> path slurp edn/read-string)] + (us/verify! ::config config) + (let [refresh (->> config meta :refresh dt/duration) + limits (->> config compile-pass-1 compile-pass-2)] + + (us/verify! ::limits limits) + (us/verify! ::refresh refresh) + + {::refresh refresh + ::limits limits})))) + +(defn- refresh-config + [{:keys [state path executor scheduler] :as params}] + (letfn [(update-config [{:keys [::updated-at] :as state}] + (let [updated-at' (fs/last-modified-time path)] + (merge state + {::updated-at updated-at'} + (when (or (nil? updated-at) + (not= (inst-ms updated-at') + (inst-ms updated-at))) + (let [state (read-config path)] + (l/info :hint "config refreshed" + :loaded-limits (count (::limits state)) + ::l/async false) + state))))) + + (schedule-next [state] + (px/schedule! scheduler + (inst-ms (::refresh state)) + (partial refresh-config params)) + state)] + + (send-via executor state update-config) + (send-via executor state schedule-next))) + +(defn- on-refresh-error + [_ cause] + (when-not (instance? java.util.concurrent.RejectedExecutionException cause) + (if-let [explain (-> cause ex-data us/pretty-explain)] + (l/warn ::l/raw (str "unable to refresh config, invalid format:\n" explain) + ::l/async false) + (l/warn :hint "unexpected exception on loading config" + :cause cause + ::l/async false)))) + +(defn- get-config-path + [] + (when-let [path (cf/get :rpc-rlimit-config)] + (and (fs/exists? path) (fs/regular-file? path) path))) + +(defmethod ig/pre-init-spec :app.rpc/rlimit [_] + (s/keys :req-un [::wrk/executor ::wrk/scheduler])) + +(defmethod ig/init-key :app.rpc/rlimit + [_ {:keys [executor] :as params}] + (let [state (agent {})] + + (set-error-handler! state on-refresh-error) + (set-error-mode! state :continue) + + (when-let [path (get-config-path)] + (l/info :hint "initializing rlimit config reader" :path (str path)) + + ;; Initialize the state with initial refresh value + (send-via executor state (constantly {::refresh (dt/duration "5s")})) + + ;; Force a refresh + (refresh-config (assoc params :path path :state state))) + + state)) diff --git a/backend/src/app/util/time.clj b/backend/src/app/util/time.clj index 5858a46e6..5c0551634 100644 --- a/backend/src/app/util/time.clj +++ b/backend/src/app/util/time.clj @@ -11,15 +11,16 @@ [cuerdas.core :as str] [fipp.ednize :as fez]) (:import + java.nio.file.attribute.FileTime java.time.Duration java.time.Instant java.time.OffsetDateTime java.time.ZoneId java.time.ZonedDateTime java.time.format.DateTimeFormatter + java.time.temporal.ChronoUnit java.time.temporal.TemporalAmount java.time.temporal.TemporalUnit - java.time.temporal.ChronoUnit java.util.Date org.apache.logging.log4j.core.util.CronExpression)) @@ -113,7 +114,11 @@ (inst-ms* [v] (.toMillis ^Duration v)) OffsetDateTime - (inst-ms* [v] (.toEpochMilli (.toInstant ^OffsetDateTime v)))) + (inst-ms* [v] (.toEpochMilli (.toInstant ^OffsetDateTime v))) + + FileTime + (inst-ms* [v] (.toMillis ^FileTime v))) + (defmethod print-method Duration [mv ^java.io.Writer writer]