From a5c6d78ee5020509a5b40d4bee0983e302af52c9 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sat, 27 Jan 2024 22:33:52 +0100 Subject: [PATCH] :recycle: Fix some fundamental bugs on climit module The climit previously of this commit is heavily used inside a transactions, so in heavy contention operation such that file thumbnail creation can cause a db pool exhaust. This commit fixes this issue setting up a better resource limiting mechanism that works outside the transactions so, contention will no longer hold an open connection/transaction. It also adds general improvement to the traceability to the climit mechanism: it now properly logs the profile-id that is currently cause some contention on specific resources. It also add a general/root climit that is applied to all requests so if someone start making abussive requests, we can clearly detect it. --- backend/resources/climit.edn | 19 +- backend/src/app/main.clj | 4 +- backend/src/app/rpc.clj | 3 +- backend/src/app/rpc/climit.clj | 266 +++++++++++------- backend/src/app/rpc/commands/auth.clj | 58 ++-- .../src/app/rpc/commands/files_thumbnails.clj | 14 +- backend/src/app/rpc/commands/files_update.clj | 18 +- backend/src/app/rpc/commands/fonts.clj | 13 +- backend/src/app/rpc/commands/media.clj | 45 +-- backend/src/app/rpc/commands/profile.clj | 56 ++-- backend/test/backend_tests/helpers.clj | 1 + common/src/app/common/logging.cljc | 6 + 12 files changed, 291 insertions(+), 212 deletions(-) diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn index 6bb330927..34d218415 100644 --- a/backend/resources/climit.edn +++ b/backend/resources/climit.edn @@ -3,15 +3,26 @@ ;; Optional: queue, ommited means Integer/MAX_VALUE ;; Optional: timeout, ommited means no timeout ;; Note: queue and timeout are excluding -{:update-file/by-profile +{:update-file/global {:permits 20} + :update-file/by-profile {:permits 1 :queue 5} - :update-file/global {:permits 20} + :process-font/global {:permits 4} + :process-font/by-profile {:permits 1} - :derive-password/global {:permits 8} - :process-font/global {:permits 4} :process-image/global {:permits 8} + :process-image/by-profile {:permits 1} + :auth/global {:permits 8} + + :root/global + {:permits 40} + + :root/by-profile + {:permits 10} + + :file-thumbnail-ops/global + {:permits 20} :file-thumbnail-ops/by-profile {:permits 2} diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 7028be8bf..47e43f5cf 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -322,9 +322,7 @@ ::rpc/climit (ig/ref ::rpc/climit) ::rpc/rlimit (ig/ref ::rpc/rlimit) ::setup/templates (ig/ref ::setup/templates) - ::props (ig/ref ::setup/props) - - :pool (ig/ref ::db/pool)} + ::props (ig/ref ::setup/props)} :app.rpc.doc/routes {:methods (ig/ref :app.rpc/methods)} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index f9c36515a..08ccd8cdb 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -240,8 +240,7 @@ ::mtx/metrics ::main/props] :opt [::climit - ::rlimit] - :req-un [::db/pool])) + ::rlimit])) (defmethod ig/init-key ::methods [_ cfg] diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index 71c64b596..cf2942c22 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -21,26 +21,31 @@ [app.worker :as-alias wrk] [clojure.edn :as edn] [clojure.spec.alpha :as s] + [cuerdas.core :as str] [datoteka.fs :as fs] [integrant.core :as ig] - [promesa.core :as p] [promesa.exec :as px] [promesa.exec.bulkhead :as pbh]) (:import - clojure.lang.ExceptionInfo)) + clojure.lang.ExceptionInfo + java.util.concurrent.atomic.AtomicLong)) (set! *warn-on-reflection* true) (defn- id->str - [id] - (-> (str id) - (subs 1))) + ([id] + (-> (str id) + (subs 1))) + ([id key] + (if key + (str (-> (str id) (subs 1)) "/" key) + (id->str id)))) (defn- create-cache [{:keys [::wrk/executor]}] (letfn [(on-remove [key _ cause] (let [[id skey] key] - (l/dbg :hint "destroy limiter" :id (id->str id) :key skey :reason (str cause))))] + (l/dbg :hint "disposed" :id (id->str id skey) :reason (str cause))))] (cache/create :executor executor :on-remove on-remove :keepalive "5m"))) @@ -81,132 +86,179 @@ (defn- create-limiter [config [id skey]] - (l/dbg :hint "create limiter" :id (id->str id) :key skey) + (l/dbg :hint "created" :id (id->str id skey)) (pbh/create :permits (or (:permits config) (:concurrency config)) :queue (or (:queue config) (:queue-size config)) :timeout (:timeout config) :type :semaphore)) -(defn- invoke! - [config cache metrics id key f] - (if-let [limiter (cache/get cache [id key] (partial create-limiter config))] - (let [tpoint (dt/tpoint) - labels (into-array String [(id->str id)]) - wrapped (fn [] - (let [elapsed (tpoint) - stats (pbh/get-stats limiter)] - (l/trc :hint "acquired" - :id (id->str id) - :key key - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed)) +(defmacro ^:private measure-and-log! + [metrics mlabels stats id action limit-id limit-label profile-id elapsed] + `(let [mpermits# (:max-permits ~stats) + mqueue# (:max-queue ~stats) + permits# (:permits ~stats) + queue# (:queue ~stats) + queue# (- queue# mpermits#) + queue# (if (neg? queue#) 0 queue#) + level# (if (pos? queue#) :warn :trace)] - (mtx/run! metrics - :id :rpc-climit-timing - :val (inst-ms elapsed) - :labels labels) - (try - (f) - (finally - (let [elapsed (tpoint)] - (l/trc :hint "finished" - :id (id->str id) - :key key - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed))))))) - measure! - (fn [stats] - (mtx/run! metrics - :id :rpc-climit-queue - :val (:queue stats) - :labels labels) - (mtx/run! metrics - :id :rpc-climit-permits - :val (:permits stats) - :labels labels))] + (mtx/run! ~metrics + :id :rpc-climit-queue + :val queue# + :labels ~mlabels) - (try - (let [stats (pbh/get-stats limiter)] - (measure! stats) - (l/trc :hint "enqueued" - :id (id->str id) - :key key - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats)) - (px/invoke! limiter wrapped)) - (catch ExceptionInfo cause - (let [{:keys [type code]} (ex-data cause)] - (if (= :bulkhead-error type) + (mtx/run! ~metrics + :id :rpc-climit-permits + :val permits# + :labels ~mlabels) + + (l/log level# + :hint ~action + :req ~id + :id ~limit-id + :label ~limit-label + :profile-id (str ~profile-id) + :permits permits# + :queue queue# + :max-permits mpermits# + :max-queue mqueue# + ~@(if (some? elapsed) + [:elapsed `(dt/format-duration ~elapsed)] + [])))) + +(def ^:private idseq (AtomicLong. 0)) + +(defn- invoke + [limiter metrics limit-id limit-key limit-label profile-id f params] + (let [tpoint (dt/tpoint) + limit-id (id->str limit-id limit-key) + mlabels (into-array String [limit-id]) + stats (pbh/get-stats limiter) + id (.incrementAndGet ^AtomicLong idseq)] + + (try + (measure-and-log! metrics mlabels stats id "enqueued" limit-id limit-label profile-id nil) + (px/invoke! limiter (fn [] + (let [elapsed (tpoint) + stats (pbh/get-stats limiter)] + (measure-and-log! metrics mlabels stats id "acquired" limit-id limit-label profile-id elapsed) + (mtx/run! metrics + :id :rpc-climit-timing + :val (inst-ms elapsed) + :labels mlabels) + (apply f params)))) + + (catch ExceptionInfo cause + (let [{:keys [type code]} (ex-data cause)] + (if (= :bulkhead-error type) + (let [elapsed (tpoint)] + (measure-and-log! metrics mlabels stats id "reject" limit-id limit-label profile-id elapsed) (ex/raise :type :concurrency-limit :code code - :hint "concurrency limit reached") - (throw cause)))) + :hint "concurrency limit reached" + :cause cause)) + (throw cause)))) - (finally - (measure! (pbh/get-stats limiter))))) - - (do - (l/wrn :hint "no limiter found" :id (id->str id)) - (f)))) + (finally + (let [elapsed (tpoint) + stats (pbh/get-stats limiter)] + (measure-and-log! metrics mlabels stats id "finished" limit-id limit-label profile-id elapsed)))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MIDDLEWARE ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(def noop-fn (constantly nil)) +(def ^:private noop-fn (constantly nil)) +(def ^:private global-limits + [[:root/global noop-fn] + [:root/by-profile ::rpc/profile-id]]) + +(defn- get-limits + [cfg] + (when-let [ref (get cfg ::id)] + (cond + (keyword? ref) + [[ref]] + + (and (vector? ref) + (keyword (first ref))) + [ref] + + (and (vector? ref) + (vector? (first ref))) + (rseq ref) + + :else + (throw (IllegalArgumentException. "unable to normalize limit"))))) (defn wrap - [{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}] - (if (and (some? climit) (some? id)) - (let [cache (::cache climit) - config (::config climit)] - (if-let [config (get config id)] - (do - (l/dbg :hint "instrumenting method" - :limit (id->str id) - :service-name (::sv/name mdata) - :timeout (:timeout config) - :permits (:permits config) - :queue (:queue config) - :keyed? (not= key-fn noop-fn)) + [{:keys [::rpc/climit ::mtx/metrics]} handler mdata] + (let [cache (::cache climit) + config (::config climit) + label (::sv/name mdata)] - (fn [cfg params] - (invoke! config cache metrics id (key-fn params) (partial f cfg params)))) + (reduce (fn [handler [limit-id key-fn]] + (if-let [config (get config limit-id)] + (let [key-fn (or key-fn noop-fn)] + (l/dbg :hint "instrumenting method" + :method label + :limit (id->str limit-id) + :timeout (:timeout config) + :permits (:permits config) + :queue (:queue config) + :keyed (not= key-fn noop-fn)) - (do - (l/wrn :hint "no config found for specified queue" :id (id->str id)) - f))) - f)) + (if (and (= key-fn ::rpc/profile-id) + (false? (::rpc/auth mdata true))) + + ;; We don't enforce by-profile limit on methods that does + ;; not require authentication + handler + + (fn [cfg params] + (let [limit-key (key-fn params) + cache-key [limit-id limit-key] + limiter (cache/get cache cache-key (partial create-limiter config)) + profile-id (if (= key-fn ::rpc/profile-id) + limit-key + (get params ::rpc/profile-id))] + (invoke limiter metrics limit-id limit-key label profile-id handler [cfg params]))))) + + (do + (l/wrn :hint "no config found for specified queue" :id (id->str limit-id)) + handler))) + + handler + (concat global-limits (get-limits mdata))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; PUBLIC API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn configure - [{:keys [::rpc/climit]} id] - (us/assert! ::rpc/climit climit) - (assoc climit ::id id)) +(defn- build-exec-chain + [{:keys [::label ::profile-id ::rpc/climit ::mtx/metrics] :as cfg} f] + (let [config (get climit ::config) + cache (get climit ::cache)] -(defn run! + (reduce (fn [handler [limit-id limit-key :as ckey]] + (let [config (get config limit-id)] + (when-not config + (throw (IllegalArgumentException. + (str/ffmt "config not found for: %" limit-id)))) + + (fn [& params] + (let [limiter (cache/get cache ckey (partial create-limiter config))] + (invoke limiter metrics limit-id limit-key label profile-id handler params))))) + f + (get-limits cfg)))) + +(defn invoke! "Run a function in context of climit. Intended to be used in virtual threads." - ([{:keys [::id ::cache ::config ::mtx/metrics]} f] - (if-let [config (get config id)] - (invoke! config cache metrics id nil f) - (f))) - - ([{:keys [::id ::cache ::config ::mtx/metrics]} f executor] - (let [f #(p/await! (px/submit! executor f))] - (if-let [config (get config id)] - (invoke! config cache metrics id nil f) - (f))))) - + [{:keys [::executor] :as cfg} f & params] + (let [f (if (some? executor) + (fn [& params] (px/await! (px/submit! executor (fn [] (apply f params))))) + f) + f (build-exec-chain cfg f)] + (apply f params))) diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index c9b55b599..2e82e5640 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -21,6 +21,7 @@ [app.loggers.audit :as audit] [app.main :as-alias main] [app.rpc :as-alias rpc] + [app.rpc.climit :as-alias climit] [app.rpc.commands.profile :as profile] [app.rpc.commands.teams :as teams] [app.rpc.doc :as-alias doc] @@ -39,7 +40,7 @@ ;; ---- COMMAND: login with password (defn login-with-password - [{:keys [::db/pool] :as cfg} {:keys [email password] :as params}] + [cfg {:keys [email password] :as params}] (when-not (or (contains? cf/flags :login) (contains? cf/flags :login-with-password)) @@ -47,7 +48,7 @@ :code :login-disabled :hint "login is disabled in this instance")) - (letfn [(check-password [conn profile password] + (letfn [(check-password [cfg profile password] (if (= (:password profile) "!") (ex/raise :type :validation :code :account-without-password @@ -57,10 +58,10 @@ (l/trc :hint "updating profile password" :id (str (:id profile)) :email (:email profile)) - (profile/update-profile-password! conn (assoc profile :password password))) + (profile/update-profile-password! cfg (assoc profile :password password))) (:valid result)))) - (validate-profile [conn profile] + (validate-profile [cfg profile] (when-not profile (ex/raise :type :validation :code :wrong-credentials)) @@ -70,7 +71,7 @@ (when (:is-blocked profile) (ex/raise :type :restriction :code :profile-blocked)) - (when-not (check-password conn profile password) + (when-not (check-password cfg profile password) (ex/raise :type :validation :code :wrong-credentials)) (when-let [deleted-at (:deleted-at profile)] @@ -78,27 +79,29 @@ (ex/raise :type :validation :code :wrong-credentials))) - profile)] + profile) - (db/with-atomic [conn pool] - (let [profile (->> (profile/get-profile-by-email conn email) - (validate-profile conn) - (profile/strip-private-attrs)) + (login [{:keys [::db/conn] :as cfg}] + (let [profile (->> (profile/get-profile-by-email conn email) + (validate-profile cfg) + (profile/strip-private-attrs)) - invitation (when-let [token (:invitation-token params)] - (tokens/verify (::main/props cfg) {:token token :iss :team-invitation})) + invitation (when-let [token (:invitation-token params)] + (tokens/verify (::main/props cfg) {:token token :iss :team-invitation})) - ;; If invitation member-id does not matches the profile-id, we just proceed to ignore the - ;; invitation because invitations matches exactly; and user can't login with other email and - ;; accept invitation with other email - response (if (and (some? invitation) (= (:id profile) (:member-id invitation))) - {:invitation-token (:invitation-token params)} - (assoc profile :is-admin (let [admins (cf/get :admins)] - (contains? admins (:email profile)))))] - (-> response - (rph/with-transform (session/create-fn cfg (:id profile))) - (rph/with-meta {::audit/props (audit/profile->props profile) - ::audit/profile-id (:id profile)})))))) + ;; If invitation member-id does not matches the profile-id, we just proceed to ignore the + ;; invitation because invitations matches exactly; and user can't login with other email and + ;; accept invitation with other email + response (if (and (some? invitation) (= (:id profile) (:member-id invitation))) + {:invitation-token (:invitation-token params)} + (assoc profile :is-admin (let [admins (cf/get :admins)] + (contains? admins (:email profile)))))] + (-> response + (rph/with-transform (session/create-fn cfg (:id profile))) + (rph/with-meta {::audit/props (audit/profile->props profile) + ::audit/profile-id (:id profile)}))))] + + (db/tx-run! cfg login))) (def schema:login-with-password [:map {:title "login-with-password"} @@ -110,6 +113,7 @@ "Performs authentication using penpot password." {::rpc/auth false ::doc/added "1.15" + ::climit/id :auth/global ::sm/params schema:login-with-password} [cfg params] (login-with-password cfg params)) @@ -149,7 +153,8 @@ (sv/defmethod ::recover-profile {::rpc/auth false ::doc/added "1.15" - ::sm/params schema:recover-profile} + ::sm/params schema:recover-profile + ::climit/id :auth/global} [cfg params] (recover-profile cfg params)) @@ -360,7 +365,6 @@ {::audit/type "fact" ::audit/name "register-profile-retry" ::audit/profile-id id})) - (cond ;; If invitation token comes in params, this is because the ;; user comes from team-invitation process; in this case, @@ -402,7 +406,6 @@ {::audit/replace-props (audit/profile->props profile) ::audit/profile-id (:id profile)}))))) - (def schema:register-profile [:map {:title "register-profile"} [:token schema:token] @@ -411,7 +414,8 @@ (sv/defmethod ::register-profile {::rpc/auth false ::doc/added "1.15" - ::sm/params schema:register-profile} + ::sm/params schema:register-profile + ::climit/id :auth/global} [{:keys [::db/pool] :as cfg} params] (db/with-atomic [conn pool] (-> (assoc cfg ::db/conn conn) diff --git a/backend/src/app/rpc/commands/files_thumbnails.clj b/backend/src/app/rpc/commands/files_thumbnails.clj index f47300bde..a44a8bdbd 100644 --- a/backend/src/app/rpc/commands/files_thumbnails.clj +++ b/backend/src/app/rpc/commands/files_thumbnails.clj @@ -285,12 +285,10 @@ (sv/defmethod ::create-file-object-thumbnail {::doc/added "1.19" ::doc/module :files - ::climit/id :file-thumbnail-ops/by-profile - ::climit/key-fn ::rpc/profile-id - + ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id] + [:file-thumbnail-ops/global]] ::rtry/enabled true ::rtry/when rtry/conflict-exception? - ::audit/skip true ::sm/params schema:create-file-object-thumbnail} @@ -332,8 +330,8 @@ {::doc/added "1.19" ::doc/module :files ::doc/deprecated "1.20" - ::climit/id :file-thumbnail-ops - ::climit/key-fn ::rpc/profile-id + ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id] + [:file-thumbnail-ops/global]] ::audit/skip true} [cfg {:keys [::rpc/profile-id file-id object-id]}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] @@ -408,8 +406,8 @@ {::doc/added "1.19" ::doc/module :files ::audit/skip true - ::climit/id :file-thumbnail-ops - ::climit/key-fn ::rpc/profile-id + ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id] + [:file-thumbnail-ops/global]] ::rtry/enabled true ::rtry/when rtry/conflict-exception? ::sm/params schema:create-file-thumbnail} diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index 134a12794..fade957e0 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -35,7 +35,8 @@ [app.util.services :as sv] [app.util.time :as dt] [app.worker :as-alias wrk] - [clojure.set :as set])) + [clojure.set :as set] + [promesa.exec :as px])) ;; --- SCHEMA @@ -132,8 +133,8 @@ ;; database. (sv/defmethod ::update-file - {::climit/id :update-file/by-profile - ::climit/key-fn ::rpc/profile-id + {::climit/id [[:update-file/by-profile ::rpc/profile-id] + [:update-file/global]] ::webhooks/event? true ::webhooks/batch-timeout (dt/duration "2m") ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id) @@ -232,13 +233,9 @@ (defn- update-file* [{:keys [::db/conn ::wrk/executor] :as cfg} {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] - (let [;; Process the file data in the CLIMIT context; scheduling it - ;; to be executed on a separated executor for avoid to do the - ;; CPU intensive operation on vthread. - - update-fdata-fn (partial update-file-data cfg file changes skip-validate) - file (-> (climit/configure cfg :update-file/global) - (climit/run! update-fdata-fn executor))] + (let [;; Process the file data on separated thread for avoid to do + ;; the CPU intensive operation on vthread. + file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))] (db/insert! conn :file-change {:id (uuid/next) @@ -306,7 +303,6 @@ (fmg/migrate-file)) file) - ;; WARNING: this ruins performance; maybe we need to find ;; some other way to do general validation libs (when (and (or (contains? cf/flags :file-validation) diff --git a/backend/src/app/rpc/commands/fonts.clj b/backend/src/app/rpc/commands/fonts.clj index c19b8a285..0942da601 100644 --- a/backend/src/app/rpc/commands/fonts.clj +++ b/backend/src/app/rpc/commands/fonts.clj @@ -16,7 +16,7 @@ [app.loggers.webhooks :as-alias webhooks] [app.media :as media] [app.rpc :as-alias rpc] - [app.rpc.climit :as climit] + [app.rpc.climit :as-alias climit] [app.rpc.commands.files :as files] [app.rpc.commands.projects :as projects] [app.rpc.commands.teams :as teams] @@ -26,7 +26,8 @@ [app.storage :as sto] [app.util.services :as sv] [app.util.time :as dt] - [app.worker :as-alias wrk])) + [app.worker :as-alias wrk] + [promesa.exec :as px])) (def valid-weight #{100 200 300 400 500 600 700 800 900 950}) (def valid-style #{"normal" "italic"}) @@ -87,6 +88,8 @@ (sv/defmethod ::create-font-variant {::doc/added "1.18" + ::climit/id [[:process-font/by-profile ::rpc/profile-id] + [:process-font/global]] ::webhooks/event? true ::sm/params schema:create-font-variant} [cfg {:keys [::rpc/profile-id team-id] :as params}] @@ -100,7 +103,7 @@ (create-font-variant cfg (assoc params :profile-id profile-id)))))) (defn create-font-variant - [{:keys [::sto/storage ::db/conn] :as cfg} {:keys [data] :as params}] + [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}] (letfn [(generate-missing! [data] (let [data (media/run {:cmd :generate-fonts :input data})] (when (and (not (contains? data "font/otf")) @@ -152,9 +155,7 @@ :otf-file-id (:id otf) :ttf-file-id (:id ttf)}))] - (let [data (-> (climit/configure cfg :process-font/global) - (climit/run! (partial generate-missing! data) - (::wrk/executor cfg))) + (let [data (px/invoke! executor (partial generate-missing! data)) assets (persist-fonts-files! data) result (insert-font-variant! assets)] (vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys)))))) diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index a3dc357db..1bdcd3c50 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -27,7 +27,8 @@ [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [datoteka.io :as io])) + [datoteka.io :as io] + [promesa.exec :as px])) (def default-max-file-size (* 1024 1024 10)) ; 10 MiB @@ -56,20 +57,25 @@ :opt-un [::id])) (sv/defmethod ::upload-file-media-object - {::doc/added "1.17"} + {::doc/added "1.17" + ::climit/id [[:process-image/by-profile ::rpc/profile-id] + [:process-image/global]]} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}] (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] + (files/check-edition-permissions! pool profile-id file-id) (media/validate-media-type! content) (media/validate-media-size! content) - (let [object (db/run! cfg #(create-file-media-object % params)) - props {:name (:name params) - :file-id file-id - :is-local (:is-local params) - :size (:size content) - :mtype (:mtype content)}] - (with-meta object - {::audit/replace-props props})))) + + (db/run! cfg (fn [cfg] + (let [object (create-file-media-object cfg params) + props {:name (:name params) + :file-id file-id + :is-local (:is-local params) + :size (:size content) + :mtype (:mtype content)}] + (with-meta object + {::audit/replace-props props})))))) (defn- big-enough-for-thumbnail? "Checks if the provided image info is big enough for @@ -144,12 +150,10 @@ (assoc ::image (process-main-image info))))) (defn create-file-media-object - [{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg} + [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [id file-id is-local name content]}] - (let [result (-> (climit/configure cfg :process-image/global) - (climit/run! (partial process-image content) executor)) - + (let [result (px/invoke! executor (partial process-image content)) image (sto/put-object! storage (::image result)) thumb (when-let [params (::thumb result)] (sto/put-object! storage params))] @@ -183,7 +187,7 @@ [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (files/check-edition-permissions! pool profile-id file-id) - (db/run! cfg #(create-file-media-object-from-url % params)))) + (create-file-media-object-from-url cfg (assoc params :profile-id profile-id)))) (defn download-image [{:keys [::http/client]} uri] @@ -235,7 +239,16 @@ params (-> params (assoc :content content) (assoc :name (or name (:filename content))))] - (create-file-media-object cfg params))) + + ;; NOTE: we use the climit here in a dynamic invocation because we + ;; don't want saturate the process-image limit with IO (download + ;; of external image) + (-> cfg + (assoc ::climit/id [[:process-image/by-profile (:profile-id params)] + [:process-image/global]]) + (assoc ::climit/profile-id (:profile-id params)) + (assoc ::climit/label "create-file-media-object-from-url") + (climit/invoke! db/run! cfg create-file-media-object params)))) ;; --- Clone File Media object (Upload and create from url) diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index 5b814abe6..a2fa82ba4 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -28,7 +28,8 @@ [app.util.services :as sv] [app.util.time :as dt] [app.worker :as-alias wrk] - [cuerdas.core :as str])) + [cuerdas.core :as str] + [promesa.exec :as px])) (declare check-profile-existence!) (declare decode-row) @@ -137,25 +138,24 @@ [:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]])) (sv/defmethod ::update-profile-password - {:doc/added "1.0" + {::doc/added "1.0" ::sm/params schema:update-profile-password - ::sm/result :nil} + ::climit/id :auth/global} + [cfg {:keys [::rpc/profile-id password] :as params}] - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id password] :as params}] - (db/with-atomic [conn pool] - (let [cfg (assoc cfg ::db/conn conn) - profile (validate-password! cfg (assoc params :profile-id profile-id)) - session-id (::session/id params)] + (db/tx-run! cfg (fn [cfg] + (let [profile (validate-password! cfg (assoc params :profile-id profile-id)) + session-id (::session/id params)] - (when (= (str/lower (:email profile)) - (str/lower (:password params))) - (ex/raise :type :validation - :code :email-as-password - :hint "you can't use your email as password")) + (when (= (str/lower (:email profile)) + (str/lower (:password params))) + (ex/raise :type :validation + :code :email-as-password + :hint "you can't use your email as password")) - (update-profile-password! conn (assoc profile :password password)) - (invalidate-profile-session! cfg profile-id session-id) - nil))) + (update-profile-password! cfg (assoc profile :password password)) + (invalidate-profile-session! cfg profile-id session-id) + nil)))) (defn- invalidate-profile-session! "Removes all sessions except the current one." @@ -173,10 +173,10 @@ profile)) (defn update-profile-password! - [conn {:keys [id password] :as profile}] + [{:keys [::db/conn] :as cfg} {:keys [id password] :as profile}] (when-not (db/read-only? conn) (db/update! conn :profile - {:password (auth/derive-password password)} + {:password (derive-password cfg password)} {:id id}) nil)) @@ -203,6 +203,7 @@ (defn update-profile-photo [{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}] + (let [photo (upload-photo cfg params) profile (db/get-by-id pool :profile profile-id ::sql/for-update true)] @@ -241,8 +242,11 @@ (defn upload-photo [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}] - (let [params (-> (climit/configure cfg :process-image/global) - (climit/run! (partial generate-thumbnail! file) executor))] + (let [params (-> cfg + (assoc ::climit/id :process-image/global) + (assoc ::climit/label "upload-photo") + (assoc ::climit/executor executor) + (climit/invoke! generate-thumbnail! file))] (sto/put-object! storage params))) @@ -438,17 +442,13 @@ (into {} (filter (fn [[k _]] (simple-ident? k))) props)) (defn derive-password - [cfg password] + [{:keys [::wrk/executor]} password] (when password - (-> (climit/configure cfg :derive-password/global) - (climit/run! (partial auth/derive-password password) - (::wrk/executor cfg))))) + (px/invoke! executor (partial auth/derive-password password)))) (defn verify-password - [cfg password password-data] - (-> (climit/configure cfg :derive-password/global) - (climit/run! (partial auth/verify-password password password-data) - (::wrk/executor cfg)))) + [{:keys [::wrk/executor]} password password-data] + (px/invoke! executor (partial auth/verify-password password password-data))) (defn decode-row [{:keys [props] :as row}] diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 8073c40a7..ad08d5b62 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -71,6 +71,7 @@ :enable-email-verification :enable-smtp :enable-quotes + :enable-rpc-climit :enable-feature-fdata-pointer-map :enable-feature-fdata-objets-map :enable-feature-components-v2 diff --git a/common/src/app/common/logging.cljc b/common/src/app/common/logging.cljc index fe7a0e8f5..d7780ef70 100644 --- a/common/src/app/common/logging.cljc +++ b/common/src/app/common/logging.cljc @@ -319,6 +319,12 @@ ::message (delay ~message)}) nil))) +(defmacro log + [level & params] + `(do + (log! ::logger ~(str *ns*) ::level ~level ~@params) + nil)) + (defmacro info [& params] `(do