0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-15 03:28:25 -05:00

♻️ Move audit http handler to RPC

This commit is contained in:
Andrey Antukh 2022-12-05 17:16:16 +01:00
parent 27c4cdb5f9
commit 7f589b09ca
8 changed files with 199 additions and 124 deletions

View file

@ -4,4 +4,6 @@
{:update-file {:concurrency 1 :queue-size 3} {:update-file {:concurrency 1 :queue-size 3}
:auth {:concurrency 128} :auth {:concurrency 128}
:process-font {:concurrency 4 :queue-size 32} :process-font {:concurrency 4 :queue-size 32}
:process-image {:concurrency 8 :queue-size 32}} :process-image {:concurrency 8 :queue-size 32}
:push-audit-events
{:concurrency 1 :queue-size 3}}

View file

@ -116,7 +116,6 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::assets map?) (s/def ::assets map?)
(s/def ::audit-handler fn?)
(s/def ::awsns-handler fn?) (s/def ::awsns-handler fn?)
(s/def ::debug-routes (s/nilable vector?)) (s/def ::debug-routes (s/nilable vector?))
(s/def ::doc-routes (s/nilable vector?)) (s/def ::doc-routes (s/nilable vector?))
@ -138,7 +137,6 @@
::awsns-handler ::awsns-handler
::debug-routes ::debug-routes
::oidc-routes ::oidc-routes
::audit-handler
::rpc-routes ::rpc-routes
::doc-routes])) ::doc-routes]))
@ -173,8 +171,6 @@
["/api" {:middleware [[mw/cors] ["/api" {:middleware [[mw/cors]
[session/middleware-2 session]]} [session/middleware-2 session]]}
["/audit/events" {:handler (:audit-handler cfg)
:allowed-methods #{:post}}]
["/feedback" {:handler feedback ["/feedback" {:handler feedback
:allowed-methods #{:post}}] :allowed-methods #{:post}}]
(:doc-routes cfg) (:doc-routes cfg)

View file

@ -28,9 +28,7 @@
[lambdaisland.uri :as u] [lambdaisland.uri :as u]
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px] [promesa.exec :as px]
[promesa.exec.bulkhead :as pxb] [yetti.request :as yrq]))
[yetti.request :as yrq]
[yetti.response :as yrs]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HELPERS ;; HELPERS
@ -56,7 +54,6 @@
(assoc (->> sk str/kebab (keyword "penpot")) v))))] (assoc (->> sk str/kebab (keyword "penpot")) v))))]
(reduce-kv process-param {} params))) (reduce-kv process-param {} params)))
(def ^:private (def ^:private
profile-props profile-props
[:id [:id
@ -105,111 +102,12 @@
(s/def ::name ::us/string) (s/def ::name ::us/string)
(s/def ::type ::us/string) (s/def ::type ::us/string)
(s/def ::props (s/map-of ::us/keyword any?)) (s/def ::props (s/map-of ::us/keyword any?))
(s/def ::timestamp dt/instant?)
(s/def ::context (s/map-of ::us/keyword any?))
(s/def ::frontend-event
(s/keys :req-un [::type ::name ::props ::timestamp ::profile-id]
:opt-un [::context]))
(s/def ::frontend-events (s/every ::frontend-event))
(s/def ::ip-addr ::us/string) (s/def ::ip-addr ::us/string)
(s/def ::backend-event
(s/def ::event
(s/keys :req-un [::type ::name ::profile-id] (s/keys :req-un [::type ::name ::profile-id]
:opt-un [::ip-addr ::props])) :opt-un [::ip-addr ::props]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HTTP HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::concurrency ::us/integer)
(defmethod ig/pre-init-spec ::http-handler [_]
(s/keys :req [::wrk/executor ::db/pool ::mtx/metrics ::concurrency]))
(defmethod ig/prep-key ::http-handler
[_ cfg]
(merge {::concurrency (cf/get :audit-log-http-handler-concurrency 8)}
(d/without-nils cfg)))
(defmethod ig/init-key ::http-handler
[_ {:keys [::wrk/executor ::db/pool ::mtx/metrics ::concurrency] :as cfg}]
(if (or (db/read-only? pool)
(not (contains? cf/flags :audit-log)))
(do
(l/warn :hint "audit: http handler disabled or db is read-only")
(fn [_ respond _]
(respond (yrs/response 204))))
(letfn [(event->row [event]
[(uuid/next)
(:name event)
(:source event)
(:type event)
(:timestamp event)
(:profile-id event)
(db/inet (:ip-addr event))
(db/tjson (:props event))
(db/tjson (d/without-nils (:context event)))])
(handle-request [{:keys [profile-id] :as request}]
(let [events (->> (:events (:params request))
(remove #(not= profile-id (:profile-id %)))
(us/conform ::frontend-events))
ip-addr (parse-client-ip request)
xform (comp
(map #(assoc % :ip-addr ip-addr))
(map #(assoc % :source "frontend"))
(map event->row))
columns [:id :name :source :type :tracked-at
:profile-id :ip-addr :props :context]]
(when (seq events)
(->> (into [] xform events)
(db/insert-multi! pool :audit-log columns)))))
(report-error! [cause]
(if-let [xdata (us/validation-error? cause)]
(l/error ::l/raw (str "audit: validation error frontend events request\n" (ex/explain xdata)))
(l/error :hint "audit: unexpected error on processing frontend events" :cause cause)))
(on-queue [instance]
(l/trace :hint "http-handler: enqueued"
:queue-size (get instance ::pxb/current-queue-size)
:concurrency (get instance ::pxb/current-concurrency))
(mtx/run! metrics
:id :audit-http-handler-queue-size
:val (get instance ::pxb/current-queue-size))
(mtx/run! metrics
:id :audit-http-handler-concurrency
:val (get instance ::pxb/current-concurrency)))
(on-run [instance task]
(let [elapsed (- (inst-ms (dt/now))
(inst-ms task))]
(l/trace :hint "http-handler: execute"
:elapsed (str elapsed "ms"))
(mtx/run! metrics
:id :audit-http-handler-timing
:val elapsed)
(mtx/run! metrics
:id :audit-http-handler-queue-size
:val (get instance ::pxb/current-queue-size))
(mtx/run! metrics
:id :audit-http-handler-concurrency
:val (get instance ::pxb/current-concurrency))))]
(let [limiter (pxb/create :executor executor
:concurrency concurrency
:on-queue on-queue
:on-run on-run)]
(fn [request respond _]
(->> (px/submit! limiter (partial handle-request request))
(p/fnly (fn [_ cause]
(some-> cause report-error!)
(respond (yrs/response 204))))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; COLLECTOR ;; COLLECTOR
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -239,7 +137,7 @@
(defn- persist-event! (defn- persist-event!
[pool event] [pool event]
(us/verify! ::backend-event event) (us/verify! ::event event)
(db/insert! pool :audit-log (db/insert! pool :audit-log
{:id (uuid/next) {:id (uuid/next)
:name (:name event) :name (:name event)
@ -335,7 +233,6 @@
{:iss "authentication" {:iss "authentication"
:iat (dt/now) :iat (dt/now)
:uid uuid/zero}) :uid uuid/zero})
;; FIXME tokens/generate
body (t/encode {:events events}) body (t/encode {:events events})
headers {"content-type" "application/transit+json" headers {"content-type" "application/transit+json"
"origin" (cf/get :public-uri) "origin" (cf/get :public-uri)

View file

@ -281,7 +281,6 @@
:metrics (ig/ref ::mtx/metrics) :metrics (ig/ref ::mtx/metrics)
:public-uri (cf/get :public-uri) :public-uri (cf/get :public-uri)
:storage (ig/ref ::sto/storage) :storage (ig/ref ::sto/storage)
:audit-handler (ig/ref ::audit/http-handler)
:rpc-routes (ig/ref :app.rpc/routes) :rpc-routes (ig/ref :app.rpc/routes)
:doc-routes (ig/ref :app.rpc.doc/routes) :doc-routes (ig/ref :app.rpc.doc/routes)
:executor (ig/ref ::wrk/executor)} :executor (ig/ref ::wrk/executor)}
@ -408,11 +407,6 @@
::lzmq/receiver ::lzmq/receiver
{} {}
::audit/http-handler
{::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::mtx/metrics (ig/ref ::mtx/metrics)}
::audit/collector ::audit/collector
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor) ::wrk/executor (ig/ref ::wrk/executor)

View file

@ -167,6 +167,7 @@
:profile-id profile-id :profile-id profile-id
:ip-addr (some-> request audit/parse-client-ip) :ip-addr (some-> request audit/parse-client-ip)
:props (d/without-qualified props)}] :props (d/without-qualified props)}]
(audit/submit! collector event))) (audit/submit! collector event)))
(handle-request [cfg params] (handle-request [cfg params]
@ -174,8 +175,9 @@
(p/mcat (fn [result] (p/mcat (fn [result]
(->> (handle-audit params result) (->> (handle-audit params result)
(p/map (constantly result)))))))] (p/map (constantly result)))))))]
(if-not (::audit/skip mdata)
(with-meta handle-request mdata)) (with-meta handle-request mdata)
f))
f)) f))
(defn- wrap (defn- wrap
@ -254,6 +256,7 @@
'app.rpc.commands.ldap 'app.rpc.commands.ldap
'app.rpc.commands.demo 'app.rpc.commands.demo
'app.rpc.commands.webhooks 'app.rpc.commands.webhooks
'app.rpc.commands.audit
'app.rpc.commands.files 'app.rpc.commands.files
'app.rpc.commands.files.update 'app.rpc.commands.files.update
'app.rpc.commands.files.create 'app.rpc.commands.files.create

View file

@ -0,0 +1,86 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.rpc.commands.audit
"Audit Log related RPC methods"
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.http :as-alias http]
[app.loggers.audit :as audit]
[app.rpc.climit :as-alias climit]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[promesa.core :as p]
[promesa.exec :as px]))
(defn- event->row [event]
[(uuid/next)
(:name event)
(:source event)
(:type event)
(:timestamp event)
(:profile-id event)
(db/inet (:ip-addr event))
(db/tjson (:props event))
(db/tjson (d/without-nils (:context event)))])
(def ^:private event-columns
[:id :name :source :type :tracked-at
:profile-id :ip-addr :props :context])
(defn- handle-events
[{:keys [::db/pool]} {:keys [profile-id events ::http/request] :as params}]
(let [ip-addr (audit/parse-client-ip request)
xform (comp
(map #(assoc % :profile-id profile-id))
(map #(assoc % :ip-addr ip-addr))
(map #(assoc % :source "frontend"))
(filter :profile-id)
(map event->row))
events (sequence xform events)]
(when (seq events)
(db/insert-multi! pool :audit-log event-columns events))))
(s/def ::profile-id ::us/uuid)
(s/def ::name ::us/string)
(s/def ::type ::us/string)
(s/def ::props (s/map-of ::us/keyword any?))
(s/def ::timestamp dt/instant?)
(s/def ::context (s/map-of ::us/keyword any?))
(s/def ::event
(s/keys :req-un [::type ::name ::props ::timestamp]
:opt-un [::context]))
(s/def ::events (s/every ::event))
(s/def ::push-audit-events
(s/keys :req-un [::events ::profile-id]))
(sv/defmethod ::push-audit-events
{::climit/queue :push-audit-events
::climit/key-fn :profile-id
::audit/skip true
::doc/added "1.17"}
[{:keys [::db/pool ::wrk/executor] :as cfg} params]
(if (or (db/read-only? pool)
(not (contains? cf/flags :audit-log)))
(do
(l/warn :hint "audit: http handler disabled or db is read-only")
(rph/wrap nil))
(->> (px/submit! executor #(handle-events cfg params))
(p/fmap (constantly nil)))))

View file

@ -0,0 +1,92 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns backend-tests.rpc-audit-test
(:require
[app.common.pprint :as pp]
[app.common.uuid :as uuid]
[app.db :as db]
[app.util.time :as dt]
[backend-tests.helpers :as th]
[clojure.test :as t]))
(t/use-fixtures :once th/state-init)
(t/use-fixtures :each th/database-reset)
(defn decode-row
[{:keys [props context] :as row}]
(cond-> row
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))
(db/pgobject? context) (assoc :context (db/decode-transit-pgobject context))))
(def http-request
(reify
yetti.request/Request
(get-header [_ name]
(case name
"x-forwarded-for" "127.0.0.44"))))
(t/deftest push-events-1
(with-redefs [app.config/flags #{:audit-log}]
(let [prof (th/create-profile* 1 {:is-active true})
team-id (:default-team-id prof)
proj-id (:default-project-id prof)
params {::th/type :push-audit-events
:app.http/request http-request
:profile-id (:id prof)
:events [{:name "navigate"
:props {:project-id proj-id
:team-id team-id
:route "dashboard-files"}
:context {:engine "blink"}
:profile-id (:id prof)
:timestamp (dt/now)
:type "action"}]}
out (th/command! params)]
;; (th/print-result! out)
(t/is (nil? (:error out)))
(t/is (nil? (:result out)))
(let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"])
(mapv decode-row))]
;; (pp/pprint rows)
(t/is (= 1 (count rows)))
(t/is (= (:id prof) (:profile-id row)))
(t/is (= "navigate" (:name row)))
(t/is (= "frontend" (:source row)))))))
(t/deftest push-events-2
(with-redefs [app.config/flags #{:audit-log}]
(let [prof (th/create-profile* 1 {:is-active true})
team-id (:default-team-id prof)
proj-id (:default-project-id prof)
params {::th/type :push-audit-events
:app.http/request http-request
:profile-id (:id prof)
:events [{:name "navigate"
:props {:project-id proj-id
:team-id team-id
:route "dashboard-files"}
:context {:engine "blink"}
:profile-id uuid/zero
:timestamp (dt/now)
:type "action"}]}
out (th/command! params)]
;; (th/print-result! out)
(t/is (nil? (:error out)))
(t/is (nil? (:result out)))
(let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"])
(mapv decode-row))]
;; (pp/pprint rows)
(t/is (= 1 (count rows)))
(t/is (= (:id prof) (:profile-id row)))
(t/is (= "navigate" (:name row)))
(t/is (= "frontend" (:source row)))))))

View file

@ -38,7 +38,7 @@
(defn- collect-context (defn- collect-context
[] []
(let [uagent (UAParser.)] (let [uagent (UAParser.)]
(d/merge (merge
{:app-version (:full @cf/version) {:app-version (:full @cf/version)
:locale @i18n/locale} :locale @i18n/locale}
(let [browser (.getBrowser uagent)] (let [browser (.getBrowser uagent)]
@ -215,12 +215,17 @@
(defn- persist-events (defn- persist-events
[events] [events]
(if (seq events) (if (seq events)
(let [uri (u/join @cf/public-uri "api/audit/events") (let [uri (u/join @cf/public-uri "api/rpc/command/push-audit-events")
params {:uri uri params {:uri uri
:method :post :method :post
:credentials "include"
:body (http/transit-data {:events events})}] :body (http/transit-data {:events events})}]
(->> (http/send! params) (->> (http/send! params)
(rx/mapcat rp/handle-response))) (rx/mapcat rp/handle-response)
(rx/catch (fn [cause]
(l/error :hint "unexpected error on persisting audit events")
(rx/of nil)))))
(rx/of nil))) (rx/of nil)))
(defn initialize (defn initialize
@ -274,7 +279,7 @@
(rx/map (fn [event] (rx/map (fn [event]
(let [session* (or @session (dt/now)) (let [session* (or @session (dt/now))
context (-> @context context (-> @context
(d/merge (:context event)) (merge (:context event))
(assoc :session session*))] (assoc :session session*))]
(reset! session session*) (reset! session session*)
(-> event (-> event