From 7f589b09cafcad8bbc26af5bb6b46131495f596e Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 5 Dec 2022 17:16:16 +0100 Subject: [PATCH] :recycle: Move audit http handler to RPC --- backend/resources/climit.edn | 4 +- backend/src/app/http.clj | 4 - backend/src/app/loggers/audit.clj | 111 +----------------- backend/src/app/main.clj | 6 - backend/src/app/rpc.clj | 7 +- backend/src/app/rpc/commands/audit.clj | 86 ++++++++++++++ backend/test/backend_tests/rpc_audit_test.clj | 92 +++++++++++++++ frontend/src/app/main/data/events.cljs | 13 +- 8 files changed, 199 insertions(+), 124 deletions(-) create mode 100644 backend/src/app/rpc/commands/audit.clj create mode 100644 backend/test/backend_tests/rpc_audit_test.clj diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn index 697d16539..755568713 100644 --- a/backend/resources/climit.edn +++ b/backend/resources/climit.edn @@ -4,4 +4,6 @@ {:update-file {:concurrency 1 :queue-size 3} :auth {:concurrency 128} :process-font {:concurrency 4 :queue-size 32} - :process-image {:concurrency 8 :queue-size 32}} + :process-image {:concurrency 8 :queue-size 32} + :push-audit-events + {:concurrency 1 :queue-size 3}} diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 58df5e81c..976f42d6e 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -116,7 +116,6 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (s/def ::assets map?) -(s/def ::audit-handler fn?) (s/def ::awsns-handler fn?) (s/def ::debug-routes (s/nilable vector?)) (s/def ::doc-routes (s/nilable vector?)) @@ -138,7 +137,6 @@ ::awsns-handler ::debug-routes ::oidc-routes - ::audit-handler ::rpc-routes ::doc-routes])) @@ -173,8 +171,6 @@ ["/api" {:middleware [[mw/cors] [session/middleware-2 session]]} - ["/audit/events" {:handler (:audit-handler cfg) - :allowed-methods #{:post}}] ["/feedback" {:handler feedback :allowed-methods #{:post}}] (:doc-routes cfg) diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index fa4f67721..c8ef0d796 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -28,9 +28,7 @@ [lambdaisland.uri :as u] [promesa.core :as p] [promesa.exec :as px] - [promesa.exec.bulkhead :as pxb] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [yetti.request :as yrq])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HELPERS @@ -56,7 +54,6 @@ (assoc (->> sk str/kebab (keyword "penpot")) v))))] (reduce-kv process-param {} params))) - (def ^:private profile-props [:id @@ -105,111 +102,12 @@ (s/def ::name ::us/string) (s/def ::type ::us/string) (s/def ::props (s/map-of ::us/keyword any?)) -(s/def ::timestamp dt/instant?) -(s/def ::context (s/map-of ::us/keyword any?)) - -(s/def ::frontend-event - (s/keys :req-un [::type ::name ::props ::timestamp ::profile-id] - :opt-un [::context])) - -(s/def ::frontend-events (s/every ::frontend-event)) - (s/def ::ip-addr ::us/string) -(s/def ::backend-event + +(s/def ::event (s/keys :req-un [::type ::name ::profile-id] :opt-un [::ip-addr ::props])) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; HTTP HANDLER -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(s/def ::concurrency ::us/integer) - -(defmethod ig/pre-init-spec ::http-handler [_] - (s/keys :req [::wrk/executor ::db/pool ::mtx/metrics ::concurrency])) - -(defmethod ig/prep-key ::http-handler - [_ cfg] - (merge {::concurrency (cf/get :audit-log-http-handler-concurrency 8)} - (d/without-nils cfg))) - -(defmethod ig/init-key ::http-handler - [_ {:keys [::wrk/executor ::db/pool ::mtx/metrics ::concurrency] :as cfg}] - (if (or (db/read-only? pool) - (not (contains? cf/flags :audit-log))) - (do - (l/warn :hint "audit: http handler disabled or db is read-only") - (fn [_ respond _] - (respond (yrs/response 204)))) - - (letfn [(event->row [event] - [(uuid/next) - (:name event) - (:source event) - (:type event) - (:timestamp event) - (:profile-id event) - (db/inet (:ip-addr event)) - (db/tjson (:props event)) - (db/tjson (d/without-nils (:context event)))]) - - (handle-request [{:keys [profile-id] :as request}] - (let [events (->> (:events (:params request)) - (remove #(not= profile-id (:profile-id %))) - (us/conform ::frontend-events)) - ip-addr (parse-client-ip request) - xform (comp - (map #(assoc % :ip-addr ip-addr)) - (map #(assoc % :source "frontend")) - (map event->row)) - - columns [:id :name :source :type :tracked-at - :profile-id :ip-addr :props :context]] - (when (seq events) - (->> (into [] xform events) - (db/insert-multi! pool :audit-log columns))))) - - (report-error! [cause] - (if-let [xdata (us/validation-error? cause)] - (l/error ::l/raw (str "audit: validation error frontend events request\n" (ex/explain xdata))) - (l/error :hint "audit: unexpected error on processing frontend events" :cause cause))) - - (on-queue [instance] - (l/trace :hint "http-handler: enqueued" - :queue-size (get instance ::pxb/current-queue-size) - :concurrency (get instance ::pxb/current-concurrency)) - (mtx/run! metrics - :id :audit-http-handler-queue-size - :val (get instance ::pxb/current-queue-size)) - (mtx/run! metrics - :id :audit-http-handler-concurrency - :val (get instance ::pxb/current-concurrency))) - - (on-run [instance task] - (let [elapsed (- (inst-ms (dt/now)) - (inst-ms task))] - (l/trace :hint "http-handler: execute" - :elapsed (str elapsed "ms")) - (mtx/run! metrics - :id :audit-http-handler-timing - :val elapsed) - (mtx/run! metrics - :id :audit-http-handler-queue-size - :val (get instance ::pxb/current-queue-size)) - (mtx/run! metrics - :id :audit-http-handler-concurrency - :val (get instance ::pxb/current-concurrency))))] - - (let [limiter (pxb/create :executor executor - :concurrency concurrency - :on-queue on-queue - :on-run on-run)] - (fn [request respond _] - (->> (px/submit! limiter (partial handle-request request)) - (p/fnly (fn [_ cause] - (some-> cause report-error!) - (respond (yrs/response 204)))))))))) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; COLLECTOR ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -239,7 +137,7 @@ (defn- persist-event! [pool event] - (us/verify! ::backend-event event) + (us/verify! ::event event) (db/insert! pool :audit-log {:id (uuid/next) :name (:name event) @@ -335,7 +233,6 @@ {:iss "authentication" :iat (dt/now) :uid uuid/zero}) - ;; FIXME tokens/generate body (t/encode {:events events}) headers {"content-type" "application/transit+json" "origin" (cf/get :public-uri) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 25956efbb..22d9de047 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -281,7 +281,6 @@ :metrics (ig/ref ::mtx/metrics) :public-uri (cf/get :public-uri) :storage (ig/ref ::sto/storage) - :audit-handler (ig/ref ::audit/http-handler) :rpc-routes (ig/ref :app.rpc/routes) :doc-routes (ig/ref :app.rpc.doc/routes) :executor (ig/ref ::wrk/executor)} @@ -408,11 +407,6 @@ ::lzmq/receiver {} - ::audit/http-handler - {::db/pool (ig/ref ::db/pool) - ::wrk/executor (ig/ref ::wrk/executor) - ::mtx/metrics (ig/ref ::mtx/metrics)} - ::audit/collector {::db/pool (ig/ref ::db/pool) ::wrk/executor (ig/ref ::wrk/executor) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 7494bc4c2..51f48ba28 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -167,6 +167,7 @@ :profile-id profile-id :ip-addr (some-> request audit/parse-client-ip) :props (d/without-qualified props)}] + (audit/submit! collector event))) (handle-request [cfg params] @@ -174,8 +175,9 @@ (p/mcat (fn [result] (->> (handle-audit params result) (p/map (constantly result)))))))] - - (with-meta handle-request mdata)) + (if-not (::audit/skip mdata) + (with-meta handle-request mdata) + f)) f)) (defn- wrap @@ -254,6 +256,7 @@ 'app.rpc.commands.ldap 'app.rpc.commands.demo 'app.rpc.commands.webhooks + 'app.rpc.commands.audit 'app.rpc.commands.files 'app.rpc.commands.files.update 'app.rpc.commands.files.create diff --git a/backend/src/app/rpc/commands/audit.clj b/backend/src/app/rpc/commands/audit.clj new file mode 100644 index 000000000..df692d7f4 --- /dev/null +++ b/backend/src/app/rpc/commands/audit.clj @@ -0,0 +1,86 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.rpc.commands.audit + "Audit Log related RPC methods" + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.common.spec :as us] + [app.common.uuid :as uuid] + [app.config :as cf] + [app.db :as db] + [app.http :as-alias http] + [app.loggers.audit :as audit] + [app.rpc.climit :as-alias climit] + [app.rpc.doc :as-alias doc] + [app.rpc.helpers :as rph] + [app.util.services :as sv] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [promesa.core :as p] + [promesa.exec :as px])) + +(defn- event->row [event] + [(uuid/next) + (:name event) + (:source event) + (:type event) + (:timestamp event) + (:profile-id event) + (db/inet (:ip-addr event)) + (db/tjson (:props event)) + (db/tjson (d/without-nils (:context event)))]) + +(def ^:private event-columns + [:id :name :source :type :tracked-at + :profile-id :ip-addr :props :context]) + +(defn- handle-events + [{:keys [::db/pool]} {:keys [profile-id events ::http/request] :as params}] + (let [ip-addr (audit/parse-client-ip request) + xform (comp + (map #(assoc % :profile-id profile-id)) + (map #(assoc % :ip-addr ip-addr)) + (map #(assoc % :source "frontend")) + (filter :profile-id) + (map event->row)) + events (sequence xform events)] + (when (seq events) + (db/insert-multi! pool :audit-log event-columns events)))) + +(s/def ::profile-id ::us/uuid) +(s/def ::name ::us/string) +(s/def ::type ::us/string) +(s/def ::props (s/map-of ::us/keyword any?)) +(s/def ::timestamp dt/instant?) +(s/def ::context (s/map-of ::us/keyword any?)) + +(s/def ::event + (s/keys :req-un [::type ::name ::props ::timestamp] + :opt-un [::context])) + +(s/def ::events (s/every ::event)) + +(s/def ::push-audit-events + (s/keys :req-un [::events ::profile-id])) + +(sv/defmethod ::push-audit-events + {::climit/queue :push-audit-events + ::climit/key-fn :profile-id + ::audit/skip true + ::doc/added "1.17"} + [{:keys [::db/pool ::wrk/executor] :as cfg} params] + (if (or (db/read-only? pool) + (not (contains? cf/flags :audit-log))) + (do + (l/warn :hint "audit: http handler disabled or db is read-only") + (rph/wrap nil)) + + (->> (px/submit! executor #(handle-events cfg params)) + (p/fmap (constantly nil))))) + diff --git a/backend/test/backend_tests/rpc_audit_test.clj b/backend/test/backend_tests/rpc_audit_test.clj new file mode 100644 index 000000000..9df795192 --- /dev/null +++ b/backend/test/backend_tests/rpc_audit_test.clj @@ -0,0 +1,92 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns backend-tests.rpc-audit-test + (:require + [app.common.pprint :as pp] + [app.common.uuid :as uuid] + [app.db :as db] + [app.util.time :as dt] + [backend-tests.helpers :as th] + [clojure.test :as t])) + +(t/use-fixtures :once th/state-init) +(t/use-fixtures :each th/database-reset) + +(defn decode-row + [{:keys [props context] :as row}] + (cond-> row + (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)) + (db/pgobject? context) (assoc :context (db/decode-transit-pgobject context)))) + +(def http-request + (reify + yetti.request/Request + (get-header [_ name] + (case name + "x-forwarded-for" "127.0.0.44")))) + +(t/deftest push-events-1 + (with-redefs [app.config/flags #{:audit-log}] + (let [prof (th/create-profile* 1 {:is-active true}) + team-id (:default-team-id prof) + proj-id (:default-project-id prof) + + params {::th/type :push-audit-events + :app.http/request http-request + :profile-id (:id prof) + :events [{:name "navigate" + :props {:project-id proj-id + :team-id team-id + :route "dashboard-files"} + :context {:engine "blink"} + :profile-id (:id prof) + :timestamp (dt/now) + :type "action"}]} + out (th/command! params)] + ;; (th/print-result! out) + (t/is (nil? (:error out))) + (t/is (nil? (:result out))) + + (let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"]) + (mapv decode-row))] + ;; (pp/pprint rows) + (t/is (= 1 (count rows))) + (t/is (= (:id prof) (:profile-id row))) + (t/is (= "navigate" (:name row))) + (t/is (= "frontend" (:source row))))))) + +(t/deftest push-events-2 + (with-redefs [app.config/flags #{:audit-log}] + (let [prof (th/create-profile* 1 {:is-active true}) + team-id (:default-team-id prof) + proj-id (:default-project-id prof) + + params {::th/type :push-audit-events + :app.http/request http-request + :profile-id (:id prof) + :events [{:name "navigate" + :props {:project-id proj-id + :team-id team-id + :route "dashboard-files"} + :context {:engine "blink"} + :profile-id uuid/zero + :timestamp (dt/now) + :type "action"}]} + out (th/command! params)] + ;; (th/print-result! out) + (t/is (nil? (:error out))) + (t/is (nil? (:result out))) + + (let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"]) + (mapv decode-row))] + ;; (pp/pprint rows) + (t/is (= 1 (count rows))) + (t/is (= (:id prof) (:profile-id row))) + (t/is (= "navigate" (:name row))) + (t/is (= "frontend" (:source row))))))) + + diff --git a/frontend/src/app/main/data/events.cljs b/frontend/src/app/main/data/events.cljs index 0f8281866..fea0f0a5f 100644 --- a/frontend/src/app/main/data/events.cljs +++ b/frontend/src/app/main/data/events.cljs @@ -38,7 +38,7 @@ (defn- collect-context [] (let [uagent (UAParser.)] - (d/merge + (merge {:app-version (:full @cf/version) :locale @i18n/locale} (let [browser (.getBrowser uagent)] @@ -215,12 +215,17 @@ (defn- persist-events [events] (if (seq events) - (let [uri (u/join @cf/public-uri "api/audit/events") + (let [uri (u/join @cf/public-uri "api/rpc/command/push-audit-events") params {:uri uri :method :post + :credentials "include" :body (http/transit-data {:events events})}] (->> (http/send! params) - (rx/mapcat rp/handle-response))) + (rx/mapcat rp/handle-response) + (rx/catch (fn [cause] + (l/error :hint "unexpected error on persisting audit events") + (rx/of nil))))) + (rx/of nil))) (defn initialize @@ -274,7 +279,7 @@ (rx/map (fn [event] (let [session* (or @session (dt/now)) context (-> @context - (d/merge (:context event)) + (merge (:context event)) (assoc :session session*))] (reset! session session*) (-> event