🐛 Fix many bugs on rlimit module

This commit is contained in:
Andrey Antukh 2022-10-18 17:13:00 +02:00
parent 9c33dc529d
commit 6ad9a5aadb
4 changed files with 79 additions and 62 deletions

@ -1,6 +1,10 @@
;; Example rlimit.edn file
^{:refresh "30s"}
[[:default :window "200000/h"]]
[[:burst :bucket "5/1/5s"]]
[[:burst :bucket "100/60/1m"]]}

@ -126,7 +126,8 @@
(fn [cfg params]
(-> (px/submit! executor #(f cfg params))
(p/bind p/wrap)))
(p/bind p/wrap)
(p/then' sv/wrap)))
(defn- wrap-audit
@ -237,6 +238,8 @@
(s/def ::http-client fn?)
(s/def ::ldap (s/nilable map?))
(s/def ::msgbus ::mbus/msgbus)
(s/def ::rlimit (s/nilable ::rlimit/rlimit))
(s/def ::public-uri ::us/not-empty-string)
(s/def ::sprops map?)
@ -249,7 +252,7 @@

@ -44,7 +44,6 @@
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
@ -111,7 +110,7 @@
"m" :minutes
"s" :seconds
"w" :weeks)
::key (dm/str "ratelimit.window." (d/name name))
::key (str "ratelimit.window." (d/name name))
::opts opts})
(ex/raise :type :validation
:code :invalid-window-limit-opts
@ -132,7 +131,7 @@
::interval interval
::opts opts
::params [(dt/->seconds interval) rate capacity]
::key (dm/str "ratelimit.bucket." (d/name name))})
::key (str "ratelimit.bucket." (d/name name))})
(ex/raise :type :validation
:code :invalid-bucket-limit-opts
:hint (str/ffmt "looks like '%' does not have a valid format" opts)))))
@ -140,7 +139,7 @@
(defmethod process-limit :bucket
[redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}]
(let [script (-> bucket-rate-limit-script
(assoc ::rscript/keys [(dm/str key "." service "." user-id)])
(assoc ::rscript/keys [(str key "." service "." user-id)])
(assoc ::rscript/vals (conj params (dt/->seconds now))))]
(-> (redis/eval! redis script)
(p/then (fn [result]
@ -165,7 +164,7 @@
(let [ts (dt/truncate now unit)
ttl (dt/diff now (dt/plus ts {unit 1}))
script (-> window-rate-limit-script
(assoc ::rscript/keys [(dm/str key "." service "." user-id "." (dt/format-instant ts))])
(assoc ::rscript/keys [(str key "." service "." user-id "." (dt/format-instant ts))])
(assoc ::rscript/vals [nreq (dt/->seconds ttl)]))]
(-> (redis/eval! redis script)
(p/then (fn [result]
@ -197,67 +196,65 @@
(filter (complement ::lresult/allowed?))
(when (and rejected (contains? cf/flags :warn-rpc-rate-limits))
(when rejected
(l/warn :hint "rejected rate limit"
:user-id (dm/str user-id)
:user-id (str user-id)
:limit-service (-> rejected ::service name)
:limit-name (-> rejected ::name name)
:limit-strategy (-> rejected ::strategy name)))
{:enabled? true
:allowed? (some? rejected)
:allowed? (not (some? rejected))
:headers {"x-rate-limit-remaining" remaining
"x-rate-limit-reset" reset}})))))
(defn- handle-response
[f cfg params rres]
(if (:enabled? rres)
(let [headers {"x-rate-limit-remaining" (:remaining rres)
"x-rate-limit-reset" (:reset rres)}]
(when-not (:allowed? rres)
[f cfg params result]
(if (:enabled? result)
(let [headers (:headers result)]
(when-not (:allowed? result)
(ex/raise :type :rate-limit
:code :request-blocked
:hint "rate limit reached"
::http/headers headers))
(-> (f cfg params)
(p/then (fn [response]
(with-meta response
{::http/headers headers})))))
(vary-meta response update ::http/headers merge headers)))))
(f cfg params)))
(defn wrap
[{:keys [rlimit redis] :as cfg} f mdata]
(let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name))
sname (dm/str (::rpc/type cfg) "." (->> mdata ::sv/spec name))
default-rresp (p/resolved {:enabled? false})]
(if (or (contains? cf/flags :rpc-rate-limit)
(contains? cf/flags :soft-rpc-rate-limit))
(if rlimit
(let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name))
sname (str (::rpc/type cfg) "." (->> mdata ::sv/spec name))]
(fn [cfg {:keys [::http/request] :as params}]
(let [user-id (or (:profile-id params)
(some-> request parse-client-ip)
(let [uid (or (:profile-id params)
(some-> request parse-client-ip)
rresp (when (and user-id @enabled?)
(when-let [limits (get-in @rlimit [::limits skey])]
(let [redis (redis/get-or-connect redis ::rlimit default-options)
limits (map #(assoc % ::service sname) limits)
rresp (-> (process-limits redis user-id limits (dt/now))
(p/catch (fn [cause]
;; If we have an error on processing the
;; rate-limit we just skip it for do not cause
;; service interruption because of redis downtime
;; or similar situation.
(l/error :hint "error on processing rate-limit" :cause cause)
{:enabled? false})))]
rsp (when (and uid @enabled?)
(when-let [limits (or (get-in @rlimit [::limits skey])
(get-in @rlimit [::limits :default]))]
(let [redis (redis/get-or-connect redis ::rlimit default-options)
limits (map #(assoc % ::service sname) limits)
resp (-> (process-limits redis uid limits (dt/now))
(p/catch (fn [cause]
;; If we have an error on processing the rate-limit we just skip
;; it for do not cause service interruption because of redis
;; downtime or similar situation.
(l/error :hint "error on processing rate-limit" :cause cause)
{:enabled? false})))]
;; If soft rate are enabled, we process the rate-limit but return
;; unprotected response.
(and (contains? cf/flags :soft-rpc-rate-limit) rresp))))]
;; If soft rate are enabled, we process the rate-limit but return unprotected
;; response.
(if (contains? cf/flags :soft-rpc-rlimit)
(p/resolved {:enabled? false})
(p/then (or rresp default-rresp)
(partial handle-response f cfg params))))
rsp (or rsp (p/resolved {:enabled? false}))]
(p/then rsp (partial handle-response f cfg params)))))
@ -376,20 +373,20 @@
(defmethod ig/pre-init-spec :app.rpc/rlimit [_]
(s/keys :req-un [::wrk/executor ::wrk/scheduler]))
(defmethod ig/init-key :app.rpc/rlimit
(defmethod ig/init-key ::rpc/rlimit
[_ {:keys [executor] :as params}]
(let [state (agent {})]
(when (contains? cf/flags :rpc-rlimit)
(let [state (agent {})]
(set-error-handler! state on-refresh-error)
(set-error-mode! state :continue)
(set-error-handler! state on-refresh-error)
(set-error-mode! state :continue)
(when-let [path (get-config-path)]
(l/info :hint "initializing rlimit config reader" :path (str path))
(when-let [path (get-config-path)]
(l/info :hint "initializing rlimit config reader" :path (str path))
;; Initialize the state with initial refresh value
(send-via executor state (constantly {::refresh (dt/duration "5s")}))
;; Initialize the state with initial refresh value
(send-via executor state (constantly {::refresh (dt/duration "5s")}))
;; Force a refresh
(refresh-config (assoc params :path path :state state)))
;; Force a refresh
(refresh-config (assoc params :path path :state state)))

@ -11,19 +11,32 @@
[app.common.data :as d]
[cuerdas.core :as str]))
(defrecord WrappedValue [obj]
;; A utilty wrapper object for wrap service responses that does not
;; implements the IObj interface that make possible attach metadata to
;; it.
(deftype MetadataWrapper [obj ^:unsynchronized-mutable metadata]
(deref [_] obj))
(deref [_] obj)
(withMeta [_ meta]
(MetadataWrapper. obj meta))
(meta [_] metadata))
(defn wrap
(WrappedValue. nil))
"Conditionally wrap a value into MetadataWrapper instance. If the
object already implements IObj interface it will be returned as is."
([] (wrap nil))
(WrappedValue. o)))
(if (instance? clojure.lang.IObj o)
(MetadataWrapper. o {}))))
(defn wrapped?
(instance? WrappedValue o))
(instance? MetadataWrapper o))
(defmacro defmethod
[sname & body]