From 37ad04d2a6d20594a7d50bf29abe04b362467805 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sun, 16 Oct 2022 23:44:16 +0200 Subject: [PATCH] :tada: Add robust concurrency limiter for RPC --- backend/deps.edn | 2 + backend/resources/climit.edn | 7 + backend/resources/log4j2-devenv.xml | 1 + backend/src/app/config.clj | 8 +- backend/src/app/http/errors.clj | 17 ++ backend/src/app/main.clj | 4 +- backend/src/app/metrics.clj | 18 +- backend/src/app/rpc.clj | 9 +- backend/src/app/rpc/climit.clj | 205 ++++++++++++++++++++++ backend/src/app/rpc/commands/auth.clj | 8 +- backend/src/app/rpc/mutations/files.clj | 6 +- backend/src/app/rpc/mutations/fonts.clj | 12 +- backend/src/app/rpc/mutations/media.clj | 30 ++-- backend/src/app/rpc/mutations/profile.clj | 10 +- backend/src/app/rpc/mutations/teams.clj | 19 +- backend/src/app/rpc/semaphore.clj | 149 ---------------- common/deps.edn | 2 +- 17 files changed, 296 insertions(+), 211 deletions(-) create mode 100644 backend/resources/climit.edn create mode 100644 backend/src/app/rpc/climit.clj delete mode 100644 backend/src/app/rpc/semaphore.clj diff --git a/backend/deps.edn b/backend/deps.edn index 92fa269d5..b449a6ecc 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -37,6 +37,8 @@ buddy/buddy-hashers {:mvn/version "1.8.158"} buddy/buddy-sign {:mvn/version "3.4.333"} + com.github.ben-manes.caffeine/caffeine {:mvn/version "3.1.1"} + org.jsoup/jsoup {:mvn/version "1.15.1"} org.im4java/im4java {:git/tag "1.4.0-penpot-2" diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn new file mode 100644 index 000000000..697d16539 --- /dev/null +++ b/backend/resources/climit.edn @@ -0,0 +1,7 @@ +;; Example climit.edn file +;; Required: concurrency +;; Optional: queue-size, ommited means Integer/MAX_VALUE +{:update-file {:concurrency 1 :queue-size 3} + :auth {:concurrency 128} + :process-font {:concurrency 4 :queue-size 32} + :process-image {:concurrency 8 :queue-size 32}} diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml index b653c60fa..6e4c30572 100644 --- a/backend/resources/log4j2-devenv.xml +++ b/backend/resources/log4j2-devenv.xml @@ -32,6 +32,7 @@ + diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 9e65cd3d3..5ce6f52b1 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -52,7 +52,9 @@ :default-blob-version 5 :loggers-zmq-uri "tcp://localhost:45556" + :rpc-rlimit-config (fs/path "resources/rlimit.edn") + :rpc-climit-config (fs/path "resources/climit.edn") :file-change-snapshot-every 5 :file-change-snapshot-timeout "3h" @@ -90,6 +92,7 @@ (s/def ::default-rpc-rlimit ::us/vector-of-strings) (s/def ::rpc-rlimit-config ::fs/path) +(s/def ::rpc-climit-config ::fs/path) (s/def ::media-max-file-size ::us/integer) @@ -172,11 +175,6 @@ (s/def ::redis-uri ::us/string) (s/def ::registration-domain-whitelist ::us/set-of-strings) -(s/def ::semaphore-process-font ::us/integer) -(s/def ::semaphore-process-image ::us/integer) -(s/def ::semaphore-update-file ::us/integer) -(s/def ::semaphore-auth ::us/integer) - (s/def ::smtp-default-from ::us/string) (s/def ::smtp-default-reply-to ::us/string) (s/def ::smtp-host ::us/string) diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj index b66b416ec..f6764dbab 100644 --- a/backend/src/app/http/errors.clj +++ b/backend/src/app/http/errors.clj @@ -94,6 +94,23 @@ [err _] (yrs/response 404 (ex-data err))) +(defmethod handle-exception :internal + [error request] + (let [{:keys [code] :as edata} (ex-data error)] + (cond + (= :concurrency-limit-reached code) + (yrs/response 429) + + :else + (do + (l/error ::l/raw (ex-message error) + ::l/context (get-context request) + :cause error) + (yrs/response 500 {:type :server-error + :code :unhandled + :hint (ex-message error) + :data edata}))))) + (defmethod handle-exception org.postgresql.util.PSQLException [error request] (let [state (.getSQLState ^java.sql.SQLException error)] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 97bcdc7a2..9f3d5a150 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -204,7 +204,7 @@ {:pool (ig/ref :app.db/pool) :executor (ig/ref [::default :app.worker/executor])} - :app.rpc/semaphores + :app.rpc/climit {:metrics (ig/ref :app.metrics/metrics) :executor (ig/ref [::default :app.worker/executor])} @@ -224,11 +224,11 @@ :audit (ig/ref :app.loggers.audit/collector) :ldap (ig/ref :app.auth.ldap/provider) :http-client (ig/ref :app.http/client) + :climit (ig/ref :app.rpc/climit) :rlimit (ig/ref :app.rpc/rlimit) :executors (ig/ref :app.worker/executors) :executor (ig/ref [::default :app.worker/executor]) :templates (ig/ref :app.setup/builtin-templates) - :semaphores (ig/ref :app.rpc/semaphores) } :app.rpc.doc/routes diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index 1429b2f57..9f4d4a472 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -100,21 +100,21 @@ ::mdef/labels ["name"] ::mdef/type :summary} - :semaphore-queued-submissions - {::mdef/name "penpot_semaphore_queued_submissions" - ::mdef/help "Current number of queued submissions on SEMAPHORE." + :rpc-climit-queue-size + {::mdef/name "penpot_rpc_climit_queue_size" + ::mdef/help "Current number of queued submissions on the CLIMIT." ::mdef/labels ["name"] ::mdef/type :gauge} - :semaphore-used-permits - {::mdef/name "penpot_semaphore_used_permits" - ::mdef/help "Current number of used permits on SEMAPHORE." + :rpc-climit-concurrency + {::mdef/name "penpot_rpc_climit_concurrency" + ::mdef/help "Current number of used concurrency capacity on the CLIMIT" ::mdef/labels ["name"] ::mdef/type :gauge} - :semaphore-timing - {::mdef/name "penpot_semaphore_timing" - ::mdef/help "Total timing of SEMAPHORE." + :rpc-climit-timing + {::mdef/name "penpot_rpc_climit_timing" + ::mdef/help "Summary of the time between queuing and executing on the CLIMIT" ::mdef/labels ["name"] ::mdef/type :summary} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 01adf6b07..6a5e28035 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -15,9 +15,9 @@ [app.loggers.audit :as audit] [app.metrics :as mtx] [app.msgbus :as-alias mbus] + [app.rpc.climit :as climit] [app.rpc.retry :as retry] [app.rpc.rlimit :as rlimit] - [app.rpc.semaphore :as-alias rsem] [app.storage :as-alias sto] [app.util.services :as sv] [app.util.time :as ts] @@ -163,7 +163,7 @@ (wrap-dispatch cfg $ mdata) (wrap-metrics cfg $ mdata) (retry/wrap-retry cfg $ mdata) - (rsem/wrap cfg $ mdata) + (climit/wrap cfg $ mdata) (rlimit/wrap cfg $ mdata) (wrap-audit cfg $ mdata)) @@ -175,6 +175,7 @@ (fn [{:keys [::request] :as params}] ;; Raise authentication error when rpc method requires auth but ;; no profile-id is found in the request. + (p/do! (if (and auth? (not (uuid? (:profile-id params)))) (ex/raise :type :authentication @@ -182,7 +183,6 @@ :hint "authentication required for this endpoint") (let [params (us/conform spec (dissoc params ::request))] (f cfg (assoc params ::request request)))))) - mdata))) (defn- process-method @@ -238,6 +238,7 @@ (s/def ::http-client fn?) (s/def ::ldap (s/nilable map?)) (s/def ::msgbus ::mbus/msgbus) +(s/def ::climit (s/nilable ::climit/climit)) (s/def ::rlimit (s/nilable ::rlimit/rlimit)) (s/def ::public-uri ::us/not-empty-string) @@ -251,7 +252,7 @@ ::public-uri ::msgbus ::http-client - ::rsem/semaphores + ::climit ::rlimit ::mtx/metrics ::db/pool diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj new file mode 100644 index 000000000..76c6f44e7 --- /dev/null +++ b/backend/src/app/rpc/climit.clj @@ -0,0 +1,205 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.rpc.climit + "Concurrencly limiter for RPC." + (:require + [app.common.data :as d] + [app.common.exceptions :as ex] + [app.common.logging :as l] + [app.common.spec :as us] + [app.config :as cf] + [app.metrics :as mtx] + [app.rpc :as-alias rpc] + [app.util.services :as-alias sv] + [app.util.time :as dt] + [app.worker :as-alias wrk] + [clojure.edn :as edn] + [clojure.spec.alpha :as s] + [datoteka.fs :as fs] + [integrant.core :as ig] + [promesa.core :as p] + [promesa.exec :as px] + [promesa.exec.bulkhead :as pxb]) + (:import + com.github.benmanes.caffeine.cache.Cache + com.github.benmanes.caffeine.cache.CacheLoader + com.github.benmanes.caffeine.cache.Caffeine + com.github.benmanes.caffeine.cache.RemovalListener)) + +(defn- capacity-exception? + [o] + (and (ex/ex-info? o) + (let [data (ex-data o)] + (and (= :bulkhead-error (:type data)) + (= :capacity-limit-reached (:code data)))))) + +(defn invoke! + [limiter f] + (p/handle + (px/submit! limiter f) + (fn [result cause] + (cond + (capacity-exception? cause) + (p/rejected + (ex/error :type :internal + :code :concurrency-limit-reached + :queue (-> limiter meta :bkey name) + :cause cause)) + + (some? cause) + (p/rejected cause) + + :else + (p/resolved result))))) + +(defn- create-limiter + [{:keys [executor metrics concurrency queue-size bkey skey]}] + (let [labels (into-array String [(name bkey)]) + on-queue (fn [instance] + (l/trace :hint "enqueued" + :key (name bkey) + :skey (str skey) + :queue-size (get instance :current-queue-size) + :concurrency (get instance :current-concurrency) + (mtx/run! metrics + :id :rpc-climit-queue-size + :val (get instance :current-queue-size) + :labels labels) + (mtx/run! metrics + :id :rpc-climit-concurrency + :val (get instance :current-concurrency) + :labels labels))) + + on-run (fn [instance task] + (let [elapsed (- (inst-ms (dt/now)) + (inst-ms task))] + (l/trace :hint "execute" + :key (name bkey) + :skey (str skey) + :elapsed (str elapsed "ms")) + (mtx/run! metrics + :id :rpc-climit-timing + :val elapsed + :labels labels) + (mtx/run! metrics + :id :rpc-climit-queue-size + :val (get instance :current-queue-size) + :labels labels) + (mtx/run! metrics + :id :rpc-climit-concurrency + :val (get instance :current-concurrency) + :labels labels))) + + options {:executor executor + :concurrency concurrency + :queue-size (or queue-size Integer/MAX_VALUE) + :on-queue on-queue + :on-run on-run}] + + (-> (pxb/create options) + (vary-meta assoc :bkey bkey :skey skey)))) + +(defn- create-cache + [{:keys [executor] :as params} config] + (let [listener (reify RemovalListener + (onRemoval [_ key _val cause] + (l/trace :hint "cache: remove" :key key :reason (str cause)))) + + loader (reify CacheLoader + (load [_ key] + (let [[bkey skey] key] + (when-let [config (get config bkey)] + (-> (merge params config) + (assoc :bkey bkey) + (assoc :skey skey) + (create-limiter))))))] + + (.. (Caffeine/newBuilder) + (weakValues) + (executor executor) + (removalListener listener) + (build loader)))) + +(defprotocol IConcurrencyManager) + +(s/def ::concurrency ::us/integer) +(s/def ::queue-size ::us/integer) +(s/def ::config + (s/map-of keyword? + (s/keys :req-un [::concurrency] + :opt-un [::queue-size]))) + +(defmethod ig/prep-key ::rpc/climit + [_ cfg] + (merge {:path (cf/get :rpc-climit-config)} + (d/without-nils cfg))) + +(defmethod ig/pre-init-spec ::rpc/climit [_] + (s/keys :req-un [::wrk/executor ::mtx/metrics ::fs/path])) + +(defmethod ig/init-key ::rpc/climit + [_ {:keys [path] :as params}] + (when (contains? cf/flags :rpc-climit) + (if-let [config (some->> path slurp edn/read-string)] + (do + (l/info :hint "initializing concurrency limit" :config (str path)) + (us/verify! ::config config) + + (let [cache (create-cache params config)] + ^{::cache cache} + (reify + IConcurrencyManager + clojure.lang.IDeref + (deref [_] config) + + clojure.lang.ILookup + (valAt [_ key] + (let [key (if (vector? key) key [key])] + (.get ^Cache cache key)))))) + + (l/warn :hint "unable to load configuration" :config (str path))))) + + +(s/def ::climit #(satisfies? IConcurrencyManager %)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; PUBLIC API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defmacro with-dispatch + [lim & body] + `(if ~lim + (invoke! ~lim (^:once fn [] (p/wrap (do ~@body)))) + (p/wrap (do ~@body)))) + +(defn wrap + [{:keys [climit]} f {:keys [::queue ::key-fn] :as mdata}] + (if (and (some? climit) + (some? queue)) + (if-let [config (get @climit queue)] + (do + (l/debug :hint "wrap: instrumenting method" + :limit-name (name queue) + :service-name (::sv/name mdata) + :queue-size (or (:queue-size config) Integer/MAX_VALUE) + :concurrency (:concurrency config) + :keyed? (some? key-fn)) + (if (some? key-fn) + (fn [cfg params] + (let [key [queue (key-fn params)] + lim (get climit key)] + (invoke! lim (partial f cfg params)))) + + (let [lim (get climit queue)] + (fn [cfg params] + (invoke! lim (partial f cfg params)))))) + (do + (l/warn :hint "wrap: no config found" + :queue (name queue) + :service (::sv/name mdata)) + f)) + f)) diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index f66ed1358..f41f6bf92 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -16,10 +16,10 @@ [app.http.session :as session] [app.loggers.audit :as audit] [app.rpc :as-alias rpc] + [app.rpc.climit :as climit] [app.rpc.doc :as-alias doc] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] - [app.rpc.semaphore :as rsem] [app.tokens :as tokens] [app.util.services :as sv] [app.util.time :as dt] @@ -147,7 +147,7 @@ (sv/defmethod ::login-with-password "Performs authentication using penpot password." {:auth false - ::rsem/queue :auth + ::climit/queue :auth ::doc/added "1.15"} [cfg params] (login-with-password cfg params)) @@ -188,7 +188,7 @@ (sv/defmethod ::recover-profile {:auth false - ::rsem/queue :auth + ::climit/queue :auth ::doc/added "1.15"} [cfg params] (recover-profile cfg params)) @@ -438,7 +438,7 @@ (sv/defmethod ::register-profile {:auth false - ::rsem/queue :auth + ::climit/queue :auth ::doc/added "1.15"} [{:keys [pool] :as cfg} params] (db/with-atomic [conn pool] diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index 5884a1a43..d29382a37 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -22,10 +22,10 @@ [app.msgbus :as mbus] [app.rpc :as-alias rpc] [app.rpc.doc :as-alias doc] + [app.rpc.climit :as climit] [app.rpc.permissions :as perms] [app.rpc.queries.files :as files] [app.rpc.queries.projects :as proj] - [app.rpc.semaphore :as rsem] [app.storage.impl :as simpl] [app.util.blob :as blob] [app.util.objects-map :as omap] @@ -346,8 +346,8 @@ FOR KEY SHARE") (sv/defmethod ::update-file - {::rsem/queue :update-file - ::doc/added "1.0"} + {::climit/queue :update-file + ::climit/key-fn :id} [{:keys [pool] :as cfg} {:keys [id profile-id components-v2] :as params}] (db/with-atomic [conn pool] (db/xact-lock! conn id) diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj index 1868feae2..61f4b3ee5 100644 --- a/backend/src/app/rpc/mutations/fonts.clj +++ b/backend/src/app/rpc/mutations/fonts.clj @@ -12,14 +12,15 @@ [app.common.uuid :as uuid] [app.db :as db] [app.media :as media] + [app.rpc.climit :as-alias climit] [app.rpc.doc :as-alias doc] [app.rpc.queries.teams :as teams] - [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s] - [promesa.core :as p])) + [promesa.core :as p] + [promesa.exec :as px])) (declare create-font-variant) @@ -46,15 +47,15 @@ (create-font-variant cfg params))) (defn create-font-variant - [{:keys [storage pool executor semaphores] :as cfg} {:keys [data] :as params}] + [{:keys [storage pool executor climit] :as cfg} {:keys [data] :as params}] (letfn [(generate-fonts [data] - (rsem/with-dispatch (:process-font semaphores) + (climit/with-dispatch (:process-font climit) (media/run {:cmd :generate-fonts :input data}))) ;; Function responsible of calculating cryptographyc hash of ;; the provided data. (calculate-hash [data] - (rsem/with-dispatch (:process-font semaphores) + (px/with-dispatch executor (sto/calculate-hash data))) (validate-data [data] @@ -120,6 +121,7 @@ and font_id = ?") (sv/defmethod ::update-font + {::climit/queue :process-font} [{:keys [pool] :as cfg} {:keys [team-id profile-id id name] :as params}] (db/with-atomic [conn pool] (teams/check-edition-permissions! conn profile-id team-id) diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj index 2e0483647..2971d64bc 100644 --- a/backend/src/app/rpc/mutations/media.clj +++ b/backend/src/app/rpc/mutations/media.clj @@ -14,8 +14,8 @@ [app.config :as cf] [app.db :as db] [app.media :as media] + [app.rpc.climit :as climit] [app.rpc.queries.teams :as teams] - [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.storage.tmp :as tmp] [app.util.services :as sv] @@ -23,7 +23,8 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.io :as io] - [promesa.core :as p])) + [promesa.core :as p] + [promesa.exec :as px])) (def default-max-file-size (* 1024 1024 10)) ; 10 MiB @@ -104,25 +105,25 @@ ;; inverse, soft referential integrity). (defn create-file-media-object - [{:keys [storage pool semaphores] :as cfg} + [{:keys [storage pool climit executor] :as cfg} {:keys [id file-id is-local name content] :as params}] (letfn [;; Function responsible to retrieve the file information, as ;; it is synchronous operation it should be wrapped into ;; with-dispatch macro. (get-info [content] - (rsem/with-dispatch (:process-image semaphores) + (climit/with-dispatch (:process-image climit) (media/run {:cmd :info :input content}))) ;; Function responsible of calculating cryptographyc hash of ;; the provided data. (calculate-hash [data] - (rsem/with-dispatch (:process-image semaphores) + (px/with-dispatch executor (sto/calculate-hash data))) ;; Function responsible of generating thumnail. As it is synchronous ;; opetation, it should be wrapped into with-dispatch macro (generate-thumbnail [info] - (rsem/with-dispatch (:process-image semaphores) + (climit/with-dispatch (:process-image climit) (media/run (assoc thumbnail-options :cmd :generic-thumbnail :input info)))) @@ -154,14 +155,15 @@ :bucket "file-media-object"}))) (insert-into-database [info image thumb] - (db/exec-one! pool [sql:create-file-media-object - (or id (uuid/next)) - file-id is-local name - (:id image) - (:id thumb) - (:width info) - (:height info) - (:mtype info)]))] + (px/with-dispatch executor + (db/exec-one! pool [sql:create-file-media-object + (or id (uuid/next)) + file-id is-local name + (:id image) + (:id thumb) + (:width info) + (:height info) + (:mtype info)])))] (p/let [info (get-info content) thumb (create-thumbnail info) diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj index 56846c30d..da087c8c1 100644 --- a/backend/src/app/rpc/mutations/profile.clj +++ b/backend/src/app/rpc/mutations/profile.clj @@ -16,11 +16,11 @@ [app.loggers.audit :as audit] [app.media :as media] [app.rpc :as-alias rpc] + [app.rpc.climit :as-alias climit] [app.rpc.commands.auth :as cmd.auth] [app.rpc.doc :as-alias doc] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] - [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.tokens :as tokens] [app.util.services :as sv] @@ -83,11 +83,11 @@ (s/keys :req-un [::profile-id ::password ::old-password])) (sv/defmethod ::update-profile-password - {::rsem/queue :auth} + {::climit/queue :auth} [{:keys [pool] :as cfg} {:keys [password] :as params}] (db/with-atomic [conn pool] (let [profile (validate-password! conn params) - session-id (:app.rpc/session-id params)] + session-id (::rpc/session-id params)] (when (= (str/lower (:email profile)) (str/lower (:password params))) (ex/raise :type :validation @@ -309,7 +309,7 @@ (sv/defmethod ::login {:auth false - ::rsem/queue :auth + ::climit/queue :auth ::doc/added "1.0" ::doc/deprecated "1.15"} [cfg params] @@ -354,7 +354,7 @@ (sv/defmethod ::register-profile {:auth false - ::rsem/queue :auth + ::climit/queue :auth ::doc/added "1.0" ::doc/deprecated "1.15"} [{:keys [pool] :as cfg} params] diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj index da04e4c65..288cfdf76 100644 --- a/backend/src/app/rpc/mutations/teams.clj +++ b/backend/src/app/rpc/mutations/teams.clj @@ -17,11 +17,11 @@ [app.loggers.audit :as audit] [app.media :as media] [app.rpc :as-alias rpc] + [app.rpc.climit :as climit] [app.rpc.mutations.projects :as projects] [app.rpc.permissions :as perms] [app.rpc.queries.profile :as profile] [app.rpc.queries.teams :as teams] - [app.rpc.semaphore :as rsem] [app.storage :as sto] [app.tokens :as tokens] [app.util.services :as sv] @@ -316,13 +316,13 @@ (assoc team :photo-id (:id photo)))) (defn upload-photo - [{:keys [storage semaphores] :as cfg} {:keys [file]}] + [{:keys [storage executor climit] :as cfg} {:keys [file]}] (letfn [(get-info [content] - (rsem/with-dispatch (:process-image semaphores) + (climit/with-dispatch (:process-image climit) (media/run {:cmd :info :input content}))) (generate-thumbnail [info] - (rsem/with-dispatch (:process-image semaphores) + (climit/with-dispatch (:process-image climit) (media/run {:cmd :profile-thumbnail :format :jpeg :quality 85 @@ -333,7 +333,7 @@ ;; Function responsible of calculating cryptographyc hash of ;; the provided data. (calculate-hash [data] - (rsem/with-dispatch (:process-image semaphores) + (px/with-dispatch executor (sto/calculate-hash data)))] (p/let [info (get-info file) @@ -341,11 +341,10 @@ hash (calculate-hash (:data thumb)) content (-> (sto/content (:data thumb) (:size thumb)) (sto/wrap-with-hash hash))] - (rsem/with-dispatch (:process-image semaphores) - (sto/put-object! storage {::sto/content content - ::sto/deduplicate? true - :bucket "profile" - :content-type (:mtype thumb)}))))) + (sto/put-object! storage {::sto/content content + ::sto/deduplicate? true + :bucket "profile" + :content-type (:mtype thumb)})))) ;; --- Mutation: Invite Member diff --git a/backend/src/app/rpc/semaphore.clj b/backend/src/app/rpc/semaphore.clj deleted file mode 100644 index 5e8a5a5ed..000000000 --- a/backend/src/app/rpc/semaphore.clj +++ /dev/null @@ -1,149 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) KALEIDOS INC - -(ns app.rpc.semaphore - "Resource usage limits (in other words: semaphores)." - (:require - [app.common.data :as d] - [app.common.logging :as l] - [app.common.spec :as us] - [app.config :as cf] - [app.metrics :as mtx] - [app.rpc :as-alias rpc] - [app.util.locks :as locks] - [app.util.time :as ts] - [app.worker :as-alias wrk] - [clojure.spec.alpha :as s] - [integrant.core :as ig] - [promesa.core :as p])) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; ASYNC SEMAPHORE IMPL -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defprotocol IAsyncSemaphore - (acquire! [_]) - (release! [_ tp])) - -(defn create - [& {:keys [permits metrics name executor]}] - (let [used (volatile! 0) - queue (volatile! (d/queue)) - labels (into-array String [(d/name name)]) - lock (locks/create) - permits (or permits Long/MAX_VALUE)] - - (when (>= permits Long/MAX_VALUE) - (l/warn :hint "permits value too high" :permits permits :semaphore name)) - - ^{::wrk/executor executor - ::name name} - (reify IAsyncSemaphore - (acquire! [_] - (let [d (p/deferred)] - (locks/locking lock - (if (< @used permits) - (do - (vswap! used inc) - (p/resolve! d)) - (vswap! queue conj d))) - - (mtx/run! metrics - :id :semaphore-used-permits - :val @used - :labels labels) - (mtx/run! metrics - :id :semaphore-queued-submissions - :val (count @queue) - :labels labels) - d)) - - (release! [_ tp] - (locks/locking lock - (if-let [item (peek @queue)] - (do - (vswap! queue pop) - (p/resolve! item)) - (when (pos? @used) - (vswap! used dec)))) - - (mtx/run! metrics - :id :semaphore-timing - :val (inst-ms (tp)) - :labels labels) - (mtx/run! metrics - :id :semaphore-used-permits - :val @used - :labels labels) - (mtx/run! metrics - :id :semaphore-queued-submissions - :val (count @queue) - :labels labels))))) - -(defn semaphore? - [v] - (satisfies? IAsyncSemaphore v)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; PREDEFINED SEMAPHORES -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(s/def ::semaphore semaphore?) -(s/def ::semaphores - (s/map-of ::us/keyword ::semaphore)) - -(defmethod ig/pre-init-spec ::rpc/semaphores [_] - (s/keys :req-un [::mtx/metrics])) - -(defn- create-default-semaphores - [metrics executor] - [(create :permits (cf/get :semaphore-process-font) - :metrics metrics - :name :process-font - :executor executor) - (create :permits (cf/get :semaphore-update-file) - :metrics metrics - :name :update-file - :executor executor) - (create :permits (cf/get :semaphore-process-image) - :metrics metrics - :name :process-image - :executor executor) - (create :permits (cf/get :semaphore-auth) - :metrics metrics - :name :auth - :executor executor)]) - -(defmethod ig/init-key ::rpc/semaphores - [_ {:keys [metrics executor]}] - (->> (create-default-semaphores metrics executor) - (d/index-by (comp ::name meta)))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; PUBLIC API -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defmacro with-dispatch - [queue & body] - `(let [tpoint# (ts/tpoint) - queue# ~queue - executor# (-> queue# meta ::wrk/executor)] - (-> (acquire! queue#) - (p/then (fn [_#] ~@body) executor#) - (p/finally (fn [_# _#] - (release! queue# tpoint#)))))) - -(defn wrap - [{:keys [semaphores]} f {:keys [::queue]}] - (let [queue' (get semaphores queue)] - (if (semaphore? queue') - (fn [cfg params] - (with-dispatch queue' - (f cfg params))) - (do - (when (some? queue) - (l/warn :hint "undefined semaphore" :name queue)) - f)))) diff --git a/common/deps.edn b/common/deps.edn index 25ff87865..c8a61a63e 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -21,7 +21,7 @@ com.cognitect/transit-cljs {:mvn/version "0.8.280"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/promesa {:mvn/version "8.0.450"} + funcool/promesa {:mvn/version "9.0.507"} funcool/cuerdas {:mvn/version "2022.06.16-403"} lambdaisland/uri {:mvn/version "1.13.95"