From 6ad9a5aadbe89d56fdb338efb94d7fa246edb8a1 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 18 Oct 2022 17:13:00 +0200 Subject: [PATCH] :bug: Fix many bugs on rlimit module --- backend/resources/rlimit.edn | 4 ++ backend/src/app/rpc.clj | 7 +- backend/src/app/rpc/rlimit.clj | 105 +++++++++++++++--------------- backend/src/app/util/services.clj | 25 +++++-- 4 files changed, 79 insertions(+), 62 deletions(-) diff --git a/backend/resources/rlimit.edn b/backend/resources/rlimit.edn index bba62dfc9..c7df92bdf 100644 --- a/backend/resources/rlimit.edn +++ b/backend/resources/rlimit.edn @@ -1,6 +1,10 @@ +;; Example rlimit.edn file ^{:refresh "30s"} {:default [[:default :window "200000/h"]] + #{:query/teams} + [[:burst :bucket "5/1/5s"]] + #{:query/profile} [[:burst :bucket "100/60/1m"]]} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 21332a10c..01adf6b07 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -126,7 +126,8 @@ (with-meta (fn [cfg params] (-> (px/submit! executor #(f cfg params)) - (p/bind p/wrap))) + (p/bind p/wrap) + (p/then' sv/wrap))) mdata)) (defn- wrap-audit @@ -237,6 +238,8 @@ (s/def ::http-client fn?) (s/def ::ldap (s/nilable map?)) (s/def ::msgbus ::mbus/msgbus) +(s/def ::rlimit (s/nilable ::rlimit/rlimit)) + (s/def ::public-uri ::us/not-empty-string) (s/def ::sprops map?) @@ -249,7 +252,7 @@ ::msgbus ::http-client ::rsem/semaphores - ::rlimit/rlimit + ::rlimit ::mtx/metrics ::db/pool ::ldap])) diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index e7d80b7a8..390892e33 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -44,7 +44,6 @@ " (:require [app.common.data :as d] - [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] @@ -111,7 +110,7 @@ "m" :minutes "s" :seconds "w" :weeks) - ::key (dm/str "ratelimit.window." (d/name name)) + ::key (str "ratelimit.window." (d/name name)) ::opts opts}) (ex/raise :type :validation :code :invalid-window-limit-opts @@ -132,7 +131,7 @@ ::interval interval ::opts opts ::params [(dt/->seconds interval) rate capacity] - ::key (dm/str "ratelimit.bucket." (d/name name))}) + ::key (str "ratelimit.bucket." (d/name name))}) (ex/raise :type :validation :code :invalid-bucket-limit-opts :hint (str/ffmt "looks like '%' does not have a valid format" opts))))) @@ -140,7 +139,7 @@ (defmethod process-limit :bucket [redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}] (let [script (-> bucket-rate-limit-script - (assoc ::rscript/keys [(dm/str key "." service "." user-id)]) + (assoc ::rscript/keys [(str key "." service "." user-id)]) (assoc ::rscript/vals (conj params (dt/->seconds now))))] (-> (redis/eval! redis script) (p/then (fn [result] @@ -165,7 +164,7 @@ (let [ts (dt/truncate now unit) ttl (dt/diff now (dt/plus ts {unit 1})) script (-> window-rate-limit-script - (assoc ::rscript/keys [(dm/str key "." service "." user-id "." (dt/format-instant ts))]) + (assoc ::rscript/keys [(str key "." service "." user-id "." (dt/format-instant ts))]) (assoc ::rscript/vals [nreq (dt/->seconds ttl)]))] (-> (redis/eval! redis script) (p/then (fn [result] @@ -197,67 +196,65 @@ (filter (complement ::lresult/allowed?)) (first))] - (when (and rejected (contains? cf/flags :warn-rpc-rate-limits)) + (when rejected (l/warn :hint "rejected rate limit" - :user-id (dm/str user-id) + :user-id (str user-id) :limit-service (-> rejected ::service name) :limit-name (-> rejected ::name name) :limit-strategy (-> rejected ::strategy name))) {:enabled? true - :allowed? (some? rejected) + :allowed? (not (some? rejected)) :headers {"x-rate-limit-remaining" remaining "x-rate-limit-reset" reset}}))))) (defn- handle-response - [f cfg params rres] - (if (:enabled? rres) - (let [headers {"x-rate-limit-remaining" (:remaining rres) - "x-rate-limit-reset" (:reset rres)}] - (when-not (:allowed? rres) + [f cfg params result] + (if (:enabled? result) + (let [headers (:headers result)] + (when-not (:allowed? result) (ex/raise :type :rate-limit :code :request-blocked :hint "rate limit reached" ::http/headers headers)) (-> (f cfg params) (p/then (fn [response] - (with-meta response - {::http/headers headers}))))) - + (vary-meta response update ::http/headers merge headers))))) (f cfg params))) (defn wrap [{:keys [rlimit redis] :as cfg} f mdata] - (let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name)) - sname (dm/str (::rpc/type cfg) "." (->> mdata ::sv/spec name)) - default-rresp (p/resolved {:enabled? false})] - (if (or (contains? cf/flags :rpc-rate-limit) - (contains? cf/flags :soft-rpc-rate-limit)) + (if rlimit + (let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name)) + sname (str (::rpc/type cfg) "." (->> mdata ::sv/spec name))] (fn [cfg {:keys [::http/request] :as params}] - (let [user-id (or (:profile-id params) - (some-> request parse-client-ip) - uuid/zero) + (let [uid (or (:profile-id params) + (some-> request parse-client-ip) + uuid/zero) - rresp (when (and user-id @enabled?) - (when-let [limits (get-in @rlimit [::limits skey])] - (let [redis (redis/get-or-connect redis ::rlimit default-options) - limits (map #(assoc % ::service sname) limits) - rresp (-> (process-limits redis user-id limits (dt/now)) - (p/catch (fn [cause] - ;; If we have an error on processing the - ;; rate-limit we just skip it for do not cause - ;; service interruption because of redis downtime - ;; or similar situation. - (l/error :hint "error on processing rate-limit" :cause cause) - {:enabled? false})))] + rsp (when (and uid @enabled?) + (when-let [limits (or (get-in @rlimit [::limits skey]) + (get-in @rlimit [::limits :default]))] + (let [redis (redis/get-or-connect redis ::rlimit default-options) + limits (map #(assoc % ::service sname) limits) + resp (-> (process-limits redis uid limits (dt/now)) + (p/catch (fn [cause] + ;; If we have an error on processing the rate-limit we just skip + ;; it for do not cause service interruption because of redis + ;; downtime or similar situation. + (l/error :hint "error on processing rate-limit" :cause cause) + {:enabled? false})))] - ;; If soft rate are enabled, we process the rate-limit but return - ;; unprotected response. - (and (contains? cf/flags :soft-rpc-rate-limit) rresp))))] + ;; If soft rate are enabled, we process the rate-limit but return unprotected + ;; response. + (if (contains? cf/flags :soft-rpc-rlimit) + (p/resolved {:enabled? false}) + resp)))) - (p/then (or rresp default-rresp) - (partial handle-response f cfg params)))) - f))) + rsp (or rsp (p/resolved {:enabled? false}))] + + (p/then rsp (partial handle-response f cfg params))))) + f)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; CONFIG WATCHER @@ -376,20 +373,20 @@ (defmethod ig/pre-init-spec :app.rpc/rlimit [_] (s/keys :req-un [::wrk/executor ::wrk/scheduler])) -(defmethod ig/init-key :app.rpc/rlimit +(defmethod ig/init-key ::rpc/rlimit [_ {:keys [executor] :as params}] - (let [state (agent {})] + (when (contains? cf/flags :rpc-rlimit) + (let [state (agent {})] + (set-error-handler! state on-refresh-error) + (set-error-mode! state :continue) - (set-error-handler! state on-refresh-error) - (set-error-mode! state :continue) + (when-let [path (get-config-path)] + (l/info :hint "initializing rlimit config reader" :path (str path)) - (when-let [path (get-config-path)] - (l/info :hint "initializing rlimit config reader" :path (str path)) + ;; Initialize the state with initial refresh value + (send-via executor state (constantly {::refresh (dt/duration "5s")})) - ;; Initialize the state with initial refresh value - (send-via executor state (constantly {::refresh (dt/duration "5s")})) + ;; Force a refresh + (refresh-config (assoc params :path path :state state))) - ;; Force a refresh - (refresh-config (assoc params :path path :state state))) - - state)) + state))) diff --git a/backend/src/app/util/services.clj b/backend/src/app/util/services.clj index f8f8fc004..59a048e1e 100644 --- a/backend/src/app/util/services.clj +++ b/backend/src/app/util/services.clj @@ -11,19 +11,32 @@ [app.common.data :as d] [cuerdas.core :as str])) -(defrecord WrappedValue [obj] +;; A utilty wrapper object for wrap service responses that does not +;; implements the IObj interface that make possible attach metadata to +;; it. + +(deftype MetadataWrapper [obj ^:unsynchronized-mutable metadata] clojure.lang.IDeref - (deref [_] obj)) + (deref [_] obj) + + clojure.lang.IObj + (withMeta [_ meta] + (MetadataWrapper. obj meta)) + + (meta [_] metadata)) (defn wrap - ([] - (WrappedValue. nil)) + "Conditionally wrap a value into MetadataWrapper instance. If the + object already implements IObj interface it will be returned as is." + ([] (wrap nil)) ([o] - (WrappedValue. o))) + (if (instance? clojure.lang.IObj o) + o + (MetadataWrapper. o {})))) (defn wrapped? [o] - (instance? WrappedValue o)) + (instance? MetadataWrapper o)) (defmacro defmethod [sname & body]