0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-23 23:18:48 -05:00

Make the RPC climit subsystem more robust

This commit is contained in:
Andrey Antukh 2023-11-24 14:53:16 +01:00
parent bb5a4c0fa5
commit 54341d5b22
8 changed files with 135 additions and 134 deletions

View file

@ -3,15 +3,17 @@
;; Optional: queue, ommited means Integer/MAX_VALUE
;; Optional: timeout, ommited means no timeout
;; Note: queue and timeout are excluding
{:update-file-by-id {:permits 1 :queue 3}
:update-file {:permits 20}
{:update-file/by-profile
{:permits 1 :queue 5}
:derive-password {:permits 8}
:process-font {:permits 4 :queue 32}
:process-image {:permits 8 :queue 32}
:update-file/global {:permits 20}
:file-thumbnail-ops
:derive-password/global {:permits 8}
:process-font/global {:permits 4}
:process-image/global {:permits 8}
:file-thumbnail-ops/by-profile
{:permits 2}
:submit-audit-events-by-profile
:submit-audit-events/by-profile
{:permits 1 :queue 3}}

View file

@ -310,8 +310,7 @@
::wrk/executor (ig/ref ::wrk/executor)}
:app.rpc/climit
{::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/executor (ig/ref ::wrk/executor)}
{::mtx/metrics (ig/ref ::mtx/metrics)}
:app.rpc/rlimit
{::wrk/executor (ig/ref ::wrk/executor)}

View file

@ -31,19 +31,24 @@
(set! *warn-on-reflection* true)
(defn- id->str
[id]
(-> (str id)
(subs 1)))
(defn- create-bulkhead-cache
[{:keys [::wrk/executor]} config]
(letfn [(load-fn [key]
(let [config (get config (nth key 0))]
(l/trc :hint "insert into cache" :key key)
[config]
(letfn [(load-fn [[id skey]]
(when-let [config (get config id)]
(l/trc :hint "insert into cache" :id (id->str id) :key skey)
(pbh/create :permits (or (:permits config) (:concurrency config))
:queue (or (:queue config) (:queue-size config))
:timeout (:timeout config)
:executor executor
:type (:type config :semaphore))))
:type :semaphore)))
(on-remove [_ _ cause]
(l/trc :hint "evict from cache" :key key :reason (str cause)))]
(on-remove [key _ cause]
(let [[id skey] key]
(l/trc :hint "evict from cache" :id (id->str id) :key skey :reason (str cause))))]
(cache/create :executor :same-thread
:on-remove on-remove
@ -65,22 +70,21 @@
(s/def ::path ::fs/path)
(defmethod ig/pre-init-spec ::rpc/climit [_]
(s/keys :req [::wrk/executor ::mtx/metrics ::path]))
(s/keys :req [::mtx/metrics ::path]))
(defmethod ig/init-key ::rpc/climit
[_ {:keys [::path ::mtx/metrics ::wrk/executor] :as cfg}]
[_ {:keys [::path ::mtx/metrics] :as cfg}]
(when (contains? cf/flags :rpc-climit)
(when-let [params (some->> path slurp edn/read-string)]
(l/inf :hint "initializing concurrency limit" :config (str path))
(us/verify! ::config params)
{::cache (create-bulkhead-cache cfg params)
{::cache (create-bulkhead-cache params)
::config params
::wrk/executor executor
::mtx/metrics metrics})))
(s/def ::cache cache/cache?)
(s/def ::instance
(s/keys :req [::cache ::config ::wrk/executor]))
(s/keys :req [::cache ::config]))
(s/def ::rpc/climit
(s/nilable ::instance))
@ -91,107 +95,94 @@
(defn invoke!
[cache metrics id key f]
(let [limiter (cache/get cache [id key])
tpoint (dt/tpoint)
labels (into-array String [(name id)])
(if-let [limiter (cache/get cache [id key])]
(let [tpoint (dt/tpoint)
labels (into-array String [(id->str id)])
wrapped (fn []
(let [elapsed (tpoint)
stats (pbh/get-stats limiter)]
(l/trc :hint "acquired"
:id (id->str id)
:key key
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed))
wrapped
(fn []
(let [elapsed (tpoint)
stats (pbh/get-stats limiter)]
(l/trc :hint "executed"
:id (name id)
:key key
:fnh (hash f)
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed))
(mtx/run! metrics
:id :rpc-climit-timing
:val (inst-ms elapsed)
:labels labels)
(try
(f)
(finally
(let [elapsed (tpoint)]
(l/trc :hint "finished"
:id (id->str id)
:key key
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed)))))))
measure!
(fn [stats]
(mtx/run! metrics
:id :rpc-climit-timing
:val (inst-ms elapsed)
:id :rpc-climit-queue
:val (:queue stats)
:labels labels)
(try
(f)
(finally
(let [elapsed (tpoint)]
(l/trc :hint "finished"
:id (name id)
:key key
:fnh (hash f)
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed)))))))
measure!
(fn [stats]
(mtx/run! metrics
:id :rpc-climit-queue
:val (:queue stats)
:labels labels)
(mtx/run! metrics
:id :rpc-climit-permits
:val (:permits stats)
:labels labels))]
(mtx/run! metrics
:id :rpc-climit-permits
:val (:permits stats)
:labels labels))]
(try
(let [stats (pbh/get-stats limiter)]
(measure! stats)
(l/trc :hint "enqueued"
:id (name id)
:key key
:fnh (hash f)
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats))
(pbh/invoke! limiter wrapped))
(catch ExceptionInfo cause
(let [{:keys [type code]} (ex-data cause)]
(if (= :bulkhead-error type)
(ex/raise :type :concurrency-limit
:code code
:hint "concurrency limit reached")
(throw cause))))
(try
(let [stats (pbh/get-stats limiter)]
(measure! stats)
(l/trc :hint "enqueued"
:id (id->str id)
:key key
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats))
(pbh/invoke! limiter wrapped))
(catch ExceptionInfo cause
(let [{:keys [type code]} (ex-data cause)]
(if (= :bulkhead-error type)
(ex/raise :type :concurrency-limit
:code code
:hint "concurrency limit reached")
(throw cause))))
(finally
(measure! (pbh/get-stats limiter))))))
(finally
(measure! (pbh/get-stats limiter)))))
(defn run!
[{:keys [::id ::cache ::mtx/metrics]} f]
(if (and cache id)
(invoke! cache metrics id nil f)
(f)))
(defn submit!
[{:keys [::id ::cache ::wrk/executor ::mtx/metrics]} f]
(let [f (partial px/submit! executor (px/wrap-bindings f))]
(if (and cache id)
(p/await! (invoke! cache metrics id nil f))
(p/await! (f)))))
(do
(l/wrn :hint "unable to load limiter" :id (id->str id))
(f))))
(defn configure
([{:keys [::rpc/climit]} id]
(us/assert! ::rpc/climit climit)
(assoc climit ::id id))
([{:keys [::rpc/climit]} id executor]
(us/assert! ::rpc/climit climit)
(-> climit
(assoc ::id id)
(assoc ::wrk/executor executor))))
[{:keys [::rpc/climit]} id]
(us/assert! ::rpc/climit climit)
(assoc climit ::id id))
(defmacro with-dispatch!
"Dispatch blocking operation to a separated thread protected with the
specified concurrency limiter. If climit is not active, the function
will be scheduled to execute without concurrency monitoring."
[instance & body]
(if (vector? instance)
`(-> (app.rpc.climit/configure ~@instance)
(app.rpc.climit/run! (^:once fn* [] ~@body)))
`(run! ~instance (^:once fn* [] ~@body))))
(defn run!
"Run a function in context of climit.
Intended to be used in virtual threads."
([{:keys [::id ::cache ::mtx/metrics]} f]
(if (and cache id)
(invoke! cache metrics id nil f)
(f)))
([{:keys [::id ::cache ::mtx/metrics]} f executor]
(let [f (fn []
(let [f (px/wrap-bindings f)]
(p/await! (px/submit! executor f))))]
(if (and cache id)
(invoke! cache metrics id nil f)
(f)))))
(def noop-fn (constantly nil))
@ -201,7 +192,7 @@
(if-let [config (get-in climit [::config id])]
(let [cache (::cache climit)]
(l/dbg :hint "instrumenting method"
:limit (name id)
:limit (id->str id)
:service-name (::sv/name mdata)
:timeout (:timeout config)
:permits (:permits config)
@ -212,7 +203,7 @@
(invoke! cache metrics id (key-fn params) (partial f cfg params))))
(do
(l/wrn :hint "no config found for specified queue" :id id)
(l/wrn :hint "no config found for specified queue" :id (id->str id))
f))
f))

View file

@ -64,7 +64,7 @@
[:events [:vector schema:event]]])
(sv/defmethod ::push-audit-events
{::climit/id :submit-audit-events-by-profile
{::climit/id :submit-audit-events/by-profile
::climit/key-fn ::rpc/profile-id
::sm/params schema:push-audit-events
::audit/skip true

View file

@ -34,6 +34,7 @@
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.set :as set]))
;; --- SCHEMA
@ -133,8 +134,8 @@
;; database.
(sv/defmethod ::update-file
{::climit/id :update-file-by-id
::climit/key-fn :id
{::climit/id :update-file/by-profile
::climit/key-fn ::rpc/profile-id
::webhooks/event? true
::webhooks/batch-timeout (dt/duration "2m")
::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id)
@ -231,13 +232,15 @@
:team-id (:team-id file)}))))))
(defn- update-file*
[{:keys [::db/conn] :as cfg}
[{:keys [::db/conn ::wrk/executor] :as cfg}
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
(let [;; Process the file data in the CLIMIT context; scheduling it
;; to be executed on a separated executor for avoid to do the
;; CPU intensive operation on vthread.
file (-> (climit/configure cfg :update-file)
(climit/submit! (partial update-file-data conn file changes skip-validate)))]
update-fdata-fn (partial update-file-data conn file changes skip-validate)
file (-> (climit/configure cfg :update-file/global)
(climit/run! update-fdata-fn executor))]
(db/insert! conn :file-change
{:id (uuid/next)

View file

@ -25,6 +25,7 @@
[app.storage :as sto]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]))
(def valid-weight #{100 200 300 400 500 600 700 800 900 950})
@ -159,8 +160,9 @@
:ttf-file-id (:id ttf)}))
]
(let [data (-> (climit/configure cfg :process-font)
(climit/submit! (partial generate-missing! data)))
(let [data (-> (climit/configure cfg :process-font/global)
(climit/run! (partial generate-missing! data)
(::wrk/executor cfg)))
assets (persist-fonts-files! data)
result (insert-font-variant! assets)]
(vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys))))))

View file

@ -23,6 +23,7 @@
[app.storage :as sto]
[app.storage.tmp :as tmp]
[app.util.services :as sv]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.io :as io]))
@ -142,11 +143,11 @@
(assoc ::image (process-main-image info)))))
(defn create-file-media-object
[{:keys [::sto/storage ::db/conn] :as cfg}
[{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg}
{:keys [id file-id is-local name content]}]
(let [result (-> (climit/configure cfg :process-image)
(climit/submit! (partial process-image content)))
(let [result (-> (climit/configure cfg :process-image/global)
(climit/run! (partial process-image content) executor))
image (sto/put-object! storage (::image result))
thumb (when-let [params (::thumb result)]

View file

@ -26,6 +26,7 @@
[app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[cuerdas.core :as str]))
(declare check-profile-existence!)
@ -230,9 +231,9 @@
:content-type (:mtype thumb)}))
(defn upload-photo
[{:keys [::sto/storage] :as cfg} {:keys [file]}]
(let [params (-> (climit/configure cfg :process-image)
(climit/submit! (partial generate-thumbnail! file)))]
[{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}]
(let [params (-> (climit/configure cfg :process-image/global)
(climit/run! (partial generate-thumbnail! file) executor))]
(sto/put-object! storage params)))
@ -426,13 +427,15 @@
(defn derive-password
[cfg password]
(when password
(-> (climit/configure cfg :derive-password)
(climit/submit! (partial auth/derive-password password)))))
(-> (climit/configure cfg :derive-password/global)
(climit/run! (partial auth/derive-password password)
(::wrk/executor cfg)))))
(defn verify-password
[cfg password password-data]
(-> (climit/configure cfg :derive-password)
(climit/submit! (partial auth/verify-password password password-data))))
(-> (climit/configure cfg :derive-password/global)
(climit/run! (partial auth/verify-password password password-data)
(::wrk/executor cfg))))
(defn decode-row
[{:keys [props] :as row}]