From 1b444a42f2b7a4df2205898f78c635d4078a9104 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 4 Mar 2022 18:00:16 +0100 Subject: [PATCH] :recycle: Refactor http server layer Make it fully asynchronous. --- backend/deps.edn | 6 +- backend/src/app/http.clj | 182 +++++++++--------- backend/src/app/http/assets.clj | 33 ++-- backend/src/app/http/awsns.clj | 5 +- backend/src/app/http/debug.clj | 64 ++++--- backend/src/app/http/doc.clj | 10 +- backend/src/app/http/errors.clj | 70 ++++--- backend/src/app/http/feedback.clj | 12 +- backend/src/app/http/middleware.clj | 202 +++++++++----------- backend/src/app/http/oauth.clj | 9 +- backend/src/app/http/session.clj | 212 ++++++++++++--------- backend/src/app/loggers/audit.clj | 21 +- backend/src/app/main.clj | 9 +- backend/src/app/media.clj | 59 +++--- backend/src/app/metrics.clj | 12 +- backend/src/app/rpc.clj | 47 ++--- backend/src/app/rpc/mutations/fonts.clj | 1 - backend/src/app/rpc/mutations/media.clj | 47 +++-- backend/src/app/rpc/mutations/profile.clj | 39 ++-- backend/src/app/rpc/mutations/teams.clj | 52 +++-- backend/src/app/storage.clj | 12 +- backend/src/app/tasks/objects_gc.clj | 1 - backend/src/app/util/websocket.clj | 93 +++++---- backend/test/app/services_files_test.clj | 4 +- backend/test/app/services_media_test.clj | 8 +- backend/test/app/services_profile_test.clj | 4 +- backend/test/app/storage_test.clj | 12 +- backend/test/app/test_helpers.clj | 19 +- 28 files changed, 615 insertions(+), 630 deletions(-) diff --git a/backend/deps.edn b/backend/deps.edn index 81803c92e..438f53356 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -19,12 +19,12 @@ io.lettuce/lettuce-core {:mvn/version "6.1.6.RELEASE"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/yetti {:git/tag "v5.0" :git/sha "f7d61e2" - :git/url "https://github.com/funcool/yetti" + funcool/yetti {:git/tag "v6.0" :git/sha "4c8690e" + :git/url "https://github.com/funcool/yetti.git" :exclusions [org.slf4j/slf4j-api]} com.github.seancorfield/next.jdbc {:mvn/version "1.2.772"} - metosin/reitit-ring {:mvn/version "0.5.16"} + metosin/reitit-core {:mvn/version "0.5.16"} org.postgresql/postgresql {:mvn/version "42.3.3"} com.zaxxer/HikariCP {:mvn/version "5.0.1"} funcool/datoteka {:mvn/version "2.0.0"} diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 2f16b78b5..fa8643879 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -10,18 +10,18 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] - [app.config :as cf] [app.http.doc :as doc] [app.http.errors :as errors] [app.http.middleware :as middleware] [app.metrics :as mtx] + [app.worker :as wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] - [reitit.ring :as rr] - [yetti.adapter :as yt]) - (:import - org.eclipse.jetty.server.Server - org.eclipse.jetty.server.handler.StatisticsHandler)) + [reitit.core :as r] + [reitit.middleware :as rr] + [yetti.adapter :as yt] + [yetti.request :as yrq] + [yetti.response :as yrs])) (declare wrap-router) @@ -29,55 +29,43 @@ ;; HTTP SERVER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::session map?) (s/def ::handler fn?) (s/def ::router some?) (s/def ::port ::us/integer) (s/def ::host ::us/string) (s/def ::name ::us/string) -(s/def ::max-threads ::cf/http-server-max-threads) -(s/def ::min-threads ::cf/http-server-min-threads) +(s/def ::executors (s/map-of keyword? ::wrk/executor)) + +;; (s/def ::max-threads ::cf/http-server-max-threads) +;; (s/def ::min-threads ::cf/http-server-min-threads) (defmethod ig/prep-key ::server [_ cfg] (merge {:name "http" - :min-threads 4 - :max-threads 60 :port 6060 :host "0.0.0.0"} (d/without-nils cfg))) (defmethod ig/pre-init-spec ::server [_] - (s/keys :req-un [::port ::host ::name ::min-threads ::max-threads ::session] - :opt-un [::mtx/metrics ::router ::handler])) - -(defn- instrument-metrics - [^Server server metrics] - (let [stats (doto (StatisticsHandler.) - (.setHandler (.getHandler server)))] - (.setHandler server stats) - (mtx/instrument-jetty! (:registry metrics) stats) - server)) + (s/keys :req-un [::port ::host ::name ::executors] + :opt-un [::router ::handler])) (defmethod ig/init-key ::server - [_ {:keys [handler router port name metrics host] :as cfg}] + [_ {:keys [handler router port name host executors] :as cfg}] (l/info :hint "starting http server" - :port port :host host :name name - :min-threads (:min-threads cfg) - :max-threads (:max-threads cfg)) + :port port :host host :name name) + (let [options {:http/port port :http/host host - :thread-pool/max-threads (:max-threads cfg) - :thread-pool/min-threads (:min-threads cfg) - :ring/async true} + :ring/async true + :xnio/dispatch (:default executors)} handler (cond (fn? handler) handler (some? router) (wrap-router cfg router) :else (ex/raise :type :internal :code :invalid-argument :hint "Missing `handler` or `router` option.")) - server (-> (yt/server handler (d/without-nils options)) - (cond-> metrics (instrument-metrics metrics)))] + server (yt/server handler (d/without-nils options))] (assoc cfg :server (yt/start! server)))) (defmethod ig/halt-key! ::server @@ -85,24 +73,34 @@ (l/info :msg "stoping http server" :name name :port port) (yt/stop! server)) +(defn- not-found-handler + [_ respond _] + (respond (yrs/response 404))) + +(defn- ring-handler + [router] + (fn [request respond raise] + (if-let [match (r/match-by-path router (yrq/path request))] + (let [params (:path-params match) + result (:result match) + handler (or (:handler result) not-found-handler) + request (-> request + (assoc :path-params params) + (update :params merge params))] + (handler request respond raise)) + (not-found-handler request respond raise)))) + (defn- wrap-router - [{:keys [session] :as cfg} router] - (let [default (rr/routes - (rr/create-resource-handler {:path "/"}) - (rr/create-default-handler)) - options {:middleware [[middleware/wrap-server-timing] - [middleware/cookies] - [(:middleware session)]] - :inject-match? false - :inject-router? false} - handler (rr/ring-handler router default options)] + [_ router] + (let [handler (ring-handler router)] (fn [request respond _] - (handler request respond (fn [cause] - (l/error :hint "unexpected error processing request" - ::l/context (errors/get-error-context request cause) - :query-string (:query-string request) - :cause cause) - (respond {:status 500 :body "internal server error"})))))) + (handler request respond + (fn [cause] + (l/error :hint "unexpected error processing request" + ::l/context (errors/get-error-context request cause) + :query-string (yrq/query request) + :cause cause) + (respond (yrs/response 500 "internal server error"))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HTTP ROUTER @@ -117,62 +115,64 @@ (s/def ::audit-handler fn?) (s/def ::debug map?) (s/def ::awsns-handler fn?) +(s/def ::session map?) (defmethod ig/pre-init-spec ::router [_] (s/keys :req-un [::rpc ::mtx/metrics ::ws ::oauth ::storage ::assets - ::feedback ::awsns-handler ::debug ::audit-handler])) + ::session ::feedback ::awsns-handler ::debug ::audit-handler])) (defmethod ig/init-key ::router [_ {:keys [ws session rpc oauth metrics assets feedback debug] :as cfg}] (rr/router - [["/metrics" {:get (:handler metrics)}] - ["/assets" {:middleware [[middleware/format-response-body] - [middleware/errors errors/handle]]} - ["/by-id/:id" {:get (:objects-handler assets)}] - ["/by-file-media-id/:id" {:get (:file-objects-handler assets)}] - ["/by-file-media-id/:id/thumbnail" {:get (:file-thumbnails-handler assets)}]] + [["" {:middleware [[middleware/server-timing] + [middleware/format-response] + [middleware/errors errors/handle] + [middleware/restrict-methods]]} + ["/metrics" {:handler (:handler metrics)}] + ["/assets" {:middleware [(:middleware session)]} + ["/by-id/:id" {:handler (:objects-handler assets)}] + ["/by-file-media-id/:id" {:handler (:file-objects-handler assets)}] + ["/by-file-media-id/:id/thumbnail" {:handler (:file-thumbnails-handler assets)}]] - ["/dbg" {:middleware [[middleware/multipart-params] - [middleware/params] - [middleware/keyword-params] - [middleware/format-response-body] - [middleware/errors errors/handle]]} - ["" {:get (:index debug)}] - ["/error-by-id/:id" {:get (:retrieve-error debug)}] - ["/error/:id" {:get (:retrieve-error debug)}] - ["/error" {:get (:retrieve-error-list debug)}] - ["/file/data" {:get (:retrieve-file-data debug) - :post (:upload-file-data debug)}] - ["/file/changes" {:get (:retrieve-file-changes debug)}]] + ["/dbg" {:middleware [[middleware/params] + [middleware/parse-request] + (:middleware session)]} + ["" {:handler (:index debug)}] + ["/error-by-id/:id" {:handler (:retrieve-error debug)}] + ["/error/:id" {:handler (:retrieve-error debug)}] + ["/error" {:handler (:retrieve-error-list debug)}] + ["/file/data" {:handler (:file-data debug)}] + ["/file/changes" {:handler (:retrieve-file-changes debug)}]] - ["/webhooks" - ["/sns" {:post (:awsns-handler cfg)}]] + ["/webhooks" + ["/sns" {:handler (:awsns-handler cfg) + :allowed-methods #{:post}}]] - ["/ws/notifications" - {:middleware [[middleware/params] - [middleware/keyword-params] - [middleware/format-response-body] - [middleware/errors errors/handle]] - :get ws}] + ["/ws/notifications" {:middleware [[middleware/params] + [middleware/parse-request] + (:middleware session)] + :handler ws + :allowed-methods #{:get}}] - ["/api" {:middleware [[middleware/cors] - [middleware/params] - [middleware/multipart-params] - [middleware/keyword-params] - [middleware/format-response-body] - [middleware/parse-request-body] - [middleware/errors errors/handle]]} + ["/api" {:middleware [[middleware/cors] + [middleware/params] + [middleware/parse-request] + (:middleware session)]} + ["/health" {:handler (:health-check debug)}] + ["/_doc" {:handler (doc/handler rpc) + :allowed-methods #{:get}}] + ["/feedback" {:handler feedback + :allowed-methods #{:post}}] - ["/health" {:get (:health-check debug)}] - ["/_doc" {:get (doc/handler rpc)}] - ["/feedback" {:middleware [(:middleware session)] - :post feedback}] - ["/auth/oauth/:provider" {:post (:handler oauth)}] - ["/auth/oauth/:provider/callback" {:get (:callback-handler oauth)}] + ["/auth/oauth/:provider" {:handler (:handler oauth) + :allowed-methods #{:post}}] + ["/auth/oauth/:provider/callback" {:handler (:callback-handler oauth) + :allowed-methods #{:get}}] - ["/audit/events" {:post (:audit-handler cfg)}] + ["/audit/events" {:handler (:audit-handler cfg) + :allowed-methods #{:post}}] - ["/rpc" - ["/query/:type" {:get (:query-handler rpc) - :post (:query-handler rpc)}] - ["/mutation/:type" {:post (:mutation-handler rpc)}]]]])) + ["/rpc" + ["/query/:type" {:handler (:query-handler rpc)}] + ["/mutation/:type" {:handler (:mutation-handler rpc) + :allowed-methods #{:post}}]]]]])) diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index bb3e9a48e..9061c436d 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -18,7 +18,8 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] - [promesa.exec :as px])) + [promesa.exec :as px] + [yetti.response :as yrs])) (def ^:private cache-max-age (dt/duration {:hours 24})) @@ -53,27 +54,25 @@ (case (:type backend) :db (p/let [body (sto/get-object-bytes storage obj)] - {:status 200 - :headers {"content-type" (:content-type mdata) - "cache-control" (str "max-age=" (inst-ms cache-max-age))} - :body body}) + (yrs/response :status 200 + :body body + :headers {"content-type" (:content-type mdata) + "cache-control" (str "max-age=" (inst-ms cache-max-age))})) :s3 (p/let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})] - {:status 307 - :headers {"location" (str url) - "x-host" (cond-> host port (str ":" port)) - "cache-control" (str "max-age=" (inst-ms cache-max-age))} - :body ""}) + (yrs/response :status 307 + :headers {"location" (str url) + "x-host" (cond-> host port (str ":" port)) + "cache-control" (str "max-age=" (inst-ms cache-max-age))})) :fs (p/let [purl (u/uri (:assets-path cfg)) purl (u/join purl (sto/object->relative-path obj))] - {:status 204 - :headers {"x-accel-redirect" (:path purl) - "content-type" (:content-type mdata) - "cache-control" (str "max-age=" (inst-ms cache-max-age))} - :body ""})))) + (yrs/response :status 204 + :headers {"x-accel-redirect" (:path purl) + "content-type" (:content-type mdata) + "cache-control" (str "max-age=" (inst-ms cache-max-age))}))))) (defn objects-handler "Handler that servers storage objects by id." @@ -84,7 +83,7 @@ obj (sto/get-object storage id)] (if obj (serve-object cfg obj) - {:status 404 :body ""}))) + (yrs/response 404)))) (p/bind p/wrap) (p/then' respond) @@ -98,7 +97,7 @@ obj (sto/get-object storage (kf mobj))] (if obj (serve-object cfg obj) - {:status 404 :body ""}))) + (yrs/response 404)))) (defn file-objects-handler "Handler that serves storage objects by file media id." diff --git a/backend/src/app/http/awsns.clj b/backend/src/app/http/awsns.clj index 5bea7ede7..2844c2d53 100644 --- a/backend/src/app/http/awsns.clj +++ b/backend/src/app/http/awsns.clj @@ -15,7 +15,8 @@ [cuerdas.core :as str] [integrant.core :as ig] [jsonista.core :as j] - [promesa.exec :as px])) + [promesa.exec :as px] + [yetti.response :as yrs])) (declare parse-json) (declare handle-request) @@ -32,7 +33,7 @@ (fn [request respond _] (let [data (slurp (:body request))] (px/run! executor #(handle-request cfg data)) - (respond {:status 200 :body ""})))) + (respond (yrs/response 200))))) (defn handle-request [{:keys [http-client] :as cfg} data] diff --git a/backend/src/app/http/debug.clj b/backend/src/app/http/debug.clj index f458eb757..53f70a584 100644 --- a/backend/src/app/http/debug.clj +++ b/backend/src/app/http/debug.clj @@ -25,7 +25,9 @@ [fipp.edn :as fpp] [integrant.core :as ig] [promesa.core :as p] - [promesa.exec :as px])) + [promesa.exec :as px] + [yetti.request :as yrq] + [yetti.response :as yrs])) ;; (selmer.parser/cache-off!) @@ -41,11 +43,10 @@ (when-not (authorized? pool request) (ex/raise :type :authentication :code :only-admins-allowed)) - - {:status 200 - :headers {"content-type" "text/html"} - :body (-> (io/resource "templates/debug.tmpl") - (tmpl/render {}))}) + (yrs/response :status 200 + :headers {"content-type" "text/html"} + :body (-> (io/resource "templates/debug.tmpl") + (tmpl/render {})))) (def sql:retrieve-range-of-changes @@ -61,13 +62,14 @@ :code :enpty-data :hint "empty response")) - (cond-> {:status 200 - :headers {"content-type" "application/transit+json"} - :body body} + (cond-> (yrs/response :status 200 + :body body + :headers {"content-type" "application/transit+json"}) (contains? params :download) (update :headers assoc "content-disposition" "attachment"))) -(defn retrieve-file-data + +(defn- retrieve-file-data [{:keys [pool]} {:keys [params] :as request}] (when-not (authorized? pool request) (ex/raise :type :authentication @@ -87,7 +89,7 @@ (update :headers assoc "content-type" "application/octet-stream")) (prepare-response request (some-> data blob/decode)))))) -(defn upload-file-data +(defn- upload-file-data [{:keys [pool]} {:keys [profile-id params] :as request}] (let [project-id (some-> (profile/retrieve-additional-data pool profile-id) :default-project-id) data (some-> params :file :tempfile fs/slurp-bytes blob/decode)] @@ -99,10 +101,16 @@ :project-id project-id :profile-id profile-id :data data}) - {:status 200 - :body "OK"}) - {:status 500 - :body "error"}))) + (yrs/response 200 "OK")) + (yrs/response 500 "ERROR")))) + +(defn file-data + [cfg request] + (case (yrq/method request) + :get (retrieve-file-data cfg request) + :post (upload-file-data cfg request) + (ex/raise :type :http + :code :method-not-found))) (defn retrieve-file-changes [{:keys [pool]} request] @@ -175,12 +183,11 @@ (retrieve-report) (render-template))] (if result - {:status 200 - :headers {"content-type" "text/html; charset=utf-8" - "x-robots-tag" "noindex"} - :body result} - {:status 404 - :body "not found"})))) + (yrs/response :status 200 + :body result + :headers {"content-type" "text/html; charset=utf-8" + "x-robots-tag" "noindex"}) + (yrs/response 404 "not found"))))) (def sql:error-reports "select id, created_at from server_error_report order by created_at desc limit 100") @@ -192,18 +199,18 @@ :code :only-admins-allowed)) (let [items (db/exec! pool [sql:error-reports]) items (map #(update % :created-at dt/format-instant :rfc1123) items)] - {:status 200 - :headers {"content-type" "text/html; charset=utf-8" - "x-robots-tag" "noindex"} - :body (-> (io/resource "templates/error-list.tmpl") - (tmpl/render {:items items}))})) + (yrs/response :status 200 + :body (-> (io/resource "templates/error-list.tmpl") + (tmpl/render {:items items})) + :headers {"content-type" "text/html; charset=utf-8" + "x-robots-tag" "noindex"}))) (defn health-check "Mainly a task that performs a health check." [{:keys [pool]} _] (db/with-atomic [conn pool] (db/exec-one! conn ["select count(*) as count from server_prop;"]) - {:status 200 :body "Ok"})) + (yrs/response 200 "OK"))) (defn- wrap-async [{:keys [executor] :as cfg} f] @@ -219,8 +226,7 @@ [_ cfg] {:index (wrap-async cfg index) :health-check (wrap-async cfg health-check) - :retrieve-file-data (wrap-async cfg retrieve-file-data) :retrieve-file-changes (wrap-async cfg retrieve-file-changes) :retrieve-error (wrap-async cfg retrieve-error) :retrieve-error-list (wrap-async cfg retrieve-error-list) - :upload-file-data (wrap-async cfg upload-file-data)}) + :file-data (wrap-async cfg file-data)}) diff --git a/backend/src/app/http/doc.clj b/backend/src/app/http/doc.clj index 07f63bb04..a6e88458b 100644 --- a/backend/src/app/http/doc.clj +++ b/backend/src/app/http/doc.clj @@ -13,7 +13,8 @@ [app.util.template :as tmpl] [clojure.java.io :as io] [clojure.spec.alpha :as s] - [pretty-spec.core :as ps])) + [pretty-spec.core :as ps] + [yetti.response :as yrs])) (defn get-spec-str [k] @@ -47,8 +48,7 @@ (let [context (prepare-context rpc)] (if (contains? cf/flags :backend-api-doc) (fn [_ respond _] - (respond {:status 200 - :body (-> (io/resource "api-doc.tmpl") - (tmpl/render context))})) + (respond (yrs/response 200 (-> (io/resource "api-doc.tmpl") + (tmpl/render context))))) (fn [_ respond _] - (respond {:status 404 :body ""}))))) + (respond (yrs/response 404)))))) diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj index c7f2a9ba6..40eaf9fc3 100644 --- a/backend/src/app/http/errors.clj +++ b/backend/src/app/http/errors.clj @@ -11,13 +11,15 @@ [app.common.logging :as l] [app.common.spec :as us] [clojure.spec.alpha :as s] - [cuerdas.core :as str])) + [cuerdas.core :as str] + [yetti.request :as yrq] + [yetti.response :as yrs])) (defn- parse-client-ip - [{:keys [headers] :as request}] - (or (some-> (get headers "x-forwarded-for") (str/split ",") first) - (get headers "x-real-ip") - (get request :remote-addr))) + [request] + (or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) + (yrq/get-header request "x-real-ip") + (yrq/remote-addr request))) (defn get-error-context [request error] @@ -49,20 +51,19 @@ (defmethod handle-exception :authentication [err _] - {:status 401 :body (ex-data err)}) + (yrs/response 401 (ex-data err))) (defmethod handle-exception :restriction [err _] - {:status 400 :body (ex-data err)}) + (yrs/response 400 (ex-data err))) (defmethod handle-exception :validation [err _] (let [data (ex-data err) explain (us/pretty-explain data)] - {:status 400 - :body (-> data - (dissoc ::s/problems ::s/value) - (cond-> explain (assoc :explain explain)))})) + (yrs/response 400 (-> data + (dissoc ::s/problems ::s/value) + (cond-> explain (assoc :explain explain)))))) (defmethod handle-exception :assertion [error request] @@ -71,17 +72,16 @@ (l/error ::l/raw (ex-message error) ::l/context (get-error-context request error) :cause error) - - {:status 500 - :body {:type :server-error - :code :assertion - :data (-> edata - (dissoc ::s/problems ::s/value ::s/spec) - (cond-> explain (assoc :explain explain)))}})) + (yrs/response :status 500 + :body {:type :server-error + :code :assertion + :data (-> edata + (dissoc ::s/problems ::s/value ::s/spec) + (cond-> explain (assoc :explain explain)))}))) (defmethod handle-exception :not-found [err _] - {:status 404 :body (ex-data err)}) + (yrs/response 404 (ex-data err))) (defmethod handle-exception :default [error request] @@ -98,11 +98,10 @@ (l/error ::l/raw (ex-message error) ::l/context (get-error-context request error) :cause error) - {:status 500 - :body {:type :server-error - :code :unexpected - :hint (ex-message error) - :data edata}})))) + (yrs/response 500 {:type :server-error + :code :unexpected + :hint (ex-message error) + :data edata}))))) (defmethod handle-exception org.postgresql.util.PSQLException [error request] @@ -112,23 +111,20 @@ :cause error) (cond (= state "57014") - {:status 504 - :body {:type :server-timeout - :code :statement-timeout - :hint (ex-message error)}} + (yrs/response 504 {:type :server-timeout + :code :statement-timeout + :hint (ex-message error)}) (= state "25P03") - {:status 504 - :body {:type :server-timeout - :code :idle-in-transaction-timeout - :hint (ex-message error)}} + (yrs/response 504 {:type :server-timeout + :code :idle-in-transaction-timeout + :hint (ex-message error)}) :else - {:status 500 - :body {:type :server-error - :code :psql-exception - :hint (ex-message error) - :state state}}))) + (yrs/response 500 {:type :server-error + :code :psql-exception + :hint (ex-message error) + :state state})))) (defn handle [error req] diff --git a/backend/src/app/http/feedback.clj b/backend/src/app/http/feedback.clj index 1f7e92a03..d5b6423e7 100644 --- a/backend/src/app/http/feedback.clj +++ b/backend/src/app/http/feedback.clj @@ -7,7 +7,6 @@ (ns app.http.feedback "A general purpose feedback module." (:require - [app.common.data :as d] [app.common.exceptions :as ex] [app.common.spec :as us] [app.config :as cf] @@ -18,7 +17,9 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] - [promesa.exec :as px])) + [promesa.exec :as px] + [yetti.request :as yrq] + [yetti.response :as yrs])) (declare ^:private send-feedback) (declare ^:private handler) @@ -42,9 +43,8 @@ (defn- handler [{:keys [pool] :as cfg} {:keys [profile-id] :as request}] (let [ftoken (cf/get :feedback-token ::no-token) - token (get-in request [:headers "x-feedback-token"]) - params (d/merge (:params request) - (:body-params request))] + token (yrq/get-header request "x-feedback-token") + params (::yrq/params request)] (cond (uuid? profile-id) (let [profile (profile/retrieve-profile-data pool profile-id) @@ -54,7 +54,7 @@ (= token ftoken) (send-feedback cfg nil params)) - {:status 204 :body ""})) + (yrs/response 204))) (s/def ::content ::us/string) (s/def ::from ::us/email) diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index c84413e30..df3d31c76 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -10,49 +10,41 @@ [app.common.transit :as t] [app.config :as cf] [app.util.json :as json] - [ring.core.protocols :as rp] - [ring.middleware.cookies :refer [wrap-cookies]] - [ring.middleware.keyword-params :refer [wrap-keyword-params]] - [ring.middleware.multipart-params :refer [wrap-multipart-params]] - [ring.middleware.params :refer [wrap-params]] - [yetti.adapter :as yt])) + [cuerdas.core :as str] + [yetti.adapter :as yt] + [yetti.middleware :as ymw] + [yetti.request :as yrq] + [yetti.response :as yrs]) + (:import java.io.OutputStream)) -(defn wrap-server-timing +(def server-timing + {:name ::server-timing + :compile (constantly ymw/wrap-server-timing)}) + +(def params + {:name ::params + :compile (constantly ymw/wrap-params)}) + +(defn wrap-parse-request [handler] - (letfn [(get-age [start] - (float (/ (- (System/nanoTime) start) 1000000000))) + (letfn [(process-request [request] + (let [header (yrq/get-header request "content-type")] + (cond + (str/starts-with? header "application/transit+json") + (with-open [is (-> request yrq/body yrq/body-stream)] + (let [params (t/read! (t/reader is))] + (-> request + (assoc :body-params params) + (update :params merge params)))) - (update-headers [headers start] - (assoc headers "Server-Timing" (str "total;dur=" (get-age start))))] - - (fn [request respond raise] - (let [start (System/nanoTime)] - (handler request #(respond (update % :headers update-headers start)) raise))))) - -(defn wrap-parse-request-body - [handler] - (letfn [(parse-transit [body] - (let [reader (t/reader body)] - (t/read! reader))) - - (parse-json [body] - (json/read body)) - - (handle-request [{:keys [headers body] :as request}] - (let [ctype (get headers "content-type")] - (case ctype - "application/transit+json" - (let [params (parse-transit body)] - (-> request - (assoc :body-params params) - (update :params merge params))) - - "application/json" - (let [params (parse-json body)] - (-> request - (assoc :body-params params) - (update :params merge params))) + (str/starts-with? header "application/json") + (with-open [is (-> request yrq/body yrq/body-stream)] + (let [params (json/read is)] + (-> request + (assoc :body-params params) + (update :params merge params)))) + :else request))) (handle-exception [cause] @@ -60,20 +52,20 @@ :code :unable-to-parse-request-body :hint "malformed params"}] (l/error :hint (ex-message cause) :cause cause) - {:status 400 - :headers {"content-type" "application/transit+json"} - :body (t/encode-str data {:type :json-verbose})}))] + (yrs/response :status 400 + :headers {"content-type" "application/transit+json"} + :body (t/encode-str data {:type :json-verbose}))))] (fn [request respond raise] (try - (let [request (handle-request request)] + (let [request (process-request request)] (handler request respond raise)) (catch Exception cause (respond (handle-exception cause))))))) -(def parse-request-body - {:name ::parse-request-body - :compile (constantly wrap-parse-request-body)}) +(def parse-request + {:name ::parse-request + :compile (constantly wrap-parse-request)}) (defn buffered-output-stream "Returns a buffered output stream that ignores flush calls. This is @@ -87,56 +79,51 @@ (proxy-super flush) (proxy-super close)))) -(def ^:const buffer-size (:http/output-buffer-size yt/base-defaults)) +(def ^:const buffer-size (:xnio/buffer-size yt/defaults)) -(defn wrap-format-response-body +(defn wrap-format-response [handler] (letfn [(transit-streamable-body [data opts] - (reify rp/StreamableResponseBody - (write-body-to-stream [_ _ output-stream] - ;; Use the same buffer as jetty output buffer size + (reify yrs/StreamableResponseBody + (-write-body-to-stream [_ _ output-stream] (try (with-open [bos (buffered-output-stream output-stream buffer-size)] (let [tw (t/writer bos opts)] (t/write! tw data))) - (catch org.eclipse.jetty.io.EofException _cause + + (catch java.io.IOException _cause ;; Do nothing, EOF means client closes connection abruptly nil) (catch Throwable cause (l/warn :hint "unexpected error on encoding response" - :cause cause)))))) + :cause cause)) + (finally + (.close ^OutputStream output-stream)))))) - (impl-format-response-body [response {:keys [query-params] :as request}] - (let [body (:body response) - opts {:type (if (contains? query-params "transit_verbose") :json-verbose :json)}] - (cond - (:ws response) - response - - (coll? body) - (-> response - (update :headers assoc "content-type" "application/transit+json") - (assoc :body (transit-streamable-body body opts))) - - (nil? body) - (assoc response :status 204 :body "") - - :else + (format-response [response request] + (let [body (yrs/body response)] + (if (coll? body) + (let [qs (yrq/query request) + opts {:type (if (str/includes? qs "verbose") :json-verbose :json)}] + (-> response + (update :headers assoc "content-type" "application/transit+json") + (assoc :body (transit-streamable-body body opts)))) response))) - (handle-response [response request] + (process-response [response request] (cond-> response - (map? response) (impl-format-response-body request)))] + (map? response) (format-response request)))] (fn [request respond raise] (handler request (fn [response] - (respond (handle-response response request))) + (let [response (process-response response request)] + (respond response))) raise)))) -(def format-response-body - {:name ::format-response-body - :compile (constantly wrap-format-response-body)}) +(def format-response + {:name ::format-response + :compile (constantly wrap-format-response)}) (defn wrap-errors [handler on-error] @@ -148,51 +135,46 @@ {:name ::errors :compile (constantly wrap-errors)}) -(def cookies - {:name ::cookies - :compile (constantly wrap-cookies)}) - -(def params - {:name ::params - :compile (constantly wrap-params)}) - -(def multipart-params - {:name ::multipart-params - :compile (constantly wrap-multipart-params)}) - -(def keyword-params - {:name ::keyword-params - :compile (constantly wrap-keyword-params)}) - -(def server-timing - {:name ::server-timing - :compile (constantly wrap-server-timing)}) - (defn wrap-cors [handler] (if-not (contains? cf/flags :cors) handler - (letfn [(add-cors-headers [response request] - (-> response - (update - :headers - (fn [headers] - (-> headers - (assoc "access-control-allow-origin" (get-in request [:headers "origin"])) - (assoc "access-control-allow-methods" "GET,POST,DELETE,OPTIONS,PUT,HEAD,PATCH") - (assoc "access-control-allow-credentials" "true") - (assoc "access-control-expose-headers" "x-requested-with, content-type, cookie") - (assoc "access-control-allow-headers" "x-frontend-version, content-type, accept, x-requested-width"))))))] + (letfn [(add-headers [headers request] + (let [origin (yrq/get-header request "origin")] + (-> headers + (assoc "access-control-allow-origin" origin) + (assoc "access-control-allow-methods" "GET,POST,DELETE,OPTIONS,PUT,HEAD,PATCH") + (assoc "access-control-allow-credentials" "true") + (assoc "access-control-expose-headers" "x-requested-with, content-type, cookie") + (assoc "access-control-allow-headers" "x-frontend-version, content-type, accept, x-requested-width")))) + + (update-response [response request] + (update response :headers add-headers request))] + (fn [request respond raise] - (if (= (:request-method request) :options) - (-> {:status 200 :body ""} - (add-cors-headers request) + (if (= (yrq/method request) :options) + (-> (yrs/response 200) + (update-response request) (respond)) (handler request (fn [response] - (respond (add-cors-headers response request))) + (respond (update-response response request))) raise)))))) (def cors {:name ::cors :compile (constantly wrap-cors)}) + +(defn compile-restrict-methods + [data _] + (when-let [allowed (:allowed-methods data)] + (fn [handler] + (fn [request respond raise] + (let [method (yrq/method request)] + (if (contains? allowed method) + (handler request respond raise) + (respond (yrs/response 405)))))))) + +(def restrict-methods + {:name ::restrict-methods + :compile compile-restrict-methods}) diff --git a/backend/src/app/http/oauth.clj b/backend/src/app/http/oauth.clj index cf936764f..fffdcfbd1 100644 --- a/backend/src/app/http/oauth.clj +++ b/backend/src/app/http/oauth.clj @@ -22,7 +22,8 @@ [cuerdas.core :as str] [integrant.core :as ig] [promesa.core :as p] - [promesa.exec :as px])) + [promesa.exec :as px] + [yetti.response :as yrs])) (defn- build-redirect-uri [{:keys [provider] :as cfg}] @@ -175,9 +176,7 @@ (defn- redirect-response [uri] - {:status 302 - :headers {"location" (str uri)} - :body ""}) + (yrs/response :status 302 :headers {"location" (str uri)})) (defn- generate-error-redirect [cfg error] @@ -233,7 +232,7 @@ :props props :exp (dt/in-future "15m")}) uri (build-auth-uri cfg state)] - (respond {:status 200 :body {:redirect-uri uri}}))) + (respond (yrs/response 200 {:redirect-uri uri})))) (defn- callback-handler [cfg request respond _] diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj index 9a544aa01..b68092940 100644 --- a/backend/src/app/http/session.clj +++ b/backend/src/app/http/session.clj @@ -19,7 +19,9 @@ [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] - [ring.middleware.session.store :as rss])) + [promesa.core :as p] + [promesa.exec :as px] + [yetti.request :as yrq])) ;; A default cookie name for storing the session. We don't allow to configure it. (def token-cookie-name "auth-token") @@ -29,65 +31,100 @@ ;; prevents using it if some one wants to. (def authenticated-cookie-name "authenticated") -(deftype DatabaseStore [pool tokens] - rss/SessionStore - (read-session [_ token] - (db/exec-one! pool (sql/select :http-session {:id token}))) +(defprotocol ISessionStore + (read-session [store key]) + (write-session [store key data]) + (delete-session [store key])) - (write-session [_ _ data] - (let [profile-id (:profile-id data) - user-agent (:user-agent data) - token (tokens :generate {:iss "authentication" - :iat (dt/now) - :uid profile-id}) +(defn- make-database-store + [{:keys [pool tokens executor]}] + (reify ISessionStore + (read-session [_ token] + (px/with-dispatch executor + (db/exec-one! pool (sql/select :http-session {:id token})))) - now (dt/now) - params {:user-agent user-agent - :profile-id profile-id - :created-at now - :updated-at now - :id token}] - (db/insert! pool :http-session params) - token)) + (write-session [_ _ data] + (px/with-dispatch executor + (let [profile-id (:profile-id data) + user-agent (:user-agent data) + token (tokens :generate {:iss "authentication" + :iat (dt/now) + :uid profile-id}) - (delete-session [_ token] - (db/delete! pool :http-session {:id token}) - nil)) + now (dt/now) + params {:user-agent user-agent + :profile-id profile-id + :created-at now + :updated-at now + :id token}] + (db/insert! pool :http-session params) + token))) -(deftype MemoryStore [cache tokens] - rss/SessionStore - (read-session [_ token] - (get @cache token)) + (delete-session [_ token] + (px/with-dispatch executor + (db/delete! pool :http-session {:id token}) + nil)))) - (write-session [_ _ data] - (let [profile-id (:profile-id data) - user-agent (:user-agent data) - token (tokens :generate {:iss "authentication" - :iat (dt/now) - :uid profile-id}) - params {:user-agent user-agent - :profile-id profile-id - :id token}] +(defn make-inmemory-store + [{:keys [tokens]}] + (let [cache (atom {})] + (reify ISessionStore + (read-session [_ token] + (p/do (get @cache token))) - (swap! cache assoc token params) - token)) + (write-session [_ _ data] + (p/do + (let [profile-id (:profile-id data) + user-agent (:user-agent data) + token (tokens :generate {:iss "authentication" + :iat (dt/now) + :uid profile-id}) + params {:user-agent user-agent + :profile-id profile-id + :id token}] - (delete-session [_ token] - (swap! cache dissoc token) - nil)) + (swap! cache assoc token params) + token))) + + (delete-session [_ token] + (p/do + (swap! cache dissoc token) + nil))))) + +(s/def ::tokens fn?) +(defmethod ig/pre-init-spec ::store [_] + (s/keys :req-un [::db/pool ::wrk/executor ::tokens])) + +(defmethod ig/init-key ::store + [_ {:keys [pool] :as cfg}] + (if (db/read-only? pool) + (make-inmemory-store cfg) + (make-database-store cfg))) + +(defmethod ig/halt-key! ::store + [_ _]) ;; --- IMPL -(defn- create-session +(defn- create-session! [store request profile-id] - (let [params {:user-agent (get-in request [:headers "user-agent"]) + (let [params {:user-agent (yrq/get-header request "user-agent") :profile-id profile-id}] - (rss/write-session store nil params))) + (write-session store nil params))) -(defn- delete-session +(defn- delete-session! [store {:keys [cookies] :as request}] (when-let [token (get-in cookies [token-cookie-name :value])] - (rss/delete-session store token))) + (delete-session store token))) + +(defn- retrieve-session + [store request] + (when-let [cookie (yrq/get-cookie request token-cookie-name)] + (-> (read-session store (:value cookie)) + (p/then (fn [session] + (when session + {:session-id (:id session) + :profile-id (:profile-id session)})))))) (defn- add-cookies [response token] @@ -114,43 +151,40 @@ (defn- clear-cookies [response] (let [authenticated-cookie-domain (cfg/get :authenticated-cookie-domain)] - (assoc response :cookies {token-cookie-name {:path "/" - :value "" - :max-age -1} - authenticated-cookie-name {:domain authenticated-cookie-domain - :path "/" - :value "" - :max-age -1}}))) + (assoc response :cookies + {token-cookie-name {:path "/" + :value "" + :max-age -1} + authenticated-cookie-name {:domain authenticated-cookie-domain + :path "/" + :value "" + :max-age -1}}))) -;; NOTE: for now the session middleware is synchronous and is -;; processed on jetty threads. This is because of probably a bug on -;; jetty that causes NPE on upgrading connection to websocket from -;; thread not managed by jetty. We probably can fix it running -;; websocket server in different port as standalone service. +(defn- make-middleware + [{:keys [::events-ch store] :as cfg}] + {:name :session-middleware + :wrap (fn [handler] + (fn [request respond raise] + (try + (-> (retrieve-session store request) + (p/then' #(merge request %)) + (p/finally (fn [request cause] + (if cause + (raise cause) + (do + (when-let [session-id (:session-id request)] + (a/offer! events-ch session-id)) + (handler request respond raise)))))) + (catch Throwable cause + (raise cause)))))}) -(defn- middleware - [{:keys [::events-ch ::store] :as cfg} handler] - (letfn [(get-session [{:keys [cookies] :as request}] - (if-let [token (get-in cookies [token-cookie-name :value])] - (if-let [{:keys [id profile-id]} (rss/read-session store token)] - (assoc request :session-id id :profile-id profile-id) - request) - request))] - - (fn [request respond raise] - (try - (let [{:keys [session-id profile-id] :as request} (get-session request)] - (when (and session-id profile-id) - (a/offer! events-ch session-id)) - (handler request respond raise)) - (catch Throwable cause - (raise cause)))))) ;; --- STATE INIT: SESSION -(s/def ::tokens fn?) +(s/def ::store #(satisfies? ISessionStore %)) + (defmethod ig/pre-init-spec :app.http/session [_] - (s/keys :req-un [::db/pool ::tokens ::wrk/executor])) + (s/keys :req-un [::store])) (defmethod ig/prep-key :app.http/session [_ cfg] @@ -158,29 +192,23 @@ (d/without-nils cfg))) (defmethod ig/init-key :app.http/session - [_ {:keys [pool tokens] :as cfg}] + [_ {:keys [store] :as cfg}] (let [events-ch (a/chan (a/dropping-buffer (:buffer-size cfg))) - store (if (db/read-only? pool) - (->MemoryStore (atom {}) tokens) - (->DatabaseStore pool tokens)) - - cfg (assoc cfg ::store store ::events-ch events-ch)] - - (when (db/read-only? pool) - (l/warn :hint "sessions module initialized with in-memory store")) + cfg (assoc cfg ::events-ch events-ch)] (-> cfg - (assoc :middleware (partial middleware cfg)) + (assoc :middleware (make-middleware cfg)) (assoc :create (fn [profile-id] (fn [request response] - (let [token (create-session store request profile-id)] + (p/let [token (create-session! store request profile-id)] (add-cookies response token))))) (assoc :delete (fn [request response] - (delete-session store request) - (-> response - (assoc :status 204) - (assoc :body "") - (clear-cookies))))))) + (p/do + (delete-session! store request) + (-> response + (assoc :status 204) + (assoc :body nil) + (clear-cookies)))))))) (defmethod ig/halt-key! :app.http/session [_ data] diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index 5d02a56f4..ae80ffaec 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -24,13 +24,15 @@ [integrant.core :as ig] [lambdaisland.uri :as u] [promesa.core :as p] - [promesa.exec :as px])) + [promesa.exec :as px] + [yetti.request :as yrq] + [yetti.response :as yrs])) (defn parse-client-ip - [{:keys [headers] :as request}] - (or (some-> (get headers "x-forwarded-for") (str/split ",") first) - (get headers "x-real-ip") - (get request :remote-addr))) + [request] + (or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) + (yrq/get-header request "x-real-ip") + (yrq/remote-addr request))) (defn profile->props [profile] @@ -87,11 +89,10 @@ (do (l/warn :hint "audit log http handler disabled or db is read-only") (fn [_ respond _] - (respond {:status 204 :body ""}))) + (respond (yrs/response 204)))) - - (letfn [(handler [{:keys [params profile-id] :as request}] - (let [events (->> (:events params) + (letfn [(handler [{:keys [profile-id] :as request}] + (let [events (->> (:events (:params request)) (remove #(not= profile-id (:profile-id %))) (us/conform ::frontend-events)) @@ -113,7 +114,7 @@ (-> (px/submit! executor #(handler request)) (p/catch handle-error)) - (respond {:status 204 :body ""}))))) + (respond (yrs/response 204)))))) (defn- persist-http-events [{:keys [pool events ip-addr source] :as cfg}] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 0908aa126..85c13749c 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -83,6 +83,9 @@ {:executor (ig/ref [::default :app.worker/executor])} :app.http/session + {:store (ig/ref :app.http.session/store)} + + :app.http.session/store {:pool (ig/ref :app.db/pool) :tokens (ig/ref :app.tokens/tokens) :executor (ig/ref [::default :app.worker/executor])} @@ -110,14 +113,12 @@ :host (cf/get :http-server-host) :router (ig/ref :app.http/router) :metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref [::default :app.worker/executor]) - :session (ig/ref :app.http/session) - :max-threads (cf/get :http-server-max-threads) - :min-threads (cf/get :http-server-min-threads)} + :executors (ig/ref :app.worker/executors)} :app.http/router {:assets (ig/ref :app.http.assets/handlers) :feedback (ig/ref :app.http.feedback/handler) + :session (ig/ref :app.http/session) :awsns-handler (ig/ref :app.http.awsns/handler) :oauth (ig/ref :app.http.oauth/handler) :debug (ig/ref :app.http.debug/handlers) diff --git a/backend/src/app/media.clj b/backend/src/app/media.clj index 95a0fde00..3814d9473 100644 --- a/backend/src/app/media.clj +++ b/backend/src/app/media.clj @@ -28,28 +28,30 @@ org.im4java.core.IMOperation org.im4java.core.Info)) -(s/def ::image-content-type cm/valid-image-types) -(s/def ::font-content-type cm/valid-font-types) - -(s/def :internal.http.upload/filename ::us/string) -(s/def :internal.http.upload/size ::us/integer) -(s/def :internal.http.upload/content-type ::us/string) -(s/def :internal.http.upload/tempfile any?) +(s/def ::path fs/path?) +(s/def ::filename string?) +(s/def ::size integer?) +(s/def ::headers (s/map-of string? string?)) +(s/def ::mtype string?) (s/def ::upload - (s/keys :req-un [:internal.http.upload/filename - :internal.http.upload/size - :internal.http.upload/tempfile - :internal.http.upload/content-type])) + (s/keys :req-un [::filename ::size ::path] + :opt-un [::mtype ::headers])) + +;; A subset of fields from the ::upload spec +(s/def ::input + (s/keys :req-un [::path] + :opt-un [::mtype])) (defn validate-media-type! - ([mtype] (validate-media-type! mtype cm/valid-image-types)) - ([mtype allowed] - (when-not (contains? allowed mtype) + ([upload] (validate-media-type! upload cm/valid-image-types)) + ([upload allowed] + (when-not (contains? allowed (:mtype upload)) (ex/raise :type :validation :code :media-type-not-allowed :hint "Seems like you are uploading an invalid media object")) - mtype)) + + upload)) (defmulti process :cmd) (defmulti process-error class) @@ -72,26 +74,16 @@ (process-error e)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; --- Thumbnails Generation +;; IMAGE THUMBNAILS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::cmd keyword?) - -(s/def ::path (s/or :path fs/path? - :string string? - :file fs/file?)) - -(s/def ::input - (s/keys :req-un [::path] - :opt-un [::cm/mtype])) - (s/def ::width integer?) (s/def ::height integer?) (s/def ::format #{:jpeg :webp :png}) (s/def ::quality #(< 0 % 101)) (s/def ::thumbnail-params - (s/keys :req-un [::cmd ::input ::format ::width ::height])) + (s/keys :req-un [::input ::format ::width ::height])) ;; Related info on how thumbnails generation ;; http://www.imagemagick.org/Usage/thumbnails/ @@ -178,7 +170,7 @@ (ex/raise :type :validation :code :invalid-svg-file :hint "uploaded svg does not provides dimensions")) - (assoc info :mtype mtype)) + (merge input info)) (let [instance (Info. (str path)) mtype' (.getProperty instance "Mime type")] @@ -191,9 +183,9 @@ ;; For an animated GIF, getImageWidth/Height returns the delta size of one frame (if no frame given ;; it returns size of the last one), whereas getPageWidth/Height always return the full size of ;; any frame. - {:width (.getPageWidth instance) - :height (.getPageHeight instance) - :mtype mtype})))) + (assoc input + :width (.getPageWidth instance) + :height (.getPageHeight instance)))))) (defmethod process-error org.im4java.core.InfoException [error] @@ -203,7 +195,7 @@ :cause error)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Fonts Generation +;; FONTS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defmethod process :generate-fonts @@ -326,11 +318,10 @@ (defn configure-assets-storage "Given storage map, returns a storage configured with the appropriate - backend for assets." + backend for assets and optional connection attached." ([storage] (assoc storage :backend (cf/get :assets-storage-backend :assets-fs))) ([storage conn] (-> storage (assoc :conn conn) (assoc :backend (cf/get :assets-storage-backend :assets-fs))))) - diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index f21d58889..254c9d56b 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -23,8 +23,6 @@ io.prometheus.client.Histogram$Child io.prometheus.client.exporter.common.TextFormat io.prometheus.client.hotspot.DefaultExports - io.prometheus.client.jetty.JettyStatisticsCollector - org.eclipse.jetty.server.handler.StatisticsHandler java.io.StringWriter)) (set! *warn-on-reflection* true) @@ -265,9 +263,9 @@ :summary (make-summary props) :histogram (make-histogram props))) -(defn instrument-jetty! - [^CollectorRegistry registry ^StatisticsHandler handler] - (doto (JettyStatisticsCollector. handler) - (.register registry)) - nil) +;; (defn instrument-jetty! +;; [^CollectorRegistry registry ^StatisticsHandler handler] +;; (doto (JettyStatisticsCollector. handler) +;; (.register registry)) +;; nil) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 49c1749a5..3cdef422c 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -21,7 +21,8 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] - [promesa.exec :as px])) + [promesa.exec :as px] + [yetti.response :as yrs])) (defn- default-handler [_] @@ -30,8 +31,8 @@ (defn- handle-response-transformation [response request mdata] (if-let [transform-fn (:transform-response mdata)] - (transform-fn request response) - response)) + (p/do (transform-fn request response)) + (p/resolved response))) (defn- handle-before-comple-hook [response mdata] @@ -42,54 +43,44 @@ (defn- rpc-query-handler "Ring handler that dispatches query requests and convert between internal async flow into ring async flow." - [methods {:keys [profile-id session-id] :as request} respond raise] + [methods {:keys [profile-id session-id params] :as request} respond raise] (letfn [(handle-response [result] (let [mdata (meta result)] - (-> {:status 200 :body result} + (-> (yrs/response 200 result) (handle-response-transformation request mdata))))] - (let [type (keyword (get-in request [:path-params :type])) - data (merge (:params request) - (:body-params request) - (:uploads request) - {::request request}) - + (let [type (keyword (:type params)) + data (into {::request request} params) data (if profile-id (assoc data :profile-id profile-id ::session-id session-id) (dissoc data :profile-id)) - - ;; Get the method from methods registry and if method does - ;; not exists asigns it to the default handler. method (get methods type default-handler)] (-> (method data) - (p/then #(respond (handle-response %))) + (p/then handle-response) + (p/then respond) (p/catch raise))))) (defn- rpc-mutation-handler "Ring handler that dispatches mutation requests and convert between internal async flow into ring async flow." - [methods {:keys [profile-id session-id] :as request} respond raise] + [methods {:keys [profile-id session-id params] :as request} respond raise] (letfn [(handle-response [result] (let [mdata (meta result)] - (-> {:status 200 :body result} - (handle-response-transformation request mdata) - (handle-before-comple-hook mdata))))] - - (let [type (keyword (get-in request [:path-params :type])) - data (merge (:params request) - (:body-params request) - (:uploads request) - {::request request}) + (p/-> (yrs/response 200 result) + (handle-response-transformation request mdata) + (handle-before-comple-hook mdata))))] + (let [type (keyword (:type params)) + data (into {::request request} params) data (if profile-id (assoc data :profile-id profile-id ::session-id session-id) (dissoc data :profile-id)) method (get methods type default-handler)] - (-> (method data) - (p/then #(respond (handle-response %))) + (p/then handle-response) + (p/then respond) (p/catch raise))))) (defn- wrap-metrics @@ -147,7 +138,7 @@ :name (or (::audit/name resultm) (::sv/name mdata)) :profile-id profile-id - :ip-addr (audit/parse-client-ip request) + :ip-addr (some-> request audit/parse-client-ip) :props (dissoc props ::request))))))) mdata) f)) diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj index d3e0486a7..46a6557d2 100644 --- a/backend/src/app/rpc/mutations/fonts.clj +++ b/backend/src/app/rpc/mutations/fonts.clj @@ -32,7 +32,6 @@ (s/def ::weight valid-weight) (s/def ::style valid-style) (s/def ::font-id ::us/uuid) -(s/def ::content-type ::media/font-content-type) (s/def ::data (s/map-of ::us/string any?)) (s/def ::create-font-variant diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj index 69e2fcc47..70c8c20ff 100644 --- a/backend/src/app/rpc/mutations/media.clj +++ b/backend/src/app/rpc/mutations/media.clj @@ -20,7 +20,6 @@ [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s] - [datoteka.core :as fs] [promesa.core :as p] [promesa.exec :as px])) @@ -41,9 +40,7 @@ (declare create-file-media-object) (declare select-file) -(s/def ::content-type ::media/image-content-type) -(s/def ::content (s/and ::media/upload (s/keys :req-un [::content-type]))) - +(s/def ::content ::media/upload) (s/def ::is-local ::us/boolean) (s/def ::upload-file-media-object @@ -95,14 +92,14 @@ (defn create-file-media-object [{:keys [storage pool executors] :as cfg} {:keys [id file-id is-local name content] :as params}] - (media/validate-media-type! (:content-type content)) + (media/validate-media-type! content) (letfn [;; Function responsible to retrieve the file information, as ;; it is synchronous operation it should be wrapped into ;; with-dispatch macro. - (get-info [path mtype] + (get-info [content] (px/with-dispatch (:blocking executors) - (media/run {:cmd :info :input {:path path :mtype mtype}}))) + (media/run {:cmd :info :input content}))) ;; Function responsible of calculating cryptographyc hash of ;; the provided data. Even though it uses the hight @@ -114,16 +111,16 @@ ;; Function responsible of generating thumnail. As it is synchronous ;; opetation, it should be wrapped into with-dispatch macro - (generate-thumbnail [info path] + (generate-thumbnail [info] (px/with-dispatch (:blocking executors) (media/run (assoc thumbnail-options :cmd :generic-thumbnail - :input {:mtype (:mtype info) :path path})))) + :input info)))) - (create-thumbnail [info path] + (create-thumbnail [info] (when (and (not (svg-image? info)) (big-enough-for-thumbnail? info)) - (p/let [thumb (generate-thumbnail info path) + (p/let [thumb (generate-thumbnail info) hash (calculate-hash (:data thumb)) content (-> (sto/content (:data thumb) (:size thumb)) (sto/wrap-with-hash hash))] @@ -134,8 +131,8 @@ :content-type (:mtype thumb) :bucket "file-media-object"})))) - (create-image [info path] - (p/let [data (cond-> path (= (:mtype info) "image/svg+xml") slurp) + (create-image [info] + (p/let [data (cond-> (:path info) (= (:mtype info) "image/svg+xml") slurp) hash (calculate-hash data) content (-> (sto/content data) (sto/wrap-with-hash hash))] @@ -157,11 +154,9 @@ (:height info) (:mtype info)])))] - (p/let [path (fs/path (:tempfile content)) - info (get-info path (:content-type content)) - thumb (create-thumbnail info path) - image (create-image info path)] - + (p/let [info (get-info content) + thumb (create-thumbnail info) + image (create-image info)] (insert-into-database info image thumb)))) ;; --- Create File Media Object (from URL) @@ -208,6 +203,14 @@ :mtype mtype :format format})) + (get-upload-object [sobj] + (p/let [path (sto/get-object-path storage sobj) + mdata (meta sobj)] + {:filename "tempfile" + :size (:size sobj) + :path path + :mtype (:content-type mdata)})) + (download-media [uri] (p/let [{:keys [body headers]} (http-client {:method :get :uri uri} {:response-type :input-stream}) {:keys [size mtype]} (parse-and-validate-size headers)] @@ -217,12 +220,7 @@ ::sto/expired-at (dt/in-future {:minutes 30}) :content-type mtype :bucket "file-media-object"}) - (p/then (fn [sobj] - (p/let [path (sto/get-object-path storage sobj)] - {:filename "tempfile" - :size (:size sobj) - :tempfile path - :content-type (:content-type (meta sobj))}))))))] + (p/then get-upload-object))))] (p/let [content (download-media url)] (->> (merge params {:content content :name (or name (:filename content))}) @@ -240,7 +238,6 @@ (db/with-atomic [conn pool] (let [file (select-file conn file-id)] (teams/check-edition-permissions! conn profile-id (:team-id file)) - (-> (assoc cfg :conn conn) (clone-file-media-object params))))) diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj index b4ffa80cb..540e5e22b 100644 --- a/backend/src/app/rpc/mutations/profile.clj +++ b/backend/src/app/rpc/mutations/profile.clj @@ -407,43 +407,32 @@ (declare update-profile-photo) -(s/def ::content-type ::media/image-content-type) -(s/def ::file (s/and ::media/upload (s/keys :req-un [::content-type]))) - +(s/def ::file ::media/upload) (s/def ::update-profile-photo (s/keys :req-un [::profile-id ::file])) -;; TODO: properly handle resource usage, transactions and storage - (sv/defmethod ::update-profile-photo [cfg {:keys [file] :as params}] ;; Validate incoming mime type - (media/validate-media-type! (:content-type file) #{"image/jpeg" "image/png" "image/webp"}) + (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) (let [cfg (update cfg :storage media/configure-assets-storage)] (update-profile-photo cfg params))) (defn update-profile-photo - [{:keys [pool storage executors] :as cfg} {:keys [profile-id file] :as params}] - (p/do - ;; Perform file validation, this operation executes some - ;; comandline helpers for true check of the image file. And it - ;; raises an exception if somethig is wrong with the file. - (px/with-dispatch (:blocking executors) - (media/run {:cmd :info :input {:path (:tempfile file) :mtype (:content-type file)}})) + [{:keys [pool storage executors] :as cfg} {:keys [profile-id] :as params}] + (p/let [profile (px/with-dispatch (:default executors) + (db/get-by-id pool :profile profile-id)) + photo (teams/upload-photo cfg params)] - (p/let [profile (px/with-dispatch (:default executors) - (db/get-by-id pool :profile profile-id)) - photo (teams/upload-photo cfg params)] + ;; Schedule deletion of old photo + (when-let [id (:photo-id profile)] + (sto/touch-object! storage id)) - ;; Schedule deletion of old photo - (when-let [id (:photo-id profile)] - (sto/touch-object! storage id)) - - ;; Save new photo - (db/update! pool :profile - {:photo-id (:id photo)} - {:id profile-id}) - nil))) + ;; Save new photo + (db/update! pool :profile + {:photo-id (:id photo)} + {:id profile-id}) + nil)) ;; --- MUTATION: Request Email Change diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj index 268983250..31bf7502b 100644 --- a/backend/src/app/rpc/mutations/teams.clj +++ b/backend/src/app/rpc/mutations/teams.clj @@ -22,7 +22,6 @@ [app.util.time :as dt] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [datoteka.core :as fs] [promesa.core :as p] [promesa.exec :as px])) @@ -281,54 +280,49 @@ (declare ^:private upload-photo) (declare ^:private update-team-photo) -(s/def ::content-type ::media/image-content-type) -(s/def ::file (s/and ::media/upload (s/keys :req-un [::content-type]))) - +(s/def ::file ::media/upload) (s/def ::update-team-photo (s/keys :req-un [::profile-id ::team-id ::file])) (sv/defmethod ::update-team-photo [cfg {:keys [file] :as params}] ;; Validate incoming mime type - (media/validate-media-type! (:content-type file) #{"image/jpeg" "image/png" "image/webp"}) + (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) (let [cfg (update cfg :storage media/configure-assets-storage)] (update-team-photo cfg params))) (defn update-team-photo - [{:keys [pool storage executors] :as cfg} {:keys [profile-id file team-id] :as params}] - (p/do - ;; Perform file validation, this operation executes some - ;; comandline helpers for true check of the image file. And it - ;; raises an exception if somethig is wrong with the file. - (px/with-dispatch (:blocking executors) - (media/run {:cmd :info :input {:path (:tempfile file) :mtype (:content-type file)}})) + [{:keys [pool storage executors] :as cfg} {:keys [profile-id team-id] :as params}] + (p/let [team (px/with-dispatch (:default executors) + (teams/retrieve-team pool profile-id team-id)) + photo (upload-photo cfg params)] - (p/let [team (px/with-dispatch (:default executors) - (teams/retrieve-team pool profile-id team-id)) - photo (upload-photo cfg params)] + ;; Mark object as touched for make it ellegible for tentative + ;; garbage collection. + (when-let [id (:photo-id team)] + (sto/touch-object! storage id)) - ;; Mark object as touched for make it ellegible for tentative - ;; garbage collection. - (when-let [id (:photo-id team)] - (sto/touch-object! storage id)) + ;; Save new photo + (db/update! pool :team + {:photo-id (:id photo)} + {:id team-id}) - ;; Save new photo - (db/update! pool :team - {:photo-id (:id photo)} - {:id team-id}) - - (assoc team :photo-id (:id photo))))) + (assoc team :photo-id (:id photo)))) (defn upload-photo [{:keys [storage executors] :as cfg} {:keys [file]}] - (letfn [(generate-thumbnail [path mtype] + (letfn [(get-info [content] + (px/with-dispatch (:blocking executors) + (media/run {:cmd :info :input content}))) + + (generate-thumbnail [info] (px/with-dispatch (:blocking executors) (media/run {:cmd :profile-thumbnail :format :jpeg :quality 85 :width 256 :height 256 - :input {:path path :mtype mtype}}))) + :input info}))) ;; Function responsible of calculating cryptographyc hash of ;; the provided data. Even though it uses the hight @@ -338,8 +332,8 @@ (px/with-dispatch (:blocking executors) (sto/calculate-hash data)))] - (p/let [thumb (generate-thumbnail (fs/path (:tempfile file)) - (:content-type file)) + (p/let [info (get-info file) + thumb (generate-thumbnail info) hash (calculate-hash (:data thumb)) content (-> (sto/content (:data thumb) (:size thumb)) (sto/wrap-with-hash hash))] diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index 0e709b572..cff6c3363 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -257,7 +257,9 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; A task responsible to permanently delete already marked as deleted -;; storage files. +;; storage files. The storage objects are practically never marked to +;; be deleted directly by the api call. The touched-gc is responsible +;; collect the usage of the object and mark it as deleted. (declare sql:retrieve-deleted-objects-chunk) @@ -308,7 +310,7 @@ and s.deleted_at < (now() - ?::interval) and s.created_at < ? order by s.created_at desc - limit 100 + limit 25 ) delete from storage_object where id in (select id from items_part) @@ -318,9 +320,9 @@ ;; Garbage Collection: Analyze touched objects ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; This task is part of the garbage collection of storage objects and -;; is responsible on analyzing the touched objects and mark them for -;; deletion if corresponds. +;; This task is part of the garbage collection process of storage +;; objects and is responsible on analyzing the touched objects and +;; mark them for deletion if corresponds. ;; ;; For example: when file_media_object is deleted, the depending ;; storage_object are marked as touched. This means that some files diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 78d1bc623..4574d71a5 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -51,7 +51,6 @@ (count result))) - ;; --- IMPL: file deletion (defmethod delete-objects "file" diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index a1285f8b2..045cd6a7e 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -13,10 +13,10 @@ [app.metrics :as mtx] [app.util.time :as dt] [clojure.core.async :as a] + [yetti.util :as yu] [yetti.websocket :as yws]) (:import - java.nio.ByteBuffer - org.eclipse.jetty.io.EofException)) + java.nio.ByteBuffer)) (declare decode-beat) (declare encode-beat) @@ -48,15 +48,17 @@ output-buff-size 64 idle-timeout 30000} :as options}] - (fn [_] + (fn [{:keys [::yws/channel] :as request}] (let [input-ch (a/chan input-buff-size) output-ch (a/chan output-buff-size) pong-ch (a/chan (a/sliding-buffer 6)) close-ch (a/chan) + options (-> options (assoc ::input-ch input-ch) (assoc ::output-ch output-ch) (assoc ::close-ch close-ch) + (assoc ::channel channel) (dissoc ::metrics)) terminated (atom false) @@ -76,33 +78,10 @@ on-error (fn [_ error] (on-terminate) - (when-not (or (instance? org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException error) - (instance? java.nio.channels.ClosedChannelException error)) + ;; TODO: properly log timeout exceptions + (when-not (instance? java.nio.channels.ClosedChannelException error) (l/error :hint (ex-message error) :cause error))) - on-connect - (fn [conn] - (mtx/run! metrics {:id :websocket-active-connections :inc 1}) - - (let [wsp (atom (assoc options ::conn conn))] - ;; Handle heartbeat - (yws/idle-timeout! conn (dt/duration idle-timeout)) - (-> @wsp - (assoc ::pong-ch pong-ch) - (assoc ::on-close on-terminate) - (process-heartbeat)) - - ;; Forward all messages from output-ch to the websocket - ;; connection - (a/go-loop [] - (when-let [val (a/!! pong-ch buffer))] + (fn [_ buffers] + (a/>!! pong-ch (yu/copy-many buffers)))] - {:on-connect on-connect - :on-error on-error - :on-close on-terminate - :on-text on-message - :on-pong on-pong})))) + (mtx/run! metrics {:id :websocket-active-connections :inc 1}) + + (let [wsp (atom options)] + ;; Handle heartbeat + (yws/idle-timeout! channel (dt/duration idle-timeout)) + (-> @wsp + (assoc ::pong-ch pong-ch) + (assoc ::on-close on-terminate) + (process-heartbeat)) + + ;; Forward all messages from output-ch to the websocket + ;; connection + (a/go-loop [] + (when-let [val (a/= (count issued) max-missed-heartbeats) - (on-close conn -1 "heartbeat-timeout") + (on-close channel -1 "heartbeat-timeout") (recur (inc i))))))) (a/go-loop [] diff --git a/backend/test/app/services_files_test.clj b/backend/test/app/services_files_test.clj index b606f6223..64c69feee 100644 --- a/backend/test/app/services_files_test.clj +++ b/backend/test/app/services_files_test.clj @@ -120,8 +120,8 @@ (t/deftest file-media-gc-task (letfn [(create-file-media-object [{:keys [profile-id file-id]}] (let [mfile {:filename "sample.jpg" - :tempfile (th/tempfile "app/test_files/sample.jpg") - :content-type "image/jpeg" + :path (th/tempfile "app/test_files/sample.jpg") + :mtype "image/jpeg" :size 312043} params {::th/type :upload-file-media-object :profile-id profile-id diff --git a/backend/test/app/services_media_test.clj b/backend/test/app/services_media_test.clj index e89ed76b1..d0ce566b0 100644 --- a/backend/test/app/services_media_test.clj +++ b/backend/test/app/services_media_test.clj @@ -57,8 +57,8 @@ :project-id (:default-project-id prof) :is-shared false}) mfile {:filename "sample.jpg" - :tempfile (th/tempfile "app/test_files/sample.jpg") - :content-type "image/jpeg" + :path (th/tempfile "app/test_files/sample.jpg") + :mtype "image/jpeg" :size 312043} params {::th/type :upload-file-media-object @@ -96,8 +96,8 @@ :project-id (:default-project-id prof) :is-shared false}) mfile {:filename "sample.jpg" - :tempfile (th/tempfile "app/test_files/sample.jpg") - :content-type "image/jpeg" + :path (th/tempfile "app/test_files/sample.jpg") + :mtype "image/jpeg" :size 312043} params {::th/type :upload-file-media-object diff --git a/backend/test/app/services_profile_test.clj b/backend/test/app/services_profile_test.clj index 78c423872..fb0f4980e 100644 --- a/backend/test/app/services_profile_test.clj +++ b/backend/test/app/services_profile_test.clj @@ -110,8 +110,8 @@ :profile-id (:id profile) :file {:filename "sample.jpg" :size 123123 - :tempfile (th/tempfile "app/test_files/sample.jpg") - :content-type "image/jpeg"}} + :path (th/tempfile "app/test_files/sample.jpg") + :mtype "image/jpeg"}} out (th/mutation! data)] ;; (th/print-result! out) diff --git a/backend/test/app/storage_test.clj b/backend/test/app/storage_test.clj index 8b8e556a3..cab9e01d8 100644 --- a/backend/test/app/storage_test.clj +++ b/backend/test/app/storage_test.clj @@ -126,8 +126,8 @@ :is-shared false}) mfile {:filename "sample.jpg" - :tempfile (th/tempfile "app/test_files/sample.jpg") - :content-type "image/jpeg" + :path (th/tempfile "app/test_files/sample.jpg") + :mtype "image/jpeg" :size 312043} params {::th/type :upload-file-media-object @@ -200,8 +200,8 @@ (fs/slurp-bytes)) mfile {:filename "sample.jpg" - :tempfile (th/tempfile "app/test_files/sample.jpg") - :content-type "image/jpeg" + :path (th/tempfile "app/test_files/sample.jpg") + :mtype "image/jpeg" :size 312043} params1 {::th/type :upload-file-media-object @@ -266,8 +266,8 @@ :project-id (:default-project-id prof) :is-shared false}) mfile {:filename "sample.jpg" - :tempfile (th/tempfile "app/test_files/sample.jpg") - :content-type "image/jpeg" + :path (th/tempfile "app/test_files/sample.jpg") + :mtype "image/jpeg" :size 312043} params {::th/type :upload-file-media-object diff --git a/backend/test/app/test_helpers.clj b/backend/test/app/test_helpers.clj index e7b50117f..f51bccc86 100644 --- a/backend/test/app/test_helpers.clj +++ b/backend/test/app/test_helpers.clj @@ -30,6 +30,7 @@ [expound.alpha :as expound] [integrant.core :as ig] [mockery.core :as mk] + [yetti.request :as yrq] [promesa.core :as p]) (:import org.postgresql.ds.PGSimpleDataSource)) @@ -55,12 +56,20 @@ (dissoc :app.srepl/server :app.http/server :app.http/router - :app.notifications/handler - :app.loggers.sentry/reporter + :app.http.awsns/handler + :app.http.session/updater :app.http.oauth/google :app.http.oauth/gitlab :app.http.oauth/github :app.http.oauth/all + :app.worker/executors-monitor + :app.http.oauth/handler + :app.notifications/handler + :app.loggers.sentry/reporter + :app.loggers.mattermost/reporter + :app.loggers.loki/reporter + :app.loggers.database/reporter + :app.loggers.zmq/receiver :app.worker/cron :app.worker/worker) (d/deep-merge @@ -71,7 +80,11 @@ (try (binding [*system* system *pool* (:app.db/pool system)] - (next)) + (mk/with-mocks [mock1 {:target 'app.rpc.mutations.profile/derive-password + :return identity} + mock2 {:target 'app.rpc.mutations.profile/verify-password + :return (fn [a b] {:valid (= a b)})}] + (next))) (finally (ig/halt! system)))))