0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-08 07:50:43 -05:00

Make the rate limit configuration automatically reloadable

This commit is contained in:
Andrey Antukh 2022-09-01 08:38:17 +02:00 committed by Alonso Torres
parent e657c1bbfa
commit f86f93deea
6 changed files with 185 additions and 42 deletions

View file

@ -0,0 +1,6 @@
^{:refresh "30s"}
{:default
[[:default :window "200000/h"]]
#{:query/profile}
[[:burst :bucket "100/60/1m"]]}

View file

@ -49,6 +49,7 @@
:default-blob-version 4
:loggers-zmq-uri "tcp://localhost:45556"
:rpc-rlimit-config (fs/path "resources/rlimit.edn")
:file-change-snapshot-every 5
:file-change-snapshot-timeout "3h"

View file

@ -216,6 +216,10 @@
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::default :app.worker/executor])}
:app.rpc/rlimit
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)}
:app.rpc/methods
{:pool (ig/ref :app.db/pool)
:session (ig/ref :app.http/session)
@ -228,6 +232,7 @@
:audit (ig/ref :app.loggers.audit/collector)
:ldap (ig/ref :app.auth.ldap/provider)
:http-client (ig/ref :app.http/client)
:rlimit (ig/ref :app.rpc/rlimit)
:executors (ig/ref :app.worker/executors)
:templates (ig/ref :app.setup/builtin-templates)}

View file

@ -264,6 +264,7 @@
::public-uri
::msgbus
::http-client
::rlimit/rlimit
::mtx/metrics
::db/pool
::ldap]))

View file

@ -55,12 +55,18 @@
[app.loggers.audit :refer [parse-client-ip]]
[app.redis :as redis]
[app.redis.script :as-alias rscript]
[app.rpc :as-alias rpc]
[app.rpc.rlimit.result :as-alias lresult]
[app.util.services :as-alias sv]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.edn :as edn]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[promesa.core :as p]))
[datoteka.fs :as fs]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
(def ^:private default-timeout
(dt/duration 400))
@ -87,17 +93,12 @@
(def ^:private bucket-opts-re
#"^(\d+)/(\d+)/(\d+[hms])$")
(s/def ::strategy (s/and ::us/keyword #{:window :bucket}))
(s/def ::limit-definition
(s/tuple ::us/keyword ::strategy string?))
(defmulti parse-limit (fn [[_ strategy _]] strategy))
(defmulti process-limit (fn [_ _ _ o] (::strategy o)))
(defmethod parse-limit :window
[[name strategy opts :as vlimit]]
(us/assert! ::limit-definition vlimit)
(us/assert! ::limit-tuple vlimit)
(merge
{::name name
::strategy strategy}
@ -118,7 +119,7 @@
(defmethod parse-limit :bucket
[[name strategy opts :as vlimit]]
(us/assert! ::limit-definition vlimit)
(us/assert! ::limit-tuple vlimit)
(merge
{::name name
::strategy strategy}
@ -184,7 +185,7 @@
(defn- process-limits
[redis user-id limits now]
(-> (p/all (map (partial process-limit redis user-id now) (reverse limits)))
(-> (p/all (map (partial process-limit redis user-id now) limits))
(p/then (fn [results]
(let [remaining (->> results
(d/index-by ::name ::lresult/remaining)
@ -195,6 +196,7 @@
rejected (->> results
(filter (complement ::lresult/allowed?))
(first))]
(when (and rejected (contains? cf/flags :warn-rpc-rate-limits))
(l/warn :hint "rejected rate limit"
:user-id (dm/str user-id)
@ -207,19 +209,6 @@
:headers {"x-rate-limit-remaining" remaining
"x-rate-limit-reset" reset}})))))
(defn- parse-limits
[service limits]
(let [default (some->> (cf/get :default-rpc-rlimit)
(us/conform ::limit-definition))
limits (cond->> limits
(some? default) (cons default))]
(->> (reverse limits)
(sequence (comp (map parse-limit)
(map #(assoc % ::service service))
(d/distinct-xf ::name))))))
(defn- handle-response
[f cfg params rres]
(if (:enabled? rres)
@ -238,33 +227,169 @@
(f cfg params)))
(defn wrap
[{:keys [redis] :as cfg} f {service ::sv/name :as mdata}]
(let [limits (parse-limits service (::limits mdata))
[{:keys [rlimit redis] :as cfg} f mdata]
(let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name))
sname (dm/str (::rpc/type cfg) "." (->> mdata ::sv/spec name))
default-rresp (p/resolved {:enabled? false})]
(if (and (seq limits)
(or (contains? cf/flags :rpc-rate-limit)
(contains? cf/flags :soft-rpc-rate-limit)))
(if (or (contains? cf/flags :rpc-rate-limit)
(contains? cf/flags :soft-rpc-rate-limit))
(fn [cfg {:keys [::http/request] :as params}]
(let [user-id (or (:profile-id params)
(some-> request parse-client-ip)
uuid/zero)
rresp (when (and user-id @enabled?)
(let [redis (redis/get-or-connect redis ::rlimit default-options)
rresp (-> (process-limits redis user-id limits (dt/now))
(p/catch (fn [cause]
;; If we have an error on processing the
;; rate-limit we just skip it for do not cause
;; service interruption because of redis downtime
;; or similar situation.
(l/error :hint "error on processing rate-limit" :cause cause)
{:enabled? false})))]
(when-let [limits (get-in @rlimit [::limits skey])]
(let [redis (redis/get-or-connect redis ::rlimit default-options)
limits (map #(assoc % ::service sname) limits)
rresp (-> (process-limits redis user-id limits (dt/now))
(p/catch (fn [cause]
;; If we have an error on processing the
;; rate-limit we just skip it for do not cause
;; service interruption because of redis downtime
;; or similar situation.
(l/error :hint "error on processing rate-limit" :cause cause)
{:enabled? false})))]
;; If soft rate are enabled, we process the rate-limit but return
;; unprotected response.
(and (contains? cf/flags :soft-rpc-rate-limit) rresp)))]
;; If soft rate are enabled, we process the rate-limit but return
;; unprotected response.
(and (contains? cf/flags :soft-rpc-rate-limit) rresp))))]
(p/then (or rresp default-rresp)
(partial handle-response f cfg params))))
f)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; CONFIG WATCHER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::strategy (s/and ::us/keyword #{:window :bucket}))
(s/def ::capacity ::us/integer)
(s/def ::rate ::us/integer)
(s/def ::interval ::dt/duration)
(s/def ::key ::us/string)
(s/def ::opts ::us/string)
(s/def ::params vector?)
(s/def ::unit #{:days :hours :minutes :seconds :weeks})
(s/def ::nreq ::us/integer)
(s/def ::refresh ::dt/duration)
(s/def ::limit-tuple
(s/tuple ::us/keyword ::strategy string?))
(s/def ::limits
(s/map-of keyword? (s/every ::limit :kind vector?)))
(s/def ::limit
(s/and
(s/keys :req [::name ::strategy ::key ::opts])
(s/or :bucket
(s/keys :req [::capacity
::rate
::interval
::params])
:window
(s/keys :req [::nreq
::unit]))))
(s/def ::rlimit
#(instance? clojure.lang.Agent %))
(s/def ::config
(s/map-of (s/or :kw keyword? :set set?)
(s/every ::limit-tuple :kind vector?)))
(defn read-config
[path]
(letfn [(compile-pass-1 [config]
(reduce-kv (fn [o k v]
(cond
(keyword? k)
(assoc o k (mapv parse-limit v))
(set? k)
(let [limits (mapv parse-limit v)]
(reduce #(assoc %1 %2 limits) o k))
:else
(throw (ex-info "invalid arguments" {}))))
{}
config))
(compile-pass-2 [config]
(let [default (:default config)]
(reduce-kv (fn [o k v]
(assoc o k (into [] (d/distinct-xf ::name) (concat v default))))
{}
config)))]
(when-let [config (some->> path slurp edn/read-string)]
(us/verify! ::config config)
(let [refresh (->> config meta :refresh dt/duration)
limits (->> config compile-pass-1 compile-pass-2)]
(us/verify! ::limits limits)
(us/verify! ::refresh refresh)
{::refresh refresh
::limits limits}))))
(defn- refresh-config
[{:keys [state path executor scheduler] :as params}]
(letfn [(update-config [{:keys [::updated-at] :as state}]
(let [updated-at' (fs/last-modified-time path)]
(merge state
{::updated-at updated-at'}
(when (or (nil? updated-at)
(not= (inst-ms updated-at')
(inst-ms updated-at)))
(let [state (read-config path)]
(l/info :hint "config refreshed"
:loaded-limits (count (::limits state))
::l/async false)
state)))))
(schedule-next [state]
(px/schedule! scheduler
(inst-ms (::refresh state))
(partial refresh-config params))
state)]
(send-via executor state update-config)
(send-via executor state schedule-next)))
(defn- on-refresh-error
[_ cause]
(when-not (instance? java.util.concurrent.RejectedExecutionException cause)
(if-let [explain (-> cause ex-data us/pretty-explain)]
(l/warn ::l/raw (str "unable to refresh config, invalid format:\n" explain)
::l/async false)
(l/warn :hint "unexpected exception on loading config"
:cause cause
::l/async false))))
(defn- get-config-path
[]
(when-let [path (cf/get :rpc-rlimit-config)]
(and (fs/exists? path) (fs/regular-file? path) path)))
(defmethod ig/pre-init-spec :app.rpc/rlimit [_]
(s/keys :req-un [::wrk/executor ::wrk/scheduler]))
(defmethod ig/init-key :app.rpc/rlimit
[_ {:keys [executor] :as params}]
(let [state (agent {})]
(set-error-handler! state on-refresh-error)
(set-error-mode! state :continue)
(when-let [path (get-config-path)]
(l/info :hint "initializing rlimit config reader" :path (str path))
;; Initialize the state with initial refresh value
(send-via executor state (constantly {::refresh (dt/duration "5s")}))
;; Force a refresh
(refresh-config (assoc params :path path :state state)))
state))

View file

@ -11,15 +11,16 @@
[cuerdas.core :as str]
[fipp.ednize :as fez])
(:import
java.nio.file.attribute.FileTime
java.time.Duration
java.time.Instant
java.time.OffsetDateTime
java.time.ZoneId
java.time.ZonedDateTime
java.time.format.DateTimeFormatter
java.time.temporal.ChronoUnit
java.time.temporal.TemporalAmount
java.time.temporal.TemporalUnit
java.time.temporal.ChronoUnit
java.util.Date
org.apache.logging.log4j.core.util.CronExpression))
@ -113,7 +114,11 @@
(inst-ms* [v] (.toMillis ^Duration v))
OffsetDateTime
(inst-ms* [v] (.toEpochMilli (.toInstant ^OffsetDateTime v))))
(inst-ms* [v] (.toEpochMilli (.toInstant ^OffsetDateTime v)))
FileTime
(inst-ms* [v] (.toMillis ^FileTime v)))
(defmethod print-method Duration
[mv ^java.io.Writer writer]