From fde03e21b020480f156fc63a7ce72769703cb51e Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 7 Nov 2022 16:56:02 +0100 Subject: [PATCH] :tada: Add conditional reading to RPC --- backend/src/app/rpc.clj | 50 ++++++++------ backend/src/app/rpc/commands/auth.clj | 9 +-- backend/src/app/rpc/commands/binfile.clj | 11 +-- backend/src/app/rpc/commands/files.clj | 28 ++++++-- backend/src/app/rpc/commands/files/update.clj | 8 +-- backend/src/app/rpc/cond.clj | 67 +++++++++++++++++++ backend/src/app/rpc/helpers.clj | 54 ++++++++++++++- backend/src/app/rpc/mutations/files.clj | 8 +-- backend/src/app/rpc/mutations/teams.clj | 25 ++++--- backend/src/app/util/services.clj | 27 -------- backend/test/backend_tests/helpers.clj | 3 +- .../rpc_cond_middleware_test.clj | 42 ++++++++++++ 12 files changed, 242 insertions(+), 90 deletions(-) create mode 100644 backend/src/app/rpc/cond.clj create mode 100644 backend/test/backend_tests/rpc_cond_middleware_test.clj diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 69a63db8b..b8a7417d2 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -16,6 +16,8 @@ [app.metrics :as mtx] [app.msgbus :as-alias mbus] [app.rpc.climit :as climit] + [app.rpc.cond :as cond] + [app.rpc.helpers :as rph] [app.rpc.retry :as retry] [app.rpc.rlimit :as rlimit] [app.storage :as-alias sto] @@ -25,6 +27,7 @@ [integrant.core :as ig] [promesa.core :as p] [promesa.exec :as px] + [yetti.request :as yrq] [yetti.response :as yrs])) (defn- default-handler @@ -33,23 +36,29 @@ (defn- handle-response-transformation [response request mdata] - (if-let [transform-fn (::transform-response mdata)] - (p/do (transform-fn request response)) - (p/resolved response))) + (let [transform-fn (reduce (fn [res-fn transform-fn] + (fn [request response] + (p/then (res-fn request response) #(transform-fn request %)))) + (constantly response) + (::response-transform-fns mdata))] + (transform-fn request response))) (defn- handle-before-comple-hook [response mdata] - (when-let [hook-fn (::before-complete mdata)] + (doseq [hook-fn (::before-complete-fns mdata)] (ex/ignoring (hook-fn))) response) (defn- handle-response [request result] - (let [mdata (meta result) - result (if (sv/wrapped? result) @result result)] - (p/-> (yrs/response 200 result (::http/headers mdata {})) - (handle-response-transformation request mdata) - (handle-before-comple-hook mdata)))) + (if (fn? result) + (p/wrap (result request)) + (let [mdata (meta result)] + (p/-> (yrs/response {:status (::http/status mdata 200) + :headers (::http/headers mdata {}) + :body (rph/unwrap result)}) + (handle-response-transformation request mdata) + (handle-before-comple-hook mdata))))) (defn- rpc-query-handler "Ring handler that dispatches query requests and convert between @@ -92,18 +101,20 @@ internal async flow into ring async flow." [methods {:keys [profile-id session-id params] :as request} respond raise] (let [cmd (keyword (:command params)) - data (into {::request request} params) + etag (yrq/get-header request "if-none-match") + data (into {::request request ::cond/key etag} params) data (if profile-id (assoc data :profile-id profile-id ::session-id session-id) (dissoc data :profile-id)) method (get methods cmd default-handler)] - (-> (method data) - (p/then (partial handle-response request)) - (p/then respond) - (p/catch (fn [cause] - (let [context {:profile-id profile-id}] - (raise (ex/wrap-with-context cause context)))))))) + (binding [cond/*enabled* true] + (-> (method data) + (p/then (partial handle-response request)) + (p/then respond) + (p/catch (fn [cause] + (let [context {:profile-id profile-id}] + (raise (ex/wrap-with-context cause context))))))))) (defn- wrap-metrics "Wrap service method with metrics measurement." @@ -125,9 +136,9 @@ [{:keys [executor] :as cfg} f mdata] (with-meta (fn [cfg params] - (-> (px/submit! executor #(f cfg params)) - (p/bind p/wrap) - (p/then' sv/wrap))) + (->> (px/submit! executor (px/wrap-bindings #(f cfg params))) + (p/mapcat p/wrap) + (p/map rph/wrap))) mdata)) (defn- wrap-audit @@ -161,6 +172,7 @@ [cfg f mdata] (let [f (as-> f $ (wrap-dispatch cfg $ mdata) + (cond/wrap cfg $ mdata) (retry/wrap-retry cfg $ mdata) (wrap-metrics cfg $ mdata) (climit/wrap cfg $ mdata) diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index f41f6bf92..46f2c5d72 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -18,6 +18,7 @@ [app.rpc :as-alias rpc] [app.rpc.climit :as climit] [app.rpc.doc :as-alias doc] + [app.rpc.helpers :as rph] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] [app.tokens :as tokens] @@ -135,10 +136,10 @@ {:invitation-token (:invitation-token params)} profile)] - (with-meta response - {::rpc/transform-response (session/create-fn session (:id profile)) - ::audit/props (audit/profile->props profile) - ::audit/profile-id (:id profile)}))))) + (-> response + (rph/with-transform (session/create-fn session (:id profile))) + (vary-meta merge {::audit/props (audit/profile->props profile) + ::audit/profile-id (:id profile)})))))) (s/def ::login-with-password (s/keys :req-un [::email ::password] diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index 301366d03..0daeb8190 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -16,7 +16,6 @@ [app.config :as cf] [app.db :as db] [app.media :as media] - [app.rpc :as-alias rpc] [app.rpc.commands.files :as files] [app.rpc.doc :as-alias doc] [app.rpc.queries.projects :as projects] @@ -874,7 +873,7 @@ {::doc/added "1.15"} [{:keys [pool] :as cfg} {:keys [profile-id file-id include-libraries? embed-assets?] :as params}] (files/check-read-permissions! pool profile-id file-id) - (let [resp (reify yrs/StreamableResponseBody + (let [body (reify yrs/StreamableResponseBody (-write-body-to-stream [_ _ output-stream] (-> cfg (assoc ::file-ids [file-id]) @@ -882,12 +881,8 @@ (assoc ::include-libraries? include-libraries?) (export! output-stream))))] - (with-meta (sv/wrap nil) - {::rpc/transform-response - (fn [_ response] - (-> response - (assoc :body resp) - (assoc :headers {"content-type" "application/octet-stream"})))}))) + (fn [_] + (yrs/response 200 body {"content-type" "application/octet-stream"})))) (s/def ::file ::media/upload) (s/def ::import-binfile diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index 775221c83..2d354f63d 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -19,8 +19,9 @@ [app.db.sql :as sql] [app.rpc :as-alias rpc] [app.rpc.commands.files.thumbnails :as-alias thumbs] + [app.rpc.cond :as-alias cond] [app.rpc.doc :as-alias doc] - [app.rpc.helpers :as rpch] + [app.rpc.helpers :as rph] [app.rpc.permissions :as perms] [app.rpc.queries.projects :as projects] [app.rpc.queries.share-link :refer [retrieve-share-link]] @@ -237,19 +238,30 @@ file))) +(defn- get-minimal-file + [{:keys [pool] :as cfg} id] + (db/get pool :file {:id id} {:columns [:id :modified-at :revn]})) + +(defn- get-file-etag + [{:keys [modified-at revn]}] + (str (dt/format-instant modified-at :iso) "-" revn)) + (s/def ::get-file (s/keys :req-un [::profile-id ::id] :opt-un [::features])) (sv/defmethod ::get-file "Retrieve a file by its ID. Only authenticated users." - {::doc/added "1.17"} + {::doc/added "1.17" + ::cond/get-object #(get-minimal-file %1 (:id %2)) + ::cond/key-fn get-file-etag} [{:keys [pool] :as cfg} {:keys [profile-id id features] :as params}] (with-open [conn (db/open pool)] (let [perms (get-permissions conn profile-id id)] (check-read-permissions! perms) - (-> (get-file conn id features) - (assoc :permissions perms))))) + (let [file (-> (get-file conn id features) + (assoc :permissions perms))] + (vary-meta file assoc ::cond/key (get-file-etag file)))))) ;; --- COMMAND QUERY: get-file-object-thumbnails @@ -277,7 +289,10 @@ (sv/defmethod ::get-file-object-thumbnails "Retrieve a file object thumbnails." - {::doc/added "1.17"} + {::doc/added "1.17" + ::cond/get-object #(get-minimal-file %1 (:file-id %2)) + ::cond/reuse-key? true + ::cond/key-fn get-file-etag} [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] (with-open [conn (db/open pool)] (check-read-permissions! conn profile-id file-id) @@ -592,8 +607,7 @@ (with-open [conn (db/open pool)] (check-read-permissions! conn profile-id file-id) (-> (get-file-thumbnail conn file-id revn) - (with-meta {::rpc/transform-response (rpch/http-cache {:max-age (* 1000 60 60)})})))) - + (with-meta {::rpc/transform-response (rph/http-cache {:max-age (* 1000 60 60)})})))) ;; --- COMMAND QUERY: get-file-data-for-thumbnail diff --git a/backend/src/app/rpc/commands/files/update.clj b/backend/src/app/rpc/commands/files/update.clj index 4752efb1a..a590d63e8 100644 --- a/backend/src/app/rpc/commands/files/update.clj +++ b/backend/src/app/rpc/commands/files/update.clj @@ -19,10 +19,10 @@ [app.loggers.audit :as audit] [app.metrics :as mtx] [app.msgbus :as mbus] - [app.rpc :as-alias rpc] [app.rpc.climit :as-alias climit] [app.rpc.commands.files :as files] [app.rpc.doc :as-alias doc] + [app.rpc.helpers :as rph] [app.util.blob :as blob] [app.util.objects-map :as omap] [app.util.pointer-map :as pmap] @@ -135,10 +135,8 @@ (let [cfg (assoc cfg :conn conn) tpoint (dt/tpoint)] (-> (update-file cfg params) - (vary-meta assoc ::rpc/before-complete - (fn [] - (let [elapsed (tpoint)] - (l/trace :hint "update-file" :time (dt/format-duration elapsed))))))))) + (rph/with-defer #(let [elapsed (tpoint)] + (l/trace :hint "update-file" :time (dt/format-duration elapsed)))))))) (defn update-file [{:keys [conn metrics] :as cfg} {:keys [id profile-id changes changes-with-metadata] :as params}] diff --git a/backend/src/app/rpc/cond.clj b/backend/src/app/rpc/cond.clj new file mode 100644 index 000000000..58440ad4b --- /dev/null +++ b/backend/src/app/rpc/cond.clj @@ -0,0 +1,67 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.rpc.cond + "Conditional loading middleware. + + A middleware consists mainly on wrapping a RPC method with + conditional logic. It expects to to have some metadata set on the RPC + method that will enable this middleware to retrieve the necessary data + for process the conditional logic: + + - `::get-object` => should be a function that retrieves the minimum version + of the object that will be used for calculate the KEY (etags in terms of + the HTTP protocol). + - `::key-fn` a function used to generate a string representation + of the object. This function can be applied to the object returned by the + `get-object` but also to the RPC return value (in case you don't provide + the return value calculated key under `::key` metadata prop. + - `::reuse-key?` enables reusing the key calculated on first time; usefull + when the target object is not retrieved on the RPC (typical on retrieving + dependent objects). + " + (:require + [app.common.logging :as l] + [app.rpc.helpers :as rph] + [app.util.services :as-alias sv] + [promesa.core :as p] + [promesa.exec :as px] + [yetti.response :as yrs])) + +(def + ^{:dynamic true + :doc "Runtime flag for enable/disable conditional processing of RPC methods."} + *enabled* false) + +(defn- fmt-key + [s] + (when s + (str "W/\"" s "\""))) + +(defn wrap + [{:keys [executor]} f {:keys [::get-object ::key-fn ::reuse-key?] :as mdata}] + (if (and (ifn? get-object) (ifn? key-fn)) + (do + (l/debug :hint "instrumenting method" :service (::sv/name mdata)) + (fn [cfg {:keys [::key] :as params}] + (if *enabled* + (->> (if (or key reuse-key?) + (->> (px/submit! executor (partial get-object cfg params)) + (p/map key-fn) + (p/map fmt-key)) + (p/resolved nil)) + (p/mapcat (fn [key'] + (if (and (some? key) + (= key key')) + (p/resolved (fn [_] (yrs/response 304))) + (->> (f cfg params) + (p/map (fn [result] + (->> (or (and reuse-key? key') + (-> result meta ::key fmt-key) + (-> result key-fn fmt-key)) + (rph/with-header result "etag"))))))))) + (f cfg params)))) + f)) diff --git a/backend/src/app/rpc/helpers.clj b/backend/src/app/rpc/helpers.clj index 326f482d8..2dda8203b 100644 --- a/backend/src/app/rpc/helpers.clj +++ b/backend/src/app/rpc/helpers.clj @@ -6,7 +6,43 @@ (ns app.rpc.helpers "General purpose RPC helpers." - (:require [app.common.data.macros :as dm])) + (:require + [app.common.data.macros :as dm] + [app.http :as-alias http] + [app.rpc :as-alias rpc])) + +;; A utilty wrapper object for wrap service responses that does not +;; implements the IObj interface that make possible attach metadata to +;; it. + +(deftype MetadataWrapper [obj ^:unsynchronized-mutable metadata] + clojure.lang.IDeref + (deref [_] obj) + + clojure.lang.IObj + (withMeta [_ meta] + (MetadataWrapper. obj meta)) + + (meta [_] metadata)) + +(defn wrap + "Conditionally wrap a value into MetadataWrapper instance. If the + object already implements IObj interface it will be returned as is." + ([] (wrap nil)) + ([o] + (if (instance? clojure.lang.IObj o) + o + (MetadataWrapper. o {}))) + ([o m] + (MetadataWrapper. o m))) + +(defn wrapped? + [o] + (instance? MetadataWrapper o)) + +(defn unwrap + [o] + (if (wrapped? o) @o o)) (defn http-cache [{:keys [max-age]}] @@ -14,3 +50,19 @@ (let [exp (if (integer? max-age) max-age (inst-ms max-age)) val (dm/fmt "max-age=%" (int (/ exp 1000.0)))] (update response :headers assoc "cache-control" val)))) + +(defn with-header + "Add a http header to the RPC result." + [mdw key val] + (vary-meta mdw update ::http/headers assoc key val)) + +(defn with-transform + "Adds a http response transform to the RPC result." + [mdw transform-fn] + (vary-meta mdw update ::rpc/response-transform-fns conj transform-fn)) + +(defn with-defer + "Defer execution of the function until request is finished." + [mdw hook-fn] + (vary-meta mdw update ::rpc/before-complete-fns conj hook-fn)) + diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index f419b6375..48dfb39ef 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -11,13 +11,13 @@ [app.common.spec :as us] [app.db :as db] [app.loggers.audit :as audit] - [app.rpc :as-alias rpc] [app.rpc.climit :as-alias climit] [app.rpc.commands.files :as cmd.files] [app.rpc.commands.files.create :as cmd.files.create] [app.rpc.commands.files.temp :as cmd.files.temp] [app.rpc.commands.files.update :as cmd.files.update] [app.rpc.doc :as-alias doc] + [app.rpc.helpers :as rph] [app.rpc.queries.projects :as proj] [app.util.services :as sv] [app.util.time :as dt] @@ -166,10 +166,8 @@ cfg (assoc cfg :conn conn)] (-> (cmd.files.update/update-file cfg params) - (vary-meta assoc ::rpc/before-complete - (fn [] - (let [elapsed (tpoint)] - (l/trace :hint "update-file" :time (dt/format-duration elapsed))))))))) + (rph/with-defer #(let [elapsed (tpoint)] + (l/trace :hint "update-file" :time (dt/format-duration elapsed)))))))) ;; --- Mutation: upsert object thumbnail diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj index 288cfdf76..83c68c1e8 100644 --- a/backend/src/app/rpc/mutations/teams.clj +++ b/backend/src/app/rpc/mutations/teams.clj @@ -16,8 +16,8 @@ [app.emails :as eml] [app.loggers.audit :as audit] [app.media :as media] - [app.rpc :as-alias rpc] [app.rpc.climit :as climit] + [app.rpc.helpers :as rph] [app.rpc.mutations.projects :as projects] [app.rpc.permissions :as perms] [app.rpc.queries.profile :as profile] @@ -487,18 +487,17 @@ :email email :role role))) - (with-meta team - {::audit/props {:invitations (count emails)} - - ::rpc/before-complete - #(audit-fn :cmd :submit - :type "mutation" - :name "invite-team-member" - :profile-id profile-id - :props {:emails emails - :role role - :profile-id profile-id - :invitations (count emails)})})))) + (-> team + (vary-meta assoc ::audit/props {:invitations (count emails)}) + (rph/with-defer + #(audit-fn :cmd :submit + :type "mutation" + :name "invite-team-member" + :profile-id profile-id + :props {:emails emails + :role role + :profile-id profile-id + :invitations (count emails)})))))) ;; --- Mutation: Update invitation role diff --git a/backend/src/app/util/services.clj b/backend/src/app/util/services.clj index 59a048e1e..66f9fc8db 100644 --- a/backend/src/app/util/services.clj +++ b/backend/src/app/util/services.clj @@ -11,33 +11,6 @@ [app.common.data :as d] [cuerdas.core :as str])) -;; A utilty wrapper object for wrap service responses that does not -;; implements the IObj interface that make possible attach metadata to -;; it. - -(deftype MetadataWrapper [obj ^:unsynchronized-mutable metadata] - clojure.lang.IDeref - (deref [_] obj) - - clojure.lang.IObj - (withMeta [_ meta] - (MetadataWrapper. obj meta)) - - (meta [_] metadata)) - -(defn wrap - "Conditionally wrap a value into MetadataWrapper instance. If the - object already implements IObj interface it will be returned as is." - ([] (wrap nil)) - ([o] - (if (instance? clojure.lang.IObj o) - o - (MetadataWrapper. o {})))) - -(defn wrapped? - [o] - (instance? MetadataWrapper o)) - (defmacro defmethod [sname & body] (let [[docs body] (if (string? (first body)) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 75b3a3eb8..3e12312ed 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -17,6 +17,7 @@ [app.main :as main] [app.media] [app.migrations] + [app.rpc.helpers :as rph] [app.rpc.commands.auth :as cmd.auth] [app.rpc.commands.files :as files] [app.rpc.commands.files.create :as files.create] @@ -295,7 +296,7 @@ [expr] `(try (let [result# (deref ~expr) - result# (cond-> result# (sv/wrapped? result#) deref)] + result# (cond-> result# (rph/wrapped? result#) deref)] {:error nil :result result#}) (catch Exception e# diff --git a/backend/test/backend_tests/rpc_cond_middleware_test.clj b/backend/test/backend_tests/rpc_cond_middleware_test.clj new file mode 100644 index 000000000..74f95e196 --- /dev/null +++ b/backend/test/backend_tests/rpc_cond_middleware_test.clj @@ -0,0 +1,42 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns backend-tests.rpc-cond-middleware-test + (:require + [backend-tests.storage-test :refer [configure-storage-backend]] + [backend-tests.helpers :as th] + [app.common.uuid :as uuid] + [app.db :as db] + [app.http :as http] + [app.rpc.cond :as cond] + [clojure.test :as t] + [datoteka.core :as fs])) + +(t/use-fixtures :once th/state-init) +(t/use-fixtures :each th/database-reset) + +(t/deftest conditional-requests + (let [profile (th/create-profile* 1 {:is-active true}) + project (th/create-project* 1 {:team-id (:default-team-id profile) + :profile-id (:id profile)}) + file1 (th/create-file* 1 {:profile-id (:id profile) + :project-id (:id project)}) + params {::th/type :get-file :id (:id file1) :profile-id (:id profile)}] + + (binding [cond/*enabled* true] + (let [{:keys [error result]} (th/command! params)] + (t/is (nil? error)) + (t/is (map? result)) + (t/is (contains? (meta result) :app.http/headers)) + (t/is (contains? (meta result) :app.rpc.cond/key)) + + (let [etag (-> result meta :app.http/headers (get "etag")) + {:keys [error result]} (th/command! (assoc params ::cond/key etag))] + (t/is (nil? error)) + (t/is (fn? result)) + (t/is (= 304 (-> (result nil) :status)))) + )))) +