diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn index 4a485b8f7..6bb330927 100644 --- a/backend/resources/climit.edn +++ b/backend/resources/climit.edn @@ -3,15 +3,17 @@ ;; Optional: queue, ommited means Integer/MAX_VALUE ;; Optional: timeout, ommited means no timeout ;; Note: queue and timeout are excluding -{:update-file-by-id {:permits 1 :queue 3} - :update-file {:permits 20} +{:update-file/by-profile + {:permits 1 :queue 5} - :derive-password {:permits 8} - :process-font {:permits 4 :queue 32} - :process-image {:permits 8 :queue 32} + :update-file/global {:permits 20} - :file-thumbnail-ops + :derive-password/global {:permits 8} + :process-font/global {:permits 4} + :process-image/global {:permits 8} + + :file-thumbnail-ops/by-profile {:permits 2} - :submit-audit-events-by-profile + :submit-audit-events/by-profile {:permits 1 :queue 3}} diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 22c5109de..125f238d3 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -310,8 +310,7 @@ ::wrk/executor (ig/ref ::wrk/executor)} :app.rpc/climit - {::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/executor (ig/ref ::wrk/executor)} + {::mtx/metrics (ig/ref ::mtx/metrics)} :app.rpc/rlimit {::wrk/executor (ig/ref ::wrk/executor)} diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index 279723d57..f5004d575 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -31,19 +31,24 @@ (set! *warn-on-reflection* true) +(defn- id->str + [id] + (-> (str id) + (subs 1))) + (defn- create-bulkhead-cache - [{:keys [::wrk/executor]} config] - (letfn [(load-fn [key] - (let [config (get config (nth key 0))] - (l/trc :hint "insert into cache" :key key) + [config] + (letfn [(load-fn [[id skey]] + (when-let [config (get config id)] + (l/trc :hint "insert into cache" :id (id->str id) :key skey) (pbh/create :permits (or (:permits config) (:concurrency config)) :queue (or (:queue config) (:queue-size config)) :timeout (:timeout config) - :executor executor - :type (:type config :semaphore)))) + :type :semaphore))) - (on-remove [_ _ cause] - (l/trc :hint "evict from cache" :key key :reason (str cause)))] + (on-remove [key _ cause] + (let [[id skey] key] + (l/trc :hint "evict from cache" :id (id->str id) :key skey :reason (str cause))))] (cache/create :executor :same-thread :on-remove on-remove @@ -65,22 +70,21 @@ (s/def ::path ::fs/path) (defmethod ig/pre-init-spec ::rpc/climit [_] - (s/keys :req [::wrk/executor ::mtx/metrics ::path])) + (s/keys :req [::mtx/metrics ::path])) (defmethod ig/init-key ::rpc/climit - [_ {:keys [::path ::mtx/metrics ::wrk/executor] :as cfg}] + [_ {:keys [::path ::mtx/metrics] :as cfg}] (when (contains? cf/flags :rpc-climit) (when-let [params (some->> path slurp edn/read-string)] (l/inf :hint "initializing concurrency limit" :config (str path)) (us/verify! ::config params) - {::cache (create-bulkhead-cache cfg params) + {::cache (create-bulkhead-cache params) ::config params - ::wrk/executor executor ::mtx/metrics metrics}))) (s/def ::cache cache/cache?) (s/def ::instance - (s/keys :req [::cache ::config ::wrk/executor])) + (s/keys :req [::cache ::config])) (s/def ::rpc/climit (s/nilable ::instance)) @@ -91,107 +95,94 @@ (defn invoke! [cache metrics id key f] - (let [limiter (cache/get cache [id key]) - tpoint (dt/tpoint) - labels (into-array String [(name id)]) + (if-let [limiter (cache/get cache [id key])] + (let [tpoint (dt/tpoint) + labels (into-array String [(id->str id)]) + wrapped (fn [] + (let [elapsed (tpoint) + stats (pbh/get-stats limiter)] + (l/trc :hint "acquired" + :id (id->str id) + :key key + :permits (:permits stats) + :queue (:queue stats) + :max-permits (:max-permits stats) + :max-queue (:max-queue stats) + :elapsed (dt/format-duration elapsed)) - wrapped - (fn [] - (let [elapsed (tpoint) - stats (pbh/get-stats limiter)] - (l/trc :hint "executed" - :id (name id) - :key key - :fnh (hash f) - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed)) + (mtx/run! metrics + :id :rpc-climit-timing + :val (inst-ms elapsed) + :labels labels) + (try + (f) + (finally + (let [elapsed (tpoint)] + (l/trc :hint "finished" + :id (id->str id) + :key key + :permits (:permits stats) + :queue (:queue stats) + :max-permits (:max-permits stats) + :max-queue (:max-queue stats) + :elapsed (dt/format-duration elapsed))))))) + measure! + (fn [stats] (mtx/run! metrics - :id :rpc-climit-timing - :val (inst-ms elapsed) + :id :rpc-climit-queue + :val (:queue stats) :labels labels) - (try - (f) - (finally - (let [elapsed (tpoint)] - (l/trc :hint "finished" - :id (name id) - :key key - :fnh (hash f) - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed))))))) - measure! - (fn [stats] - (mtx/run! metrics - :id :rpc-climit-queue - :val (:queue stats) - :labels labels) - (mtx/run! metrics - :id :rpc-climit-permits - :val (:permits stats) - :labels labels))] + (mtx/run! metrics + :id :rpc-climit-permits + :val (:permits stats) + :labels labels))] - (try - (let [stats (pbh/get-stats limiter)] - (measure! stats) - (l/trc :hint "enqueued" - :id (name id) - :key key - :fnh (hash f) - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats)) - (pbh/invoke! limiter wrapped)) - (catch ExceptionInfo cause - (let [{:keys [type code]} (ex-data cause)] - (if (= :bulkhead-error type) - (ex/raise :type :concurrency-limit - :code code - :hint "concurrency limit reached") - (throw cause)))) + (try + (let [stats (pbh/get-stats limiter)] + (measure! stats) + (l/trc :hint "enqueued" + :id (id->str id) + :key key + :permits (:permits stats) + :queue (:queue stats) + :max-permits (:max-permits stats) + :max-queue (:max-queue stats)) + (pbh/invoke! limiter wrapped)) + (catch ExceptionInfo cause + (let [{:keys [type code]} (ex-data cause)] + (if (= :bulkhead-error type) + (ex/raise :type :concurrency-limit + :code code + :hint "concurrency limit reached") + (throw cause)))) - (finally - (measure! (pbh/get-stats limiter)))))) + (finally + (measure! (pbh/get-stats limiter))))) - -(defn run! - [{:keys [::id ::cache ::mtx/metrics]} f] - (if (and cache id) - (invoke! cache metrics id nil f) - (f))) - -(defn submit! - [{:keys [::id ::cache ::wrk/executor ::mtx/metrics]} f] - (let [f (partial px/submit! executor (px/wrap-bindings f))] - (if (and cache id) - (p/await! (invoke! cache metrics id nil f)) - (p/await! (f))))) + (do + (l/wrn :hint "unable to load limiter" :id (id->str id)) + (f)))) (defn configure - ([{:keys [::rpc/climit]} id] - (us/assert! ::rpc/climit climit) - (assoc climit ::id id)) - ([{:keys [::rpc/climit]} id executor] - (us/assert! ::rpc/climit climit) - (-> climit - (assoc ::id id) - (assoc ::wrk/executor executor)))) + [{:keys [::rpc/climit]} id] + (us/assert! ::rpc/climit climit) + (assoc climit ::id id)) -(defmacro with-dispatch! - "Dispatch blocking operation to a separated thread protected with the - specified concurrency limiter. If climit is not active, the function - will be scheduled to execute without concurrency monitoring." - [instance & body] - (if (vector? instance) - `(-> (app.rpc.climit/configure ~@instance) - (app.rpc.climit/run! (^:once fn* [] ~@body))) - `(run! ~instance (^:once fn* [] ~@body)))) +(defn run! + "Run a function in context of climit. + Intended to be used in virtual threads." + ([{:keys [::id ::cache ::mtx/metrics]} f] + (if (and cache id) + (invoke! cache metrics id nil f) + (f))) + + ([{:keys [::id ::cache ::mtx/metrics]} f executor] + (let [f (fn [] + (let [f (px/wrap-bindings f)] + (p/await! (px/submit! executor f))))] + (if (and cache id) + (invoke! cache metrics id nil f) + (f))))) (def noop-fn (constantly nil)) @@ -201,7 +192,7 @@ (if-let [config (get-in climit [::config id])] (let [cache (::cache climit)] (l/dbg :hint "instrumenting method" - :limit (name id) + :limit (id->str id) :service-name (::sv/name mdata) :timeout (:timeout config) :permits (:permits config) @@ -212,7 +203,7 @@ (invoke! cache metrics id (key-fn params) (partial f cfg params)))) (do - (l/wrn :hint "no config found for specified queue" :id id) + (l/wrn :hint "no config found for specified queue" :id (id->str id)) f)) f)) diff --git a/backend/src/app/rpc/commands/audit.clj b/backend/src/app/rpc/commands/audit.clj index 8049595c9..fa5608721 100644 --- a/backend/src/app/rpc/commands/audit.clj +++ b/backend/src/app/rpc/commands/audit.clj @@ -64,7 +64,7 @@ [:events [:vector schema:event]]]) (sv/defmethod ::push-audit-events - {::climit/id :submit-audit-events-by-profile + {::climit/id :submit-audit-events/by-profile ::climit/key-fn ::rpc/profile-id ::sm/params schema:push-audit-events ::audit/skip true diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index f827257cf..5aabe5fce 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -34,6 +34,7 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.set :as set])) ;; --- SCHEMA @@ -133,8 +134,8 @@ ;; database. (sv/defmethod ::update-file - {::climit/id :update-file-by-id - ::climit/key-fn :id + {::climit/id :update-file/by-profile + ::climit/key-fn ::rpc/profile-id ::webhooks/event? true ::webhooks/batch-timeout (dt/duration "2m") ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id) @@ -231,13 +232,15 @@ :team-id (:team-id file)})))))) (defn- update-file* - [{:keys [::db/conn] :as cfg} + [{:keys [::db/conn ::wrk/executor] :as cfg} {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] (let [;; Process the file data in the CLIMIT context; scheduling it ;; to be executed on a separated executor for avoid to do the ;; CPU intensive operation on vthread. - file (-> (climit/configure cfg :update-file) - (climit/submit! (partial update-file-data conn file changes skip-validate)))] + + update-fdata-fn (partial update-file-data conn file changes skip-validate) + file (-> (climit/configure cfg :update-file/global) + (climit/run! update-fdata-fn executor))] (db/insert! conn :file-change {:id (uuid/next) diff --git a/backend/src/app/rpc/commands/fonts.clj b/backend/src/app/rpc/commands/fonts.clj index 256132e84..f22c8c1ef 100644 --- a/backend/src/app/rpc/commands/fonts.clj +++ b/backend/src/app/rpc/commands/fonts.clj @@ -25,6 +25,7 @@ [app.storage :as sto] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.spec.alpha :as s])) (def valid-weight #{100 200 300 400 500 600 700 800 900 950}) @@ -159,8 +160,9 @@ :ttf-file-id (:id ttf)})) ] - (let [data (-> (climit/configure cfg :process-font) - (climit/submit! (partial generate-missing! data))) + (let [data (-> (climit/configure cfg :process-font/global) + (climit/run! (partial generate-missing! data) + (::wrk/executor cfg))) assets (persist-fonts-files! data) result (insert-font-variant! assets)] (vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys)))))) diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index ef13c969f..04ad8bc9b 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -23,6 +23,7 @@ [app.storage :as sto] [app.storage.tmp :as tmp] [app.util.services :as sv] + [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.io :as io])) @@ -142,11 +143,11 @@ (assoc ::image (process-main-image info))))) (defn create-file-media-object - [{:keys [::sto/storage ::db/conn] :as cfg} + [{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg} {:keys [id file-id is-local name content]}] - (let [result (-> (climit/configure cfg :process-image) - (climit/submit! (partial process-image content))) + (let [result (-> (climit/configure cfg :process-image/global) + (climit/run! (partial process-image content) executor)) image (sto/put-object! storage (::image result)) thumb (when-let [params (::thumb result)] diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index 60d181009..1a11e57f2 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -26,6 +26,7 @@ [app.tokens :as tokens] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [cuerdas.core :as str])) (declare check-profile-existence!) @@ -230,9 +231,9 @@ :content-type (:mtype thumb)})) (defn upload-photo - [{:keys [::sto/storage] :as cfg} {:keys [file]}] - (let [params (-> (climit/configure cfg :process-image) - (climit/submit! (partial generate-thumbnail! file)))] + [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}] + (let [params (-> (climit/configure cfg :process-image/global) + (climit/run! (partial generate-thumbnail! file) executor))] (sto/put-object! storage params))) @@ -426,13 +427,15 @@ (defn derive-password [cfg password] (when password - (-> (climit/configure cfg :derive-password) - (climit/submit! (partial auth/derive-password password))))) + (-> (climit/configure cfg :derive-password/global) + (climit/run! (partial auth/derive-password password) + (::wrk/executor cfg))))) (defn verify-password [cfg password password-data] - (-> (climit/configure cfg :derive-password) - (climit/submit! (partial auth/verify-password password password-data)))) + (-> (climit/configure cfg :derive-password/global) + (climit/run! (partial auth/verify-password password password-data) + (::wrk/executor cfg)))) (defn decode-row [{:keys [props] :as row}]