diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index 7dbec0861..3ca348e0b 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -20,6 +20,7 @@ [app.util.time :as dt] [app.worker :as-alias wrk] [clojure.edn :as edn] + [clojure.set :as set] [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] @@ -91,67 +92,77 @@ :timeout (:timeout config) :type :semaphore)) -(defmacro ^:private measure-and-log! - [metrics mlabels stats id action limit-id limit-label profile-id elapsed] - `(let [mpermits# (:max-permits ~stats) - mqueue# (:max-queue ~stats) - permits# (:permits ~stats) - queue# (:queue ~stats) - queue# (- queue# mpermits#) - queue# (if (neg? queue#) 0 queue#) - level# (if (pos? queue#) :warn :trace)] - (mtx/run! ~metrics - :id :rpc-climit-queue - :val queue# - :labels ~mlabels) +(defn measure! + [metrics mlabels stats elapsed] + (let [mpermits (:max-permits stats) + permits (:permits stats) + queue (:queue stats) + queue (- queue mpermits) + queue (if (neg? queue) 0 queue)] - (mtx/run! ~metrics - :id :rpc-climit-permits - :val permits# - :labels ~mlabels) + (mtx/run! metrics + :id :rpc-climit-queue + :val queue + :labels mlabels) - (l/log level# - :hint ~action - :req ~id - :id ~limit-id - :label ~limit-label - :profile-id (str ~profile-id) - :permits permits# - :queue queue# - :max-permits mpermits# - :max-queue mqueue# - ~@(if (some? elapsed) - [:elapsed `(dt/format-duration ~elapsed)] - [])))) + (mtx/run! metrics + :id :rpc-climit-permits + :val permits + :labels mlabels) + + (when elapsed + (mtx/run! metrics + :id :rpc-climit-timing + :val (inst-ms elapsed) + :labels mlabels)))) + +(defn log! + [action req-id stats limit-id limit-label params elapsed] + (let [mpermits (:max-permits stats) + queue (:queue stats) + queue (- queue mpermits) + queue (if (neg? queue) 0 queue) + level (if (pos? queue) :warn :trace)] + + (l/log level + :hint action + :req req-id + :id limit-id + :label limit-label + :queue queue + :elapsed (some-> elapsed dt/format-duration) + :params (-> (select-keys params [::rpc/profile-id :file-id :profile-id]) + (set/rename-keys {::rpc/profile-id :profile-id}) + (update-vals str))))) (def ^:private idseq (AtomicLong. 0)) (defn- invoke - [limiter metrics limit-id limit-key limit-label profile-id f params] + [limiter metrics limit-id limit-key limit-label handler params] (let [tpoint (dt/tpoint) mlabels (into-array String [(id->str limit-id)]) limit-id (id->str limit-id limit-key) stats (pbh/get-stats limiter) - id (.incrementAndGet ^AtomicLong idseq)] + req-id (.incrementAndGet ^AtomicLong idseq)] (try - (measure-and-log! metrics mlabels stats id "enqueued" limit-id limit-label profile-id nil) + (measure! metrics mlabels stats nil) + (log! "enqueued" req-id stats limit-id limit-label params nil) (px/invoke! limiter (fn [] (let [elapsed (tpoint) stats (pbh/get-stats limiter)] - (measure-and-log! metrics mlabels stats id "acquired" limit-id limit-label profile-id elapsed) - (mtx/run! metrics - :id :rpc-climit-timing - :val (inst-ms elapsed) - :labels mlabels) - (apply f params)))) + + (measure! metrics mlabels stats elapsed) + (log! "acquired" req-id stats limit-id limit-label params elapsed) + + (handler params)))) (catch ExceptionInfo cause (let [{:keys [type code]} (ex-data cause)] (if (= :bulkhead-error type) (let [elapsed (tpoint)] - (measure-and-log! metrics mlabels stats id "reject" limit-id limit-label profile-id elapsed) + (log! "rejected" req-id stats limit-id limit-label params elapsed) (ex/raise :type :concurrency-limit :code code :hint "concurrency limit reached" @@ -161,7 +172,9 @@ (finally (let [elapsed (tpoint) stats (pbh/get-stats limiter)] - (measure-and-log! metrics mlabels stats id "finished" limit-id limit-label profile-id elapsed)))))) + + (measure! metrics mlabels stats nil) + (log! "finished" req-id stats limit-id limit-label params elapsed)))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MIDDLEWARE @@ -219,10 +232,8 @@ (let [limit-key (key-fn params) cache-key [limit-id limit-key] limiter (cache/get cache cache-key (partial create-limiter config)) - profile-id (if (= key-fn ::rpc/profile-id) - limit-key - (get params ::rpc/profile-id))] - (invoke limiter metrics limit-id limit-key label profile-id handler [cfg params]))))) + handler (partial handler cfg)] + (invoke limiter metrics limit-id limit-key label handler params))))) (do (l/wrn :hint "no config found for specified queue" :id (id->str limit-id)) @@ -237,15 +248,15 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- build-exec-chain - [{:keys [::label ::profile-id ::rpc/climit ::mtx/metrics] :as cfg} f] + [{:keys [::label ::rpc/climit ::mtx/metrics] :as cfg} f] (let [config (get climit ::config) cache (get climit ::cache)] (reduce (fn [handler [limit-id limit-key :as ckey]] (if-let [config (get config limit-id)] - (fn [& params] - (let [limiter (cache/get cache ckey (partial create-limiter config))] - (invoke limiter metrics limit-id limit-key label profile-id handler params))) - + (fn [cfg params] + (let [limiter (cache/get cache ckey (partial create-limiter config)) + handler (partial handler cfg)] + (invoke limiter metrics limit-id limit-key label handler params))) (do (l/wrn :hint "config not found" :label label :id limit-id) f))) @@ -255,9 +266,9 @@ (defn invoke! "Run a function in context of climit. Intended to be used in virtual threads." - [{:keys [::executor] :as cfg} f & params] + [{:keys [::executor] :as cfg} f params] (let [f (if (some? executor) - (fn [& params] (px/await! (px/submit! executor (fn [] (apply f params))))) + (fn [cfg params] (px/await! (px/submit! executor (fn [] (f cfg params))))) f) f (build-exec-chain cfg f)] - (apply f params))) + (f cfg params))) diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index 1bdcd3c50..08232e899 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -243,12 +243,13 @@ ;; NOTE: we use the climit here in a dynamic invocation because we ;; don't want saturate the process-image limit with IO (download ;; of external image) + (-> cfg (assoc ::climit/id [[:process-image/by-profile (:profile-id params)] [:process-image/global]]) - (assoc ::climit/profile-id (:profile-id params)) (assoc ::climit/label "create-file-media-object-from-url") - (climit/invoke! db/run! cfg create-file-media-object params)))) + (climit/invoke! #(db/run! %1 create-file-media-object %2) params)))) + ;; --- Clone File Media object (Upload and create from url) diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index ccb6a8b2e..89a50b6ad 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -233,7 +233,7 @@ :file-mtype (:mtype file)}})))) (defn- generate-thumbnail! - [file] + [_ file] (let [input (media/run {:cmd :info :input file}) thumb (media/run {:cmd :profile-thumbnail :format :jpeg @@ -250,15 +250,15 @@ :content-type (:mtype thumb)})) (defn upload-photo - [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}] + [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file] :as params}] (let [params (-> cfg - (assoc ::climit/id :process-image/global) + (assoc ::climit/id [[:process-image/by-profile (:profile-id params)] + [:process-image/global]]) (assoc ::climit/label "upload-photo") (assoc ::climit/executor executor) (climit/invoke! generate-thumbnail! file))] (sto/put-object! storage params))) - ;; --- MUTATION: Request Email Change (declare ^:private request-email-change!) diff --git a/frontend/src/app/main/data/users.cljs b/frontend/src/app/main/data/users.cljs index 392c6e055..8a540317f 100644 --- a/frontend/src/app/main/data/users.cljs +++ b/frontend/src/app/main/data/users.cljs @@ -163,7 +163,7 @@ (ptk/reify ::logged-in ev/Event (-data [_] - {::ev/name "signing" + {::ev/name "signin" ::ev/type "identify" :email (:email profile) :auth-backend (:auth-backend profile)