0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-12 18:18:24 -05:00

Add improved traceability of climit module

This commit is contained in:
Andrey Antukh 2024-04-15 14:09:22 +02:00
parent 8bf1b9c28e
commit 0135b477ca
3 changed files with 71 additions and 59 deletions

View file

@ -20,6 +20,7 @@
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.edn :as edn] [clojure.edn :as edn]
[clojure.set :as set]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.fs :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
@ -91,67 +92,77 @@
:timeout (:timeout config) :timeout (:timeout config)
:type :semaphore)) :type :semaphore))
(defmacro ^:private measure-and-log!
[metrics mlabels stats id action limit-id limit-label profile-id elapsed]
`(let [mpermits# (:max-permits ~stats)
mqueue# (:max-queue ~stats)
permits# (:permits ~stats)
queue# (:queue ~stats)
queue# (- queue# mpermits#)
queue# (if (neg? queue#) 0 queue#)
level# (if (pos? queue#) :warn :trace)]
(mtx/run! ~metrics (defn measure!
:id :rpc-climit-queue [metrics mlabels stats elapsed]
:val queue# (let [mpermits (:max-permits stats)
:labels ~mlabels) permits (:permits stats)
queue (:queue stats)
queue (- queue mpermits)
queue (if (neg? queue) 0 queue)]
(mtx/run! ~metrics (mtx/run! metrics
:id :rpc-climit-permits :id :rpc-climit-queue
:val permits# :val queue
:labels ~mlabels) :labels mlabels)
(l/log level# (mtx/run! metrics
:hint ~action :id :rpc-climit-permits
:req ~id :val permits
:id ~limit-id :labels mlabels)
:label ~limit-label
:profile-id (str ~profile-id) (when elapsed
:permits permits# (mtx/run! metrics
:queue queue# :id :rpc-climit-timing
:max-permits mpermits# :val (inst-ms elapsed)
:max-queue mqueue# :labels mlabels))))
~@(if (some? elapsed)
[:elapsed `(dt/format-duration ~elapsed)] (defn log!
[])))) [action req-id stats limit-id limit-label params elapsed]
(let [mpermits (:max-permits stats)
queue (:queue stats)
queue (- queue mpermits)
queue (if (neg? queue) 0 queue)
level (if (pos? queue) :warn :trace)]
(l/log level
:hint action
:req req-id
:id limit-id
:label limit-label
:queue queue
:elapsed (some-> elapsed dt/format-duration)
:params (-> (select-keys params [::rpc/profile-id :file-id :profile-id])
(set/rename-keys {::rpc/profile-id :profile-id})
(update-vals str)))))
(def ^:private idseq (AtomicLong. 0)) (def ^:private idseq (AtomicLong. 0))
(defn- invoke (defn- invoke
[limiter metrics limit-id limit-key limit-label profile-id f params] [limiter metrics limit-id limit-key limit-label handler params]
(let [tpoint (dt/tpoint) (let [tpoint (dt/tpoint)
mlabels (into-array String [(id->str limit-id)]) mlabels (into-array String [(id->str limit-id)])
limit-id (id->str limit-id limit-key) limit-id (id->str limit-id limit-key)
stats (pbh/get-stats limiter) stats (pbh/get-stats limiter)
id (.incrementAndGet ^AtomicLong idseq)] req-id (.incrementAndGet ^AtomicLong idseq)]
(try (try
(measure-and-log! metrics mlabels stats id "enqueued" limit-id limit-label profile-id nil) (measure! metrics mlabels stats nil)
(log! "enqueued" req-id stats limit-id limit-label params nil)
(px/invoke! limiter (fn [] (px/invoke! limiter (fn []
(let [elapsed (tpoint) (let [elapsed (tpoint)
stats (pbh/get-stats limiter)] stats (pbh/get-stats limiter)]
(measure-and-log! metrics mlabels stats id "acquired" limit-id limit-label profile-id elapsed)
(mtx/run! metrics (measure! metrics mlabels stats elapsed)
:id :rpc-climit-timing (log! "acquired" req-id stats limit-id limit-label params elapsed)
:val (inst-ms elapsed)
:labels mlabels) (handler params))))
(apply f params))))
(catch ExceptionInfo cause (catch ExceptionInfo cause
(let [{:keys [type code]} (ex-data cause)] (let [{:keys [type code]} (ex-data cause)]
(if (= :bulkhead-error type) (if (= :bulkhead-error type)
(let [elapsed (tpoint)] (let [elapsed (tpoint)]
(measure-and-log! metrics mlabels stats id "reject" limit-id limit-label profile-id elapsed) (log! "rejected" req-id stats limit-id limit-label params elapsed)
(ex/raise :type :concurrency-limit (ex/raise :type :concurrency-limit
:code code :code code
:hint "concurrency limit reached" :hint "concurrency limit reached"
@ -161,7 +172,9 @@
(finally (finally
(let [elapsed (tpoint) (let [elapsed (tpoint)
stats (pbh/get-stats limiter)] stats (pbh/get-stats limiter)]
(measure-and-log! metrics mlabels stats id "finished" limit-id limit-label profile-id elapsed))))))
(measure! metrics mlabels stats nil)
(log! "finished" req-id stats limit-id limit-label params elapsed))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; MIDDLEWARE ;; MIDDLEWARE
@ -219,10 +232,8 @@
(let [limit-key (key-fn params) (let [limit-key (key-fn params)
cache-key [limit-id limit-key] cache-key [limit-id limit-key]
limiter (cache/get cache cache-key (partial create-limiter config)) limiter (cache/get cache cache-key (partial create-limiter config))
profile-id (if (= key-fn ::rpc/profile-id) handler (partial handler cfg)]
limit-key (invoke limiter metrics limit-id limit-key label handler params)))))
(get params ::rpc/profile-id))]
(invoke limiter metrics limit-id limit-key label profile-id handler [cfg params])))))
(do (do
(l/wrn :hint "no config found for specified queue" :id (id->str limit-id)) (l/wrn :hint "no config found for specified queue" :id (id->str limit-id))
@ -237,15 +248,15 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- build-exec-chain (defn- build-exec-chain
[{:keys [::label ::profile-id ::rpc/climit ::mtx/metrics] :as cfg} f] [{:keys [::label ::rpc/climit ::mtx/metrics] :as cfg} f]
(let [config (get climit ::config) (let [config (get climit ::config)
cache (get climit ::cache)] cache (get climit ::cache)]
(reduce (fn [handler [limit-id limit-key :as ckey]] (reduce (fn [handler [limit-id limit-key :as ckey]]
(if-let [config (get config limit-id)] (if-let [config (get config limit-id)]
(fn [& params] (fn [cfg params]
(let [limiter (cache/get cache ckey (partial create-limiter config))] (let [limiter (cache/get cache ckey (partial create-limiter config))
(invoke limiter metrics limit-id limit-key label profile-id handler params))) handler (partial handler cfg)]
(invoke limiter metrics limit-id limit-key label handler params)))
(do (do
(l/wrn :hint "config not found" :label label :id limit-id) (l/wrn :hint "config not found" :label label :id limit-id)
f))) f)))
@ -255,9 +266,9 @@
(defn invoke! (defn invoke!
"Run a function in context of climit. "Run a function in context of climit.
Intended to be used in virtual threads." Intended to be used in virtual threads."
[{:keys [::executor] :as cfg} f & params] [{:keys [::executor] :as cfg} f params]
(let [f (if (some? executor) (let [f (if (some? executor)
(fn [& params] (px/await! (px/submit! executor (fn [] (apply f params))))) (fn [cfg params] (px/await! (px/submit! executor (fn [] (f cfg params)))))
f) f)
f (build-exec-chain cfg f)] f (build-exec-chain cfg f)]
(apply f params))) (f cfg params)))

View file

@ -243,12 +243,13 @@
;; NOTE: we use the climit here in a dynamic invocation because we ;; NOTE: we use the climit here in a dynamic invocation because we
;; don't want saturate the process-image limit with IO (download ;; don't want saturate the process-image limit with IO (download
;; of external image) ;; of external image)
(-> cfg (-> cfg
(assoc ::climit/id [[:process-image/by-profile (:profile-id params)] (assoc ::climit/id [[:process-image/by-profile (:profile-id params)]
[:process-image/global]]) [:process-image/global]])
(assoc ::climit/profile-id (:profile-id params))
(assoc ::climit/label "create-file-media-object-from-url") (assoc ::climit/label "create-file-media-object-from-url")
(climit/invoke! db/run! cfg create-file-media-object params)))) (climit/invoke! #(db/run! %1 create-file-media-object %2) params))))
;; --- Clone File Media object (Upload and create from url) ;; --- Clone File Media object (Upload and create from url)

View file

@ -233,7 +233,7 @@
:file-mtype (:mtype file)}})))) :file-mtype (:mtype file)}}))))
(defn- generate-thumbnail! (defn- generate-thumbnail!
[file] [_ file]
(let [input (media/run {:cmd :info :input file}) (let [input (media/run {:cmd :info :input file})
thumb (media/run {:cmd :profile-thumbnail thumb (media/run {:cmd :profile-thumbnail
:format :jpeg :format :jpeg
@ -250,15 +250,15 @@
:content-type (:mtype thumb)})) :content-type (:mtype thumb)}))
(defn upload-photo (defn upload-photo
[{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}] [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file] :as params}]
(let [params (-> cfg (let [params (-> cfg
(assoc ::climit/id :process-image/global) (assoc ::climit/id [[:process-image/by-profile (:profile-id params)]
[:process-image/global]])
(assoc ::climit/label "upload-photo") (assoc ::climit/label "upload-photo")
(assoc ::climit/executor executor) (assoc ::climit/executor executor)
(climit/invoke! generate-thumbnail! file))] (climit/invoke! generate-thumbnail! file))]
(sto/put-object! storage params))) (sto/put-object! storage params)))
;; --- MUTATION: Request Email Change ;; --- MUTATION: Request Email Change
(declare ^:private request-email-change!) (declare ^:private request-email-change!)