diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn index 6bb330927..34d218415 100644 --- a/backend/resources/climit.edn +++ b/backend/resources/climit.edn @@ -3,15 +3,26 @@ ;; Optional: queue, ommited means Integer/MAX_VALUE ;; Optional: timeout, ommited means no timeout ;; Note: queue and timeout are excluding -{:update-file/by-profile +{:update-file/global {:permits 20} + :update-file/by-profile {:permits 1 :queue 5} - :update-file/global {:permits 20} + :process-font/global {:permits 4} + :process-font/by-profile {:permits 1} - :derive-password/global {:permits 8} - :process-font/global {:permits 4} :process-image/global {:permits 8} + :process-image/by-profile {:permits 1} + :auth/global {:permits 8} + + :root/global + {:permits 40} + + :root/by-profile + {:permits 10} + + :file-thumbnail-ops/global + {:permits 20} :file-thumbnail-ops/by-profile {:permits 2} diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index 6e7407e17..097ada50a 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -237,8 +237,7 @@ (jdbc/get-connection system-or-pool) (if (map? system-or-pool) (open (::pool system-or-pool)) - (ex/raise :type :internal - :code :unable-resolve-pool)))) + (throw (IllegalArgumentException. "unable to resolve connection pool"))))) (defn get-update-count [result] @@ -250,9 +249,7 @@ cfg-or-conn (if (map? cfg-or-conn) (get-connection (::conn cfg-or-conn)) - (ex/raise :type :internal - :code :unable-resolve-connection - :hint "expected conn or system map")))) + (throw (IllegalArgumentException. "unable to resolve connection"))))) (defn connection-map? "Check if the provided value is a map like data structure that @@ -260,15 +257,15 @@ [o] (and (map? o) (connection? (::conn o)))) -(defn- get-connectable +(defn get-connectable + "Resolve to a connection or connection pool instance; if it is not + possible, raises an exception" [o] (cond (connection? o) o (pool? o) o (map? o) (get-connectable (or (::conn o) (::pool o))) - :else (ex/raise :type :internal - :code :unable-resolve-connectable - :hint "expected conn, pool or system"))) + :else (throw (IllegalArgumentException. "unable to resolve connectable")))) (def ^:private params-mapping {::return-keys? :return-keys diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index 3626971fc..e7a0184ac 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -200,22 +200,15 @@ ;; NOTE: this operation may cause primary key conflicts on inserts ;; because of the timestamp precission (two concurrent requests), in ;; this case we just retry the operation. - (let [cfg (-> cfg - (assoc ::rtry/when rtry/conflict-exception?) - (assoc ::rtry/max-retries 6) - (assoc ::rtry/label "persist-audit-log")) + (let [tnow (dt/now) params (-> params + (assoc :created-at tnow) + (assoc :tracked-at tnow) (update :props db/tjson) (update :context db/tjson) (update :ip-addr db/inet) (assoc :source "backend"))] - - (rtry/invoke cfg (fn [cfg] - (let [tnow (dt/now) - params (-> params - (assoc :created-at tnow) - (assoc :tracked-at tnow))] - (db/insert! cfg :audit-log params)))))) + (db/insert! cfg :audit-log params))) (when (and (contains? cf/flags :webhooks) (::webhooks/event? event)) @@ -246,9 +239,13 @@ "Submit audit event to the collector." [cfg params] (try - (let [event (d/without-nils params)] + (let [event (d/without-nils params) + cfg (-> cfg + (assoc ::rtry/when rtry/conflict-exception?) + (assoc ::rtry/max-retries 6) + (assoc ::rtry/label "persist-audit-log"))] (us/verify! ::event event) - (db/tx-run! cfg handle-event! event)) + (rtry/invoke! cfg db/tx-run! handle-event! event)) (catch Throwable cause (l/error :hint "unexpected error processing event" :cause cause)))) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 7028be8bf..47e43f5cf 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -322,9 +322,7 @@ ::rpc/climit (ig/ref ::rpc/climit) ::rpc/rlimit (ig/ref ::rpc/rlimit) ::setup/templates (ig/ref ::setup/templates) - ::props (ig/ref ::setup/props) - - :pool (ig/ref ::db/pool)} + ::props (ig/ref ::setup/props)} :app.rpc.doc/routes {:methods (ig/ref :app.rpc/methods)} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 69f9d84fb..08ccd8cdb 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -139,15 +139,10 @@ (f cfg (us/conform spec params))) f))) -;; TODO: integrate with sm/define - (defn- wrap-params-validation [_ f mdata] (if-let [schema (::sm/params mdata)] - (let [schema (if (sm/lazy-schema? schema) - schema - (sm/define schema)) - validate (sm/validator schema) + (let [validate (sm/validator schema) explain (sm/explainer schema) decode (sm/decoder schema)] (fn [cfg params] @@ -245,8 +240,7 @@ ::mtx/metrics ::main/props] :opt [::climit - ::rlimit] - :req-un [::db/pool])) + ::rlimit])) (defmethod ig/init-key ::methods [_ cfg] diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index 71c64b596..cf2942c22 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -21,26 +21,31 @@ [app.worker :as-alias wrk] [clojure.edn :as edn] [clojure.spec.alpha :as s] + [cuerdas.core :as str] [datoteka.fs :as fs] [integrant.core :as ig] - [promesa.core :as p] [promesa.exec :as px] [promesa.exec.bulkhead :as pbh]) (:import - clojure.lang.ExceptionInfo)) + clojure.lang.ExceptionInfo + java.util.concurrent.atomic.AtomicLong)) (set! *warn-on-reflection* true) (defn- id->str - [id] - (-> (str id) - (subs 1))) + ([id] + (-> (str id) + (subs 1))) + ([id key] + (if key + (str (-> (str id) (subs 1)) "/" key) + (id->str id)))) (defn- create-cache [{:keys [::wrk/executor]}] (letfn [(on-remove [key _ cause] (let [[id skey] key] - (l/dbg :hint "destroy limiter" :id (id->str id) :key skey :reason (str cause))))] + (l/dbg :hint "disposed" :id (id->str id skey) :reason (str cause))))] (cache/create :executor executor :on-remove on-remove :keepalive "5m"))) @@ -81,132 +86,179 @@ (defn- create-limiter [config [id skey]] - (l/dbg :hint "create limiter" :id (id->str id) :key skey) + (l/dbg :hint "created" :id (id->str id skey)) (pbh/create :permits (or (:permits config) (:concurrency config)) :queue (or (:queue config) (:queue-size config)) :timeout (:timeout config) :type :semaphore)) -(defn- invoke! - [config cache metrics id key f] - (if-let [limiter (cache/get cache [id key] (partial create-limiter config))] - (let [tpoint (dt/tpoint) - labels (into-array String [(id->str id)]) - wrapped (fn [] - (let [elapsed (tpoint) - stats (pbh/get-stats limiter)] - (l/trc :hint "acquired" - :id (id->str id) - :key key - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed)) +(defmacro ^:private measure-and-log! + [metrics mlabels stats id action limit-id limit-label profile-id elapsed] + `(let [mpermits# (:max-permits ~stats) + mqueue# (:max-queue ~stats) + permits# (:permits ~stats) + queue# (:queue ~stats) + queue# (- queue# mpermits#) + queue# (if (neg? queue#) 0 queue#) + level# (if (pos? queue#) :warn :trace)] - (mtx/run! metrics - :id :rpc-climit-timing - :val (inst-ms elapsed) - :labels labels) - (try - (f) - (finally - (let [elapsed (tpoint)] - (l/trc :hint "finished" - :id (id->str id) - :key key - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed))))))) - measure! - (fn [stats] - (mtx/run! metrics - :id :rpc-climit-queue - :val (:queue stats) - :labels labels) - (mtx/run! metrics - :id :rpc-climit-permits - :val (:permits stats) - :labels labels))] + (mtx/run! ~metrics + :id :rpc-climit-queue + :val queue# + :labels ~mlabels) - (try - (let [stats (pbh/get-stats limiter)] - (measure! stats) - (l/trc :hint "enqueued" - :id (id->str id) - :key key - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats)) - (px/invoke! limiter wrapped)) - (catch ExceptionInfo cause - (let [{:keys [type code]} (ex-data cause)] - (if (= :bulkhead-error type) + (mtx/run! ~metrics + :id :rpc-climit-permits + :val permits# + :labels ~mlabels) + + (l/log level# + :hint ~action + :req ~id + :id ~limit-id + :label ~limit-label + :profile-id (str ~profile-id) + :permits permits# + :queue queue# + :max-permits mpermits# + :max-queue mqueue# + ~@(if (some? elapsed) + [:elapsed `(dt/format-duration ~elapsed)] + [])))) + +(def ^:private idseq (AtomicLong. 0)) + +(defn- invoke + [limiter metrics limit-id limit-key limit-label profile-id f params] + (let [tpoint (dt/tpoint) + limit-id (id->str limit-id limit-key) + mlabels (into-array String [limit-id]) + stats (pbh/get-stats limiter) + id (.incrementAndGet ^AtomicLong idseq)] + + (try + (measure-and-log! metrics mlabels stats id "enqueued" limit-id limit-label profile-id nil) + (px/invoke! limiter (fn [] + (let [elapsed (tpoint) + stats (pbh/get-stats limiter)] + (measure-and-log! metrics mlabels stats id "acquired" limit-id limit-label profile-id elapsed) + (mtx/run! metrics + :id :rpc-climit-timing + :val (inst-ms elapsed) + :labels mlabels) + (apply f params)))) + + (catch ExceptionInfo cause + (let [{:keys [type code]} (ex-data cause)] + (if (= :bulkhead-error type) + (let [elapsed (tpoint)] + (measure-and-log! metrics mlabels stats id "reject" limit-id limit-label profile-id elapsed) (ex/raise :type :concurrency-limit :code code - :hint "concurrency limit reached") - (throw cause)))) + :hint "concurrency limit reached" + :cause cause)) + (throw cause)))) - (finally - (measure! (pbh/get-stats limiter))))) - - (do - (l/wrn :hint "no limiter found" :id (id->str id)) - (f)))) + (finally + (let [elapsed (tpoint) + stats (pbh/get-stats limiter)] + (measure-and-log! metrics mlabels stats id "finished" limit-id limit-label profile-id elapsed)))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MIDDLEWARE ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(def noop-fn (constantly nil)) +(def ^:private noop-fn (constantly nil)) +(def ^:private global-limits + [[:root/global noop-fn] + [:root/by-profile ::rpc/profile-id]]) + +(defn- get-limits + [cfg] + (when-let [ref (get cfg ::id)] + (cond + (keyword? ref) + [[ref]] + + (and (vector? ref) + (keyword (first ref))) + [ref] + + (and (vector? ref) + (vector? (first ref))) + (rseq ref) + + :else + (throw (IllegalArgumentException. "unable to normalize limit"))))) (defn wrap - [{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}] - (if (and (some? climit) (some? id)) - (let [cache (::cache climit) - config (::config climit)] - (if-let [config (get config id)] - (do - (l/dbg :hint "instrumenting method" - :limit (id->str id) - :service-name (::sv/name mdata) - :timeout (:timeout config) - :permits (:permits config) - :queue (:queue config) - :keyed? (not= key-fn noop-fn)) + [{:keys [::rpc/climit ::mtx/metrics]} handler mdata] + (let [cache (::cache climit) + config (::config climit) + label (::sv/name mdata)] - (fn [cfg params] - (invoke! config cache metrics id (key-fn params) (partial f cfg params)))) + (reduce (fn [handler [limit-id key-fn]] + (if-let [config (get config limit-id)] + (let [key-fn (or key-fn noop-fn)] + (l/dbg :hint "instrumenting method" + :method label + :limit (id->str limit-id) + :timeout (:timeout config) + :permits (:permits config) + :queue (:queue config) + :keyed (not= key-fn noop-fn)) - (do - (l/wrn :hint "no config found for specified queue" :id (id->str id)) - f))) - f)) + (if (and (= key-fn ::rpc/profile-id) + (false? (::rpc/auth mdata true))) + + ;; We don't enforce by-profile limit on methods that does + ;; not require authentication + handler + + (fn [cfg params] + (let [limit-key (key-fn params) + cache-key [limit-id limit-key] + limiter (cache/get cache cache-key (partial create-limiter config)) + profile-id (if (= key-fn ::rpc/profile-id) + limit-key + (get params ::rpc/profile-id))] + (invoke limiter metrics limit-id limit-key label profile-id handler [cfg params]))))) + + (do + (l/wrn :hint "no config found for specified queue" :id (id->str limit-id)) + handler))) + + handler + (concat global-limits (get-limits mdata))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; PUBLIC API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn configure - [{:keys [::rpc/climit]} id] - (us/assert! ::rpc/climit climit) - (assoc climit ::id id)) +(defn- build-exec-chain + [{:keys [::label ::profile-id ::rpc/climit ::mtx/metrics] :as cfg} f] + (let [config (get climit ::config) + cache (get climit ::cache)] -(defn run! + (reduce (fn [handler [limit-id limit-key :as ckey]] + (let [config (get config limit-id)] + (when-not config + (throw (IllegalArgumentException. + (str/ffmt "config not found for: %" limit-id)))) + + (fn [& params] + (let [limiter (cache/get cache ckey (partial create-limiter config))] + (invoke limiter metrics limit-id limit-key label profile-id handler params))))) + f + (get-limits cfg)))) + +(defn invoke! "Run a function in context of climit. Intended to be used in virtual threads." - ([{:keys [::id ::cache ::config ::mtx/metrics]} f] - (if-let [config (get config id)] - (invoke! config cache metrics id nil f) - (f))) - - ([{:keys [::id ::cache ::config ::mtx/metrics]} f executor] - (let [f #(p/await! (px/submit! executor f))] - (if-let [config (get config id)] - (invoke! config cache metrics id nil f) - (f))))) - + [{:keys [::executor] :as cfg} f & params] + (let [f (if (some? executor) + (fn [& params] (px/await! (px/submit! executor (fn [] (apply f params))))) + f) + f (build-exec-chain cfg f)] + (apply f params))) diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index c9b55b599..2e82e5640 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -21,6 +21,7 @@ [app.loggers.audit :as audit] [app.main :as-alias main] [app.rpc :as-alias rpc] + [app.rpc.climit :as-alias climit] [app.rpc.commands.profile :as profile] [app.rpc.commands.teams :as teams] [app.rpc.doc :as-alias doc] @@ -39,7 +40,7 @@ ;; ---- COMMAND: login with password (defn login-with-password - [{:keys [::db/pool] :as cfg} {:keys [email password] :as params}] + [cfg {:keys [email password] :as params}] (when-not (or (contains? cf/flags :login) (contains? cf/flags :login-with-password)) @@ -47,7 +48,7 @@ :code :login-disabled :hint "login is disabled in this instance")) - (letfn [(check-password [conn profile password] + (letfn [(check-password [cfg profile password] (if (= (:password profile) "!") (ex/raise :type :validation :code :account-without-password @@ -57,10 +58,10 @@ (l/trc :hint "updating profile password" :id (str (:id profile)) :email (:email profile)) - (profile/update-profile-password! conn (assoc profile :password password))) + (profile/update-profile-password! cfg (assoc profile :password password))) (:valid result)))) - (validate-profile [conn profile] + (validate-profile [cfg profile] (when-not profile (ex/raise :type :validation :code :wrong-credentials)) @@ -70,7 +71,7 @@ (when (:is-blocked profile) (ex/raise :type :restriction :code :profile-blocked)) - (when-not (check-password conn profile password) + (when-not (check-password cfg profile password) (ex/raise :type :validation :code :wrong-credentials)) (when-let [deleted-at (:deleted-at profile)] @@ -78,27 +79,29 @@ (ex/raise :type :validation :code :wrong-credentials))) - profile)] + profile) - (db/with-atomic [conn pool] - (let [profile (->> (profile/get-profile-by-email conn email) - (validate-profile conn) - (profile/strip-private-attrs)) + (login [{:keys [::db/conn] :as cfg}] + (let [profile (->> (profile/get-profile-by-email conn email) + (validate-profile cfg) + (profile/strip-private-attrs)) - invitation (when-let [token (:invitation-token params)] - (tokens/verify (::main/props cfg) {:token token :iss :team-invitation})) + invitation (when-let [token (:invitation-token params)] + (tokens/verify (::main/props cfg) {:token token :iss :team-invitation})) - ;; If invitation member-id does not matches the profile-id, we just proceed to ignore the - ;; invitation because invitations matches exactly; and user can't login with other email and - ;; accept invitation with other email - response (if (and (some? invitation) (= (:id profile) (:member-id invitation))) - {:invitation-token (:invitation-token params)} - (assoc profile :is-admin (let [admins (cf/get :admins)] - (contains? admins (:email profile)))))] - (-> response - (rph/with-transform (session/create-fn cfg (:id profile))) - (rph/with-meta {::audit/props (audit/profile->props profile) - ::audit/profile-id (:id profile)})))))) + ;; If invitation member-id does not matches the profile-id, we just proceed to ignore the + ;; invitation because invitations matches exactly; and user can't login with other email and + ;; accept invitation with other email + response (if (and (some? invitation) (= (:id profile) (:member-id invitation))) + {:invitation-token (:invitation-token params)} + (assoc profile :is-admin (let [admins (cf/get :admins)] + (contains? admins (:email profile)))))] + (-> response + (rph/with-transform (session/create-fn cfg (:id profile))) + (rph/with-meta {::audit/props (audit/profile->props profile) + ::audit/profile-id (:id profile)}))))] + + (db/tx-run! cfg login))) (def schema:login-with-password [:map {:title "login-with-password"} @@ -110,6 +113,7 @@ "Performs authentication using penpot password." {::rpc/auth false ::doc/added "1.15" + ::climit/id :auth/global ::sm/params schema:login-with-password} [cfg params] (login-with-password cfg params)) @@ -149,7 +153,8 @@ (sv/defmethod ::recover-profile {::rpc/auth false ::doc/added "1.15" - ::sm/params schema:recover-profile} + ::sm/params schema:recover-profile + ::climit/id :auth/global} [cfg params] (recover-profile cfg params)) @@ -360,7 +365,6 @@ {::audit/type "fact" ::audit/name "register-profile-retry" ::audit/profile-id id})) - (cond ;; If invitation token comes in params, this is because the ;; user comes from team-invitation process; in this case, @@ -402,7 +406,6 @@ {::audit/replace-props (audit/profile->props profile) ::audit/profile-id (:id profile)}))))) - (def schema:register-profile [:map {:title "register-profile"} [:token schema:token] @@ -411,7 +414,8 @@ (sv/defmethod ::register-profile {::rpc/auth false ::doc/added "1.15" - ::sm/params schema:register-profile} + ::sm/params schema:register-profile + ::climit/id :auth/global} [{:keys [::db/pool] :as cfg} params] (db/with-atomic [conn pool] (-> (assoc cfg ::db/conn conn) diff --git a/backend/src/app/rpc/commands/comments.clj b/backend/src/app/rpc/commands/comments.clj index 9e1a9d436..4949f1a43 100644 --- a/backend/src/app/rpc/commands/comments.clj +++ b/backend/src/app/rpc/commands/comments.clj @@ -9,7 +9,7 @@ [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.geom.point :as gpt] - [app.common.spec :as us] + [app.common.schema :as sm] [app.common.uuid :as uuid] [app.db :as db] [app.db.sql :as sql] @@ -24,18 +24,21 @@ [app.rpc.retry :as rtry] [app.util.pointer-map :as pmap] [app.util.services :as sv] - [app.util.time :as dt] - [clojure.spec.alpha :as s])) + [app.util.time :as dt])) ;; --- GENERAL PURPOSE INTERNAL HELPERS -(defn decode-row +(defn- decode-row [{:keys [participants position] :as row}] (cond-> row (db/pgpoint? position) (assoc :position (db/decode-pgpoint position)) (db/pgobject? participants) (assoc :participants (db/decode-transit-pgobject participants)))) -(def sql:get-file +(def xf-decode-row + (map decode-row)) + +(def ^:privateqpage-name + sql:get-file "select f.id, f.modified_at, f.revn, f.features, f.project_id, p.team_id, f.data from file as f @@ -45,17 +48,19 @@ (defn- get-file "A specialized version of get-file for comments module." - [{:keys [::db/conn] :as cfg} file-id page-id] - (if-let [{:keys [data] :as file} (some-> (db/exec-one! conn [sql:get-file file-id]) - (files/decode-row))] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg file-id)] - (-> file - (assoc :page-name (dm/get-in data [:pages-index page-id :name])) - (assoc :page-id page-id))) + [cfg file-id page-id] + (let [file (db/exec-one! cfg [sql:get-file file-id])] + (when-not file + (ex/raise :type :not-found + :code :object-not-found + :hint "file not found")) - (ex/raise :type :not-found - :code :object-not-found - :hint "file not found"))) + (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg file-id)] + (let [{:keys [data] :as file} (files/decode-row file)] + (-> file + (assoc :page-name (dm/get-in data [:pages-index page-id :name])) + (assoc :page-id page-id) + (dissoc :data)))))) (defn- get-comment-thread [conn thread-id & {:as opts}] @@ -93,23 +98,25 @@ (declare ^:private get-comment-threads) -(s/def ::team-id ::us/uuid) -(s/def ::file-id ::us/uuid) -(s/def ::share-id (s/nilable ::us/uuid)) - -(s/def ::get-comment-threads - (s/and (s/keys :req [::rpc/profile-id] - :opt-un [::file-id ::share-id ::team-id]) - #(or (:file-id %) (:team-id %)))) +(def ^:private + schema:get-comment-threads + [:and + [:map {:title "get-comment-threads"} + [:file-id {:optional true} ::sm/uuid] + [:team-id {:optional true} ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]] + [::sm/contains-any #{:file-id :team-id}]]) (sv/defmethod ::get-comment-threads - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id share-id] :as params}] - (dm/with-open [conn (db/open pool)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (get-comment-threads conn profile-id file-id))) + {::doc/added "1.15" + ::sm/params schema:get-comment-threads} + [cfg {:keys [::rpc/profile-id file-id share-id] :as params}] -(def sql:comment-threads + (db/run! cfg (fn [{:keys [::db/conn]}] + (files/check-comment-permissions! conn profile-id file-id share-id) + (get-comment-threads conn profile-id file-id)))) + +(def ^:private sql:comment-threads "select distinct on (ct.id) ct.*, f.name as file_name, @@ -134,23 +141,24 @@ (defn- get-comment-threads [conn profile-id file-id] (->> (db/exec! conn [sql:comment-threads profile-id file-id]) - (into [] (map decode-row)))) + (into [] xf-decode-row))) ;; --- COMMAND: Get Unread Comment Threads (declare ^:private get-unread-comment-threads) -(s/def ::team-id ::us/uuid) -(s/def ::get-unread-comment-threads - (s/keys :req [::rpc/profile-id] - :req-un [::team-id])) +(def ^:private + schema:get-unread-comment-threads + [:map {:title "get-unread-comment-threads"} + [:team-id ::sm/uuid]]) (sv/defmethod ::get-unread-comment-threads - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id] :as params}] - (dm/with-open [conn (db/open pool)] - (teams/check-read-permissions! conn profile-id team-id) - (get-unread-comment-threads conn profile-id team-id))) + {::doc/added "1.15" + ::sm/params schema:get-unread-comment-threads} + [cfg {:keys [::rpc/profile-id team-id] :as params}] + (db/run! cfg (fn [{:keys [::db/conn]}] + (teams/check-read-permissions! conn profile-id team-id) + (get-unread-comment-threads conn profile-id team-id)))) (def sql:comment-threads-by-team "select distinct on (ct.id) @@ -182,62 +190,60 @@ (defn- get-unread-comment-threads [conn profile-id team-id] (->> (db/exec! conn [sql:unread-comment-threads-by-team profile-id team-id]) - (into [] (map decode-row)))) - + (into [] xf-decode-row))) ;; --- COMMAND: Get Single Comment Thread -(s/def ::get-comment-thread - (s/keys :req [::rpc/profile-id] - :req-un [::file-id ::us/id] - :opt-un [::share-id])) +(def ^:private + schema:get-comment-thread + [:map {:title "get-comment-thread"} + [:file-id ::sm/uuid] + [:id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::get-comment-thread - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id id share-id] :as params}] - (dm/with-open [conn (db/open pool)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (let [sql (str "with threads as (" sql:comment-threads ")" - "select * from threads where id = ?")] - (-> (db/exec-one! conn [sql profile-id file-id id]) - (decode-row))))) + {::doc/added "1.15" + ::sm/params schema:get-comment-thread} + [cfg {:keys [::rpc/profile-id file-id id share-id] :as params}] + (db/run! cfg (fn [{:keys [::db/conn]}] + (files/check-comment-permissions! conn profile-id file-id share-id) + (let [sql (str "with threads as (" sql:comment-threads ")" + "select * from threads where id = ?")] + (-> (db/exec-one! conn [sql profile-id file-id id]) + (decode-row)))))) ;; --- COMMAND: Retrieve Comments (declare ^:private get-comments) -(s/def ::thread-id ::us/uuid) -(s/def ::get-comments - (s/keys :req [::rpc/profile-id] - :req-un [::thread-id] - :opt-un [::share-id])) +(def ^:private + schema:get-comments + [:map {:title "get-comments"} + [:thread-id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::get-comments - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id thread-id share-id] :as params}] - (dm/with-open [conn (db/open pool)] - (let [{:keys [file-id] :as thread} (get-comment-thread conn thread-id)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (get-comments conn thread-id)))) - -(def sql:comments - "select c.* from comment as c - where c.thread_id = ? - order by c.created_at asc") + {::doc/added "1.15" + ::sm/params schema:get-comments} + [cfg {:keys [::rpc/profile-id thread-id share-id]}] + (db/run! cfg (fn [{:keys [::db/conn]}] + (let [{:keys [file-id] :as thread} (get-comment-thread conn thread-id)] + (files/check-comment-permissions! conn profile-id file-id share-id) + (get-comments conn thread-id))))) (defn- get-comments [conn thread-id] (->> (db/query conn :comment {:thread-id thread-id} {:order-by [[:created-at :asc]]}) - (into [] (map decode-row)))) + (into [] xf-decode-row))) ;; --- COMMAND: Get file comments users ;; All the profiles that had comment the file, plus the current ;; profile. -(def sql:file-comment-users +(def ^:private sql:file-comment-users "WITH available_profiles AS ( SELECT DISTINCT owner_id AS id FROM comment @@ -256,20 +262,22 @@ [conn file-id profile-id] (db/exec! conn [sql:file-comment-users file-id profile-id])) -(s/def ::get-profiles-for-file-comments - (s/keys :req [::rpc/profile-id] - :req-un [::file-id] - :opt-un [::share-id])) +(def ^:private + schema:get-profiles-for-file-comments + [:map {:title "get-profiles-for-file-comments"} + [:file-id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::get-profiles-for-file-comments "Retrieves a list of profiles with limited set of properties of all participants on comment threads of the file." {::doc/added "1.15" - ::doc/changes ["1.15" "Imported from queries and renamed."]} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id share-id]}] - (dm/with-open [conn (db/open pool)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (get-file-comments-users conn file-id profile-id))) + ::doc/changes ["1.15" "Imported from queries and renamed."] + ::sm/params schema:get-profiles-for-file-comments} + [cfg {:keys [::rpc/profile-id file-id share-id]}] + (db/run! cfg (fn [{:keys [::db/conn]}] + (files/check-comment-permissions! conn profile-id file-id share-id) + (get-file-comments-users conn file-id profile-id)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MUTATION COMMANDS @@ -279,52 +287,52 @@ ;; --- COMMAND: Create Comment Thread -(s/def ::page-id ::us/uuid) -(s/def ::position ::gpt/point) -(s/def ::content ::us/string) -(s/def ::frame-id ::us/uuid) - -(s/def ::create-comment-thread - (s/keys :req [::rpc/profile-id] - :req-un [::file-id ::position ::content ::page-id ::frame-id] - :opt-un [::share-id])) +(def ^:private + schema:create-comment-thread + [:map {:title "create-comment-thread"} + [:file-id ::sm/uuid] + [:position ::gpt/point] + [:content :string] + [:page-id ::sm/uuid] + [:frame-id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::create-comment-thread {::doc/added "1.15" - ::webhooks/event? true} + ::webhooks/event? true + ::rtry/enabled true + ::rtry/when rtry/conflict-exception? + ::sm/params schema:create-comment-thread} [cfg {:keys [::rpc/profile-id ::rpc/request-at file-id page-id share-id position content frame-id]}] - (db/tx-run! cfg - (fn [{:keys [::db/conn] :as cfg}] - (files/check-comment-permissions! conn profile-id file-id share-id) - (let [{:keys [team-id project-id page-name] :as file} (get-file cfg file-id page-id)] - (run! (partial quotes/check-quote! conn) - (list {::quotes/id ::quotes/comment-threads-per-file - ::quotes/profile-id profile-id - ::quotes/team-id team-id - ::quotes/project-id project-id - ::quotes/file-id file-id} - {::quotes/id ::quotes/comments-per-file - ::quotes/profile-id profile-id - ::quotes/team-id team-id - ::quotes/project-id project-id - ::quotes/file-id file-id})) + (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] + (files/check-comment-permissions! cfg profile-id file-id share-id) + (let [{:keys [team-id project-id page-name]} (get-file conn file-id page-id)] + (run! (partial quotes/check-quote! cfg) + (list {::quotes/id ::quotes/comment-threads-per-file + ::quotes/profile-id profile-id + ::quotes/team-id team-id + ::quotes/project-id project-id + ::quotes/file-id file-id} + {::quotes/id ::quotes/comments-per-file + ::quotes/profile-id profile-id + ::quotes/team-id team-id + ::quotes/project-id project-id + ::quotes/file-id file-id})) - (-> cfg - (assoc ::rtry/when rtry/conflict-exception?) - (assoc ::rtry/label "create-comment-thread") - (rtry/invoke create-comment-thread {:created-at request-at - :profile-id profile-id - :file-id file-id - :page-id page-id - :page-name page-name - :position position - :content content - :frame-id frame-id})))))) + (create-comment-thread conn {:created-at request-at + :profile-id profile-id + :file-id file-id + :page-id page-id + :page-name page-name + :position position + :content content + :frame-id frame-id}))))) (defn- create-comment-thread - [{:keys [::db/conn]} {:keys [profile-id file-id page-id page-name created-at position content frame-id]}] + [conn {:keys [profile-id file-id page-id page-name created-at position content frame-id]}] + (let [;; NOTE: we take the next seq number from a separate query because the whole ;; operation can be retried on conflict, and in this case the new seq shold be ;; retrieved from the database. @@ -364,68 +372,72 @@ ;; --- COMMAND: Update Comment Thread Status -(s/def ::id ::us/uuid) -(s/def ::share-id (s/nilable ::us/uuid)) - -(s/def ::update-comment-thread-status - (s/keys :req [::rpc/profile-id] - :req-un [::id] - :opt-un [::share-id])) +(def ^:private + schema:update-comment-thread-status + [:map {:title "update-comment-thread-status"} + [:id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::update-comment-thread-status - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id share-id] :as params}] - (db/with-atomic [conn pool] - (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (upsert-comment-thread-status! conn profile-id id)))) + {::doc/added "1.15" + ::sm/params schema:update-comment-thread-status} + [cfg {:keys [::rpc/profile-id id share-id]}] + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] + (files/check-comment-permissions! conn profile-id file-id share-id) + (upsert-comment-thread-status! conn profile-id id))))) ;; --- COMMAND: Update Comment Thread -(s/def ::is-resolved ::us/boolean) -(s/def ::update-comment-thread - (s/keys :req [::rpc/profile-id] - :req-un [::id ::is-resolved] - :opt-un [::share-id])) +(def ^:private + schema:update-comment-thread + [:map {:title "update-comment-thread"} + [:id ::sm/uuid] + [:is-resolved :boolean] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::update-comment-thread - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id is-resolved share-id] :as params}] - (db/with-atomic [conn pool] - (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (db/update! conn :comment-thread - {:is-resolved is-resolved} - {:id id}) - nil))) + {::doc/added "1.15" + ::sm/params schema:update-comment-thread} + [cfg {:keys [::rpc/profile-id id is-resolved share-id]}] + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] + (files/check-comment-permissions! conn profile-id file-id share-id) + (db/update! conn :comment-thread + {:is-resolved is-resolved} + {:id id}) + nil)))) ;; --- COMMAND: Add Comment (declare ^:private get-comment-thread) -(s/def ::create-comment - (s/keys :req [::rpc/profile-id] - :req-un [::thread-id ::content] - :opt-un [::share-id])) +(def ^:private + schema:create-comment + [:map {:title "create-comment"} + [:thread-id ::sm/uuid] + [:content :string] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::create-comment {::doc/added "1.15" - ::webhooks/event? true} + ::webhooks/event? true + ::sm/params schema:create-comment} [cfg {:keys [::rpc/profile-id ::rpc/request-at thread-id share-id content]}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (let [{:keys [file-id page-id] :as thread} (get-comment-thread conn thread-id ::sql/for-update true) {:keys [team-id project-id page-name] :as file} (get-file cfg file-id page-id)] - (files/check-comment-permissions! conn profile-id (:id file) share-id) + (files/check-comment-permissions! conn profile-id file-id share-id) (quotes/check-quote! conn {::quotes/id ::quotes/comments-per-file ::quotes/profile-id profile-id ::quotes/team-id team-id ::quotes/project-id project-id - ::quotes/file-id (:id file)}) + ::quotes/file-id file-id}) ;; Update the page-name cached attribute on comment thread table. (when (not= page-name (:page-name thread)) @@ -461,15 +473,17 @@ ;; --- COMMAND: Update Comment -(s/def ::update-comment - (s/keys :req [::rpc/profile-id] - :req-un [::id ::content] - :opt-un [::share-id])) +(def ^:private + schema:update-comment + [:map {:title "update-comment"} + [:id ::sm/uuid] + [:content :string] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::update-comment - {::doc/added "1.15"} + {::doc/added "1.15" + ::sm/params schema:update-comment} [cfg {:keys [::rpc/profile-id ::rpc/request-at id share-id content]}] - (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (let [{:keys [thread-id owner-id] :as comment} (get-comment conn id ::sql/for-update true) @@ -482,7 +496,7 @@ (ex/raise :type :validation :code :not-allowed)) - (let [{:keys [page-name] :as file} (get-file cfg file-id page-id)] + (let [{:keys [page-name]} (get-file cfg file-id page-id)] (db/update! conn :comment {:content content :modified-at request-at} @@ -496,79 +510,90 @@ ;; --- COMMAND: Delete Comment Thread -(s/def ::delete-comment-thread - (s/keys :req [::rpc/profile-id] - :req-un [::id] - :opt-un [::share-id])) +(def ^:private + schema:delete-comment-thread + [:map {:title "delete-comment-thread"} + [:id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::delete-comment-thread - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id share-id]}] - (db/with-atomic [conn pool] - (let [{:keys [owner-id file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (when-not (= owner-id profile-id) - (ex/raise :type :validation - :code :not-allowed)) + {::doc/added "1.15" + ::sm/params schema:delete-comment-thread} + [cfg {:keys [::rpc/profile-id id share-id]}] + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + (let [{:keys [owner-id file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] + (files/check-comment-permissions! conn profile-id file-id share-id) + (when-not (= owner-id profile-id) + (ex/raise :type :validation + :code :not-allowed)) - (db/delete! conn :comment-thread {:id id}) - nil))) + (db/delete! conn :comment-thread {:id id}) + nil)))) ;; --- COMMAND: Delete comment -(s/def ::delete-comment - (s/keys :req [::rpc/profile-id] - :req-un [::id] - :opt-un [::share-id])) +(def ^:private + schema:delete-comment + [:map {:title "delete-comment"} + [:id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::delete-comment - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id share-id] :as params}] - (db/with-atomic [conn pool] - (let [{:keys [owner-id thread-id] :as comment} (get-comment conn id ::sql/for-update true) - {:keys [file-id] :as thread} (get-comment-thread conn thread-id)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (when-not (= owner-id profile-id) - (ex/raise :type :validation - :code :not-allowed)) - (db/delete! conn :comment {:id id}) - nil))) + {::doc/added "1.15" + ::sm/params schema:delete-comment} + [cfg {:keys [::rpc/profile-id id share-id]}] + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + (let [{:keys [owner-id thread-id] :as comment} (get-comment conn id ::sql/for-update true) + {:keys [file-id] :as thread} (get-comment-thread conn thread-id)] + (files/check-comment-permissions! conn profile-id file-id share-id) + (when-not (= owner-id profile-id) + (ex/raise :type :validation + :code :not-allowed)) + (db/delete! conn :comment {:id id}) + nil)))) ;; --- COMMAND: Update comment thread position -(s/def ::update-comment-thread-position - (s/keys :req [::rpc/profile-id] - :req-un [::id ::position ::frame-id] - :opt-un [::share-id])) +(def ^:private + schema:update-comment-thread-position + [:map {:title "update-comment-thread-position"} + [:id ::sm/uuid] + [:position ::gpt/point] + [:frame-id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::update-comment-thread-position - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id ::rpc/request-at id position frame-id share-id]}] - (db/with-atomic [conn pool] - (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (db/update! conn :comment-thread - {:modified-at request-at - :position (db/pgpoint position) - :frame-id frame-id} - {:id (:id thread)}) - nil))) + {::doc/added "1.15" + ::sm/params schema:update-comment-thread-position} + [cfg {:keys [::rpc/profile-id ::rpc/request-at id position frame-id share-id]}] + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] + (files/check-comment-permissions! conn profile-id file-id share-id) + (db/update! conn :comment-thread + {:modified-at request-at + :position (db/pgpoint position) + :frame-id frame-id} + {:id (:id thread)}) + nil)))) ;; --- COMMAND: Update comment frame -(s/def ::update-comment-thread-frame - (s/keys :req [::rpc/profile-id] - :req-un [::id ::frame-id] - :opt-un [::share-id])) +(def ^:private + schema:update-comment-thread-frame + [:map {:title "update-comment-thread-frame"} + [:id ::sm/uuid] + [:frame-id ::sm/uuid] + [:share-id {:optional true} [:maybe ::sm/uuid]]]) (sv/defmethod ::update-comment-thread-frame - {::doc/added "1.15"} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id ::rpc/request-at id frame-id share-id]}] - (db/with-atomic [conn pool] - (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] - (files/check-comment-permissions! conn profile-id file-id share-id) - (db/update! conn :comment-thread - {:modified-at request-at - :frame-id frame-id} - {:id id}) - nil))) + {::doc/added "1.15" + ::sm/params schema:update-comment-thread-frame} + [cfg {:keys [::rpc/profile-id ::rpc/request-at id frame-id share-id]}] + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + (let [{:keys [file-id] :as thread} (get-comment-thread conn id ::sql/for-update true)] + (files/check-comment-permissions! conn profile-id file-id share-id) + (db/update! conn :comment-thread + {:modified-at request-at + :frame-id frame-id} + {:id id}) + nil)))) diff --git a/backend/src/app/rpc/commands/files_thumbnails.clj b/backend/src/app/rpc/commands/files_thumbnails.clj index 712c21204..a44a8bdbd 100644 --- a/backend/src/app/rpc/commands/files_thumbnails.clj +++ b/backend/src/app/rpc/commands/files_thumbnails.clj @@ -285,26 +285,27 @@ (sv/defmethod ::create-file-object-thumbnail {::doc/added "1.19" ::doc/module :files - ::climit/id :file-thumbnail-ops - ::climit/key-fn ::rpc/profile-id + ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id] + [:file-thumbnail-ops/global]] + ::rtry/enabled true + ::rtry/when rtry/conflict-exception? ::audit/skip true ::sm/params schema:create-file-object-thumbnail} [cfg {:keys [::rpc/profile-id file-id object-id media tag]}] + (media/validate-media-type! media) + (media/validate-media-size! media) + (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (files/check-edition-permissions! conn profile-id file-id) - (media/validate-media-type! media) - (media/validate-media-size! media) - (when-not (db/read-only? conn) (let [cfg (-> cfg (update ::sto/storage media/configure-assets-storage) (assoc ::rtry/when rtry/conflict-exception?) (assoc ::rtry/max-retries 5) (assoc ::rtry/label "create-file-object-thumbnail"))] - (rtry/invoke cfg create-file-object-thumbnail! - file-id object-id media (or tag "frame"))))))) + (create-file-object-thumbnail! cfg file-id object-id media (or tag "frame"))))))) ;; --- MUTATION COMMAND: delete-file-object-thumbnail @@ -329,8 +330,8 @@ {::doc/added "1.19" ::doc/module :files ::doc/deprecated "1.20" - ::climit/id :file-thumbnail-ops - ::climit/key-fn ::rpc/profile-id + ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id] + [:file-thumbnail-ops/global]] ::audit/skip true} [cfg {:keys [::rpc/profile-id file-id object-id]}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] @@ -392,27 +393,29 @@ media)) +(def ^:private + schema:create-file-thumbnail + [:map {:title "create-file-thumbnail"} + [:file-id ::sm/uuid] + [:revn :int] + [:media ::media/upload]]) + (sv/defmethod ::create-file-thumbnail "Creates or updates the file thumbnail. Mainly used for paint the grid thumbnails." {::doc/added "1.19" ::doc/module :files ::audit/skip true - ::climit/id :file-thumbnail-ops - ::climit/key-fn ::rpc/profile-id - ::sm/params [:map {:title "create-file-thumbnail"} - [:file-id ::sm/uuid] - [:revn :int] - [:media ::media/upload]]} + ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id] + [:file-thumbnail-ops/global]] + ::rtry/enabled true + ::rtry/when rtry/conflict-exception? + ::sm/params schema:create-file-thumbnail} [cfg {:keys [::rpc/profile-id file-id] :as params}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (files/check-edition-permissions! conn profile-id file-id) (when-not (db/read-only? conn) - (let [cfg (-> cfg - (update ::sto/storage media/configure-assets-storage) - (assoc ::rtry/when rtry/conflict-exception?) - (assoc ::rtry/max-retries 5) - (assoc ::rtry/label "create-thumbnail")) - media (rtry/invoke cfg create-file-thumbnail! params)] + (let [cfg (update cfg ::sto/storage media/configure-assets-storage) + media (create-file-thumbnail! cfg params)] {:uri (files/resolve-public-uri (:id media))}))))) diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index 134a12794..fade957e0 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -35,7 +35,8 @@ [app.util.services :as sv] [app.util.time :as dt] [app.worker :as-alias wrk] - [clojure.set :as set])) + [clojure.set :as set] + [promesa.exec :as px])) ;; --- SCHEMA @@ -132,8 +133,8 @@ ;; database. (sv/defmethod ::update-file - {::climit/id :update-file/by-profile - ::climit/key-fn ::rpc/profile-id + {::climit/id [[:update-file/by-profile ::rpc/profile-id] + [:update-file/global]] ::webhooks/event? true ::webhooks/batch-timeout (dt/duration "2m") ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id) @@ -232,13 +233,9 @@ (defn- update-file* [{:keys [::db/conn ::wrk/executor] :as cfg} {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] - (let [;; Process the file data in the CLIMIT context; scheduling it - ;; to be executed on a separated executor for avoid to do the - ;; CPU intensive operation on vthread. - - update-fdata-fn (partial update-file-data cfg file changes skip-validate) - file (-> (climit/configure cfg :update-file/global) - (climit/run! update-fdata-fn executor))] + (let [;; Process the file data on separated thread for avoid to do + ;; the CPU intensive operation on vthread. + file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))] (db/insert! conn :file-change {:id (uuid/next) @@ -306,7 +303,6 @@ (fmg/migrate-file)) file) - ;; WARNING: this ruins performance; maybe we need to find ;; some other way to do general validation libs (when (and (or (contains? cf/flags :file-validation) diff --git a/backend/src/app/rpc/commands/fonts.clj b/backend/src/app/rpc/commands/fonts.clj index c19b8a285..0942da601 100644 --- a/backend/src/app/rpc/commands/fonts.clj +++ b/backend/src/app/rpc/commands/fonts.clj @@ -16,7 +16,7 @@ [app.loggers.webhooks :as-alias webhooks] [app.media :as media] [app.rpc :as-alias rpc] - [app.rpc.climit :as climit] + [app.rpc.climit :as-alias climit] [app.rpc.commands.files :as files] [app.rpc.commands.projects :as projects] [app.rpc.commands.teams :as teams] @@ -26,7 +26,8 @@ [app.storage :as sto] [app.util.services :as sv] [app.util.time :as dt] - [app.worker :as-alias wrk])) + [app.worker :as-alias wrk] + [promesa.exec :as px])) (def valid-weight #{100 200 300 400 500 600 700 800 900 950}) (def valid-style #{"normal" "italic"}) @@ -87,6 +88,8 @@ (sv/defmethod ::create-font-variant {::doc/added "1.18" + ::climit/id [[:process-font/by-profile ::rpc/profile-id] + [:process-font/global]] ::webhooks/event? true ::sm/params schema:create-font-variant} [cfg {:keys [::rpc/profile-id team-id] :as params}] @@ -100,7 +103,7 @@ (create-font-variant cfg (assoc params :profile-id profile-id)))))) (defn create-font-variant - [{:keys [::sto/storage ::db/conn] :as cfg} {:keys [data] :as params}] + [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}] (letfn [(generate-missing! [data] (let [data (media/run {:cmd :generate-fonts :input data})] (when (and (not (contains? data "font/otf")) @@ -152,9 +155,7 @@ :otf-file-id (:id otf) :ttf-file-id (:id ttf)}))] - (let [data (-> (climit/configure cfg :process-font/global) - (climit/run! (partial generate-missing! data) - (::wrk/executor cfg))) + (let [data (px/invoke! executor (partial generate-missing! data)) assets (persist-fonts-files! data) result (insert-font-variant! assets)] (vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys)))))) diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index a3dc357db..1bdcd3c50 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -27,7 +27,8 @@ [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [datoteka.io :as io])) + [datoteka.io :as io] + [promesa.exec :as px])) (def default-max-file-size (* 1024 1024 10)) ; 10 MiB @@ -56,20 +57,25 @@ :opt-un [::id])) (sv/defmethod ::upload-file-media-object - {::doc/added "1.17"} + {::doc/added "1.17" + ::climit/id [[:process-image/by-profile ::rpc/profile-id] + [:process-image/global]]} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}] (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] + (files/check-edition-permissions! pool profile-id file-id) (media/validate-media-type! content) (media/validate-media-size! content) - (let [object (db/run! cfg #(create-file-media-object % params)) - props {:name (:name params) - :file-id file-id - :is-local (:is-local params) - :size (:size content) - :mtype (:mtype content)}] - (with-meta object - {::audit/replace-props props})))) + + (db/run! cfg (fn [cfg] + (let [object (create-file-media-object cfg params) + props {:name (:name params) + :file-id file-id + :is-local (:is-local params) + :size (:size content) + :mtype (:mtype content)}] + (with-meta object + {::audit/replace-props props})))))) (defn- big-enough-for-thumbnail? "Checks if the provided image info is big enough for @@ -144,12 +150,10 @@ (assoc ::image (process-main-image info))))) (defn create-file-media-object - [{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg} + [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [id file-id is-local name content]}] - (let [result (-> (climit/configure cfg :process-image/global) - (climit/run! (partial process-image content) executor)) - + (let [result (px/invoke! executor (partial process-image content)) image (sto/put-object! storage (::image result)) thumb (when-let [params (::thumb result)] (sto/put-object! storage params))] @@ -183,7 +187,7 @@ [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (files/check-edition-permissions! pool profile-id file-id) - (db/run! cfg #(create-file-media-object-from-url % params)))) + (create-file-media-object-from-url cfg (assoc params :profile-id profile-id)))) (defn download-image [{:keys [::http/client]} uri] @@ -235,7 +239,16 @@ params (-> params (assoc :content content) (assoc :name (or name (:filename content))))] - (create-file-media-object cfg params))) + + ;; NOTE: we use the climit here in a dynamic invocation because we + ;; don't want saturate the process-image limit with IO (download + ;; of external image) + (-> cfg + (assoc ::climit/id [[:process-image/by-profile (:profile-id params)] + [:process-image/global]]) + (assoc ::climit/profile-id (:profile-id params)) + (assoc ::climit/label "create-file-media-object-from-url") + (climit/invoke! db/run! cfg create-file-media-object params)))) ;; --- Clone File Media object (Upload and create from url) diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index 5b814abe6..a2fa82ba4 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -28,7 +28,8 @@ [app.util.services :as sv] [app.util.time :as dt] [app.worker :as-alias wrk] - [cuerdas.core :as str])) + [cuerdas.core :as str] + [promesa.exec :as px])) (declare check-profile-existence!) (declare decode-row) @@ -137,25 +138,24 @@ [:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]])) (sv/defmethod ::update-profile-password - {:doc/added "1.0" + {::doc/added "1.0" ::sm/params schema:update-profile-password - ::sm/result :nil} + ::climit/id :auth/global} + [cfg {:keys [::rpc/profile-id password] :as params}] - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id password] :as params}] - (db/with-atomic [conn pool] - (let [cfg (assoc cfg ::db/conn conn) - profile (validate-password! cfg (assoc params :profile-id profile-id)) - session-id (::session/id params)] + (db/tx-run! cfg (fn [cfg] + (let [profile (validate-password! cfg (assoc params :profile-id profile-id)) + session-id (::session/id params)] - (when (= (str/lower (:email profile)) - (str/lower (:password params))) - (ex/raise :type :validation - :code :email-as-password - :hint "you can't use your email as password")) + (when (= (str/lower (:email profile)) + (str/lower (:password params))) + (ex/raise :type :validation + :code :email-as-password + :hint "you can't use your email as password")) - (update-profile-password! conn (assoc profile :password password)) - (invalidate-profile-session! cfg profile-id session-id) - nil))) + (update-profile-password! cfg (assoc profile :password password)) + (invalidate-profile-session! cfg profile-id session-id) + nil)))) (defn- invalidate-profile-session! "Removes all sessions except the current one." @@ -173,10 +173,10 @@ profile)) (defn update-profile-password! - [conn {:keys [id password] :as profile}] + [{:keys [::db/conn] :as cfg} {:keys [id password] :as profile}] (when-not (db/read-only? conn) (db/update! conn :profile - {:password (auth/derive-password password)} + {:password (derive-password cfg password)} {:id id}) nil)) @@ -203,6 +203,7 @@ (defn update-profile-photo [{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}] + (let [photo (upload-photo cfg params) profile (db/get-by-id pool :profile profile-id ::sql/for-update true)] @@ -241,8 +242,11 @@ (defn upload-photo [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}] - (let [params (-> (climit/configure cfg :process-image/global) - (climit/run! (partial generate-thumbnail! file) executor))] + (let [params (-> cfg + (assoc ::climit/id :process-image/global) + (assoc ::climit/label "upload-photo") + (assoc ::climit/executor executor) + (climit/invoke! generate-thumbnail! file))] (sto/put-object! storage params))) @@ -438,17 +442,13 @@ (into {} (filter (fn [[k _]] (simple-ident? k))) props)) (defn derive-password - [cfg password] + [{:keys [::wrk/executor]} password] (when password - (-> (climit/configure cfg :derive-password/global) - (climit/run! (partial auth/derive-password password) - (::wrk/executor cfg))))) + (px/invoke! executor (partial auth/derive-password password)))) (defn verify-password - [cfg password password-data] - (-> (climit/configure cfg :derive-password/global) - (climit/run! (partial auth/verify-password password password-data) - (::wrk/executor cfg)))) + [{:keys [::wrk/executor]} password password-data] + (px/invoke! executor (partial auth/verify-password password password-data))) (defn decode-row [{:keys [props] :as row}] diff --git a/backend/src/app/rpc/quotes.clj b/backend/src/app/rpc/quotes.clj index 4cdc3800d..3244bd03f 100644 --- a/backend/src/app/rpc/quotes.clj +++ b/backend/src/app/rpc/quotes.clj @@ -7,8 +7,10 @@ (ns app.rpc.quotes "Penpot resource usage quotes." (:require + [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.logging :as l] + [app.common.schema :as sm] [app.common.spec :as us] [app.config :as cf] [app.db :as db] @@ -23,21 +25,15 @@ ;; PUBLIC API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::conn ::db/pool-or-conn) -(s/def ::file-id ::us/uuid) -(s/def ::team-id ::us/uuid) -(s/def ::project-id ::us/uuid) -(s/def ::profile-id ::us/uuid) -(s/def ::incr (s/and int? pos?)) -(s/def ::target ::us/string) - -(s/def ::quote - (s/keys :req [::id ::profile-id] - :opt [::conn - ::team-id - ::project-id - ::file-id - ::incr])) +(def ^:private schema:quote + (sm/define + [:map {:title "Quote"} + [::team-id {:optional true} ::sm/uuid] + [::project-id {:optional true} ::sm/uuid] + [::file-id {:optional true} ::sm/uuid] + [::incr {:optional true} [:int {:min 0}]] + [::id :keyword] + [::profile-id ::sm/uuid]])) (def ^:private enabled (volatile! true)) @@ -52,15 +48,22 @@ (vswap! enabled (constantly false))) (defn check-quote! - [conn quote] - (us/assert! ::db/pool-or-conn conn) - (us/assert! ::quote quote) + [ds quote] + (dm/assert! + "expected valid quote map" + (sm/validate schema:quote quote)) + (when (contains? cf/flags :quotes) (when @enabled - (check-quote (assoc quote ::conn conn ::target (name (::id quote))))))) + ;; This approach add flexibility on how and where the + ;; check-quote! can be called (in or out of transaction) + (db/run! ds (fn [cfg] + (-> (merge cfg quote) + (assoc ::target (name (::id quote))) + (check-quote))))))) (defn- send-notification! - [{:keys [::conn] :as params}] + [{:keys [::db/conn] :as params}] (l/warn :hint "max quote reached" :target (::target params) :profile-id (some-> params ::profile-id str) @@ -93,7 +96,7 @@ :content content}]})))) (defn- generic-check! - [{:keys [::conn ::incr ::quote-sql ::count-sql ::default ::target] :or {incr 1} :as params}] + [{:keys [::db/conn ::incr ::quote-sql ::count-sql ::default ::target] :or {incr 1} :as params}] (let [quote (->> (db/exec! conn quote-sql) (map :quote) (reduce max (- Integer/MAX_VALUE))) @@ -347,7 +350,6 @@ (assoc ::count-sql [sql:get-comments-per-file file-id]) (generic-check!))) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; QUOTE: DEFAULT ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/rpc/retry.clj b/backend/src/app/rpc/retry.clj index bd9c3ea07..3745b9d8f 100644 --- a/backend/src/app/rpc/retry.clj +++ b/backend/src/app/rpc/retry.clj @@ -6,8 +6,8 @@ (ns app.rpc.retry (:require + [app.common.exceptions :as ex] [app.common.logging :as l] - [app.db :as db] [app.util.services :as sv]) (:import org.postgresql.util.PSQLException)) @@ -15,12 +15,29 @@ (defn conflict-exception? "Check if exception matches a insertion conflict on postgresql." [e] - (and (instance? PSQLException e) - (= "23505" (.getSQLState ^PSQLException e)))) + (when-let [cause (ex/instance? PSQLException e)] + (= "23505" (.getSQLState ^PSQLException cause)))) (def ^:private always-false (constantly false)) +(defn invoke! + [{:keys [::max-retries] :or {max-retries 3} :as cfg} f & args] + (loop [rnum 1] + (let [match? (get cfg ::when always-false) + result (try + (apply f cfg args) + (catch Throwable cause + (if (and (match? cause) (<= rnum max-retries)) + ::retry + (throw cause))))] + (if (= ::retry result) + (let [label (get cfg ::label "anonymous")] + (l/warn :hint "retrying operation" :label label :retry rnum) + (recur (inc rnum))) + result)))) + + (defn wrap-retry [_ f {:keys [::sv/name] :as mdata}] @@ -29,36 +46,10 @@ matches? (get mdata ::when always-false)] (l/dbg :hint "wrapping retry" :name name :max-retries max-retries) (fn [cfg params] - ((fn recursive-invoke [retry] - (try - (f cfg params) - (catch Throwable cause - (if (matches? cause) - (let [current-retry (inc retry)] - (l/wrn :hint "retrying operation" :retry current-retry :service name) - (if (<= current-retry max-retries) - (recursive-invoke current-retry) - (throw cause))) - (throw cause))))) 1))) + (-> cfg + (assoc ::max-retries max-retries) + (assoc ::when matches?) + (assoc ::label name) + (invoke! f params)))) f)) -(defn invoke - [{:keys [::db/conn ::max-retries] :or {max-retries 3} :as cfg} f & args] - (assert (db/connection? conn) "invalid database connection") - (loop [rnum 1] - (let [match? (get cfg ::when always-false) - result (let [spoint (db/savepoint conn)] - (try - (let [result (apply f cfg args)] - (db/release! conn spoint) - result) - (catch Throwable cause - (db/rollback! conn spoint) - (if (and (match? cause) (<= rnum max-retries)) - ::retry - (throw cause)))))] - (if (= ::retry result) - (let [label (get cfg ::label "anonymous")] - (l/warn :hint "retrying operation" :label label :retry rnum) - (recur (inc rnum))) - result)))) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 8073c40a7..ad08d5b62 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -71,6 +71,7 @@ :enable-email-verification :enable-smtp :enable-quotes + :enable-rpc-climit :enable-feature-fdata-pointer-map :enable-feature-fdata-objets-map :enable-feature-components-v2 diff --git a/common/src/app/common/exceptions.cljc b/common/src/app/common/exceptions.cljc index 2070986fe..5cceeb722 100644 --- a/common/src/app/common/exceptions.cljc +++ b/common/src/app/common/exceptions.cljc @@ -74,10 +74,9 @@ [class cause] (loop [cause cause] (if (c/instance? class cause) - true - (if-let [cause (ex-cause cause)] - (recur cause) - false))))) + cause + (when-let [cause (ex-cause cause)] + (recur cause)))))) ;; NOTE: idea for a macro for error handling ;; (pu/try-let [cause (p/await (get-object-data backend object))] diff --git a/common/src/app/common/logging.cljc b/common/src/app/common/logging.cljc index fe7a0e8f5..d7780ef70 100644 --- a/common/src/app/common/logging.cljc +++ b/common/src/app/common/logging.cljc @@ -319,6 +319,12 @@ ::message (delay ~message)}) nil))) +(defmacro log + [level & params] + `(do + (log! ::logger ~(str *ns*) ::level ~level ~@params) + nil)) + (defmacro info [& params] `(do diff --git a/frontend/src/app/main/ui/dashboard/pin_button.cljs b/frontend/src/app/main/ui/dashboard/pin_button.cljs index 9319be947..be27a05f0 100644 --- a/frontend/src/app/main/ui/dashboard/pin_button.cljs +++ b/frontend/src/app/main/ui/dashboard/pin_button.cljs @@ -12,15 +12,21 @@ (:require [app.main.ui.icons :as i] [app.util.i18n :as i18n :refer [tr]] + [app.util.object :as obj] [rumext.v2 :as mf])) -(def pin-icon (icon-xref :pin-refactor (stl/css :icon))) +(def ^:private pin-icon + (icon-xref :pin-refactor (stl/css :icon))) (mf/defc pin-button* {::mf/props :obj} [{:keys [aria-label is-pinned class] :as props}] (let [aria-label (or aria-label (tr "dashboard.pin-unpin")) class (dm/str (or class "") " " (stl/css-case :button true :button-active is-pinned)) - props (mf/spread-props props {:class class - :aria-label aria-label})] - [:> "button" props pin-icon])) \ No newline at end of file + + props (-> (obj/clone props) + (obj/unset! "isPinned") + (obj/set! "className" class) + (obj/set! "aria-label" aria-label))] + + [:> "button" props pin-icon]))