0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-09 16:30:37 -05:00

Move audit tasks to separated namespace files

This commit is contained in:
Andrey Antukh 2024-03-25 10:46:15 +01:00
parent 8585e73c0f
commit fd24831c71
4 changed files with 175 additions and 146 deletions

View file

@ -9,31 +9,24 @@
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.data.macros :as dm] [app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.transit :as t]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.http :as-alias http] [app.http :as-alias http]
[app.http.access-token :as-alias actoken] [app.http.access-token :as-alias actoken]
[app.http.client :as http.client]
[app.loggers.audit.tasks :as-alias tasks] [app.loggers.audit.tasks :as-alias tasks]
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.main :as-alias main]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.retry :as rtry] [app.rpc.retry :as rtry]
[app.setup :as-alias setup] [app.setup :as-alias setup]
[app.tokens :as tokens]
[app.util.services :as-alias sv] [app.util.services :as-alias sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
[lambdaisland.uri :as u]
[promesa.exec :as px]
[ring.request :as rreq])) [ring.request :as rreq]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -249,137 +242,3 @@
(rtry/invoke! cfg db/tx-run! handle-event! event)) (rtry/invoke! cfg db/tx-run! handle-event! event))
(catch Throwable cause (catch Throwable cause
(l/error :hint "unexpected error processing event" :cause cause)))) (l/error :hint "unexpected error processing event" :cause cause))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TASK: ARCHIVE
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; This is a task responsible to send the accumulated events to
;; external service for archival.
(declare archive-events)
(s/def ::tasks/uri ::us/string)
(defmethod ig/pre-init-spec ::tasks/archive-task [_]
(s/keys :req [::db/pool ::setup/props ::http.client/client]))
(defmethod ig/init-key ::tasks/archive
[_ cfg]
(fn [params]
;; NOTE: this let allows overwrite default configured values from
;; the repl, when manually invoking the task.
(let [enabled (or (contains? cf/flags :audit-log-archive)
(:enabled params false))
uri (cf/get :audit-log-archive-uri)
uri (or uri (:uri params))
cfg (assoc cfg ::uri uri)]
(when (and enabled (not uri))
(ex/raise :type :internal
:code :task-not-configured
:hint "archive task not configured, missing uri"))
(when enabled
(loop [total 0]
(let [n (archive-events cfg)]
(if n
(do
(px/sleep 100)
(recur (+ total ^long n)))
(when (pos? total)
(l/dbg :hint "events archived" :total total)))))))))
(def ^:private sql:retrieve-batch-of-audit-log
"select *
from audit_log
where archived_at is null
order by created_at asc
limit 128
for update skip locked;")
(defn archive-events
[{:keys [::db/pool ::uri] :as cfg}]
(letfn [(decode-row [{:keys [props ip-addr context] :as row}]
(cond-> row
(db/pgobject? props)
(assoc :props (db/decode-transit-pgobject props))
(db/pgobject? context)
(assoc :context (db/decode-transit-pgobject context))
(db/pgobject? ip-addr "inet")
(assoc :ip-addr (db/decode-inet ip-addr))))
(row->event [row]
(select-keys row [:type
:name
:source
:created-at
:tracked-at
:profile-id
:ip-addr
:props
:context]))
(send [events]
(let [token (tokens/generate (::setup/props cfg)
{:iss "authentication"
:iat (dt/now)
:uid uuid/zero})
body (t/encode {:events events})
headers {"content-type" "application/transit+json"
"origin" (cf/get :public-uri)
"cookie" (u/map->query-string {:auth-token token})}
params {:uri uri
:timeout 12000
:method :post
:headers headers
:body body}
resp (http.client/req! cfg params)]
(if (= (:status resp) 204)
true
(do
(l/error :hint "unable to archive events"
:resp-status (:status resp)
:resp-body (:body resp))
false))))
(mark-as-archived [conn rows]
(db/exec-one! conn ["update audit_log set archived_at=now() where id = ANY(?)"
(->> (map :id rows)
(db/create-array conn "uuid"))]))]
(db/with-atomic [conn pool]
(let [rows (db/exec! conn [sql:retrieve-batch-of-audit-log])
xform (comp (map decode-row)
(map row->event))
events (into [] xform rows)]
(when-not (empty? events)
(l/trc :hint "archive events chunk" :uri uri :events (count events))
(when (send events)
(mark-as-archived conn rows)
(count events)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; GC Task
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private sql:clean-archived
"delete from audit_log
where archived_at is not null")
(defn- clean-archived
[{:keys [::db/pool]}]
(let [result (db/exec-one! pool [sql:clean-archived])
result (:next.jdbc/update-count result)]
(l/debug :hint "delete archived audit log entries" :deleted result)
result))
(defmethod ig/pre-init-spec ::tasks/gc [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::tasks/gc
[_ cfg]
(fn [_]
(clean-archived cfg)))

View file

@ -0,0 +1,140 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.loggers.audit.archive-task
(:require
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.transit :as t]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.http.client :as http]
[app.setup :as-alias setup]
[app.tokens :as tokens]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[lambdaisland.uri :as u]
[promesa.exec :as px]))
;; This is a task responsible to send the accumulated events to
;; external service for archival.
(defn- decode-row
[{:keys [props ip-addr context] :as row}]
(cond-> row
(db/pgobject? props)
(assoc :props (db/decode-transit-pgobject props))
(db/pgobject? context)
(assoc :context (db/decode-transit-pgobject context))
(db/pgobject? ip-addr "inet")
(assoc :ip-addr (db/decode-inet ip-addr))))
(def ^:private event-keys
[:type
:name
:source
:created-at
:tracked-at
:profile-id
:ip-addr
:props
:context])
(defn- row->event
[row]
(select-keys row event-keys))
(defn- send!
[{:keys [::uri] :as cfg} events]
(let [token (tokens/generate (::setup/props cfg)
{:iss "authentication"
:iat (dt/now)
:uid uuid/zero})
body (t/encode {:events events})
headers {"content-type" "application/transit+json"
"origin" (cf/get :public-uri)
"cookie" (u/map->query-string {:auth-token token})}
params {:uri uri
:timeout 12000
:method :post
:headers headers
:body body}
resp (http/req! cfg params)]
(if (= (:status resp) 204)
true
(do
(l/error :hint "unable to archive events"
:resp-status (:status resp)
:resp-body (:body resp))
false))))
(defn- mark-archived!
[{:keys [::db/conn]} rows]
(let [ids (db/create-array conn "uuid" (map :id rows))]
(db/exec-one! conn ["update audit_log set archived_at=now() where id = ANY(?)" ids])))
(def ^:private xf:create-event
(comp (map decode-row)
(map row->event)))
(def ^:private sql:get-audit-log-chunk
"SELECT *
FROM audit_log
WHERE archived_at is null
ORDER BY created_at ASC
LIMIT 128
FOR UPDATE
SKIP LOCKED")
(defn- get-event-rows
[{:keys [::db/conn] :as cfg}]
(->> (db/exec! conn [sql:get-audit-log-chunk])
(not-empty)))
(defn- archive-events!
[{:keys [::uri] :as cfg}]
(db/tx-run! cfg (fn [cfg]
(when-let [rows (get-event-rows cfg)]
(let [events (into [] xf:create-event rows)]
(l/trc :hint "archive events chunk" :uri uri :events (count events))
(when (send! cfg events)
(mark-archived! cfg rows)
(count events)))))))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool ::setup/props ::http/client]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [params]
;; NOTE: this let allows overwrite default configured values from
;; the repl, when manually invoking the task.
(let [enabled (or (contains? cf/flags :audit-log-archive)
(:enabled params false))
uri (cf/get :audit-log-archive-uri)
uri (or uri (:uri params))
cfg (assoc cfg ::uri uri)]
(when (and enabled (not uri))
(ex/raise :type :internal
:code :task-not-configured
:hint "archive task not configured, missing uri"))
(when enabled
(loop [total 0]
(if-let [n (archive-events! cfg)]
(do
(px/sleep 100)
(recur (+ total ^long n)))
(when (pos? total)
(l/dbg :hint "events archived" :total total))))))))

View file

@ -0,0 +1,31 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.loggers.audit.gc-task
(:require
[app.common.logging :as l]
[app.db :as db]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(def ^:private sql:clean-archived
"DELETE FROM audit_log
WHERE archived_at IS NOT NULL")
(defn- clean-archived!
[{:keys [::db/pool]}]
(let [result (db/exec-one! pool [sql:clean-archived])
result (db/get-update-count result)]
(l/debug :hint "delete archived audit log entries" :deleted result)
result))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [_]
(clean-archived! cfg)))

View file

@ -21,7 +21,6 @@
[app.http.session :as-alias session] [app.http.session :as-alias session]
[app.http.session.tasks :as-alias session.tasks] [app.http.session.tasks :as-alias session.tasks]
[app.http.websocket :as http.ws] [app.http.websocket :as http.ws]
[app.loggers.audit.tasks :as-alias audit.tasks]
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.metrics :as-alias mtx] [app.metrics :as-alias mtx]
[app.metrics.definition :as-alias mdef] [app.metrics.definition :as-alias mdef]
@ -346,8 +345,8 @@
:storage-gc-deleted (ig/ref ::sto.gc-deleted/handler) :storage-gc-deleted (ig/ref ::sto.gc-deleted/handler)
:storage-gc-touched (ig/ref ::sto.gc-touched/handler) :storage-gc-touched (ig/ref ::sto.gc-touched/handler)
:session-gc (ig/ref ::session.tasks/gc) :session-gc (ig/ref ::session.tasks/gc)
:audit-log-archive (ig/ref ::audit.tasks/archive) :audit-log-archive (ig/ref :app.loggers.audit.archive-task/handler)
:audit-log-gc (ig/ref ::audit.tasks/gc) :audit-log-gc (ig/ref :app.loggers.audit.gc-task/handler)
:process-webhook-event :process-webhook-event
(ig/ref ::webhooks/process-event-handler) (ig/ref ::webhooks/process-event-handler)
@ -411,12 +410,12 @@
::svgo/optimizer ::svgo/optimizer
{} {}
::audit.tasks/archive :app.loggers.audit.archive-task/handler
{::setup/props (ig/ref ::setup/props) {::setup/props (ig/ref ::setup/props)
::db/pool (ig/ref ::db/pool) ::db/pool (ig/ref ::db/pool)
::http.client/client (ig/ref ::http.client/client)} ::http.client/client (ig/ref ::http.client/client)}
::audit.tasks/gc :app.loggers.audit.gc-task/handler
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
::webhooks/process-event-handler ::webhooks/process-event-handler