From aed6a8a5ff243cd0ef668d5b7438ba54e0241490 Mon Sep 17 00:00:00 2001 From: "alonso.torres" Date: Mon, 21 Feb 2022 16:56:41 +0100 Subject: [PATCH 1/3] :bug: Fix problem with double click --- CHANGES.md | 1 + .../main/ui/workspace/viewport/actions.cljs | 26 +++++++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6b169a44f..c9791e493 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -36,6 +36,7 @@ - Fixed handoff shadow type text [Taiga #2717](https://tree.taiga.io/project/penpot/issue/2717) - Fixed components get "dirty" marker when moved [Taiga #2764](https://tree.taiga.io/project/penpot/issue/2764) - Fixed cannot align objects in a group that is not part of a frame [Taiga #2762](https://tree.taiga.io/project/penpot/issue/2762) +- Fix problem with double click on exit path editing [Taiga #2906](https://tree.taiga.io/project/penpot/issue/2906) ### :arrow_up: Deps updates diff --git a/frontend/src/app/main/ui/workspace/viewport/actions.cljs b/frontend/src/app/main/ui/workspace/viewport/actions.cljs index c454d3698..0d6ca0ba8 100644 --- a/frontend/src/app/main/ui/workspace/viewport/actions.cljs +++ b/frontend/src/app/main/ui/workspace/viewport/actions.cljs @@ -186,19 +186,23 @@ (st/emit! (ms/->MouseEvent :double-click ctrl? shift? alt?)) - (when (and (not drawing-path?) shape) - (cond frame? - (st/emit! (dw/select-shape id shift?)) + ;; Emit asynchronously so the double click to exit shapes won't break + (timers/schedule + #(when (and (not drawing-path?) shape) + (cond + frame? + (st/emit! (dw/select-shape id shift?)) - (and group? (> (count @hover-ids) 1)) - (let [selected (get objects (second @hover-ids))] - (reset! hover selected) - (reset! hover-ids (into [] (rest @hover-ids))) - (st/emit! (dw/select-shape (:id selected)))) + (and group? (> (count @hover-ids) 1)) + (let [selected (get objects (second @hover-ids))] + (reset! hover selected) + (reset! hover-ids (into [] (rest @hover-ids))) - (not= id edition) - (st/emit! (dw/select-shape id) - (dw/start-editing-selected)))))))) + (st/emit! (dw/select-shape (:id selected)))) + + (not= id edition) + (st/emit! (dw/select-shape id) + (dw/start-editing-selected))))))))) (defn on-context-menu [hover hover-ids] From d24f16563fb99b515ba2d9f5101706e12af00913 Mon Sep 17 00:00:00 2001 From: "alonso.torres" Date: Mon, 21 Feb 2022 17:24:45 +0100 Subject: [PATCH 2/3] :sparkles: Use remove to delete guides --- frontend/src/app/main/data/workspace.cljs | 16 ++++++++--- .../src/app/main/data/workspace/guides.cljs | 27 +++++++++++++++++++ .../app/main/data/workspace/shortcuts.cljs | 5 ++-- .../app/main/ui/workspace/context_menu.cljs | 5 ++-- .../main/ui/workspace/viewport/guides.cljs | 4 ++- 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/frontend/src/app/main/data/workspace.cljs b/frontend/src/app/main/data/workspace.cljs index ad4339d28..ebbb1c692 100644 --- a/frontend/src/app/main/data/workspace.cljs +++ b/frontend/src/app/main/data/workspace.cljs @@ -747,14 +747,21 @@ ;; --- Delete Selected -(def delete-selected +(defn delete-selected "Deselect all and remove all selected shapes." + [] (ptk/reify ::delete-selected ptk/WatchEvent (watch [_ state _] - (let [selected (wsh/lookup-selected state)] - (rx/of (dwc/delete-shapes selected) - (dws/deselect-all)))))) + (let [selected (wsh/lookup-selected state) + hover-guides (get-in state [:workspace-guides :hover])] + (cond + (d/not-empty? selected) + (rx/of (dwc/delete-shapes selected) + (dws/deselect-all)) + + (d/not-empty? hover-guides) + (rx/of (dwgu/remove-guides hover-guides))))))) ;; --- Shape Vertical Ordering @@ -2068,4 +2075,5 @@ ;; Guides (d/export dwgu/update-guides) (d/export dwgu/remove-guide) +(d/export dwgu/set-hover-guide) diff --git a/frontend/src/app/main/data/workspace/guides.cljs b/frontend/src/app/main/data/workspace/guides.cljs index 473d03df0..3fce84bf2 100644 --- a/frontend/src/app/main/data/workspace/guides.cljs +++ b/frontend/src/app/main/data/workspace/guides.cljs @@ -41,6 +41,12 @@ (defn remove-guide [guide] (us/verify ::csp/guide guide) (ptk/reify ::remove-guide + ptk/UpdateEvent + (update [_ state] + (let [sdisj (fnil disj #{})] + (-> state + (update-in [:workspace-guides :hover] sdisj (:id guide))))) + ptk/WatchEvent (watch [it state _] (let [page (wsh/lookup-page state) @@ -53,6 +59,16 @@ (pcb/set-page-option :guides new-guides))] (rx/of (dwc/commit-changes changes)))))) +(defn remove-guides + [ids] + (ptk/reify ::remove-guides + ptk/WatchEvent + (watch [_ state _] + (let [page (wsh/lookup-page state) + guides (get-in page [:options :guides] {}) + guides (-> (select-keys guides ids) (vals))] + (rx/from (->> guides (mapv #(remove-guide %)))))))) + (defn move-frame-guides "Move guides that are inside a frame when that frame is moved" [ids] @@ -86,3 +102,14 @@ (filter (comp frame-ids? :frame-id)) (map build-move-event) (rx/from)))))) + +(defn set-hover-guide + [id hover?] + (ptk/reify ::set-hover-guide + ptk/UpdateEvent + (update [_ state] + (let [sconj (fnil conj #{}) + sdisj (fnil disj #{})] + (if hover? + (update-in state [:workspace-guides :hover] sconj id) + (update-in state [:workspace-guides :hover] sdisj id)))))) diff --git a/frontend/src/app/main/data/workspace/shortcuts.cljs b/frontend/src/app/main/data/workspace/shortcuts.cljs index ec42b727a..64fcba629 100644 --- a/frontend/src/app/main/data/workspace/shortcuts.cljs +++ b/frontend/src/app/main/data/workspace/shortcuts.cljs @@ -190,7 +190,8 @@ :cut {:tooltip (ds/meta "X") :command (ds/c-mod "x") - :fn #(st/emit! (dw/copy-selected) dw/delete-selected)} + :fn #(st/emit! (dw/copy-selected) + (dw/delete-selected))} :paste {:tooltip (ds/meta "V") :disabled true @@ -199,7 +200,7 @@ :delete {:tooltip (ds/supr) :command ["del" "backspace"] - :fn #(st/emit! dw/delete-selected)} + :fn #(st/emit! (dw/delete-selected))} :bring-forward {:tooltip (ds/meta ds/up-arrow) :command (ds/c-mod "up") diff --git a/frontend/src/app/main/ui/workspace/context_menu.cljs b/frontend/src/app/main/ui/workspace/context_menu.cljs index 5f07b7dc9..f020dc1e4 100644 --- a/frontend/src/app/main/ui/workspace/context_menu.cljs +++ b/frontend/src/app/main/ui/workspace/context_menu.cljs @@ -103,7 +103,8 @@ (mf/defc context-menu-edit [] (let [do-copy (st/emitf (dw/copy-selected)) - do-cut (st/emitf (dw/copy-selected) dw/delete-selected) + do-cut (st/emitf (dw/copy-selected) + (dw/delete-selected)) do-paste (st/emitf dw/paste) do-duplicate (st/emitf (dw/duplicate-selected false))] [:* @@ -406,7 +407,7 @@ (mf/defc context-menu-delete [] - (let [do-delete (st/emitf dw/delete-selected)] + (let [do-delete (st/emitf (dw/delete-selected))] [:& menu-entry {:title (tr "workspace.shape.menu.delete") :shortcut (sc/get-tooltip :delete) :on-click do-delete}])) diff --git a/frontend/src/app/main/ui/workspace/viewport/guides.cljs b/frontend/src/app/main/ui/workspace/viewport/guides.cljs index 2a179628c..6aebf468a 100644 --- a/frontend/src/app/main/ui/workspace/viewport/guides.cljs +++ b/frontend/src/app/main/ui/workspace/viewport/guides.cljs @@ -36,7 +36,7 @@ (defn use-guide "Hooks to support drag/drop for existing guides and new guides" - [on-guide-change get-hover-frame zoom {:keys [position axis frame-id]}] + [on-guide-change get-hover-frame zoom {:keys [id position axis frame-id]}] (let [dragging-ref (mf/use-ref false) start-ref (mf/use-ref nil) start-pos-ref (mf/use-ref nil) @@ -52,11 +52,13 @@ on-pointer-enter (mf/use-callback (fn [] + (st/emit! (dw/set-hover-guide id true)) (swap! state assoc :hover true))) on-pointer-leave (mf/use-callback (fn [] + (st/emit! (dw/set-hover-guide id false)) (swap! state assoc :hover false))) on-pointer-down From 7cf27ac86dd84d0ec3b244fd18a99a7c564c1be1 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 18 Feb 2022 18:01:21 +0100 Subject: [PATCH 3/3] :recycle: Refactor general resource and concurrency model on backend --- backend/src/app/config.clj | 20 +- backend/src/app/db.clj | 80 ++--- backend/src/app/http.clj | 58 ++-- backend/src/app/http/assets.clj | 62 ++-- backend/src/app/http/debug.clj | 31 +- backend/src/app/http/doc.clj | 11 +- backend/src/app/http/middleware.clj | 215 ++++++------- backend/src/app/http/oauth.clj | 68 ++-- backend/src/app/http/session.clj | 21 +- backend/src/app/http/websocket.clj | 47 ++- backend/src/app/main.clj | 84 +++-- backend/src/app/metrics.clj | 296 +++++++----------- backend/src/app/msgbus.clj | 26 +- backend/src/app/rpc.clj | 219 ++++++++----- backend/src/app/rpc/mutations/comments.clj | 5 +- backend/src/app/rpc/mutations/files.clj | 11 +- backend/src/app/rpc/mutations/ldap.clj | 2 +- backend/src/app/rpc/mutations/media.clj | 7 + backend/src/app/rpc/mutations/profile.clj | 32 +- backend/src/app/rpc/mutations/teams.clj | 2 +- .../src/app/rpc/mutations/verify_token.clj | 12 +- backend/src/app/rpc/queries/profile.clj | 3 +- backend/src/app/rpc/retry.clj | 52 +++ backend/src/app/rpc/rlimit.clj | 67 ++++ backend/src/app/util/async.clj | 8 +- backend/src/app/util/retry.clj | 43 --- backend/src/app/util/rlimit.clj | 36 --- backend/src/app/util/websocket.clj | 17 +- backend/src/app/worker.clj | 148 ++++----- backend/test/app/test_helpers.clj | 2 +- common/deps.edn | 2 +- common/src/app/common/data.cljc | 27 +- 32 files changed, 917 insertions(+), 797 deletions(-) create mode 100644 backend/src/app/rpc/retry.clj create mode 100644 backend/src/app/rpc/rlimit.clj delete mode 100644 backend/src/app/util/retry.clj delete mode 100644 backend/src/app/util/rlimit.clj diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 7083b490e..c0aa7ddc9 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -41,9 +41,7 @@ data)) (def defaults - {:http-server-port 6060 - :http-server-host "0.0.0.0" - :host "devenv" + {:host "devenv" :tenant "dev" :database-uri "postgresql://postgres/penpot" :database-username "penpot" @@ -106,6 +104,10 @@ (s/def ::file-change-snapshot-every ::us/integer) (s/def ::file-change-snapshot-timeout ::dt/duration) +(s/def ::default-executor-parallelism ::us/integer) +(s/def ::blocking-executor-parallelism ::us/integer) +(s/def ::worker-executor-parallelism ::us/integer) + (s/def ::secret-key ::us/string) (s/def ::allow-demo-users ::us/boolean) (s/def ::assets-path ::us/string) @@ -114,6 +116,9 @@ (s/def ::database-uri ::us/string) (s/def ::database-username (s/nilable ::us/string)) (s/def ::database-readonly ::us/boolean) +(s/def ::database-min-pool-size ::us/integer) +(s/def ::database-max-pool-size ::us/integer) + (s/def ::default-blob-version ::us/integer) (s/def ::error-report-webhook ::us/string) (s/def ::user-feedback-destination ::us/string) @@ -136,6 +141,8 @@ (s/def ::host ::us/string) (s/def ::http-server-port ::us/integer) (s/def ::http-server-host ::us/string) +(s/def ::http-server-min-threads ::us/integer) +(s/def ::http-server-max-threads ::us/integer) (s/def ::http-session-idle-max-age ::dt/duration) (s/def ::http-session-updater-batch-max-age ::dt/duration) (s/def ::http-session-updater-batch-max-size ::us/integer) @@ -207,8 +214,13 @@ ::database-uri ::database-username ::database-readonly + ::database-min-pool-size + ::database-max-pool-size ::default-blob-version ::error-report-webhook + ::default-executor-parallelism + ::blocking-executor-parallelism + ::worker-executor-parallelism ::file-change-snapshot-every ::file-change-snapshot-timeout ::user-feedback-destination @@ -231,6 +243,8 @@ ::host ::http-server-host ::http-server-port + ::http-server-max-threads + ::http-server-min-threads ::http-session-idle-max-age ::http-session-updater-batch-max-age ::http-session-updater-batch-max-size diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index 479a8fdbc..a45fcd90f 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -47,13 +47,12 @@ ;; Initialization ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare instrument-jdbc!) (declare apply-migrations!) (s/def ::connection-timeout ::us/integer) -(s/def ::max-pool-size ::us/integer) +(s/def ::max-size ::us/integer) +(s/def ::min-size ::us/integer) (s/def ::migrations map?) -(s/def ::min-pool-size ::us/integer) (s/def ::name keyword?) (s/def ::password ::us/string) (s/def ::read-only ::us/boolean) @@ -62,19 +61,39 @@ (s/def ::validation-timeout ::us/integer) (defmethod ig/pre-init-spec ::pool [_] - (s/keys :req-un [::uri ::name ::username ::password] - :opt-un [::min-pool-size - ::max-pool-size + (s/keys :req-un [::uri ::name + ::min-size + ::max-size ::connection-timeout - ::validation-timeout - ::migrations + ::validation-timeout] + :opt-un [::migrations + ::username + ::password ::mtx/metrics ::read-only])) +(defmethod ig/prep-key ::pool + [_ cfg] + (merge {:name :main + :min-size 0 + :max-size 30 + :connection-timeout 10000 + :validation-timeout 10000 + :idle-timeout 120000 ; 2min + :max-lifetime 1800000 ; 30m + :read-only false} + (d/without-nils cfg))) + (defmethod ig/init-key ::pool - [_ {:keys [migrations metrics name read-only] :as cfg}] - (l/info :action "initialize connection pool" :name (d/name name) :uri (:uri cfg)) - (some-> metrics :registry instrument-jdbc!) + [_ {:keys [migrations name read-only] :as cfg}] + (l/info :hint "initialize connection pool" + :name (d/name name) + :uri (:uri cfg) + :read-only read-only + :with-credentials (and (contains? cfg :username) + (contains? cfg :password)) + :min-size (:min-size cfg) + :max-size (:max-size cfg)) (let [pool (create-pool cfg)] (when-not read-only @@ -85,16 +104,6 @@ [_ pool] (.close ^HikariDataSource pool)) -(defn- instrument-jdbc! - [registry] - (mtx/instrument-vars! - [#'next.jdbc/execute-one! - #'next.jdbc/execute!] - {:registry registry - :type :counter - :name "database_query_total" - :help "An absolute counter of database queries."})) - (defn- apply-migrations! [pool migrations] (with-open [conn ^AutoCloseable (open pool)] @@ -111,22 +120,19 @@ "SET idle_in_transaction_session_timeout = 300000;")) (defn- create-datasource-config - [{:keys [metrics read-only] :or {read-only false} :as cfg}] - (let [dburi (:uri cfg) - username (:username cfg) - password (:password cfg) - config (HikariConfig.)] + [{:keys [metrics uri] :as cfg}] + (let [config (HikariConfig.)] (doto config - (.setJdbcUrl (str "jdbc:" dburi)) - (.setPoolName (d/name (:name cfg))) + (.setJdbcUrl (str "jdbc:" uri)) + (.setPoolName (d/name (:name cfg))) (.setAutoCommit true) - (.setReadOnly read-only) - (.setConnectionTimeout (:connection-timeout cfg 10000)) ;; 10seg - (.setValidationTimeout (:validation-timeout cfg 10000)) ;; 10seg - (.setIdleTimeout 120000) ;; 2min - (.setMaxLifetime 1800000) ;; 30min - (.setMinimumIdle (:min-pool-size cfg 0)) - (.setMaximumPoolSize (:max-pool-size cfg 50)) + (.setReadOnly (:read-only cfg)) + (.setConnectionTimeout (:connection-timeout cfg)) + (.setValidationTimeout (:validation-timeout cfg)) + (.setIdleTimeout (:idle-timeout cfg)) + (.setMaxLifetime (:max-lifetime cfg)) + (.setMinimumIdle (:min-size cfg)) + (.setMaximumPoolSize (:max-size cfg)) (.setConnectionInitSql initsql) (.setInitializationFailTimeout -1)) @@ -136,8 +142,8 @@ (PrometheusMetricsTrackerFactory.) (.setMetricsTrackerFactory config))) - (when username (.setUsername config username)) - (when password (.setPassword config password)) + (some->> ^String (:username cfg) (.setUsername config)) + (some->> ^String (:password cfg) (.setPassword config)) config)) diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 594916a06..b49d8bf15 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -10,6 +10,7 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] + [app.config :as cf] [app.http.doc :as doc] [app.http.errors :as errors] [app.http.middleware :as middleware] @@ -24,19 +25,30 @@ (declare wrap-router) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; HTTP SERVER +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + (s/def ::handler fn?) (s/def ::router some?) (s/def ::port ::us/integer) (s/def ::host ::us/string) (s/def ::name ::us/string) - -(defmethod ig/pre-init-spec ::server [_] - (s/keys :req-un [::port] - :opt-un [::name ::mtx/metrics ::router ::handler ::host])) +(s/def ::max-threads ::cf/http-server-max-threads) +(s/def ::min-threads ::cf/http-server-min-threads) (defmethod ig/prep-key ::server [_ cfg] - (merge {:name "http"} (d/without-nils cfg))) + (merge {:name "http" + :min-threads 4 + :max-threads 60 + :port 6060 + :host "0.0.0.0"} + (d/without-nils cfg))) + +(defmethod ig/pre-init-spec ::server [_] + (s/keys :req-un [::port ::host ::name ::min-threads ::max-threads] + :opt-un [::mtx/metrics ::router ::handler])) (defn- instrument-metrics [^Server server metrics] @@ -48,15 +60,22 @@ (defmethod ig/init-key ::server [_ {:keys [handler router port name metrics host] :as opts}] - (l/info :msg "starting http server" :port port :host host :name name) - (let [options {:http/port port :http/host host} + (l/info :hint "starting http server" + :port port :host host :name name + :min-threads (:min-threads opts) + :max-threads (:max-threads opts)) + (let [options {:http/port port + :http/host host + :thread-pool/max-threads (:max-threads opts) + :thread-pool/min-threads (:min-threads opts) + :ring/async true} handler (cond (fn? handler) handler (some? router) (wrap-router router) :else (ex/raise :type :internal :code :invalid-argument :hint "Missing `handler` or `router` option.")) - server (-> (yt/server handler options) + server (-> (yt/server handler (d/without-nils options)) (cond-> metrics (instrument-metrics metrics)))] (assoc opts :server (yt/start! server)))) @@ -70,20 +89,20 @@ (let [default (rr/routes (rr/create-resource-handler {:path "/"}) (rr/create-default-handler)) - options {:middleware [middleware/server-timing]} + options {:middleware [middleware/wrap-server-timing] + :inject-match? false + :inject-router? false} handler (rr/ring-handler router default options)] - (fn [request] - (try - (handler request) - (catch Throwable e - (l/error :hint "unexpected error processing request" - ::l/context (errors/get-error-context request e) - :query-string (:query-string request) - :cause e) - {:status 500 :body "internal server error"}))))) + (fn [request respond _] + (handler request respond (fn [cause] + (l/error :hint "unexpected error processing request" + ::l/context (errors/get-error-context request cause) + :query-string (:query-string request) + :cause cause) + (respond {:status 500 :body "internal server error"})))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Http Router +;; HTTP ROUTER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (s/def ::rpc map?) @@ -145,7 +164,6 @@ [middleware/multipart-params] [middleware/keyword-params] [middleware/format-response-body] - [middleware/etag] [middleware/parse-request-body] [middleware/errors errors/handle] [middleware/cookies]]} diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index 550bfcdfb..439b9f32e 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -13,9 +13,12 @@ [app.db :as db] [app.metrics :as mtx] [app.storage :as sto] + [app.util.async :as async] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] - [integrant.core :as ig])) + [integrant.core :as ig] + [promesa.core :as p])) (def ^:private cache-max-age (dt/duration {:hours 24})) @@ -69,29 +72,38 @@ :body ""})))) (defn- generic-handler - [{:keys [storage] :as cfg} _request id] - (let [obj (sto/get-object storage id)] - (if obj - (serve-object cfg obj) - {:status 404 :body ""}))) + [{:keys [storage executor] :as cfg} request kf] + (async/with-dispatch executor + (let [id (get-in request [:path-params :id]) + mobj (get-file-media-object storage id) + obj (sto/get-object storage (kf mobj))] + (if obj + (serve-object cfg obj) + {:status 404 :body ""})))) (defn objects-handler - [cfg request] - (let [id (get-in request [:path-params :id])] - (generic-handler cfg request (coerce-id id)))) + [{:keys [storage executor] :as cfg} request respond raise] + (-> (async/with-dispatch executor + (let [id (get-in request [:path-params :id]) + id (coerce-id id) + obj (sto/get-object storage id)] + (if obj + (serve-object cfg obj) + {:status 404 :body ""}))) + (p/then respond) + (p/catch raise))) (defn file-objects-handler - [{:keys [storage] :as cfg} request] - (let [id (get-in request [:path-params :id]) - mobj (get-file-media-object storage id)] - (generic-handler cfg request (:media-id mobj)))) + [cfg request respond raise] + (-> (generic-handler cfg request :media-id) + (p/then respond) + (p/catch raise))) (defn file-thumbnails-handler - [{:keys [storage] :as cfg} request] - (let [id (get-in request [:path-params :id]) - mobj (get-file-media-object storage id)] - (generic-handler cfg request (or (:thumbnail-id mobj) (:media-id mobj))))) - + [cfg request respond raise] + (-> (generic-handler cfg request #(or (:thumbnail-id %) (:media-id %))) + (p/then respond) + (p/catch raise))) ;; --- Initialization @@ -101,10 +113,16 @@ (s/def ::signature-max-age ::dt/duration) (defmethod ig/pre-init-spec ::handlers [_] - (s/keys :req-un [::storage ::mtx/metrics ::assets-path ::cache-max-age ::signature-max-age])) + (s/keys :req-un [::storage + ::wrk/executor + ::mtx/metrics + ::assets-path + ::cache-max-age + ::signature-max-age])) (defmethod ig/init-key ::handlers [_ cfg] - {:objects-handler #(objects-handler cfg %) - :file-objects-handler #(file-objects-handler cfg %) - :file-thumbnails-handler #(file-thumbnails-handler cfg %)}) + {:objects-handler (partial objects-handler cfg) + :file-objects-handler (partial file-objects-handler cfg) + :file-thumbnails-handler (partial file-thumbnails-handler cfg)}) + diff --git a/backend/src/app/http/debug.clj b/backend/src/app/http/debug.clj index e31040618..16190e706 100644 --- a/backend/src/app/http/debug.clj +++ b/backend/src/app/http/debug.clj @@ -14,14 +14,18 @@ [app.db :as db] [app.rpc.mutations.files :as m.files] [app.rpc.queries.profile :as profile] + [app.util.async :as async] [app.util.blob :as blob] [app.util.template :as tmpl] [app.util.time :as dt] + [app.worker :as wrk] [clojure.java.io :as io] + [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.core :as fs] [fipp.edn :as fpp] - [integrant.core :as ig])) + [integrant.core :as ig] + [promesa.core :as p])) ;; (selmer.parser/cache-off!) @@ -201,12 +205,23 @@ (db/exec-one! conn ["select count(*) as count from server_prop;"]) {:status 200 :body "Ok"})) +(defn- wrap-async + [{:keys [executor] :as cfg} f] + (fn [request respond raise] + (-> (async/with-dispatch executor + (f cfg request)) + (p/then respond) + (p/catch raise)))) + +(defmethod ig/pre-init-spec ::handlers [_] + (s/keys :req-un [::db/pool ::wrk/executor])) + (defmethod ig/init-key ::handlers [_ cfg] - {:index (partial index cfg) - :health-check (partial health-check cfg) - :retrieve-file-data (partial retrieve-file-data cfg) - :retrieve-file-changes (partial retrieve-file-changes cfg) - :retrieve-error (partial retrieve-error cfg) - :retrieve-error-list (partial retrieve-error-list cfg) - :upload-file-data (partial upload-file-data cfg)}) + {:index (wrap-async cfg index) + :health-check (wrap-async cfg health-check) + :retrieve-file-data (wrap-async cfg retrieve-file-data) + :retrieve-file-changes (wrap-async cfg retrieve-file-changes) + :retrieve-error (wrap-async cfg retrieve-error) + :retrieve-error-list (wrap-async cfg retrieve-error-list) + :upload-file-data (wrap-async cfg upload-file-data)}) diff --git a/backend/src/app/http/doc.clj b/backend/src/app/http/doc.clj index 29796a117..07f63bb04 100644 --- a/backend/src/app/http/doc.clj +++ b/backend/src/app/http/doc.clj @@ -46,8 +46,9 @@ [rpc] (let [context (prepare-context rpc)] (if (contains? cf/flags :backend-api-doc) - (fn [_] - {:status 200 - :body (-> (io/resource "api-doc.tmpl") - (tmpl/render context))}) - (constantly {:status 404 :body ""})))) + (fn [_ respond _] + (respond {:status 200 + :body (-> (io/resource "api-doc.tmpl") + (tmpl/render context))})) + (fn [_ respond _] + (respond {:status 404 :body ""}))))) diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index 153ddc727..c84413e30 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -10,8 +10,6 @@ [app.common.transit :as t] [app.config :as cf] [app.util.json :as json] - [buddy.core.codecs :as bc] - [buddy.core.hash :as bh] [ring.core.protocols :as rp] [ring.middleware.cookies :refer [wrap-cookies]] [ring.middleware.keyword-params :refer [wrap-keyword-params]] @@ -21,13 +19,15 @@ (defn wrap-server-timing [handler] - (let [seconds-from #(float (/ (- (System/nanoTime) %) 1000000000))] - (fn [request] - (let [start (System/nanoTime) - response (handler request)] - (update response :headers - (fn [headers] - (assoc headers "Server-Timing" (str "total;dur=" (seconds-from start))))))))) + (letfn [(get-age [start] + (float (/ (- (System/nanoTime) start) 1000000000))) + + (update-headers [headers start] + (assoc headers "Server-Timing" (str "total;dur=" (get-age start))))] + + (fn [request respond raise] + (let [start (System/nanoTime)] + (handler request #(respond (update % :headers update-headers start)) raise))))) (defn wrap-parse-request-body [handler] @@ -36,32 +36,40 @@ (t/read! reader))) (parse-json [body] - (json/read body))] - (fn [{:keys [headers body] :as request}] + (json/read body)) + + (handle-request [{:keys [headers body] :as request}] + (let [ctype (get headers "content-type")] + (case ctype + "application/transit+json" + (let [params (parse-transit body)] + (-> request + (assoc :body-params params) + (update :params merge params))) + + "application/json" + (let [params (parse-json body)] + (-> request + (assoc :body-params params) + (update :params merge params))) + + request))) + + (handle-exception [cause] + (let [data {:type :validation + :code :unable-to-parse-request-body + :hint "malformed params"}] + (l/error :hint (ex-message cause) :cause cause) + {:status 400 + :headers {"content-type" "application/transit+json"} + :body (t/encode-str data {:type :json-verbose})}))] + + (fn [request respond raise] (try - (let [ctype (get headers "content-type")] - (handler (case ctype - "application/transit+json" - (let [params (parse-transit body)] - (-> request - (assoc :body-params params) - (update :params merge params))) - - "application/json" - (let [params (parse-json body)] - (-> request - (assoc :body-params params) - (update :params merge params))) - - request))) - (catch Exception e - (let [data {:type :validation - :code :unable-to-parse-request-body - :hint "malformed params"}] - (l/error :hint (ex-message e) :cause e) - {:status 400 - :headers {"content-type" "application/transit+json"} - :body (t/encode-str data {:type :json-verbose})})))))) + (let [request (handle-request request)] + (handler request respond raise)) + (catch Exception cause + (respond (handle-exception cause))))))) (def parse-request-body {:name ::parse-request-body @@ -81,48 +89,50 @@ (def ^:const buffer-size (:http/output-buffer-size yt/base-defaults)) -(defn- transit-streamable-body - [data opts] - (reify rp/StreamableResponseBody - (write-body-to-stream [_ _ output-stream] - ;; Use the same buffer as jetty output buffer size - (try - (with-open [bos (buffered-output-stream output-stream buffer-size)] - (let [tw (t/writer bos opts)] - (t/write! tw data))) - (catch org.eclipse.jetty.io.EofException _cause - ;; Do nothing, EOF means client closes connection abruptly - nil) - (catch Throwable cause - (l/warn :hint "unexpected error on encoding response" - :cause cause)))))) - -(defn- impl-format-response-body - [response {:keys [query-params] :as request}] - (let [body (:body response) - opts {:type (if (contains? query-params "transit_verbose") :json-verbose :json)}] - - (cond - (:ws response) - response - - (coll? body) - (-> response - (update :headers assoc "content-type" "application/transit+json") - (assoc :body (transit-streamable-body body opts))) - - (nil? body) - (assoc response :status 204 :body "") - - :else - response))) - -(defn- wrap-format-response-body +(defn wrap-format-response-body [handler] - (fn [request] - (let [response (handler request)] - (cond-> response - (map? response) (impl-format-response-body request))))) + (letfn [(transit-streamable-body [data opts] + (reify rp/StreamableResponseBody + (write-body-to-stream [_ _ output-stream] + ;; Use the same buffer as jetty output buffer size + (try + (with-open [bos (buffered-output-stream output-stream buffer-size)] + (let [tw (t/writer bos opts)] + (t/write! tw data))) + (catch org.eclipse.jetty.io.EofException _cause + ;; Do nothing, EOF means client closes connection abruptly + nil) + (catch Throwable cause + (l/warn :hint "unexpected error on encoding response" + :cause cause)))))) + + (impl-format-response-body [response {:keys [query-params] :as request}] + (let [body (:body response) + opts {:type (if (contains? query-params "transit_verbose") :json-verbose :json)}] + (cond + (:ws response) + response + + (coll? body) + (-> response + (update :headers assoc "content-type" "application/transit+json") + (assoc :body (transit-streamable-body body opts))) + + (nil? body) + (assoc response :status 204 :body "") + + :else + response))) + + (handle-response [response request] + (cond-> response + (map? response) (impl-format-response-body request)))] + + (fn [request respond raise] + (handler request + (fn [response] + (respond (handle-response response request))) + raise)))) (def format-response-body {:name ::format-response-body @@ -130,11 +140,9 @@ (defn wrap-errors [handler on-error] - (fn [request] - (try - (handler request) - (catch Throwable e - (on-error e request))))) + (fn [request respond _] + (handler request respond (fn [cause] + (-> cause (on-error request) respond))))) (def errors {:name ::errors @@ -160,41 +168,7 @@ {:name ::server-timing :compile (constantly wrap-server-timing)}) -(defn wrap-etag - [handler] - (letfn [(encode [data] - (when (string? data) - (str "W/\"" (-> data bh/blake2b-128 bc/bytes->hex) "\"")))] - (fn [{method :request-method headers :headers :as request}] - (cond-> (handler request) - (= :get method) - (as-> $ (if-let [etag (-> $ :body meta :etag encode)] - (cond-> (update $ :headers assoc "etag" etag) - (= etag (get headers "if-none-match")) - (-> (assoc :body "") - (assoc :status 304))) - $)))))) - -(def etag - {:name ::etag - :compile (constantly wrap-etag)}) - -(defn activity-logger - [handler] - (let [logger "penpot.profile-activity"] - (fn [{:keys [headers] :as request}] - (let [ip-addr (get headers "x-forwarded-for") - profile-id (:profile-id request) - qstring (:query-string request)] - (l/info ::l/async true - ::l/logger logger - :ip-addr ip-addr - :profile-id profile-id - :uri (str (:uri request) (when qstring (str "?" qstring))) - :method (name (:request-method request))) - (handler request))))) - -(defn- wrap-cors +(defn wrap-cors [handler] (if-not (contains? cf/flags :cors) handler @@ -209,12 +183,15 @@ (assoc "access-control-allow-credentials" "true") (assoc "access-control-expose-headers" "x-requested-with, content-type, cookie") (assoc "access-control-allow-headers" "x-frontend-version, content-type, accept, x-requested-width"))))))] - (fn [request] + (fn [request respond raise] (if (= (:request-method request) :options) (-> {:status 200 :body ""} - (add-cors-headers request)) - (let [response (handler request)] - (add-cors-headers response request))))))) + (add-cors-headers request) + (respond)) + (handler request + (fn [response] + (respond (add-cors-headers response request))) + raise)))))) (def cors {:name ::cors diff --git a/backend/src/app/http/oauth.clj b/backend/src/app/http/oauth.clj index c116836a6..21347f688 100644 --- a/backend/src/app/http/oauth.clj +++ b/backend/src/app/http/oauth.clj @@ -21,7 +21,8 @@ [clojure.set :as set] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [integrant.core :as ig])) + [integrant.core :as ig] + [promesa.exec :as px])) (defn- build-redirect-uri [{:keys [provider] :as cfg}] @@ -213,28 +214,35 @@ (redirect-response uri)))) (defn- auth-handler - [{:keys [tokens] :as cfg} {:keys [params] :as request}] - (let [invitation (:invitation-token params) - props (extract-utm-props params) - state (tokens :generate - {:iss :oauth - :invitation-token invitation - :props props - :exp (dt/in-future "15m")}) - uri (build-auth-uri cfg state)] - {:status 200 - :body {:redirect-uri uri}})) + [{:keys [tokens executors] :as cfg} {:keys [params] :as request} respond _] + (px/run! + (:default executors) + (fn [] + (let [invitation (:invitation-token params) + props (extract-utm-props params) + state (tokens :generate + {:iss :oauth + :invitation-token invitation + :props props + :exp (dt/in-future "15m")}) + uri (build-auth-uri cfg state)] + + (respond + {:status 200 + :body {:redirect-uri uri}}))))) (defn- callback-handler - [cfg request] - (try - (let [info (retrieve-info cfg request) - profile (retrieve-profile cfg info)] - (generate-redirect cfg request info profile)) - (catch Exception e - (l/warn :hint "error on oauth process" - :cause e) - (generate-error-redirect cfg e)))) + [{:keys [executors] :as cfg} request respond _] + (px/run! + (:default executors) + (fn [] + (try + (let [info (retrieve-info cfg request) + profile (retrieve-profile cfg info)] + (respond (generate-redirect cfg request info profile))) + (catch Exception cause + (l/warn :hint "error on oauth process" :cause cause) + (respond (generate-error-redirect cfg cause))))))) ;; --- INIT @@ -250,15 +258,19 @@ (defn wrap-handler [cfg handler] - (fn [request] + (fn [request respond raise] (let [provider (get-in request [:path-params :provider]) provider (get-in @cfg [:providers provider])] - (when-not provider - (ex/raise :type :not-found - :context {:provider provider} - :hint "provider not configured")) - (-> (assoc @cfg :provider provider) - (handler request))))) + (if provider + (handler (assoc @cfg :provider provider) + request + respond + raise) + (raise + (ex/error + :type :not-found + :provider provider + :hint "provider not configured")))))) (defmethod ig/init-key ::handler [_ cfg] diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj index 002b1ab36..7a98abff1 100644 --- a/backend/src/app/http/session.clj +++ b/backend/src/app/http/session.clj @@ -134,13 +134,13 @@ (defn- middleware [events-ch store handler] - (fn [request] + (fn [request respond raise] (if-let [{:keys [id profile-id] :as session} (retrieve-from-request store request)] (do (a/>!! events-ch id) (l/set-context! {:profile-id profile-id}) - (handler (assoc request :profile-id profile-id :session-id id))) - (handler request)))) + (handler (assoc request :profile-id profile-id :session-id id) respond raise)) + (handler request respond raise)))) ;; --- STATE INIT: SESSION @@ -150,7 +150,8 @@ (defmethod ig/prep-key ::session [_ cfg] - (d/merge {:buffer-size 128} (d/without-nils cfg))) + (d/merge {:buffer-size 128} + (d/without-nils cfg))) (defmethod ig/init-key ::session [_ {:keys [pool tokens] :as cfg}] @@ -164,7 +165,7 @@ (-> cfg (assoc ::events-ch events-ch) - (assoc :middleware #(middleware events-ch store %)) + (assoc :middleware (partial middleware events-ch store)) (assoc :create (fn [profile-id] (fn [request response] (let [token (create-session store request profile-id)] @@ -207,16 +208,11 @@ :max-batch-size (str (:max-batch-size cfg))) (let [input (aa/batch (::events-ch session) {:max-batch-size (:max-batch-size cfg) - :max-batch-age (inst-ms (:max-batch-age cfg))}) - mcnt (mtx/create - {:name "http_session_update_total" - :help "A counter of session update batch events." - :registry (:registry metrics) - :type :counter})] + :max-batch-age (inst-ms (:max-batch-age cfg))})] (a/go-loop [] (when-let [[reason batch] (a/ (merge cfg params) - (assoc :profile-id profile-id) - (assoc :team-id (:team-id file)) - (assoc ::ws/metrics metrics))] + [_ {:keys [pool] :as cfg}] + (fn [{:keys [profile-id params] :as req} respond raise] + (let [params (us/conform ::handler-params params) + file (retrieve-file pool (:file-id params)) + cfg (-> (merge cfg params) + (assoc :profile-id profile-id) + (assoc :team-id (:team-id file)))] - (when-not profile-id - (ex/raise :type :authentication - :hint "Authentication required.")) + (cond + (not profile-id) + (raise (ex/error :type :authentication + :hint "Authentication required.")) - (when-not file - (ex/raise :type :not-found - :code :object-not-found)) + (not file) + (raise (ex/error :type :not-found + :code :object-not-found)) - (when-not (yws/upgrade-request? req) - (ex/raise :type :validation - :code :websocket-request-expected - :hint "this endpoint only accepts websocket connections")) + (not (yws/upgrade-request? req)) + (raise (ex/error :type :validation + :code :websocket-request-expected + :hint "this endpoint only accepts websocket connections")) + + :else (->> (ws/handler handle-message cfg) - (yws/upgrade req)))))) + (yws/upgrade req) + (respond)))))) (def ^:private sql:retrieve-file diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index fabf61c4d..9eb424b6c 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -21,8 +21,32 @@ :metrics (ig/ref :app.metrics/metrics) :migrations (ig/ref :app.migrations/all) :name :main - :min-pool-size 0 - :max-pool-size 60} + :min-size (cf/get :database-min-pool-size 0) + :max-size (cf/get :database-max-pool-size 30)} + + ;; Default thread pool for IO operations + [::default :app.worker/executor] + {:parallelism (cf/get :default-executor-parallelism 120) + :prefix :default} + + ;; Constrained thread pool. Should only be used from high demand + ;; RPC methods. + [::blocking :app.worker/executor] + {:parallelism (cf/get :blocking-executor-parallelism 20) + :prefix :blocking} + + ;; Dedicated thread pool for backround tasks execution. + [::worker :app.worker/executor] + {:parallelism (cf/get :worker-executor-parallelism 10) + :prefix :worker} + + :app.worker/executors-monitor + {:executors + {:default (ig/ref [::default :app.worker/executor]) + :blocking (ig/ref [::blocking :app.worker/executor]) + :worker (ig/ref [::worker :app.worker/executor])} + + :metrics (ig/ref :app.metrics/metrics)} :app.migrations/migrations {} @@ -50,8 +74,8 @@ {:pool (ig/ref :app.db/pool)} :app.http.session/session - {:pool (ig/ref :app.db/pool) - :tokens (ig/ref :app.tokens/tokens)} + {:pool (ig/ref :app.db/pool) + :tokens (ig/ref :app.tokens/tokens)} :app.http.session/gc-task {:pool (ig/ref :app.db/pool) @@ -60,7 +84,7 @@ :app.http.session/updater {:pool (ig/ref :app.db/pool) :metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref :app.worker/executor) + :executor (ig/ref [::worker :app.worker/executor]) :session (ig/ref :app.http.session/session) :max-batch-age (cf/get :http-session-updater-batch-max-age) :max-batch-size (cf/get :http-session-updater-batch-max-size)} @@ -70,10 +94,13 @@ :pool (ig/ref :app.db/pool)} :app.http/server - {:port (cf/get :http-server-port) - :host (cf/get :http-server-host) - :router (ig/ref :app.http/router) - :metrics (ig/ref :app.metrics/metrics)} + {:port (cf/get :http-server-port) + :host (cf/get :http-server-host) + :router (ig/ref :app.http/router) + :metrics (ig/ref :app.metrics/metrics) + + :max-threads (cf/get :http-server-max-threads) + :min-threads (cf/get :http-server-min-threads)} :app.http/router {:assets (ig/ref :app.http.assets/handlers) @@ -91,11 +118,11 @@ :rpc (ig/ref :app.rpc/rpc)} :app.http.debug/handlers - {:pool (ig/ref :app.db/pool)} + {:pool (ig/ref :app.db/pool) + :executor (ig/ref [::default :app.worker/executor])} :app.http.websocket/handler {:pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor) :metrics (ig/ref :app.metrics/metrics) :msgbus (ig/ref :app.msgbus/msgbus)} @@ -103,6 +130,7 @@ {:metrics (ig/ref :app.metrics/metrics) :assets-path (cf/get :assets-path) :storage (ig/ref :app.storage/storage) + :executor (ig/ref [::default :app.worker/executor]) :cache-max-age (dt/duration {:hours 24}) :signature-max-age (dt/duration {:hours 24 :minutes 5})} @@ -125,22 +153,19 @@ :storage (ig/ref :app.storage/storage) :msgbus (ig/ref :app.msgbus/msgbus) :public-uri (cf/get :public-uri) - :audit (ig/ref :app.loggers.audit/collector)} - - :app.worker/executor - {:min-threads 0 - :max-threads 256 - :idle-timeout 60000 - :name :worker} + :audit (ig/ref :app.loggers.audit/collector) + :executors + {:default (ig/ref [::default :app.worker/executor]) + :blocking (ig/ref [::blocking :app.worker/executor])}} :app.worker/worker - {:executor (ig/ref :app.worker/executor) + {:executor (ig/ref [::worker :app.worker/executor]) :tasks (ig/ref :app.worker/registry) :metrics (ig/ref :app.metrics/metrics) :pool (ig/ref :app.db/pool)} :app.worker/scheduler - {:executor (ig/ref :app.worker/executor) + {:executor (ig/ref [::worker :app.worker/executor]) :tasks (ig/ref :app.worker/registry) :pool (ig/ref :app.db/pool) :schedule @@ -254,11 +279,11 @@ :app.loggers.audit/http-handler {:pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor)} + :executor (ig/ref [::default :app.worker/executor])} :app.loggers.audit/collector {:pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor)} + :executor (ig/ref [::worker :app.worker/executor])} :app.loggers.audit/archive-task {:uri (cf/get :audit-log-archive-uri) @@ -272,27 +297,18 @@ :app.loggers.loki/reporter {:uri (cf/get :loggers-loki-uri) :receiver (ig/ref :app.loggers.zmq/receiver) - :executor (ig/ref :app.worker/executor)} + :executor (ig/ref [::worker :app.worker/executor])} :app.loggers.mattermost/reporter {:uri (cf/get :error-report-webhook) :receiver (ig/ref :app.loggers.zmq/receiver) :pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor)} + :executor (ig/ref [::worker :app.worker/executor])} :app.loggers.database/reporter {:receiver (ig/ref :app.loggers.zmq/receiver) :pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor)} - - :app.loggers.sentry/reporter - {:dsn (cf/get :sentry-dsn) - :trace-sample-rate (cf/get :sentry-trace-sample-rate 1.0) - :attach-stack-trace (cf/get :sentry-attach-stack-trace false) - :debug (cf/get :sentry-debug false) - :receiver (ig/ref :app.loggers.zmq/receiver) - :pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor)} + :executor (ig/ref [::worker :app.worker/executor])} :app.storage/storage {:pool (ig/ref :app.db/pool) diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index 57e1ba531..7dd65079f 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -5,46 +5,40 @@ ;; Copyright (c) UXBOX Labs SL (ns app.metrics + (:refer-clojure :exclude [run!]) (:require - [app.common.exceptions :as ex] [app.common.logging :as l] [clojure.spec.alpha :as s] [integrant.core :as ig]) (:import io.prometheus.client.CollectorRegistry io.prometheus.client.Counter + io.prometheus.client.Counter$Child io.prometheus.client.Gauge + io.prometheus.client.Gauge$Child io.prometheus.client.Summary + io.prometheus.client.Summary$Child + io.prometheus.client.Summary$Builder io.prometheus.client.Histogram + io.prometheus.client.Histogram$Child io.prometheus.client.exporter.common.TextFormat io.prometheus.client.hotspot.DefaultExports io.prometheus.client.jetty.JettyStatisticsCollector org.eclipse.jetty.server.handler.StatisticsHandler java.io.StringWriter)) -(declare instrument-vars!) -(declare instrument) +(set! *warn-on-reflection* true) + (declare create-registry) (declare create) (declare handler) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Defaults +;; METRICS SERVICE PROVIDER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (def default-metrics - {:profile-register - {:name "actions_profile_register_count" - :help "A global counter of user registrations." - :type :counter} - - :profile-activation - {:name "actions_profile_activation_count" - :help "A global counter of profile activations" - :type :counter} - - :update-file-changes + {:update-file-changes {:name "rpc_update_file_changes_total" :help "A total number of changes submitted to update-file." :type :counter} @@ -54,6 +48,18 @@ :help "A total number of bytes processed by update-file." :type :counter} + :rpc-mutation-timing + {:name "rpc_mutation_timing" + :help "RPC mutation method call timming." + :labels ["name"] + :type :histogram} + + :rpc-query-timing + {:name "rpc_query_timing" + :help "RPC query method call timing." + :labels ["name"] + :type :histogram} + :websocket-active-connections {:name "websocket_active_connections" :help "Active websocket connections gauge" @@ -68,12 +74,54 @@ :websocket-session-timing {:name "websocket_session_timing" :help "Websocket session timing (seconds)." - :quantiles [] - :type :summary}}) + :type :summary} -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Entry Point -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + :session-update-total + {:name "http_session_update_total" + :help "A counter of session update batch events." + :type :counter} + + :tasks-timing + {:name "penpot_tasks_timing" + :help "Background tasks timing (milliseconds)." + :labels ["name"] + :type :summary} + + :rlimit-queued-submissions + {:name "penpot_rlimit_queued_submissions" + :help "Current number of queued submissions on RLIMIT." + :labels ["name"] + :type :gauge} + + :rlimit-used-permits + {:name "penpot_rlimit_used_permits" + :help "Current number of used permits on RLIMIT." + :labels ["name"] + :type :gauge} + + :rlimit-acquires-total + {:name "penpot_rlimit_acquires_total" + :help "Total number of acquire operations on RLIMIT." + :labels ["name"] + :type :counter} + + :executors-active-threads + {:name "penpot_executors_active_threads" + :help "Current number of threads available in the executor service." + :labels ["name"] + :type :gauge} + + :executors-running-threads + {:name "penpot_executors_running_threads" + :help "Current number of threads with state RUNNING." + :labels ["name"] + :type :gauge} + + :executors-queued-submissions + {:name "penpot_executors_queued_submissions" + :help "Current number of queued submissions." + :labels ["name"] + :type :gauge}}) (defmethod ig/init-key ::metrics [_ _] @@ -95,31 +143,44 @@ (s/keys :req-un [::registry ::handler])) (defn- handler - [registry _request] + [registry _ respond _] (let [samples (.metricFamilySamples ^CollectorRegistry registry) writer (StringWriter.)] (TextFormat/write004 writer samples) - {:headers {"content-type" TextFormat/CONTENT_TYPE_004} - :body (.toString writer)})) + (respond {:headers {"content-type" TextFormat/CONTENT_TYPE_004} + :body (.toString writer)}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Implementation ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(def default-empty-labels (into-array String [])) + +(def default-quantiles + [[0.5 0.01] + [0.90 0.01] + [0.99 0.001]]) + +(def default-histogram-buckets + [1 5 10 25 50 75 100 250 500 750 1000 2500 5000 7500]) + +(defn run! + [{:keys [definitions]} {:keys [id] :as params}] + (when-let [mobj (get definitions id)] + ((::fn mobj) params) + true)) + (defn create-registry [] (let [registry (CollectorRegistry.)] (DefaultExports/register registry) registry)) -(defmacro with-measure - [& {:keys [expr cb]}] - `(let [start# (System/nanoTime) - tdown# ~cb] - (try - ~expr - (finally - (tdown# (/ (- (System/nanoTime) start#) 1000000)))))) +(defn- is-array? + [o] + (let [oc (class o)] + (and (.isArray ^Class oc) + (= (.getComponentType oc) String)))) (defn make-counter [{:keys [name help registry reg labels] :as props}] @@ -132,12 +193,9 @@ instance (.register instance registry)] {::instance instance - ::fn (fn [{:keys [by labels] :or {by 1}}] - (if labels - (.. ^Counter instance - (labels (into-array String labels)) - (inc by)) - (.inc ^Counter instance by)))})) + ::fn (fn [{:keys [inc labels] :or {inc 1 labels default-empty-labels}}] + (let [instance (.labels instance (if (is-array? labels) labels (into-array String labels)))] + (.inc ^Counter$Child instance (double inc))))})) (defn make-gauge [{:keys [name help registry reg labels] :as props}] @@ -148,48 +206,33 @@ _ (when (seq labels) (.labelNames instance (into-array String labels))) instance (.register instance registry)] - {::instance instance - ::fn (fn [{:keys [cmd by labels] :or {by 1}}] - (if labels - (let [labels (into-array String [labels])] - (case cmd - :inc (.. ^Gauge instance (labels labels) (inc by)) - :dec (.. ^Gauge instance (labels labels) (dec by)))) - (case cmd - :inc (.inc ^Gauge instance by) - :dec (.dec ^Gauge instance by))))})) - -(def default-quantiles - [[0.75 0.02] - [0.99 0.001]]) + ::fn (fn [{:keys [inc dec labels val] :or {labels default-empty-labels}}] + (let [instance (.labels ^Gauge instance (if (is-array? labels) labels (into-array String labels)))] + (cond (number? inc) (.inc ^Gauge$Child instance (double inc)) + (number? dec) (.dec ^Gauge$Child instance (double dec)) + (number? val) (.set ^Gauge$Child instance (double val)))))})) (defn make-summary [{:keys [name help registry reg labels max-age quantiles buckets] - :or {max-age 3600 buckets 6 quantiles default-quantiles} :as props}] + :or {max-age 3600 buckets 12 quantiles default-quantiles} :as props}] (let [registry (or registry reg) - instance (doto (Summary/build) + builder (doto (Summary/build) (.name name) (.help help)) _ (when (seq quantiles) - (.maxAgeSeconds ^Summary instance max-age) - (.ageBuckets ^Summary instance buckets)) + (.maxAgeSeconds ^Summary$Builder builder ^long max-age) + (.ageBuckets ^Summary$Builder builder buckets)) _ (doseq [[q e] quantiles] - (.quantile ^Summary instance q e)) + (.quantile ^Summary$Builder builder q e)) _ (when (seq labels) - (.labelNames instance (into-array String labels))) - instance (.register instance registry)] + (.labelNames ^Summary$Builder builder (into-array String labels))) + instance (.register ^Summary$Builder builder registry)] {::instance instance - ::fn (fn [{:keys [val labels]}] - (if labels - (.. ^Summary instance - (labels (into-array String labels)) - (observe val)) - (.observe ^Summary instance val)))})) - -(def default-histogram-buckets - [1 5 10 25 50 75 100 250 500 750 1000 2500 5000 7500]) + ::fn (fn [{:keys [val labels] :or {labels default-empty-labels}}] + (let [instance (.labels ^Summary instance (if (is-array? labels) labels (into-array String labels)))] + (.observe ^Summary$Child instance val)))})) (defn make-histogram [{:keys [name help registry reg labels buckets] @@ -204,12 +247,9 @@ instance (.register instance registry)] {::instance instance - ::fn (fn [{:keys [val labels]}] - (if labels - (.. ^Histogram instance - (labels (into-array String labels)) - (observe val)) - (.observe ^Histogram instance val)))})) + ::fn (fn [{:keys [val labels] :or {labels default-empty-labels}}] + (let [instance (.labels ^Histogram instance (if (is-array? labels) labels (into-array String labels)))] + (.observe ^Histogram$Child instance val)))})) (defn create [{:keys [type] :as props}] @@ -219,114 +259,6 @@ :summary (make-summary props) :histogram (make-histogram props))) -(defn wrap-counter - ([rootf mobj] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn - ([a] - ((::fn mobj) nil) - (origf a)) - ([a b] - ((::fn mobj) nil) - (origf a b)) - ([a b c] - ((::fn mobj) nil) - (origf a b c)) - ([a b c d] - ((::fn mobj) nil) - (origf a b c d)) - ([a b c d & more] - ((::fn mobj) nil) - (apply origf a b c d more))) - (assoc mdata ::original origf)))) - ([rootf mobj labels] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn - ([a] - ((::fn mobj) {:labels labels}) - (origf a)) - ([a b] - ((::fn mobj) {:labels labels}) - (origf a b)) - ([a b & more] - ((::fn mobj) {:labels labels}) - (apply origf a b more))) - (assoc mdata ::original origf))))) - -(defn wrap-summary - ([rootf mobj] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn - ([a] - (with-measure - :expr (origf a) - :cb #((::fn mobj) {:val %}))) - ([a b] - (with-measure - :expr (origf a b) - :cb #((::fn mobj) {:val %}))) - ([a b & more] - (with-measure - :expr (apply origf a b more) - :cb #((::fn mobj) {:val %})))) - (assoc mdata ::original origf)))) - - ([rootf mobj labels] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn - ([a] - (with-measure - :expr (origf a) - :cb #((::fn mobj) {:val % :labels labels}))) - ([a b] - (with-measure - :expr (origf a b) - :cb #((::fn mobj) {:val % :labels labels}))) - ([a b & more] - (with-measure - :expr (apply origf a b more) - :cb #((::fn mobj) {:val % :labels labels})))) - (assoc mdata ::original origf))))) - -(defn instrument-vars! - [vars {:keys [wrap] :as props}] - (let [obj (create props)] - (cond - (instance? Counter (::instance obj)) - (doseq [var vars] - (alter-var-root var (or wrap wrap-counter) obj)) - - (instance? Summary (::instance obj)) - (doseq [var vars] - (alter-var-root var (or wrap wrap-summary) obj)) - - :else - (ex/raise :type :not-implemented)))) - -(defn instrument - [f {:keys [wrap] :as props}] - (let [obj (create props)] - (cond - (instance? Counter (::instance obj)) - ((or wrap wrap-counter) f obj) - - (instance? Summary (::instance obj)) - ((or wrap wrap-summary) f obj) - - (instance? Histogram (::instance obj)) - ((or wrap wrap-summary) f obj) - - :else - (ex/raise :type :not-implemented)))) - (defn instrument-jetty! [^CollectorRegistry registry ^StatisticsHandler handler] (doto (JettyStatisticsCollector. handler) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 285f185c7..7d621a352 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -18,7 +18,6 @@ [integrant.core :as ig] [promesa.core :as p]) (:import - java.time.Duration io.lettuce.core.RedisClient io.lettuce.core.RedisURI io.lettuce.core.api.StatefulConnection @@ -29,7 +28,10 @@ io.lettuce.core.codec.StringCodec io.lettuce.core.pubsub.RedisPubSubListener io.lettuce.core.pubsub.StatefulRedisPubSubConnection - io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands)) + io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands + io.lettuce.core.resource.ClientResources + io.lettuce.core.resource.DefaultClientResources + java.time.Duration)) (def ^:private prefix (cfg/get :tenant)) @@ -136,27 +138,35 @@ (declare impl-redis-sub) (declare impl-redis-unsub) + (defmethod init-backend :redis [{:keys [redis-uri] :as cfg}] (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) - uri (RedisURI/create redis-uri) - rclient (RedisClient/create ^RedisURI uri) + resources (.. (DefaultClientResources/builder) + (ioThreadPoolSize 4) + (computationThreadPoolSize 4) + (build)) - pub-conn (.connect ^RedisClient rclient ^RedisCodec codec) - sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] + uri (RedisURI/create redis-uri) + rclient (RedisClient/create ^ClientResources resources ^RedisURI uri) + + pub-conn (.connect ^RedisClient rclient ^RedisCodec codec) + sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] (.setTimeout ^StatefulRedisConnection pub-conn ^Duration (dt/duration {:seconds 10})) (.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10})) (-> cfg + (assoc ::resources resources) (assoc ::pub-conn pub-conn) (assoc ::sub-conn sub-conn)))) (defmethod stop-backend :redis - [{:keys [::pub-conn ::sub-conn] :as cfg}] + [{:keys [::pub-conn ::sub-conn ::resources] :as cfg}] (.close ^StatefulRedisConnection pub-conn) - (.close ^StatefulRedisPubSubConnection sub-conn)) + (.close ^StatefulRedisPubSubConnection sub-conn) + (.shutdown ^ClientResources resources)) (defmethod init-pub-loop :redis [{:keys [::pub-conn ::pub-ch]}] diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index ff346aece..312fa657a 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -13,79 +13,164 @@ [app.db :as db] [app.loggers.audit :as audit] [app.metrics :as mtx] - [app.util.retry :as retry] - [app.util.rlimit :as rlimit] + [app.rpc.retry :as retry] + [app.rpc.rlimit :as rlimit] + [app.util.async :as async] [app.util.services :as sv] + [app.worker :as wrk] [clojure.spec.alpha :as s] - [integrant.core :as ig])) + [integrant.core :as ig] + [promesa.core :as p] + [promesa.exec :as px])) (defn- default-handler [_] - (ex/raise :type :not-found)) + (p/rejected (ex/error :type :not-found))) -(defn- run-hook - [hook-fn response] - (ex/ignoring (hook-fn)) +(defn- handle-response-transformation + [response request mdata] + (if-let [transform-fn (:transform-response mdata)] + (transform-fn request response) + response)) + +(defn- handle-before-comple-hook + [response mdata] + (when-let [hook-fn (:before-complete mdata)] + (ex/ignoring (hook-fn))) response) (defn- rpc-query-handler - [methods {:keys [profile-id session-id] :as request}] - (let [type (keyword (get-in request [:path-params :type])) + "Ring handler that dispatches query requests and convert between + internal async flow into ring async flow." + [methods {:keys [profile-id session-id] :as request} respond raise] + (letfn [(handle-response [result] + (let [mdata (meta result)] + (-> {:status 200 :body result} + (handle-response-transformation request mdata))))] - data (merge (:params request) - (:body-params request) - (:uploads request) - {::request request}) + (let [type (keyword (get-in request [:path-params :type])) + data (merge (:params request) + (:body-params request) + (:uploads request) + {::request request}) - data (if profile-id - (assoc data :profile-id profile-id ::session-id session-id) - (dissoc data :profile-id)) + data (if profile-id + (assoc data :profile-id profile-id ::session-id session-id) + (dissoc data :profile-id)) - result ((get methods type default-handler) data) - mdata (meta result)] + ;; Get the method from methods registry and if method does + ;; not exists asigns it to the default handler. + method (get methods type default-handler)] - (cond->> {:status 200 :body result} - (fn? (:transform-response mdata)) - ((:transform-response mdata) request)))) + (-> (method data) + (p/then #(respond (handle-response %))) + (p/catch raise))))) (defn- rpc-mutation-handler - [methods {:keys [profile-id session-id] :as request}] - (let [type (keyword (get-in request [:path-params :type])) - data (merge (:params request) - (:body-params request) - (:uploads request) - {::request request}) + "Ring handler that dispatches mutation requests and convert between + internal async flow into ring async flow." + [methods {:keys [profile-id session-id] :as request} respond raise] + (letfn [(handle-response [result] + (let [mdata (meta result)] + (-> {:status 200 :body result} + (handle-response-transformation request mdata) + (handle-before-comple-hook mdata))))] - data (if profile-id - (assoc data :profile-id profile-id ::session-id session-id) - (dissoc data :profile-id)) + (let [type (keyword (get-in request [:path-params :type])) + data (merge (:params request) + (:body-params request) + (:uploads request) + {::request request}) - result ((get methods type default-handler) data) - mdata (meta result)] - (cond->> {:status 200 :body result} - (fn? (:transform-response mdata)) - ((:transform-response mdata) request) + data (if profile-id + (assoc data :profile-id profile-id ::session-id session-id) + (dissoc data :profile-id)) - (fn? (:before-complete mdata)) - (run-hook (:before-complete mdata))))) + method (get methods type default-handler)] -(defn- wrap-with-metrics - [cfg f mdata] - (mtx/wrap-summary f (::mobj cfg) [(::sv/name mdata)])) + (-> (method data) + (p/then #(respond (handle-response %))) + (p/catch raise))))) -(defn- wrap-impl +(defn- wrap-metrics + "Wrap service method with metrics measurement." + [{:keys [metrics ::metrics-id]} f mdata] + (let [labels (into-array String [(::sv/name mdata)])] + (fn [cfg params] + (let [start (System/nanoTime)] + (p/finally + (f cfg params) + (fn [_ _] + (mtx/run! metrics + {:id metrics-id + :val (/ (- (System/nanoTime) start) 1000000) + :labels labels}))))))) + +(defn- wrap-dispatch + "Wraps service method into async flow, with the ability to dispatching + it to a preconfigured executor service." + [{:keys [executors] :as cfg} f mdata] + (let [dname (::async/dispatch mdata :none)] + (if (= :none dname) + (with-meta + (fn [cfg params] + (try + (p/wrap (f cfg params)) + (catch Throwable cause + (p/rejected cause)))) + mdata) + + (let [executor (get executors dname)] + (when-not executor + (ex/raise :type :internal + :code :executor-not-configured + :hint (format "executor %s not configured" dname))) + (with-meta + (fn [cfg params] + (-> (px/submit! executor #(f cfg params)) + (p/bind p/wrap))) + mdata))))) + +(defn- wrap-audit [{:keys [audit] :as cfg} f mdata] + (if audit + (with-meta + (fn [cfg {:keys [::request] :as params}] + (p/finally (f cfg params) + (fn [result _] + (when result + (let [resultm (meta result) + profile-id (or (:profile-id params) + (:profile-id result) + (::audit/profile-id resultm)) + props (d/merge params (::audit/props resultm))] + (audit :cmd :submit + :type (or (::audit/type resultm) + (::type cfg)) + :name (or (::audit/name resultm) + (::sv/name mdata)) + :profile-id profile-id + :ip-addr (audit/parse-client-ip request) + :props (dissoc props ::request))))))) + mdata) + f)) + +(defn- wrap + [cfg f mdata] (let [f (as-> f $ + (wrap-dispatch cfg $ mdata) (rlimit/wrap-rlimit cfg $ mdata) (retry/wrap-retry cfg $ mdata) - (wrap-with-metrics cfg $ mdata)) + (wrap-audit cfg $ mdata) + (wrap-metrics cfg $ mdata) + ) spec (or (::sv/spec mdata) (s/spec any?)) auth? (:auth mdata true)] (l/trace :action "register" :name (::sv/name mdata)) (with-meta - (fn [params] + (fn [{:keys [::request] :as params}] ;; Raise authentication error when rpc method requires auth but ;; no profile-id is found in the request. (when (and auth? (not (uuid? (:profile-id params)))) @@ -93,44 +178,19 @@ :code :authentication-required :hint "authentication required for this endpoint")) - (let [params' (dissoc params ::request) - params' (us/conform spec params') - result (f cfg params')] - - ;; When audit log is enabled (default false). - (when (fn? audit) - (let [resultm (meta result) - request (::request params) - profile-id (or (:profile-id params') - (:profile-id result) - (::audit/profile-id resultm)) - props (d/merge params' (::audit/props resultm))] - (audit :cmd :submit - :type (or (::audit/type resultm) - (::type cfg)) - :name (or (::audit/name resultm) - (::sv/name mdata)) - :profile-id profile-id - :ip-addr (audit/parse-client-ip request) - :props props))) - result)) + (let [params (us/conform spec (dissoc params ::request))] + (f cfg (assoc params ::request request)))) mdata))) (defn- process-method [cfg vfn] (let [mdata (meta vfn)] [(keyword (::sv/name mdata)) - (wrap-impl cfg (deref vfn) mdata)])) + (wrap cfg (deref vfn) mdata)])) (defn- resolve-query-methods [cfg] - (let [mobj (mtx/create - {:name "rpc_query_timing" - :labels ["name"] - :registry (get-in cfg [:metrics :registry]) - :type :histogram - :help "Timing of query services."}) - cfg (assoc cfg ::mobj mobj ::type "query")] + (let [cfg (assoc cfg ::type "query" ::metrics-id :rpc-query-timing)] (->> (sv/scan-ns 'app.rpc.queries.projects 'app.rpc.queries.files 'app.rpc.queries.teams @@ -143,13 +203,7 @@ (defn- resolve-mutation-methods [cfg] - (let [mobj (mtx/create - {:name "rpc_mutation_timing" - :labels ["name"] - :registry (get-in cfg [:metrics :registry]) - :type :histogram - :help "Timing of mutation services."}) - cfg (assoc cfg ::mobj mobj ::type "mutation")] + (let [cfg (assoc cfg ::type "mutation" ::metrics-id :rpc-mutation-timing)] (->> (sv/scan-ns 'app.rpc.mutations.demo 'app.rpc.mutations.media 'app.rpc.mutations.profile @@ -169,15 +223,16 @@ (s/def ::session map?) (s/def ::tokens fn?) (s/def ::audit (s/nilable fn?)) +(s/def ::executors (s/map-of keyword? ::wrk/executor)) (defmethod ig/pre-init-spec ::rpc [_] (s/keys :req-un [::storage ::session ::tokens ::audit - ::mtx/metrics ::db/pool])) + ::executors ::mtx/metrics ::db/pool])) (defmethod ig/init-key ::rpc [_ cfg] (let [mq (resolve-query-methods cfg) mm (resolve-mutation-methods cfg)] {:methods {:query mq :mutation mm} - :query-handler #(rpc-query-handler mq %) - :mutation-handler #(rpc-mutation-handler mm %)})) + :query-handler (partial rpc-query-handler mq) + :mutation-handler (partial rpc-mutation-handler mm)})) diff --git a/backend/src/app/rpc/mutations/comments.clj b/backend/src/app/rpc/mutations/comments.clj index 4c4a6eccc..438cfdebd 100644 --- a/backend/src/app/rpc/mutations/comments.clj +++ b/backend/src/app/rpc/mutations/comments.clj @@ -12,8 +12,8 @@ [app.db :as db] [app.rpc.queries.comments :as comments] [app.rpc.queries.files :as files] + [app.rpc.retry :as retry] [app.util.blob :as blob] - [app.util.retry :as retry] [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s])) @@ -34,8 +34,7 @@ (s/keys :req-un [::profile-id ::file-id ::position ::content ::page-id])) (sv/defmethod ::create-comment-thread - {::retry/enabled true - ::retry/max-retries 3 + {::retry/max-retries 3 ::retry/matches retry/conflict-db-insert?} [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] (db/with-atomic [conn pool] diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index ce8d98ef9..a27ef5512 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -18,6 +18,7 @@ [app.rpc.queries.files :as files] [app.rpc.queries.projects :as proj] [app.storage.impl :as simpl] + [app.util.async :as async] [app.util.blob :as blob] [app.util.services :as sv] [app.util.time :as dt] @@ -272,6 +273,7 @@ (contains? o :changes-with-metadata))))) (sv/defmethod ::update-file + {::async/dispatch :blocking} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (db/xact-lock! conn id) @@ -307,24 +309,21 @@ :context {:incoming-revn (:revn params) :stored-revn (:revn file)})) - (let [mtx1 (get-in metrics [:definitions :update-file-changes]) - mtx2 (get-in metrics [:definitions :update-file-bytes-processed]) - - changes (if changes-with-metadata + (let [changes (if changes-with-metadata (mapcat :changes changes-with-metadata) changes) changes (vec changes) ;; Trace the number of changes processed - _ ((::mtx/fn mtx1) {:by (count changes)}) + _ (mtx/run! metrics {:id :update-file-changes :inc (count changes)}) ts (dt/now) file (-> (files/retrieve-data cfg file) (update :revn inc) (update :data (fn [data] ;; Trace the length of bytes of processed data - ((::mtx/fn mtx2) {:by (alength data)}) + (mtx/run! metrics {:id :update-file-bytes-processed :inc (alength data)}) (-> data (blob/decode) (assoc :id (:id file)) diff --git a/backend/src/app/rpc/mutations/ldap.clj b/backend/src/app/rpc/mutations/ldap.clj index 0f6675f24..b4cc37afb 100644 --- a/backend/src/app/rpc/mutations/ldap.clj +++ b/backend/src/app/rpc/mutations/ldap.clj @@ -56,7 +56,7 @@ (s/keys :req-un [::email ::password] :opt-un [::invitation-token])) -(sv/defmethod ::login-with-ldap {:auth false :rlimit :password} +(sv/defmethod ::login-with-ldap {:auth false} [{:keys [pool session tokens] :as cfg} params] (db/with-atomic [conn pool] (let [info (authenticate params) diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj index 68b5d98ea..8cb0bda1d 100644 --- a/backend/src/app/rpc/mutations/media.clj +++ b/backend/src/app/rpc/mutations/media.clj @@ -10,10 +10,13 @@ [app.common.media :as cm] [app.common.spec :as us] [app.common.uuid :as uuid] + [app.config :as cf] [app.db :as db] [app.media :as media] [app.rpc.queries.teams :as teams] + [app.rpc.rlimit :as rlimit] [app.storage :as sto] + [app.util.async :as async] [app.util.http :as http] [app.util.services :as sv] [app.util.time :as dt] @@ -47,6 +50,8 @@ :opt-un [::id])) (sv/defmethod ::upload-file-media-object + {::rlimit/permits (cf/get :rlimit-image) + ::async/dispatch :default} [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] (let [file (select-file pool file-id)] (teams/check-edition-permissions! pool profile-id (:team-id file)) @@ -167,6 +172,8 @@ :opt-un [::id ::name])) (sv/defmethod ::create-file-media-object-from-url + {::rlimit/permits (cf/get :rlimit-image) + ::async/dispatch :default} [{:keys [pool storage] :as cfg} {:keys [profile-id file-id url name] :as params}] (let [file (select-file pool file-id)] (teams/check-edition-permissions! pool profile-id (:team-id file)) diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj index 94732858f..944df2fde 100644 --- a/backend/src/app/rpc/mutations/profile.clj +++ b/backend/src/app/rpc/mutations/profile.clj @@ -15,11 +15,11 @@ [app.http.oauth :refer [extract-utm-props]] [app.loggers.audit :as audit] [app.media :as media] - [app.metrics :as mtx] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] + [app.rpc.rlimit :as rlimit] [app.storage :as sto] - [app.util.rlimit :as rlimit] + [app.util.async :as async] [app.util.services :as sv] [app.util.time :as dt] [buddy.hashers :as hashers] @@ -38,7 +38,6 @@ (s/def ::theme ::us/string) (s/def ::invitation-token ::us/not-empty-string) -(declare annotate-profile-register) (declare check-profile-existence!) (declare create-profile) (declare create-profile-relations) @@ -102,6 +101,7 @@ (when-not (contains? cf/flags :registration) (ex/raise :type :restriction :code :registration-disabled)) + (when-let [domains (cf/get :registration-domain-whitelist)] (when-not (email-domain-in-whitelist? domains (:email params)) (ex/raise :type :validation @@ -122,10 +122,11 @@ :code :email-as-password :hint "you can't use your email as password")) - (let [params (assoc params - :backend "penpot" - :iss :prepared-register - :exp (dt/in-future "48h")) + (let [params {:email (:email params) + :invitation-token (:invitation-token params) + :backend "penpot" + :iss :prepared-register + :exp (dt/in-future "48h")} token (tokens :generate params)] {:token token})) @@ -142,16 +143,8 @@ (-> (assoc cfg :conn conn) (register-profile params)))) -(defn- annotate-profile-register - "A helper for properly increase the profile-register metric once the - transaction is completed." - [metrics] - (fn [] - (let [mobj (get-in metrics [:definitions :profile-register])] - ((::mtx/fn mobj) {:by 1})))) - (defn register-profile - [{:keys [conn tokens session metrics] :as cfg} {:keys [token] :as params}] + [{:keys [conn tokens session] :as cfg} {:keys [token] :as params}] (let [claims (tokens :verify {:token token :iss :prepared-register}) params (merge params claims)] @@ -177,7 +170,6 @@ resp {:invitation-token token}] (with-meta resp {:transform-response ((:create session) (:id profile)) - :before-complete (annotate-profile-register metrics) ::audit/props (audit/profile->props profile) ::audit/profile-id (:id profile)})) @@ -187,7 +179,6 @@ (not= "penpot" (:auth-backend profile)) (with-meta (profile/strip-private-attrs profile) {:transform-response ((:create session) (:id profile)) - :before-complete (annotate-profile-register metrics) ::audit/props (audit/profile->props profile) ::audit/profile-id (:id profile)}) @@ -196,7 +187,6 @@ (true? is-active) (with-meta (profile/strip-private-attrs profile) {:transform-response ((:create session) (:id profile)) - :before-complete (annotate-profile-register metrics) ::audit/props (audit/profile->props profile) ::audit/profile-id (:id profile)}) @@ -219,8 +209,7 @@ :extra-data ptoken}) (with-meta profile - {:before-complete (annotate-profile-register metrics) - ::audit/props (audit/profile->props profile) + {::audit/props (audit/profile->props profile) ::audit/profile-id (:id profile)})))))) (defn create-profile @@ -359,6 +348,7 @@ :opt-un [::lang ::theme])) (sv/defmethod ::update-profile + {::async/dispatch :default} [{:keys [pool] :as cfg} params] (db/with-atomic [conn pool] (let [profile (update-profile conn params)] diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj index c54ae7e61..a104f3604 100644 --- a/backend/src/app/rpc/mutations/teams.clj +++ b/backend/src/app/rpc/mutations/teams.clj @@ -18,8 +18,8 @@ [app.rpc.permissions :as perms] [app.rpc.queries.profile :as profile] [app.rpc.queries.teams :as teams] + [app.rpc.rlimit :as rlimit] [app.storage :as sto] - [app.util.rlimit :as rlimit] [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s] diff --git a/backend/src/app/rpc/mutations/verify_token.clj b/backend/src/app/rpc/mutations/verify_token.clj index 0b1eb87d6..1b79b74f1 100644 --- a/backend/src/app/rpc/mutations/verify_token.clj +++ b/backend/src/app/rpc/mutations/verify_token.clj @@ -10,7 +10,6 @@ [app.common.spec :as us] [app.db :as db] [app.loggers.audit :as audit] - [app.metrics :as mtx] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] [app.util.services :as sv] @@ -44,16 +43,8 @@ ::audit/props {:email email} ::audit/profile-id profile-id})) -(defn- annotate-profile-activation - "A helper for properly increase the profile-activation metric once the - transaction is completed." - [metrics] - (fn [] - (let [mobj (get-in metrics [:definitions :profile-activation])] - ((::mtx/fn mobj) {:by 1})))) - (defmethod process-token :verify-email - [{:keys [conn session metrics] :as cfg} _ {:keys [profile-id] :as claims}] + [{:keys [conn session] :as cfg} _ {:keys [profile-id] :as claims}] (let [profile (profile/retrieve-profile conn profile-id) claims (assoc claims :profile profile)] @@ -69,7 +60,6 @@ (with-meta claims {:transform-response ((:create session) profile-id) - :before-complete (annotate-profile-activation metrics) ::audit/name "verify-profile-email" ::audit/props (audit/profile->props profile) ::audit/profile-id (:id profile)}))) diff --git a/backend/src/app/rpc/queries/profile.clj b/backend/src/app/rpc/queries/profile.clj index 5c4c33876..c1ed70244 100644 --- a/backend/src/app/rpc/queries/profile.clj +++ b/backend/src/app/rpc/queries/profile.clj @@ -35,7 +35,8 @@ (s/def ::profile (s/keys :opt-un [::profile-id])) -(sv/defmethod ::profile {:auth false} +(sv/defmethod ::profile + {:auth false} [{:keys [pool] :as cfg} {:keys [profile-id] :as params}] ;; We need to return the anonymous profile object in two cases, when ;; no profile-id is in session, and when db call raises not found. In all other diff --git a/backend/src/app/rpc/retry.clj b/backend/src/app/rpc/retry.clj new file mode 100644 index 000000000..f63cb90d9 --- /dev/null +++ b/backend/src/app/rpc/retry.clj @@ -0,0 +1,52 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.rpc.retry + "A fault tolerance helpers. Allow retry some operations that we know + we can retry." + (:require + [app.common.logging :as l] + [app.util.services :as sv] + [promesa.core :as p])) + +(defn conflict-db-insert? + "Check if exception matches a insertion conflict on postgresql." + [e] + (and (instance? org.postgresql.util.PSQLException e) + (= "23505" (.getSQLState e)))) + +(defn wrap-retry + [_ f {:keys [::matches ::sv/name] + :or {matches (constantly false)} + :as mdata}] + + (when (::enabled mdata) + (l/debug :hint "wrapping retry" :name name)) + + (if-let [max-retries (::max-retries mdata)] + (fn [cfg params] + (letfn [(run [retry] + (prn "wrap-retry" "run" retry) + (try + (-> (f cfg params) + (p/catch (partial handle-error retry))) + (catch Throwable cause + (prn cause) + (throw cause)))) + + + (handle-error [retry cause] + (prn "FOOFOFOF" retry (matches cause)) + (if (matches cause) + (let [current-retry (inc retry)] + (l/trace :hint "running retry algorithm" :retry current-retry) + (if (<= current-retry max-retries) + (run current-retry) + (throw cause))) + (throw cause)))] + (run 0))) + f)) + diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj new file mode 100644 index 000000000..1b70b2da6 --- /dev/null +++ b/backend/src/app/rpc/rlimit.clj @@ -0,0 +1,67 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.rpc.rlimit + "Resource usage limits (in other words: semaphores)." + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.metrics :as mtx] + [app.util.services :as sv] + [promesa.core :as p])) + +(defprotocol IAsyncSemaphore + (acquire! [_]) + (release! [_])) + +(defn semaphore + [{:keys [permits metrics name]}] + (let [name (d/name name) + used (volatile! 0) + queue (volatile! (d/queue)) + labels (into-array String [name])] + (reify IAsyncSemaphore + (acquire! [this] + (let [d (p/deferred)] + (locking this + (if (< @used permits) + (do + (vswap! used inc) + (p/resolve! d)) + (vswap! queue conj d))) + + (mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels }) + (mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels}) + (mtx/run! metrics {:id :rlimit-acquires-total :inc 1 :labels labels}) + d)) + + (release! [this] + (locking this + (if-let [item (peek @queue)] + (do + (vswap! queue pop) + (p/resolve! item)) + (when (pos? @used) + (vswap! used dec)))) + + (mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels}) + (mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels}) + )))) + +(defn wrap-rlimit + [{:keys [metrics] :as cfg} f mdata] + (if-let [permits (::permits mdata)] + (let [sem (semaphore {:permits permits + :metrics metrics + :name (::sv/name mdata)})] + (l/debug :hint "wrapping rlimit" :handler (::sv/name mdata) :permits permits) + (fn [cfg params] + (-> (acquire! sem) + (p/then (fn [_] (f cfg params))) + (p/finally (fn [_ _] (release! sem)))))) + f)) + + diff --git a/backend/src/app/util/async.clj b/backend/src/app/util/async.clj index 6193fbe2f..c04fa891f 100644 --- a/backend/src/app/util/async.clj +++ b/backend/src/app/util/async.clj @@ -7,7 +7,8 @@ (ns app.util.async (:require [clojure.core.async :as a] - [clojure.spec.alpha :as s]) + [clojure.spec.alpha :as s] + [promesa.exec :as px]) (:import java.util.concurrent.Executor)) @@ -54,13 +55,16 @@ (a/close! c) c)))) - (defmacro with-thread [executor & body] (if (= executor ::default) `(a/thread-call (^:once fn* [] (try ~@body (catch Exception e# e#)))) `(thread-call ~executor (^:once fn* [] ~@body)))) +(defmacro with-dispatch + [executor & body] + `(px/submit! ~executor (^:once fn* [] ~@body))) + (defn batch [in {:keys [max-batch-size max-batch-age diff --git a/backend/src/app/util/retry.clj b/backend/src/app/util/retry.clj deleted file mode 100644 index d0bed166d..000000000 --- a/backend/src/app/util/retry.clj +++ /dev/null @@ -1,43 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) UXBOX Labs SL - -(ns app.util.retry - "A fault tolerance helpers. Allow retry some operations that we know - we can retry." - (:require - [app.common.exceptions :as ex] - [app.common.logging :as l] - [app.util.async :as aa] - [app.util.services :as sv])) - -(defn conflict-db-insert? - "Check if exception matches a insertion conflict on postgresql." - [e] - (and (instance? org.postgresql.util.PSQLException e) - (= "23505" (.getSQLState e)))) - -(defn wrap-retry - [_ f {:keys [::max-retries ::matches ::sv/name] - :or {max-retries 3 - matches (constantly false)} - :as mdata}] - (when (::enabled mdata) - (l/debug :hint "wrapping retry" :name name)) - (if (::enabled mdata) - (fn [cfg params] - (loop [retry 1] - (when (> retry 1) - (l/debug :hint "retrying controlled function" :retry retry :name name)) - (let [res (ex/try (f cfg params))] - (if (ex/exception? res) - (if (and (matches res) (< retry max-retries)) - (do - (aa/thread-sleep (* 100 retry)) - (recur (inc retry))) - (throw res)) - res)))) - f)) - diff --git a/backend/src/app/util/rlimit.clj b/backend/src/app/util/rlimit.clj deleted file mode 100644 index 8398237c1..000000000 --- a/backend/src/app/util/rlimit.clj +++ /dev/null @@ -1,36 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) UXBOX Labs SL - -(ns app.util.rlimit - "Resource usage limits (in other words: semaphores)." - (:require - [app.common.logging :as l] - [app.util.services :as sv]) - (:import - java.util.concurrent.Semaphore)) - -(defn acquire! - [sem] - (.acquire ^Semaphore sem)) - -(defn release! - [sem] - (.release ^Semaphore sem)) - -(defn wrap-rlimit - [_cfg f mdata] - (if-let [permits (::permits mdata)] - (let [sem (Semaphore. permits)] - (l/debug :hint "wrapping rlimit" :handler (::sv/name mdata) :permits permits) - (fn [cfg params] - (try - (acquire! sem) - (f cfg params) - (finally - (release! sem))))) - f)) - - diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index b82783e3c..6acbd9e36 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -27,11 +27,6 @@ (declare ws-ping!) (declare ws-send!) -(defmacro call-mtx - [definitions name & args] - `(when-let [mtx-fn# (some-> ~definitions ~name ::mtx/fn)] - (mtx-fn# ~@args))) - (def noop (constantly nil)) (defn handler @@ -49,7 +44,7 @@ ([handle-message {:keys [::input-buff-size ::output-buff-size ::idle-timeout - ::metrics] + metrics] :or {input-buff-size 64 output-buff-size 64 idle-timeout 30000} @@ -71,8 +66,8 @@ on-terminate (fn [& _args] (when (compare-and-set! terminated false true) - (call-mtx metrics :connections {:cmd :dec :by 1}) - (call-mtx metrics :sessions {:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}) + (mtx/run! metrics {:id :websocket-active-connections :dec 1}) + (mtx/run! metrics {:id :websocket-session-timing :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}) (a/close! close-ch) (a/close! pong-ch) @@ -88,7 +83,7 @@ on-connect (fn [conn] - (call-mtx metrics :connections {:cmd :inc :by 1}) + (mtx/run! metrics {:id :websocket-active-connections :inc 1}) (let [wsp (atom (assoc options ::conn conn))] ;; Handle heartbeat @@ -102,7 +97,7 @@ ;; connection (a/go-loop [] (when-let [val (a/string - [error] - (with-out-str - (.printStackTrace ^Throwable error (java.io.PrintWriter. *out*)))) - (defn- execute-scheduled-task [{:keys [executor pool] :as cfg} {:keys [id] :as task}] (letfn [(run-task [conn] @@ -460,59 +500,27 @@ ;; --- INSTRUMENTATION -(defn instrument! - [registry] - (mtx/instrument-vars! - [#'submit!] - {:registry registry - :type :counter - :labels ["name"] - :name "tasks_submit_total" - :help "A counter of task submissions." - :wrap (fn [rootf mobj] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn [conn params] - (let [tname (:name params)] - (mobj :inc [tname]) - (origf conn params))) - {::original origf})))}) - - (mtx/instrument-vars! - [#'app.worker/run-task] - {:registry registry - :type :summary - :quantiles [] - :name "tasks_checkout_timing" - :help "Latency measured between scheduled_at and execution time." - :wrap (fn [rootf mobj] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn [tasks item] - (let [now (inst-ms (dt/now)) - sat (inst-ms (:scheduled-at item))] - (mobj :observe (- now sat)) - (origf tasks item))) - {::original origf})))})) - +(defn- wrap-task-handler + [metrics tname f] + (let [labels (into-array String [tname])] + (fn [params] + (let [start (System/nanoTime)] + (try + (f params) + (finally + (mtx/run! metrics + {:id :tasks-timing + :val (/ (- (System/nanoTime) start) 1000000) + :labels labels}))))))) (defmethod ig/pre-init-spec ::registry [_] (s/keys :req-un [::mtx/metrics ::tasks])) (defmethod ig/init-key ::registry [_ {:keys [metrics tasks]}] - (let [mobj (mtx/create - {:registry (:registry metrics) - :type :summary - :labels ["name"] - :quantiles [] - :name "tasks_timing" - :help "Background task execution timing."})] - (reduce-kv (fn [res k v] - (let [tname (name k)] - (l/debug :action "register task" :name tname) - (assoc res k (mtx/wrap-summary v mobj [tname])))) - {} - tasks))) + (reduce-kv (fn [res k v] + (let [tname (name k)] + (l/debug :hint "register task" :name tname) + (assoc res k (wrap-task-handler metrics tname v)))) + {} + tasks)) diff --git a/backend/test/app/test_helpers.clj b/backend/test/app/test_helpers.clj index 2807e6f3b..9f380551a 100644 --- a/backend/test/app/test_helpers.clj +++ b/backend/test/app/test_helpers.clj @@ -248,7 +248,7 @@ [expr] `(try {:error nil - :result ~expr} + :result (deref ~expr)} (catch Exception e# {:error (handle-error e#) :result nil}))) diff --git a/common/deps.edn b/common/deps.edn index 25c6e584a..082926005 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -21,7 +21,7 @@ com.cognitect/transit-cljs {:mvn/version "0.8.269"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/promesa {:mvn/version "6.1.431"} + funcool/promesa {:mvn/version "7.0.444"} funcool/cuerdas {:mvn/version "2022.01.14-391"} lambdaisland/uri {:mvn/version "1.13.95" diff --git a/common/src/app/common/data.cljc b/common/src/app/common/data.cljc index dcd0cf368..9dfba00ad 100644 --- a/common/src/app/common/data.cljc +++ b/common/src/app/common/data.cljc @@ -37,6 +37,22 @@ #?(:cljs (instance? lks/LinkedSet o) :clj (instance? LinkedSet o))) +#?(:clj + (defmethod print-method clojure.lang.PersistentQueue [q, w] + ;; Overload the printer for queues so they look like fish + (print-method '<- w) + (print-method (seq q) w) + (print-method '-< w))) + +(defn queue + ([] #?(:clj clojure.lang.PersistentQueue/EMPTY :cljs #queue [])) + ([a] (into (queue) [a])) + ([a & more] (into (queue) (cons a more)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Data Structures Manipulation +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + (defn deep-merge ([a b] (if (map? a) @@ -45,10 +61,6 @@ ([a b & rest] (reduce deep-merge a (cons b rest)))) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Data Structures Manipulation -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (defn dissoc-in [m [k & ks]] (if ks @@ -151,7 +163,11 @@ "Given a map, return a map removing key-value pairs when value is `nil`." [data] - (into {} (remove (comp nil? second) data))) + (into {} (remove (comp nil? second)) data)) + +(defn without-qualified + [data] + (into {} (remove (comp qualified-keyword? first)) data)) (defn without-keys "Return a map without the keys provided @@ -676,3 +692,4 @@ (recur acc (step k)) acc))) acc)))))) +