diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index 5eb0f85ee..67be94684 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -182,78 +182,94 @@ (assoc ::lresult/remaining remaining) (assoc ::lresult/reset (dt/plus ts {unit 1}))))))))) -(defn- process-limits +(defn- process-limits! [redis user-id limits now] - (-> (p/all (map (partial process-limit redis user-id now) limits)) - (p/then (fn [results] - (let [remaining (->> results - (d/index-by ::name ::lresult/remaining) - (uri/map->query-string)) - reset (->> results - (d/index-by ::name (comp dt/->seconds ::lresult/reset)) - (uri/map->query-string)) - rejected (->> results - (filter (complement ::lresult/allowed?)) - (first))] + (->> (p/all (map (partial process-limit redis user-id now) limits)) + (p/fmap (fn [results] + (let [remaining (->> results + (d/index-by ::name ::lresult/remaining) + (uri/map->query-string)) + reset (->> results + (d/index-by ::name (comp dt/->seconds ::lresult/reset)) + (uri/map->query-string)) + rejected (->> results + (filter (complement ::lresult/allowed?)) + (first))] - (when rejected - (l/warn :hint "rejected rate limit" - :user-id (str user-id) - :limit-service (-> rejected ::service name) - :limit-name (-> rejected ::name name) - :limit-strategy (-> rejected ::strategy name))) + (when rejected + (l/warn :hint "rejected rate limit" + :user-id (str user-id) + :limit-service (-> rejected ::service name) + :limit-name (-> rejected ::name name) + :limit-strategy (-> rejected ::strategy name))) - {:enabled? true - :allowed? (not (some? rejected)) - :headers {"x-rate-limit-remaining" remaining - "x-rate-limit-reset" reset}}))))) + {:enabled? true + :allowed? (not (some? rejected)) + :headers {"x-rate-limit-remaining" remaining + "x-rate-limit-reset" reset}}))))) (defn- handle-response [f cfg params result] (if (:enabled? result) (let [headers (:headers result)] - (when-not (:allowed? result) - (ex/raise :type :rate-limit - :code :request-blocked - :hint "rate limit reached" - ::http/headers headers)) - (-> (f cfg params) - (p/then (fn [response] - (vary-meta response update ::http/headers merge headers))))) + (if (:allowed? result) + (->> (f cfg params) + (p/fmap (fn [response] + (vary-meta response update ::http/headers merge headers)))) + (p/rejected + (ex/error :type :rate-limit + :code :request-blocked + :hint "rate limit reached" + ::http/headers headers)))) (f cfg params))) +(defn- get-limits + [state skey sname] + (some->> (or (get-in @state [::limits skey]) + (get-in @state [::limits :default])) + (map #(assoc % ::service sname)) + (seq))) + +(defn- get-uid + [{:keys [::http/request] :as params}] + (or (::rpc/profile-id params) + (some-> request parse-client-ip) + uuid/zero)) + (defn wrap [{:keys [rlimit redis] :as cfg} f mdata] (if rlimit (let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name)) sname (str (::rpc/type cfg) "." (->> mdata ::sv/spec name))] - (fn [cfg {:keys [::http/request] :as params}] - (let [uid (or (:profile-id params) - (some-> request parse-client-ip) - uuid/zero) - rsp (when (and uid @enabled?) - (when-let [limits (or (get-in @rlimit [::limits skey]) - (get-in @rlimit [::limits :default]))] - (let [redis (redis/get-or-connect redis ::rlimit default-options) - limits (map #(assoc % ::service sname) limits) - resp (-> (process-limits redis uid limits (dt/now)) - (p/catch (fn [cause] - ;; If we have an error on processing the rate-limit we just skip - ;; it for do not cause service interruption because of redis - ;; downtime or similar situation. - (l/error :hint "error on processing rate-limit" :cause cause) - {:enabled? false})))] + (fn [cfg params] + (if @enabled? + (try + (let [uid (get-uid params) + rsp (when-let [limits (get-limits rlimit skey sname)] + (let [redis (redis/get-or-connect redis ::rpc/rlimit default-options) + rsp (->> (process-limits! redis uid limits (dt/now)) + (p/merr (fn [cause] + ;; If we have an error on processing the rate-limit we just skip + ;; it for do not cause service interruption because of redis + ;; downtime or similar situation. + (l/error :hint "error on processing rate-limit" :cause cause) + (p/resolved {:enabled? false}))))] - ;; If soft rate are enabled, we process the rate-limit but return unprotected - ;; response. - (if (contains? cf/flags :soft-rpc-rlimit) - (p/resolved {:enabled? false}) - resp)))) + ;; If soft rate are enabled, we process the rate-limit but return unprotected + ;; response. + (if (contains? cf/flags :soft-rpc-rlimit) + {:enabled? false} + rsp)))] - rsp (or rsp (p/resolved {:enabled? false}))] + (->> (p/promise rsp) + (p/fmap #(or % {:enabled? false})) + (p/mcat #(handle-response f cfg params %)))) - (p/then rsp (partial handle-response f cfg params))))) + (catch Throwable cause + (p/rejected cause))) + + (f cfg params)))) f)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;