From 7f589b09cafcad8bbc26af5bb6b46131495f596e Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 5 Dec 2022 17:16:16 +0100 Subject: [PATCH 01/14] :recycle: Move audit http handler to RPC --- backend/resources/climit.edn | 4 +- backend/src/app/http.clj | 4 - backend/src/app/loggers/audit.clj | 111 +----------------- backend/src/app/main.clj | 6 - backend/src/app/rpc.clj | 7 +- backend/src/app/rpc/commands/audit.clj | 86 ++++++++++++++ backend/test/backend_tests/rpc_audit_test.clj | 92 +++++++++++++++ frontend/src/app/main/data/events.cljs | 13 +- 8 files changed, 199 insertions(+), 124 deletions(-) create mode 100644 backend/src/app/rpc/commands/audit.clj create mode 100644 backend/test/backend_tests/rpc_audit_test.clj diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn index 697d16539..755568713 100644 --- a/backend/resources/climit.edn +++ b/backend/resources/climit.edn @@ -4,4 +4,6 @@ {:update-file {:concurrency 1 :queue-size 3} :auth {:concurrency 128} :process-font {:concurrency 4 :queue-size 32} - :process-image {:concurrency 8 :queue-size 32}} + :process-image {:concurrency 8 :queue-size 32} + :push-audit-events + {:concurrency 1 :queue-size 3}} diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 58df5e81c..976f42d6e 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -116,7 +116,6 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (s/def ::assets map?) -(s/def ::audit-handler fn?) (s/def ::awsns-handler fn?) (s/def ::debug-routes (s/nilable vector?)) (s/def ::doc-routes (s/nilable vector?)) @@ -138,7 +137,6 @@ ::awsns-handler ::debug-routes ::oidc-routes - ::audit-handler ::rpc-routes ::doc-routes])) @@ -173,8 +171,6 @@ ["/api" {:middleware [[mw/cors] [session/middleware-2 session]]} - ["/audit/events" {:handler (:audit-handler cfg) - :allowed-methods #{:post}}] ["/feedback" {:handler feedback :allowed-methods #{:post}}] (:doc-routes cfg) diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index fa4f67721..c8ef0d796 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -28,9 +28,7 @@ [lambdaisland.uri :as u] [promesa.core :as p] [promesa.exec :as px] - [promesa.exec.bulkhead :as pxb] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [yetti.request :as yrq])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HELPERS @@ -56,7 +54,6 @@ (assoc (->> sk str/kebab (keyword "penpot")) v))))] (reduce-kv process-param {} params))) - (def ^:private profile-props [:id @@ -105,111 +102,12 @@ (s/def ::name ::us/string) (s/def ::type ::us/string) (s/def ::props (s/map-of ::us/keyword any?)) -(s/def ::timestamp dt/instant?) -(s/def ::context (s/map-of ::us/keyword any?)) - -(s/def ::frontend-event - (s/keys :req-un [::type ::name ::props ::timestamp ::profile-id] - :opt-un [::context])) - -(s/def ::frontend-events (s/every ::frontend-event)) - (s/def ::ip-addr ::us/string) -(s/def ::backend-event + +(s/def ::event (s/keys :req-un [::type ::name ::profile-id] :opt-un [::ip-addr ::props])) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; HTTP HANDLER -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(s/def ::concurrency ::us/integer) - -(defmethod ig/pre-init-spec ::http-handler [_] - (s/keys :req [::wrk/executor ::db/pool ::mtx/metrics ::concurrency])) - -(defmethod ig/prep-key ::http-handler - [_ cfg] - (merge {::concurrency (cf/get :audit-log-http-handler-concurrency 8)} - (d/without-nils cfg))) - -(defmethod ig/init-key ::http-handler - [_ {:keys [::wrk/executor ::db/pool ::mtx/metrics ::concurrency] :as cfg}] - (if (or (db/read-only? pool) - (not (contains? cf/flags :audit-log))) - (do - (l/warn :hint "audit: http handler disabled or db is read-only") - (fn [_ respond _] - (respond (yrs/response 204)))) - - (letfn [(event->row [event] - [(uuid/next) - (:name event) - (:source event) - (:type event) - (:timestamp event) - (:profile-id event) - (db/inet (:ip-addr event)) - (db/tjson (:props event)) - (db/tjson (d/without-nils (:context event)))]) - - (handle-request [{:keys [profile-id] :as request}] - (let [events (->> (:events (:params request)) - (remove #(not= profile-id (:profile-id %))) - (us/conform ::frontend-events)) - ip-addr (parse-client-ip request) - xform (comp - (map #(assoc % :ip-addr ip-addr)) - (map #(assoc % :source "frontend")) - (map event->row)) - - columns [:id :name :source :type :tracked-at - :profile-id :ip-addr :props :context]] - (when (seq events) - (->> (into [] xform events) - (db/insert-multi! pool :audit-log columns))))) - - (report-error! [cause] - (if-let [xdata (us/validation-error? cause)] - (l/error ::l/raw (str "audit: validation error frontend events request\n" (ex/explain xdata))) - (l/error :hint "audit: unexpected error on processing frontend events" :cause cause))) - - (on-queue [instance] - (l/trace :hint "http-handler: enqueued" - :queue-size (get instance ::pxb/current-queue-size) - :concurrency (get instance ::pxb/current-concurrency)) - (mtx/run! metrics - :id :audit-http-handler-queue-size - :val (get instance ::pxb/current-queue-size)) - (mtx/run! metrics - :id :audit-http-handler-concurrency - :val (get instance ::pxb/current-concurrency))) - - (on-run [instance task] - (let [elapsed (- (inst-ms (dt/now)) - (inst-ms task))] - (l/trace :hint "http-handler: execute" - :elapsed (str elapsed "ms")) - (mtx/run! metrics - :id :audit-http-handler-timing - :val elapsed) - (mtx/run! metrics - :id :audit-http-handler-queue-size - :val (get instance ::pxb/current-queue-size)) - (mtx/run! metrics - :id :audit-http-handler-concurrency - :val (get instance ::pxb/current-concurrency))))] - - (let [limiter (pxb/create :executor executor - :concurrency concurrency - :on-queue on-queue - :on-run on-run)] - (fn [request respond _] - (->> (px/submit! limiter (partial handle-request request)) - (p/fnly (fn [_ cause] - (some-> cause report-error!) - (respond (yrs/response 204)))))))))) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; COLLECTOR ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -239,7 +137,7 @@ (defn- persist-event! [pool event] - (us/verify! ::backend-event event) + (us/verify! ::event event) (db/insert! pool :audit-log {:id (uuid/next) :name (:name event) @@ -335,7 +233,6 @@ {:iss "authentication" :iat (dt/now) :uid uuid/zero}) - ;; FIXME tokens/generate body (t/encode {:events events}) headers {"content-type" "application/transit+json" "origin" (cf/get :public-uri) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 25956efbb..22d9de047 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -281,7 +281,6 @@ :metrics (ig/ref ::mtx/metrics) :public-uri (cf/get :public-uri) :storage (ig/ref ::sto/storage) - :audit-handler (ig/ref ::audit/http-handler) :rpc-routes (ig/ref :app.rpc/routes) :doc-routes (ig/ref :app.rpc.doc/routes) :executor (ig/ref ::wrk/executor)} @@ -408,11 +407,6 @@ ::lzmq/receiver {} - ::audit/http-handler - {::db/pool (ig/ref ::db/pool) - ::wrk/executor (ig/ref ::wrk/executor) - ::mtx/metrics (ig/ref ::mtx/metrics)} - ::audit/collector {::db/pool (ig/ref ::db/pool) ::wrk/executor (ig/ref ::wrk/executor) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 7494bc4c2..51f48ba28 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -167,6 +167,7 @@ :profile-id profile-id :ip-addr (some-> request audit/parse-client-ip) :props (d/without-qualified props)}] + (audit/submit! collector event))) (handle-request [cfg params] @@ -174,8 +175,9 @@ (p/mcat (fn [result] (->> (handle-audit params result) (p/map (constantly result)))))))] - - (with-meta handle-request mdata)) + (if-not (::audit/skip mdata) + (with-meta handle-request mdata) + f)) f)) (defn- wrap @@ -254,6 +256,7 @@ 'app.rpc.commands.ldap 'app.rpc.commands.demo 'app.rpc.commands.webhooks + 'app.rpc.commands.audit 'app.rpc.commands.files 'app.rpc.commands.files.update 'app.rpc.commands.files.create diff --git a/backend/src/app/rpc/commands/audit.clj b/backend/src/app/rpc/commands/audit.clj new file mode 100644 index 000000000..df692d7f4 --- /dev/null +++ b/backend/src/app/rpc/commands/audit.clj @@ -0,0 +1,86 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.rpc.commands.audit + "Audit Log related RPC methods" + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.common.spec :as us] + [app.common.uuid :as uuid] + [app.config :as cf] + [app.db :as db] + [app.http :as-alias http] + [app.loggers.audit :as audit] + [app.rpc.climit :as-alias climit] + [app.rpc.doc :as-alias doc] + [app.rpc.helpers :as rph] + [app.util.services :as sv] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [promesa.core :as p] + [promesa.exec :as px])) + +(defn- event->row [event] + [(uuid/next) + (:name event) + (:source event) + (:type event) + (:timestamp event) + (:profile-id event) + (db/inet (:ip-addr event)) + (db/tjson (:props event)) + (db/tjson (d/without-nils (:context event)))]) + +(def ^:private event-columns + [:id :name :source :type :tracked-at + :profile-id :ip-addr :props :context]) + +(defn- handle-events + [{:keys [::db/pool]} {:keys [profile-id events ::http/request] :as params}] + (let [ip-addr (audit/parse-client-ip request) + xform (comp + (map #(assoc % :profile-id profile-id)) + (map #(assoc % :ip-addr ip-addr)) + (map #(assoc % :source "frontend")) + (filter :profile-id) + (map event->row)) + events (sequence xform events)] + (when (seq events) + (db/insert-multi! pool :audit-log event-columns events)))) + +(s/def ::profile-id ::us/uuid) +(s/def ::name ::us/string) +(s/def ::type ::us/string) +(s/def ::props (s/map-of ::us/keyword any?)) +(s/def ::timestamp dt/instant?) +(s/def ::context (s/map-of ::us/keyword any?)) + +(s/def ::event + (s/keys :req-un [::type ::name ::props ::timestamp] + :opt-un [::context])) + +(s/def ::events (s/every ::event)) + +(s/def ::push-audit-events + (s/keys :req-un [::events ::profile-id])) + +(sv/defmethod ::push-audit-events + {::climit/queue :push-audit-events + ::climit/key-fn :profile-id + ::audit/skip true + ::doc/added "1.17"} + [{:keys [::db/pool ::wrk/executor] :as cfg} params] + (if (or (db/read-only? pool) + (not (contains? cf/flags :audit-log))) + (do + (l/warn :hint "audit: http handler disabled or db is read-only") + (rph/wrap nil)) + + (->> (px/submit! executor #(handle-events cfg params)) + (p/fmap (constantly nil))))) + diff --git a/backend/test/backend_tests/rpc_audit_test.clj b/backend/test/backend_tests/rpc_audit_test.clj new file mode 100644 index 000000000..9df795192 --- /dev/null +++ b/backend/test/backend_tests/rpc_audit_test.clj @@ -0,0 +1,92 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns backend-tests.rpc-audit-test + (:require + [app.common.pprint :as pp] + [app.common.uuid :as uuid] + [app.db :as db] + [app.util.time :as dt] + [backend-tests.helpers :as th] + [clojure.test :as t])) + +(t/use-fixtures :once th/state-init) +(t/use-fixtures :each th/database-reset) + +(defn decode-row + [{:keys [props context] :as row}] + (cond-> row + (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)) + (db/pgobject? context) (assoc :context (db/decode-transit-pgobject context)))) + +(def http-request + (reify + yetti.request/Request + (get-header [_ name] + (case name + "x-forwarded-for" "127.0.0.44")))) + +(t/deftest push-events-1 + (with-redefs [app.config/flags #{:audit-log}] + (let [prof (th/create-profile* 1 {:is-active true}) + team-id (:default-team-id prof) + proj-id (:default-project-id prof) + + params {::th/type :push-audit-events + :app.http/request http-request + :profile-id (:id prof) + :events [{:name "navigate" + :props {:project-id proj-id + :team-id team-id + :route "dashboard-files"} + :context {:engine "blink"} + :profile-id (:id prof) + :timestamp (dt/now) + :type "action"}]} + out (th/command! params)] + ;; (th/print-result! out) + (t/is (nil? (:error out))) + (t/is (nil? (:result out))) + + (let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"]) + (mapv decode-row))] + ;; (pp/pprint rows) + (t/is (= 1 (count rows))) + (t/is (= (:id prof) (:profile-id row))) + (t/is (= "navigate" (:name row))) + (t/is (= "frontend" (:source row))))))) + +(t/deftest push-events-2 + (with-redefs [app.config/flags #{:audit-log}] + (let [prof (th/create-profile* 1 {:is-active true}) + team-id (:default-team-id prof) + proj-id (:default-project-id prof) + + params {::th/type :push-audit-events + :app.http/request http-request + :profile-id (:id prof) + :events [{:name "navigate" + :props {:project-id proj-id + :team-id team-id + :route "dashboard-files"} + :context {:engine "blink"} + :profile-id uuid/zero + :timestamp (dt/now) + :type "action"}]} + out (th/command! params)] + ;; (th/print-result! out) + (t/is (nil? (:error out))) + (t/is (nil? (:result out))) + + (let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"]) + (mapv decode-row))] + ;; (pp/pprint rows) + (t/is (= 1 (count rows))) + (t/is (= (:id prof) (:profile-id row))) + (t/is (= "navigate" (:name row))) + (t/is (= "frontend" (:source row))))))) + + diff --git a/frontend/src/app/main/data/events.cljs b/frontend/src/app/main/data/events.cljs index 0f8281866..fea0f0a5f 100644 --- a/frontend/src/app/main/data/events.cljs +++ b/frontend/src/app/main/data/events.cljs @@ -38,7 +38,7 @@ (defn- collect-context [] (let [uagent (UAParser.)] - (d/merge + (merge {:app-version (:full @cf/version) :locale @i18n/locale} (let [browser (.getBrowser uagent)] @@ -215,12 +215,17 @@ (defn- persist-events [events] (if (seq events) - (let [uri (u/join @cf/public-uri "api/audit/events") + (let [uri (u/join @cf/public-uri "api/rpc/command/push-audit-events") params {:uri uri :method :post + :credentials "include" :body (http/transit-data {:events events})}] (->> (http/send! params) - (rx/mapcat rp/handle-response))) + (rx/mapcat rp/handle-response) + (rx/catch (fn [cause] + (l/error :hint "unexpected error on persisting audit events") + (rx/of nil))))) + (rx/of nil))) (defn initialize @@ -274,7 +279,7 @@ (rx/map (fn [event] (let [session* (or @session (dt/now)) context (-> @context - (d/merge (:context event)) + (merge (:context event)) (assoc :session session*))] (reset! session session*) (-> event From c0a4b7dc76ae52d03ba05aaeafee50d6578f7d69 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 5 Dec 2022 23:01:53 +0100 Subject: [PATCH 02/14] :sparkles: Improve worker queue management and add specific worker instance for webhooks --- backend/src/app/config.clj | 7 +- backend/src/app/main.clj | 18 ++-- backend/src/app/worker.clj | 165 ++++++++++++++++++++----------------- 3 files changed, 106 insertions(+), 84 deletions(-) diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 32b01ca86..95a221b3a 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -108,7 +108,9 @@ (s/def ::default-executor-parallelism ::us/integer) (s/def ::scheduled-executor-parallelism ::us/integer) -(s/def ::worker-parallelism ::us/integer) + +(s/def ::worker-default-parallelism ::us/integer) +(s/def ::worker-webhook-parallelism ::us/integer) (s/def ::authenticated-cookie-domain ::us/string) (s/def ::authenticated-cookie-name ::us/string) @@ -222,7 +224,8 @@ ::error-report-webhook ::default-executor-parallelism ::scheduled-executor-parallelism - ::worker-parallelism + ::worker-default-parallelism + ::worker-webhook-parallelism ::file-change-snapshot-every ::file-change-snapshot-timeout ::user-feedback-destination diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 22d9de047..1c36c9d03 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -494,20 +494,28 @@ {:cron #app/cron "30 */5 * * * ?" ;; every 5m :task :audit-log-gc})]} - ::wrk/scheduler + ::wrk/dispatcher {::rds/redis (ig/ref ::rds/redis) ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)} - ::wrk/worker - {::wrk/parallelism (cf/get ::worker-parallelism 1) - ;; FIXME: read queues from configuration - ::wrk/queue "default" + [::default ::wrk/worker] + {::wrk/parallelism (cf/get ::worker-default-parallelism 1) + ::wrk/queue :default + ::rds/redis (ig/ref ::rds/redis) + ::wrk/registry (ig/ref ::wrk/registry) + ::mtx/metrics (ig/ref ::mtx/metrics) + ::db/pool (ig/ref ::db/pool)} + + [::webhook ::wrk/worker] + {::wrk/parallelism (cf/get ::worker-webhook-parallelism 1) + ::wrk/queue :webhooks ::rds/redis (ig/ref ::rds/redis) ::wrk/registry (ig/ref ::wrk/registry) ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)}}) + (def system nil) (defn start diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 607ba1809..b4305a3b9 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -14,6 +14,7 @@ [app.common.spec :as us] [app.common.transit :as t] [app.common.uuid :as uuid] + [app.config :as cf] [app.db :as db] [app.metrics :as mtx] [app.redis :as rds] @@ -174,63 +175,62 @@ (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))) -(s/def ::queue ::us/string) (s/def ::wait-duration ::dt/duration) -(defmethod ig/pre-init-spec ::scheduler [_] +(defmethod ig/pre-init-spec ::dispatcher [_] (s/keys :req [::mtx/metrics ::db/pool ::rds/redis] :opt [::wait-duration ::batch-size])) -(defmethod ig/prep-key ::scheduler +(defmethod ig/prep-key ::dispatcher [_ cfg] - (merge {::batch-size 1 - ::wait-duration (dt/duration "2s")} + (merge {::batch-size 100 + ::wait-duration (dt/duration "5s")} (d/without-nils cfg))) (def ^:private sql:select-next-tasks - "select * from task as t + "select id, queue from task as t where t.scheduled_at <= now() and (t.status = 'new' or t.status = 'retry') + and queue ~~* ?::text order by t.priority desc, t.scheduled_at limit ? for update skip locked") -(defn- format-queue - [queue] - (str/ffmt "penpot-tasks-queue:%" queue)) - -(defmethod ig/init-key ::scheduler +(defmethod ig/init-key ::dispatcher [_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}] - (letfn [(get-tasks-batch [conn] - (->> (db/exec! conn [sql:select-next-tasks batch-size]) - (map decode-task-row) - (seq))) + (letfn [(get-tasks [conn] + (let [prefix (str (cf/get :tenant) ":%")] + (seq (db/exec! conn [sql:select-next-tasks prefix batch-size])))) - (queue-task [conn rconn {:keys [id queue] :as task}] - (db/update! conn :task {:status "ready"} {:id id}) - (let [queue (format-queue queue) - payload (t/encode id) - result (rds/rpush! rconn queue payload)] - (l/debug :hist "scheduler: task pushed to redis" - :task-id id - :key queue - :queued result))) + (push-tasks! [conn rconn [queue tasks]] + (let [ids (mapv :id tasks) + key (str/ffmt "taskq:%" queue) + res (rds/rpush! rconn key (mapv t/encode ids)) + sql [(str "update task set status = 'scheduled'" + " where id = ANY(?)") + (db/create-array conn "uuid" ids)]] - (run-batch [rconn] + (db/exec-one! conn sql) + (l/debug :hist "dispatcher: push tasks to redis" + :queue queue + :tasks (count ids) + :queued res))) + + (run-batch! [rconn] (db/with-atomic [conn pool] - (when-let [tasks (get-tasks-batch conn)] - (run! (partial queue-task conn rconn) tasks) - true))) - ] + (when-let [tasks (get-tasks conn)] + (->> (group-by :queue tasks) + (run! (partial push-tasks! conn rconn))) + true)))] (if (db/read-only? pool) - (l/warn :hint "scheduler: not started (db is read-only)") + (l/warn :hint "dispatcher: not started (db is read-only)") (px/thread - {:name "penpot/scheduler"} - (l/info :hint "scheduler: started") + {:name "penpot/worker-dispatcher"} + (l/info :hint "dispatcher: started") (try (dm/with-open [rconn (rds/connect redis)] (loop [] @@ -238,7 +238,7 @@ (throw (InterruptedException. "interrumpted"))) (try - (when-not (run-batch rconn) + (when-not (run-batch! rconn) (px/sleep (::wait-duration cfg))) (catch InterruptedException cause (throw cause)) @@ -246,29 +246,29 @@ (cond (rds/exception? cause) (do - (l/warn :hint "scheduler: redis exception (will retry in an instant)" :cause cause) + (l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause) (px/sleep (::rds/timeout rconn))) (db/sql-exception? cause) (do - (l/warn :hint "scheduler: database exception (will retry in an instant)" :cause cause) + (l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause) (px/sleep (::rds/timeout rconn))) :else (do - (l/error :hint "scheduler: unhandled exception (will retry in an instant)" :cause cause) + (l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause) (px/sleep (::rds/timeout rconn)))))) (recur))) (catch InterruptedException _ - (l/debug :hint "scheduler: interrupted")) + (l/debug :hint "dispatcher: interrupted")) (catch Throwable cause - (l/error :hint "scheduler: unexpected exception" :cause cause)) + (l/error :hint "dispatcher: unexpected exception" :cause cause)) (finally - (l/info :hint "scheduler: terminated"))))))) + (l/info :hint "dispatcher: terminated"))))))) -(defmethod ig/halt-key! ::scheduler +(defmethod ig/halt-key! ::dispatcher [_ thread] (some-> thread px/interrupt!)) @@ -288,36 +288,38 @@ ::queue ::registry])) -;; FIXME: define queue as set (defmethod ig/prep-key ::worker [_ cfg] - (merge {::queue "default" ::parallelism 1} + (merge {::parallelism 1} (d/without-nils cfg))) (defmethod ig/init-key ::worker [_ {:keys [::db/pool ::queue ::parallelism] :as cfg}] - (if (db/read-only? pool) - (l/warn :hint "workers: not started (db is read-only)" :queue queue) - (doall - (->> (range parallelism) - (map #(assoc cfg ::worker-id %)) - (map start-worker!))))) + (let [queue (d/name queue) + cfg (assoc cfg ::queue queue)] + (if (db/read-only? pool) + (l/warn :hint "worker: not started (db is read-only)" :queue queue :parallelism parallelism) + (doall + (->> (range parallelism) + (map #(assoc cfg ::worker-id %)) + (map start-worker!)))))) (defmethod ig/halt-key! ::worker [_ threads] (run! px/interrupt! threads)) (defn- start-worker! - [{:keys [::rds/redis ::worker-id] :as cfg}] + [{:keys [::rds/redis ::worker-id ::queue] :as cfg}] (px/thread {:name (format "penpot/worker/%s" worker-id)} - (l/info :hint "worker: started" :worker-id worker-id) + (l/info :hint "worker: started" :worker-id worker-id :queue queue) (try (dm/with-open [rconn (rds/connect redis)] - (let [cfg (-> cfg - (update ::queue format-queue) - (assoc ::rds/rconn rconn) - (assoc ::timeout (dt/duration "5s")))] + (let [tenant (cf/get :tenant "main") + cfg (-> cfg + (assoc ::queue (str/ffmt "taskq:%:%" tenant queue)) + (assoc ::rds/rconn rconn) + (assoc ::timeout (dt/duration "5s")))] (loop [] (when (px/interrupted?) (throw (InterruptedException. "interrupted"))) @@ -327,13 +329,17 @@ (catch InterruptedException _ (l/debug :hint "worker: interrupted" - :worker-id worker-id)) + :worker-id worker-id + :queue queue)) (catch Throwable cause (l/error :hint "worker: unexpected exception" :worker-id worker-id + :queue queue :cause cause)) (finally - (l/info :hint "worker: terminated" :worker-id worker-id))))) + (l/info :hint "worker: terminated" + :worker-id worker-id + :queue queue))))) (defn- run-worker-loop! [{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}] @@ -631,9 +637,26 @@ ;; SUBMIT API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::task keyword?) +(defn- extract-props + [options] + (let [cns (namespace ::sample)] + (persistent! + (reduce-kv (fn [res k v] + (cond-> res + (not= (namespace k) cns) + (assoc! k v))) + (transient {}) + options)))) + +(def ^:private sql:insert-new-task + "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) + values (?, ?, ?, ?, ?, ?, now() + ?) + returning id") + +(s/def ::task (s/or :kw keyword? :str string?)) +(s/def ::queue (s/or :kw keyword? :str string?)) (s/def ::delay (s/or :int ::us/integer :duration dt/duration?)) -(s/def ::conn some?) +(s/def ::conn (s/or :pool ::db/pool :connection some?)) (s/def ::priority ::us/integer) (s/def ::max-retries ::us/integer) @@ -641,36 +664,24 @@ (s/keys :req [::task ::conn] :opt [::delay ::queue ::priority ::max-retries])) -(defn- extract-props - [options] - (persistent! - (reduce-kv (fn [res k v] - (cond-> res - (not (qualified-keyword? k)) - (assoc! k v))) - (transient {}) - options))) - -(def ^:private sql:insert-new-task - "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) - values (?, ?, ?, ?, ?, ?, now() + ?) - returning id") - (defn submit! [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn] - :or {delay 0 queue "default" priority 100 max-retries 3} + :or {delay 0 queue :default priority 100 max-retries 3} :as options}] (us/verify ::submit-options options) (let [duration (dt/duration delay) interval (db/interval duration) props (-> options extract-props db/tjson) - id (uuid/next)] + id (uuid/next) + tenant (cf/get :tenant) + task (d/name task) + queue (str/ffmt "%:%" tenant (d/name queue))] (l/debug :hint "submit task" - :name (d/name task) + :name task :queue queue :in (dt/format-duration duration)) - (db/exec-one! conn [sql:insert-new-task id (d/name task) props + (db/exec-one! conn [sql:insert-new-task id task props queue priority max-retries interval]) id)) From 9debfa3b276b33f8d3b7c638c782648dcd8c1489 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 5 Dec 2022 23:03:30 +0100 Subject: [PATCH 03/14] :paperclip: Minor cange on exception formating --- common/src/app/common/exceptions.cljc | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/src/app/common/exceptions.cljc b/common/src/app/common/exceptions.cljc index 7be8efb34..333af7472 100644 --- a/common/src/app/common/exceptions.cljc +++ b/common/src/app/common/exceptions.cljc @@ -169,9 +169,7 @@ (print-all [cause] (print-summary cause) - (newline) (println "DETAIL:") - (when trace? (print-trace cause)) From d584ae5a0f36367286116da3f1474074017c27a5 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 7 Dec 2022 14:43:07 +0100 Subject: [PATCH 04/14] :sparkles: Improve json encode/decode api --- backend/src/app/auth/oidc.clj | 8 ++++---- backend/src/app/db.clj | 9 +++++---- backend/src/app/http/middleware.clj | 2 +- backend/src/app/loggers/loki.clj | 2 +- backend/src/app/loggers/mattermost.clj | 2 +- backend/src/app/loggers/zmq.clj | 2 +- backend/src/app/tasks/telemetry.clj | 2 +- backend/src/app/util/json.clj | 27 +++++++++++++------------- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/backend/src/app/auth/oidc.clj b/backend/src/app/auth/oidc.clj index 6463c7075..8440b650e 100644 --- a/backend/src/app/auth/oidc.clj +++ b/backend/src/app/auth/oidc.clj @@ -64,7 +64,7 @@ nil) (= 200 (:status response)) - (let [data (json/read (:body response))] + (let [data (json/decode (:body response))] {:token-uri (get data :token_endpoint) :auth-uri (get data :authorization_endpoint) :user-uri (get data :userinfo_endpoint)}) @@ -172,7 +172,7 @@ :hint "unable to retrieve github emails" :http-status status :http-body body)) - (->> response :body json/read (filter :primary) first :email)))))) + (->> response :body json/decode (filter :primary) first :email)))))) (defmethod ig/pre-init-spec ::providers/github [_] (s/keys :req [::http/client])) @@ -278,7 +278,7 @@ (->> (http/req! cfg req) (p/map (fn [{:keys [status body] :as res}] (if (= status 200) - (let [data (json/read body)] + (let [data (json/decode body)] {:token (get data :access_token) :type (get data :token_type)}) (ex/raise :type :internal @@ -316,7 +316,7 @@ (get info attr-kw))) (process-response [response] - (p/let [info (-> response :body json/read) + (p/let [info (-> response :body json/decode) email (get-email info)] {:backend (:name provider) :email email diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index e897535da..9848a943a 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -427,7 +427,7 @@ val (.getValue o)] (if (or (= typ "json") (= typ "jsonb")) - (json/read val) + (json/decode val) val)))) (defn decode-transit-pgobject @@ -462,9 +462,10 @@ (defn json "Encode as plain json." [data] - (doto (org.postgresql.util.PGobject.) - (.setType "jsonb") - (.setValue (json/write-str data)))) + (when data + (doto (org.postgresql.util.PGobject.) + (.setType "jsonb") + (.setValue (json/encode-str data))))) ;; --- Locks diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index ce0471aff..5f687be12 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -46,7 +46,7 @@ (str/starts-with? header "application/json") (with-open [is (yrq/body request)] - (let [params (json/read is)] + (let [params (json/decode is)] (-> request (assoc :body-params params) (update :params merge params)))) diff --git a/backend/src/app/loggers/loki.clj b/backend/src/app/loggers/loki.clj index 3f95d3ad6..68f5ee74a 100644 --- a/backend/src/app/loggers/loki.clj +++ b/backend/src/app/loggers/loki.clj @@ -73,7 +73,7 @@ :timeout 3000 :method :post :headers {"content-type" "application/json"} - :body (json/write payload)} + :body (json/encode payload)} {:sync? true})) (defn- handle-event diff --git a/backend/src/app/loggers/mattermost.clj b/backend/src/app/loggers/mattermost.clj index 15c51d044..f7a1efb49 100644 --- a/backend/src/app/loggers/mattermost.clj +++ b/backend/src/app/loggers/mattermost.clj @@ -29,7 +29,7 @@ {:uri (cf/get :error-report-webhook) :method :post :headers {"content-type" "application/json"} - :body (json/write-str {:text text})} + :body (json/encode-str {:text text})} {:sync? true})] (when (not= 200 (:status resp)) diff --git a/backend/src/app/loggers/zmq.clj b/backend/src/app/loggers/zmq.clj index 19a7e9800..77b7de549 100644 --- a/backend/src/app/loggers/zmq.clj +++ b/backend/src/app/loggers/zmq.clj @@ -92,7 +92,7 @@ (.. socket (setReceiveTimeOut 5000)) (loop [] (let [msg (.recv ^ZMQ$Socket socket) - msg (ex/ignoring (json/read msg json-mapper)) + msg (ex/ignoring (json/decode msg json-mapper)) msg (if (nil? msg) :empty msg)] (when (a/>!! output msg) (recur)))) diff --git a/backend/src/app/tasks/telemetry.clj b/backend/src/app/tasks/telemetry.clj index e6323ebdb..7754f699e 100644 --- a/backend/src/app/tasks/telemetry.clj +++ b/backend/src/app/tasks/telemetry.clj @@ -81,7 +81,7 @@ {:method :post :uri (cf/get :telemetry-uri) :headers {"content-type" "application/json"} - :body (json/write-str data)} + :body (json/encode-str data)} {:sync? true})] (when (> (:status response) 206) (ex/raise :type :internal diff --git a/backend/src/app/util/json.clj b/backend/src/app/util/json.clj index 3547bfebd..abcb4154b 100644 --- a/backend/src/app/util/json.clj +++ b/backend/src/app/util/json.clj @@ -5,7 +5,6 @@ ;; Copyright (c) KALEIDOS INC (ns app.util.json - (:refer-clojure :exclude [read]) (:require [jsonista.core :as j])) @@ -13,23 +12,23 @@ [params] (j/object-mapper params)) -(defn write +(defn read! + ([from] (j/read-value from j/keyword-keys-object-mapper)) + ([from mapper] (j/read-value from mapper))) + +(defn write! + ([to v] (j/write-value to v j/keyword-keys-object-mapper)) + ([to v mapper] (j/write-value to v mapper))) + +(defn encode ([v] (j/write-value-as-bytes v j/keyword-keys-object-mapper)) ([v mapper] (j/write-value-as-bytes v mapper))) -(defn write-str - ([v] (j/write-value-as-string v j/keyword-keys-object-mapper)) - ([v mapper] (j/write-value-as-string v mapper))) - -(defn read +(defn decode ([v] (j/read-value v j/keyword-keys-object-mapper)) ([v mapper] (j/read-value v mapper))) -(defn encode - [v] - (j/write-value-as-bytes v j/keyword-keys-object-mapper)) - -(defn decode - [v] - (j/read-value v j/keyword-keys-object-mapper)) +(defn encode-str + ([v] (j/write-value-as-string v j/keyword-keys-object-mapper)) + ([v mapper] (j/write-value-as-string v mapper))) From d768711caae6bc18e828b92885bc9b3e65bcdb46 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 7 Dec 2022 14:43:35 +0100 Subject: [PATCH 05/14] :sparkles: Improve null handling on more db helpers --- backend/src/app/db.clj | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index 9848a943a..6e4d12061 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -442,22 +442,25 @@ (defn inet [ip-addr] - (doto (org.postgresql.util.PGobject.) - (.setType "inet") - (.setValue (str ip-addr)))) + (when ip-addr + (doto (org.postgresql.util.PGobject.) + (.setType "inet") + (.setValue (str ip-addr))))) (defn decode-inet [^PGobject o] - (if (= "inet" (.getType o)) - (.getValue o) - nil)) + (when o + (if (= "inet" (.getType o)) + (.getValue o) + nil))) (defn tjson "Encode as transit json." [data] - (doto (org.postgresql.util.PGobject.) - (.setType "jsonb") - (.setValue (t/encode-str data {:type :json-verbose})))) + (when data + (doto (org.postgresql.util.PGobject.) + (.setType "jsonb") + (.setValue (t/encode-str data {:type :json-verbose}))))) (defn json "Encode as plain json." From 5b9f0ed0b19cc3a072494f622d1dfc1be2ac2b88 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 7 Dec 2022 14:47:32 +0100 Subject: [PATCH 06/14] :tada: Add webhook processing worker --- backend/src/app/loggers/audit.clj | 48 +++-- backend/src/app/loggers/webhooks.clj | 171 ++++++++++++++++++ backend/src/app/main.clj | 16 +- backend/src/app/migrations.clj | 3 + .../sql/0086-add-webhook-delivery-table.sql | 16 ++ backend/src/app/rpc.clj | 9 +- backend/src/app/rpc/commands/webhooks.clj | 87 +++++---- backend/src/app/rpc/helpers.clj | 4 + backend/src/app/rpc/mutations/projects.clj | 18 +- 9 files changed, 308 insertions(+), 64 deletions(-) create mode 100644 backend/src/app/loggers/webhooks.clj create mode 100644 backend/src/app/migrations/sql/0086-add-webhook-delivery-table.sql diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index c8ef0d796..d004fe5d4 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -17,6 +17,7 @@ [app.db :as db] [app.http.client :as http] [app.loggers.audit.tasks :as-alias tasks] + [app.loggers.webhooks :as-alias webhooks] [app.main :as-alias main] [app.metrics :as mtx] [app.tokens :as tokens] @@ -103,10 +104,11 @@ (s/def ::type ::us/string) (s/def ::props (s/map-of ::us/keyword any?)) (s/def ::ip-addr ::us/string) +(s/def ::webhooks/event? ::us/boolean) (s/def ::event (s/keys :req-un [::type ::name ::profile-id] - :opt-un [::ip-addr ::props])) + :opt-un [::ip-addr ::props ::webhooks/event?])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; COLLECTOR @@ -117,8 +119,7 @@ ;; an external storage and data cleared. (s/def ::collector - (s/nilable - (s/keys :req [::wrk/executor ::db/pool]))) + (s/keys :req [::wrk/executor ::db/pool])) (defmethod ig/pre-init-spec ::collector [_] (s/keys :req [::db/pool ::wrk/executor ::mtx/metrics])) @@ -126,11 +127,8 @@ (defmethod ig/init-key ::collector [_ {:keys [::db/pool] :as cfg}] (cond - (not (contains? cf/flags :audit-log)) - (l/info :hint "audit: log collection disabled") - (db/read-only? pool) - (l/warn :hint "audit: log collection disabled (db is read-only)") + (l/warn :hint "audit: disabled (db is read-only)") :else cfg)) @@ -138,19 +136,35 @@ (defn- persist-event! [pool event] (us/verify! ::event event) - (db/insert! pool :audit-log - {:id (uuid/next) - :name (:name event) - :type (:type event) - :profile-id (:profile-id event) - :tracked-at (dt/now) - :ip-addr (some-> (:ip-addr event) db/inet) - :props (db/tjson (:props event)) - :source "backend"})) + (let [params {:id (uuid/next) + :name (:name event) + :type (:type event) + :profile-id (:profile-id event) + :tracked-at (dt/now) + :ip-addr (:ip-addr event) + :props (:props event)}] + + (when (contains? cf/flags :audit-log) + (db/insert! pool :audit-log + (-> params + (update :props db/tjson) + (update :ip-addr db/inet) + (assoc :source "backend")))) + + (when (and (contains? cf/flags :webhooks) + (::webhooks/event? event)) + (wrk/submit! ::wrk/conn pool + ::wrk/task :process-webhook-event + ::wrk/queue :webhooks + ::wrk/max-retries 0 + ::webhooks/event (-> params + (dissoc :ip-addr) + (dissoc :type)))))) (defn submit! "Submit audit event to the collector." - [{:keys [::wrk/executor ::db/pool]} params] + [{:keys [::wrk/executor ::db/pool] :as collector} params] + (us/assert! ::collector collector) (->> (px/submit! executor (partial persist-event! pool (d/without-nils params))) (p/merr (fn [cause] (l/error :hint "audit: unexpected error processing event" :cause cause) diff --git a/backend/src/app/loggers/webhooks.clj b/backend/src/app/loggers/webhooks.clj new file mode 100644 index 000000000..28331540f --- /dev/null +++ b/backend/src/app/loggers/webhooks.clj @@ -0,0 +1,171 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.loggers.webhooks + "A mattermost integration for error reporting." + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.common.transit :as t] + [app.common.uri :as uri] + [app.db :as db] + [app.http.client :as http] + [app.util.json :as json] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [integrant.core :as ig])) + +;; --- PROC + +(defn lookup-webhooks-by-team + [pool team-id] + (db/exec! pool ["select * from webhook where team_id=? and is_active=true" team-id])) + +(defn lookup-webhooks-by-project + [pool project-id] + (let [sql [(str "select * from webhook as w" + " join project as p on (p.team_id = w.team_id)" + " where p.id = ? and w.is_active = true") + project-id]] + (db/exec! pool sql))) + +(defn lookup-webhooks-by-file + [pool file-id] + (let [sql [(str "select * from webhook as w" + " join project as p on (p.team_id = w.team_id)" + " join file as f on (f.project_id = p.id)" + " where f.id = ? and w.is_active = true") + file-id]] + (db/exec! pool sql))) + +(defn lookup-webhooks + [{:keys [::db/pool]} {:keys [props] :as event}] + (or (some->> (:team-id props) (lookup-webhooks-by-team pool)) + (some->> (:project-id props) (lookup-webhooks-by-project pool)) + (some->> (:file-id props) (lookup-webhooks-by-file pool)))) + +(defmethod ig/pre-init-spec ::process-event-handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/init-key ::process-event-handler + [_ {:keys [::db/pool] :as cfg}] + (fn [{:keys [props] :as task}] + (let [event (::event props)] + (l/debug :hint "process webhook event" + :name (:name event)) + + (when-let [items (lookup-webhooks cfg event)] + ;; (app.common.pprint/pprint items) + (l/trace :hint "webhooks found for event" :total (count items)) + + (db/with-atomic [conn pool] + (doseq [item items] + (wrk/submit! ::wrk/conn conn + ::wrk/task :run-webhook + ::wrk/queue :webhooks + ::wrk/max-retries 3 + ::event event + ::config item))))))) + +;; --- RUN + +(declare interpret-exception) +(declare interpret-response) + +(def ^:private mapper + (json/mapper + {:encode-key-fn str/camel + :decode-key-fn (comp keyword str/kebab) + :pretty true})) + +(defmethod ig/pre-init-spec ::run-webhook-handler [_] + (s/keys :req [::http/client ::db/pool])) + +(defmethod ig/prep-key ::run-webhook-handler + [_ cfg] + (merge {::max-errors 3} (d/without-nils cfg))) + +(defmethod ig/init-key ::run-webhook-handler + [_ {:keys [::db/pool ::max-errors] :as cfg}] + (letfn [(update-webhook! [whook err] + (if err + (let [sql [(str "update webhook " + " set error_code=?, " + " error_count=error_count+1 " + " where id=?") + err + (:id whook)] + res (db/exec-one! pool sql {:return-keys true})] + (when (>= (:error-count res) max-errors) + (db/update! pool :webhook {:is-active false} {:id (:id whook)}))) + + (db/update! pool :webhook + {:updated-at (dt/now) + :error-code nil + :error-count 0} + {:id (:id whook)}))) + + (report-delivery! [whook req rsp err] + (db/insert! pool :webhook-delivery + {:webhook-id (:id whook) + :created-at (dt/now) + :error-code err + :req-data (db/tjson req) + :rsp-data (db/tjson rsp)}))] + + (fn [{:keys [props] :as task}] + (let [event (::event props) + whook (::config props) + + body (case (:mtype whook) + "application/json" (json/encode-str event mapper) + "application/transit+json" (t/encode-str event) + "application/x-www-form-urlencoded" (uri/map->query-string event))] + + (l/debug :hint "run webhook" + :event-name (:name event) + :webhook-id (:id whook) + :webhook-uri (:uri whook) + :webhook-mtype (:mtype whook)) + + (let [req {:uri (:uri whook) + :headers {"content-type" (:mtype whook)} + :timeout (dt/duration "4s") + :method :post + :body body}] + (try + (let [rsp (http/req! cfg req {:response-type :input-stream :sync? true}) + err (interpret-response rsp)] + (report-delivery! whook req rsp err) + (update-webhook! whook err)) + (catch Throwable cause + (let [err (interpret-exception cause)] + (report-delivery! whook req nil err) + (update-webhook! whook err) + (when (= err "unknown") + (l/error :hint "unknown error on webhook request" + :cause cause)))))))))) + +(defn interpret-response + [{:keys [status] :as response}] + (when-not (or (= 200 status) + (= 204 status)) + (str/ffmt "unexpected-status:%" status))) + +(defn interpret-exception + [cause] + (cond + (instance? javax.net.ssl.SSLHandshakeException cause) + "ssl-validation-error" + + (instance? java.net.ConnectException cause) + "connection-error" + + (instance? java.net.http.HttpConnectTimeoutException cause) + "timeout" + )) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 1c36c9d03..b2145aeb4 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -15,6 +15,7 @@ [app.http.session :as-alias http.session] [app.loggers.audit :as-alias audit] [app.loggers.audit.tasks :as-alias audit.tasks] + [app.loggers.webhooks :as-alias webhooks] [app.loggers.zmq :as-alias lzmq] [app.metrics :as-alias mtx] [app.metrics.definition :as-alias mdef] @@ -357,7 +358,12 @@ :telemetry (ig/ref :app.tasks.telemetry/handler) :session-gc (ig/ref :app.http.session/gc-task) :audit-log-archive (ig/ref ::audit.tasks/archive) - :audit-log-gc (ig/ref ::audit.tasks/gc)}} + :audit-log-gc (ig/ref ::audit.tasks/gc) + + :process-webhook-event + (ig/ref ::webhooks/process-event-handler) + :run-webhook + (ig/ref ::webhooks/run-webhook-handler)}} :app.emails/sendmail @@ -420,6 +426,14 @@ ::audit.tasks/gc {::db/pool (ig/ref ::db/pool)} + ::webhooks/process-event-handler + {::db/pool (ig/ref ::db/pool) + ::http.client/client (ig/ref ::http.client/client)} + + ::webhooks/run-webhook-handler + {::db/pool (ig/ref ::db/pool) + ::http.client/client (ig/ref ::http.client/client)} + :app.loggers.loki/reporter {::lzmq/receiver (ig/ref ::lzmq/receiver) ::http.client/client (ig/ref ::http.client/client)} diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index c0694e103..dda506b11 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -265,6 +265,9 @@ {:name "0085-add-webhook-table" :fn (mg/resource "app/migrations/sql/0085-add-webhook-table.sql")} + + {:name "0086-add-webhook-delivery-table" + :fn (mg/resource "app/migrations/sql/0086-add-webhook-delivery-table.sql")} ]) diff --git a/backend/src/app/migrations/sql/0086-add-webhook-delivery-table.sql b/backend/src/app/migrations/sql/0086-add-webhook-delivery-table.sql new file mode 100644 index 000000000..3fead5b51 --- /dev/null +++ b/backend/src/app/migrations/sql/0086-add-webhook-delivery-table.sql @@ -0,0 +1,16 @@ +CREATE TABLE webhook_delivery ( + webhook_id uuid NOT NULL REFERENCES webhook(id) ON DELETE CASCADE DEFERRABLE, + created_at timestamptz NOT NULL DEFAULT now(), + + error_code text NULL, + + req_data jsonb NULL, + rsp_data jsonb NULL, + + PRIMARY KEY (webhook_id, created_at) +); + +ALTER TABLE webhook_delivery + ALTER COLUMN error_code SET STORAGE external, + ALTER COLUMN req_data SET STORAGE external, + ALTER COLUMN rsp_data SET STORAGE external; diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 51f48ba28..df4fd4ea9 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -16,6 +16,7 @@ [app.http.client :as-alias http.client] [app.http.session :as-alias http.session] [app.loggers.audit :as audit] + [app.loggers.webhooks :as-alias webhooks] [app.metrics :as mtx] [app.msgbus :as-alias mbus] [app.rpc.climit :as climit] @@ -155,18 +156,24 @@ (:profile-id result) (:profile-id params) uuid/zero) + props (or (::audit/replace-props resultm) (-> params + (d/without-qualified) (merge (::audit/props resultm)) (dissoc :profile-id) (dissoc :type))) + event {:type (or (::audit/type resultm) (::type cfg)) :name (or (::audit/name resultm) (::sv/name mdata)) :profile-id profile-id :ip-addr (some-> request audit/parse-client-ip) - :props (d/without-qualified props)}] + :props props + ::webhooks/event? (or (::webhooks/event? mdata) + (::webhooks/event? resultm) + false)}] (audit/submit! collector event))) diff --git a/backend/src/app/rpc/commands/webhooks.clj b/backend/src/app/rpc/commands/webhooks.clj index 66ae14965..f1af96175 100644 --- a/backend/src/app/rpc/commands/webhooks.clj +++ b/backend/src/app/rpc/commands/webhooks.clj @@ -11,6 +11,7 @@ [app.common.uuid :as uuid] [app.db :as db] [app.http.client :as http] + [app.loggers.webhooks :as webhooks] [app.rpc.doc :as-alias doc] [app.rpc.queries.teams :refer [check-edition-permissions! check-read-permissions!]] [app.util.services :as sv] @@ -35,77 +36,83 @@ (s/keys :req-un [::profile-id ::team-id ::uri ::mtype] :opt-un [::is-active])) -;; FIXME: validate -;; FIXME: default ratelimit -;; FIXME: quotes +;; NOTE: for now the quote is hardcoded but this need to be solved in +;; a more universal way for handling properly object quotes +(def max-hooks-for-team 8) (defn- validate-webhook! [cfg whook params] (letfn [(handle-exception [exception] - (cond - (instance? java.util.concurrent.CompletionException exception) - (handle-exception (ex/cause exception)) - - (instance? javax.net.ssl.SSLHandshakeException exception) + (if-let [hint (webhooks/interpret-exception exception)] (ex/raise :type :validation :code :webhook-validation - :hint "ssl-validation") - - :else - (ex/raise :type :validation + :hint hint) + (ex/raise :type :internal :code :webhook-validation - :hint "unknown" :cause exception))) - (handle-response [{:keys [status] :as response}] - (when (not= status 200) + (handle-response [response] + (when-let [hint (webhooks/interpret-response response)] (ex/raise :type :validation :code :webhook-validation - :hint (str/ffmt "unexpected-status-%" (:status response)))))] + :hint hint)))] (if (not= (:uri whook) (:uri params)) (->> (http/req! cfg {:method :head :uri (:uri params) - :timeout (dt/duration "2s")}) + :timeout (dt/duration "3s")}) (p/hmap (fn [response exception] (if exception (handle-exception exception) (handle-response response))))) (p/resolved nil)))) +(defn- validate-quotes! + [{:keys [::db/pool]} {:keys [team-id]}] + (let [sql ["select count(*) as total from webhook where team_id = ?" team-id] + total (:total (db/exec-one! pool sql))] + (when (>= total max-hooks-for-team) + (ex/raise :type :restriction + :code :webhooks-quote-reached + :hint (str/ffmt "can't create more than % webhooks per team" max-hooks-for-team))))) + +(defn- insert-webhook! + [{:keys [::db/pool]} {:keys [team-id uri mtype is-active] :as params}] + (db/insert! pool :webhook + {:id (uuid/next) + :team-id team-id + :uri uri + :is-active is-active + :mtype mtype})) + +(defn- update-webhook! + [{:keys [::db/pool] :as cfg} {:keys [id] :as wook} {:keys [uri mtype is-active] :as params}] + (db/update! pool :webhook + {:uri uri + :is-active is-active + :mtype mtype + :error-code nil + :error-count 0} + {:id id})) + (sv/defmethod ::create-webhook {::doc/added "1.17"} - [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id uri mtype is-active] :as params}] + [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id] :as params}] (check-edition-permissions! pool profile-id team-id) - (letfn [(insert-webhook [_] - (db/insert! pool :webhook - {:id (uuid/next) - :team-id team-id - :uri uri - :is-active is-active - :mtype mtype}))] - (->> (validate-webhook! cfg nil params) - (p/fmap executor insert-webhook)))) + (->> (validate-webhook! cfg nil params) + (p/fmap executor (fn [_] (validate-quotes! cfg params))) + (p/fmap executor (fn [_] (insert-webhook! cfg params))))) (s/def ::update-webhook (s/keys :req-un [::id ::uri ::mtype ::is-active])) (sv/defmethod ::update-webhook {::doc/added "1.17"} - [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id id uri mtype is-active] :as params}] - (let [whook (db/get pool :webhook {:id id}) - update-fn (fn [_] - (db/update! pool :webhook - {:uri uri - :is-active is-active - :mtype mtype - :error-code nil - :error-count 0} - {:id id}))] + [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [id profile-id] :as params}] + (let [whook (db/get pool :webhook {:id id})] (check-edition-permissions! pool profile-id (:team-id whook)) - (->> (validate-webhook! cfg whook params) - (p/fmap executor update-fn)))) + (p/fmap executor (fn [_] (update-webhook! cfg whook params)))))) (s/def ::delete-webhook (s/keys :req-un [::profile-id ::id])) @@ -133,4 +140,4 @@ [{:keys [pool] :as cfg} {:keys [profile-id team-id]}] (with-open [conn (db/open pool)] (check-read-permissions! conn profile-id team-id) - (db/exec! conn [sql:get-webhooks team-id]))) \ No newline at end of file + (db/exec! conn [sql:get-webhooks team-id]))) diff --git a/backend/src/app/rpc/helpers.clj b/backend/src/app/rpc/helpers.clj index aef80d754..1f4d7bbf9 100644 --- a/backend/src/app/rpc/helpers.clj +++ b/backend/src/app/rpc/helpers.clj @@ -64,6 +64,10 @@ [mdw mdata] (vary-meta mdw merge mdata)) +(defn assoc-meta + [mdw k v] + (vary-meta mdw assoc k v)) + (defn with-http-cache [mdw max-age] (vary-meta mdw update ::rpc/response-transform-fns conj diff --git a/backend/src/app/rpc/mutations/projects.clj b/backend/src/app/rpc/mutations/projects.clj index 35e598ae4..95c36d957 100644 --- a/backend/src/app/rpc/mutations/projects.clj +++ b/backend/src/app/rpc/mutations/projects.clj @@ -9,6 +9,10 @@ [app.common.spec :as us] [app.common.uuid :as uuid] [app.db :as db] + [app.loggers.audit :as-alias audit] + [app.loggers.webhooks :as-alias webhooks] + [app.rpc.doc :as-alias doc] + [app.rpc.helpers :as rph] [app.rpc.permissions :as perms] [app.rpc.queries.projects :as proj] [app.rpc.queries.teams :as teams] @@ -22,7 +26,6 @@ (s/def ::name ::us/string) (s/def ::profile-id ::us/uuid) - ;; --- Mutation: Create Project (declare create-project) @@ -35,6 +38,8 @@ :opt-un [::id])) (sv/defmethod ::create-project + {::doc/added "1.0" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [profile-id team-id] :as params}] (db/with-atomic [conn pool] (teams/check-edition-permissions! conn profile-id team-id) @@ -122,10 +127,13 @@ ;; this is not allowed. (sv/defmethod ::delete-project + {::doc/added "1.0" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (proj/check-edition-permissions! conn profile-id id) - (db/update! conn :project - {:deleted-at (dt/now)} - {:id id :is-default false}) - nil)) + (let [project (db/update! conn :project + {:deleted-at (dt/now)} + {:id id :is-default false})] + (rph/with-meta (rph/wrap) + {::audit/props {:team-id (:team-id project)}})))) From edaa62b05bbef8b1b0ad171096e36c33163ca19d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 7 Dec 2022 14:52:07 +0100 Subject: [PATCH 07/14] :lipstick: Replace us/assert with us/assert! on dashboard data ns --- frontend/src/app/main/data/dashboard.cljs | 86 ++++++++++++----------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/frontend/src/app/main/data/dashboard.cljs b/frontend/src/app/main/data/dashboard.cljs index dd5e27f12..16feb7014 100644 --- a/frontend/src/app/main/data/dashboard.cljs +++ b/frontend/src/app/main/data/dashboard.cljs @@ -64,7 +64,7 @@ (defn initialize [{:keys [id] :as params}] - (us/assert ::us/uuid id) + (us/assert! ::us/uuid id) (ptk/reify ::initialize ptk/UpdateEvent (update [_ state] @@ -201,7 +201,7 @@ (defn search [params] - (us/assert ::search params) + (us/assert! ::search params) (ptk/reify ::search ptk/UpdateEvent (update [_ state] @@ -236,7 +236,7 @@ (defn fetch-files [{:keys [project-id] :as params}] - (us/assert ::us/uuid project-id) + (us/assert! ::us/uuid project-id) (ptk/reify ::fetch-files ptk/WatchEvent (watch [_ _ _] @@ -347,7 +347,7 @@ (defn toggle-file-select [{:keys [id project-id] :as file}] - (us/assert ::file file) + (us/assert! ::file file) (ptk/reify ::toggle-file-select ptk/UpdateEvent (update [_ state] @@ -377,7 +377,7 @@ (defn create-team [{:keys [name] :as params}] - (us/assert string? name) + (us/assert! ::us/string name) (ptk/reify ::create-team ptk/WatchEvent (watch [_ _ _] @@ -394,7 +394,7 @@ (defn create-team-with-invitations [{:keys [name emails role] :as params}] - (us/assert string? name) + (us/assert! ::us/string name) (ptk/reify ::create-team-with-invitations ptk/WatchEvent (watch [_ _ _] @@ -413,7 +413,7 @@ (defn update-team [{:keys [id name] :as params}] - (us/assert ::team params) + (us/assert! ::team params) (ptk/reify ::update-team ptk/UpdateEvent (update [_ state] @@ -426,7 +426,7 @@ (defn update-team-photo [{:keys [file] :as params}] - (us/assert ::di/file file) + (us/assert! ::di/file file) (ptk/reify ::update-team-photo ptk/WatchEvent (watch [_ state _] @@ -447,8 +447,8 @@ (defn update-team-member-role [{:keys [role member-id] :as params}] - (us/assert ::us/uuid member-id) - (us/assert ::us/keyword role) + (us/assert! ::us/uuid member-id) + (us/assert! ::us/keyword role) (ptk/reify ::update-team-member-role ptk/WatchEvent (watch [_ state _] @@ -461,7 +461,7 @@ (defn delete-team-member [{:keys [member-id] :as params}] - (us/assert ::us/uuid member-id) + (us/assert! ::us/uuid member-id) (ptk/reify ::delete-team-member ptk/WatchEvent (watch [_ state _] @@ -474,7 +474,9 @@ (defn leave-team [{:keys [reassign-to] :as params}] - (us/assert (s/nilable ::us/uuid) reassign-to) + (us/assert! + :spec (s/nilable ::us/uuid) + :val reassign-to) (ptk/reify ::leave-team ptk/WatchEvent (watch [_ state _] @@ -510,9 +512,9 @@ (defn update-team-invitation-role [{:keys [email team-id role] :as params}] - (us/assert ::us/email email) - (us/assert ::us/uuid team-id) - (us/assert ::us/keyword role) + (us/assert! ::us/email email) + (us/assert! ::us/uuid team-id) + (us/assert! ::us/keyword role) (ptk/reify ::update-team-invitation-role IDeref (-deref [_] {:role role}) @@ -528,8 +530,8 @@ (defn delete-team-invitation [{:keys [email team-id] :as params}] - (us/assert ::us/email email) - (us/assert ::us/uuid team-id) + (us/assert! ::us/email email) + (us/assert! ::us/uuid team-id) (ptk/reify ::delete-team-invitation ptk/WatchEvent (watch [_ _ _] @@ -542,7 +544,7 @@ (defn delete-team-webhook [{:keys [id] :as params}] - (us/assert ::us/uuid id) + (us/assert! ::us/uuid id) (ptk/reify ::delete-team-webhook ptk/WatchEvent (watch [_ state _] @@ -562,10 +564,10 @@ (defn update-team-webhook [{:keys [id uri mtype is-active] :as params}] - (us/assert ::us/uuid id) - (us/assert ::us/uri uri) - (us/assert ::mtype mtype) - (us/assert ::us/boolean is-active) + (us/assert! ::us/uuid id) + (us/assert! ::us/uri uri) + (us/assert! ::mtype mtype) + (us/assert! ::us/boolean is-active) (ptk/reify ::update-team-webhook ptk/WatchEvent (watch [_ state _] @@ -580,9 +582,9 @@ (defn create-team-webhook [{:keys [uri mtype is-active] :as params}] - (us/assert ::us/uri uri) - (us/assert ::mtype mtype) - (us/assert ::us/boolean is-active) + (us/assert! ::us/uri uri) + (us/assert! ::mtype mtype) + (us/assert! ::us/boolean is-active) (ptk/reify ::create-team-webhook ptk/WatchEvent (watch [_ state _] @@ -599,7 +601,7 @@ (defn delete-team [{:keys [id] :as params}] - (us/assert ::team params) + (us/assert! ::team params) (ptk/reify ::delete-team ptk/WatchEvent (watch [_ _ _] @@ -652,7 +654,7 @@ (defn duplicate-project [{:keys [id name] :as params}] - (us/assert ::us/uuid id) + (us/assert! ::us/uuid id) (ptk/reify ::duplicate-project ptk/WatchEvent (watch [_ _ _] @@ -669,8 +671,8 @@ (defn move-project [{:keys [id team-id] :as params}] - (us/assert ::us/uuid id) - (us/assert ::us/uuid team-id) + (us/assert! ::us/uuid id) + (us/assert! ::us/uuid team-id) (ptk/reify ::move-project IDeref (-deref [_] @@ -688,7 +690,7 @@ (defn toggle-project-pin [{:keys [id is-pinned] :as project}] - (us/assert ::project project) + (us/assert! ::project project) (ptk/reify ::toggle-project-pin ptk/UpdateEvent (update [_ state] @@ -705,7 +707,7 @@ (defn rename-project [{:keys [id name] :as params}] - (us/assert ::project params) + (us/assert! ::project params) (ptk/reify ::rename-project ptk/UpdateEvent (update [_ state] @@ -723,7 +725,7 @@ (defn delete-project [{:keys [id] :as params}] - (us/assert ::project params) + (us/assert! ::project params) (ptk/reify ::delete-project ptk/UpdateEvent (update [_ state] @@ -745,7 +747,7 @@ (defn delete-file [{:keys [id project-id] :as params}] - (us/assert ::file params) + (us/assert! ::file params) (ptk/reify ::delete-file ptk/UpdateEvent (update [_ state] @@ -764,7 +766,7 @@ (defn rename-file [{:keys [id name] :as params}] - (us/assert ::file params) + (us/assert! ::file params) (ptk/reify ::rename-file IDeref (-deref [_] @@ -787,7 +789,7 @@ (defn set-file-shared [{:keys [id is-shared] :as params}] - (us/assert ::file params) + (us/assert! ::file params) (ptk/reify ::set-file-shared IDeref (-deref [_] @@ -828,7 +830,7 @@ (defn create-file [{:keys [project-id] :as params}] - (us/assert ::us/uuid project-id) + (us/assert! ::us/uuid project-id) (ptk/reify ::create-file IDeref @@ -857,8 +859,8 @@ (defn duplicate-file [{:keys [id name] :as params}] - (us/assert ::us/uuid id) - (us/assert ::name name) + (us/assert! ::us/uuid id) + (us/assert! ::name name) (ptk/reify ::duplicate-file ptk/WatchEvent (watch [_ _ _] @@ -877,8 +879,8 @@ (defn move-files [{:keys [ids project-id] :as params}] - (us/assert ::us/set-of-uuid ids) - (us/assert ::us/uuid project-id) + (us/assert! ::us/set-of-uuid ids) + (us/assert! ::us/uuid project-id) (ptk/reify ::move-files IDeref (-deref [_] @@ -898,7 +900,7 @@ ;; --- EVENT: clone-template (defn clone-template [{:keys [template-id project-id] :as params}] - (us/assert ::us/uuid project-id) + (us/assert! ::us/uuid project-id) (ptk/reify ::clone-template IDeref (-deref [_] @@ -920,7 +922,7 @@ (defn go-to-workspace [{:keys [id project-id] :as file}] - (us/assert ::file file) + (us/assert! ::file file) (ptk/reify ::go-to-workspace ptk/WatchEvent (watch [_ _ _] From 21abd98b95c2378e95d2325c71840236ddad50fd Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 7 Dec 2022 15:51:53 +0100 Subject: [PATCH 08/14] :sparkles: Integrate error handling for webhooks UI --- frontend/src/app/main/data/events.cljs | 2 +- frontend/src/app/main/ui/dashboard/team.cljs | 223 ++++++++++--------- frontend/translations/en.po | 27 ++- frontend/translations/es.po | 27 ++- 4 files changed, 153 insertions(+), 126 deletions(-) diff --git a/frontend/src/app/main/data/events.cljs b/frontend/src/app/main/data/events.cljs index fea0f0a5f..e4644d875 100644 --- a/frontend/src/app/main/data/events.cljs +++ b/frontend/src/app/main/data/events.cljs @@ -222,7 +222,7 @@ :body (http/transit-data {:events events})}] (->> (http/send! params) (rx/mapcat rp/handle-response) - (rx/catch (fn [cause] + (rx/catch (fn [_] (l/error :hint "unexpected error on persisting audit events") (rx/of nil))))) diff --git a/frontend/src/app/main/ui/dashboard/team.cljs b/frontend/src/app/main/ui/dashboard/team.cljs index 9e1c50c92..0a333b1f6 100644 --- a/frontend/src/app/main/ui/dashboard/team.cljs +++ b/frontend/src/app/main/ui/dashboard/team.cljs @@ -587,53 +587,78 @@ (s/def ::webhook-form (s/keys :req-un [::uri ::mtype])) -(mf/defc webhook-modal {::mf/register modal/components - ::mf/register-as :webhook} +(def valid-webhook-mtypes + [{:label "application/json" :value "application/json"} + {:label "application/x-www-form-urlencoded" :value "application/x-www-form-urlencoded"} + {:label "application/transit+json" :value "application/transit+json"}]) + +(defn- extract-status + [error-code] + (-> error-code (str/split #":") second)) + +(mf/defc webhook-modal + {::mf/register modal/components + ::mf/register-as :webhook} [{:keys [webhook] :as props}] (let [initial (mf/use-memo (fn [] (or webhook {:is-active false :mtype "application/json"}))) form (fm/use-form :spec ::webhook-form :initial initial) - mtypes [{:label "application/json" :value "application/json"} - {:label "application/x-www-form-urlencoded" :value "application/x-www-form-urlencoded"} - {:label "application/transit+json" :value "application/transit+json"}] - on-success - (fn [message] - (st/emit! (dd/fetch-team-webhooks) - (msg/success message) - (modal/hide))) + (mf/use-fn + (fn [_] + (let [message (tr "dashboard.webhooks.create.success")] + (st/emit! (dd/fetch-team-webhooks) + (msg/success message) + (modal/hide))))) on-error - (fn [message {:keys [type code hint] :as error}] - (let [message (if (and (= type :validation) (= code :webhook-validation)) - (str message " " - (case hint - "ssl-validation" (tr "errors.webhooks.ssl-validation") - "")) ;; TODO Add more error codes when back defines them - message)] - (rx/of (msg/error message)))) + (mf/use-fn + (fn [form {:keys [type code hint] :as error}] + (if (and (= type :validation) + (= code :webhook-validation)) + (let [message (cond + (= hint "unknown") + (tr "errors.webhooks.unexpected") + (= hint "ssl-validation-error") + (tr "errors.webhooks.ssl-validation") + (= hint "timeout") + (tr "errors.webhooks.timeout") + (= hint "connection-error") + (tr "errors.webhooks.connection") + (str/starts-with? hint "unexpected-status") + (tr "errors.webhooks.unexpected-status" (extract-status hint)))] + (swap! form assoc-in [:errors :uri] {:message message})) + (rx/throw error)))) on-create-submit - (fn [] - (let [mdata {:on-success #(on-success (tr "dashboard.webhooks.create.success")) - :on-error (partial on-error (tr "dashboard.webhooks.create.error"))} - webhook {:uri (get-in @form [:clean-data :uri]) - :mtype (get-in @form [:clean-data :mtype]) - :is-active (get-in @form [:clean-data :is-active])}] - (st/emit! (dd/create-team-webhook (with-meta webhook mdata))))) + (mf/use-fn + (fn [form] + (let [cdata (:clean-data @form) + mdata {:on-success (partial on-success form) + :on-error (partial on-error form)} + params {:uri (:uri cdata) + :mtype (:mtype cdata) + :is-active (:is-active cdata)}] + (st/emit! (dd/create-team-webhook + (with-meta params mdata)))))) on-update-submit - (fn [] - (let [mdata {:on-success #(on-success (tr "dashboard.webhooks.update.success")) - :on-error (partial on-error (tr "dashboard.webhooks.update.error"))} - webhook (get @form :clean-data)] - (st/emit! (dd/update-team-webhook (with-meta webhook mdata))))) + (mf/use-fn + (fn [form] + (let [params (:clean-data @form) + mdata {:on-success (partial on-success form) + :on-error (partial on-error form)}] + (st/emit! (dd/update-team-webhook + (with-meta params mdata)))))) on-submit - #(let [data (:clean-data @form)] - (if (:id data) - (on-update-submit) - (on-create-submit)))] + (mf/use-fn + (fn [form] + (prn @form) + (let [data (:clean-data @form)] + (if (:id data) + (on-update-submit form) + (on-create-submit form)))))] [:div.modal-overlay [:div.modal-container.webhooks-modal @@ -659,7 +684,7 @@ :placeholder (tr "modals.create-webhook.url.placeholder")}]] [:div.fields-row - [:& fm/select {:options mtypes + [:& fm/select {:options valid-webhook-mtypes :label (tr "dashboard.webhooks.content-type") :default "application/json" :name :mtype}]]] @@ -704,79 +729,75 @@ {:on-click #(st/emit! (modal/show :webhook {}))} [:span (tr "dashboard.webhooks.create")]]]]) +(mf/defc webhook-actions + [{:keys [on-edit on-delete] :as props}] + (let [show? (mf/use-state false)] + [:* + [:span.icon {:on-click #(reset! show? true)} [i/actions]] + [:& dropdown {:show @show? + :on-close #(reset! show? false)} + [:ul.dropdown.actions-dropdown + [:li {:on-click on-edit} (tr "labels.edit")] + [:li {:on-click on-delete} (tr "labels.delete")]]]])) - (mf/defc webhook-actions - [{:keys [on-edit on-delete] :as props}] - (let [show? (mf/use-state false)] - [:* - [:span.icon {:on-click #(reset! show? true)} [i/actions]] - [:& dropdown {:show @show? - :on-close #(reset! show? false)} - [:ul.dropdown.actions-dropdown - [:li {:on-click on-edit} (tr "labels.edit")] - [:li {:on-click on-delete} (tr "labels.delete")]]]])) +(mf/defc last-delivery-icon + [{:keys [success? text] :as props}] + [:div.last-delivery-icon + [:div.tooltip + [:div.label text] + [:div.arrow-down]] + (if success? + [:span.icon.success i/msg-success] + [:span.icon.failure i/msg-warning])]) - (mf/defc last-delivery-icon - [{:keys [success? text] :as props}] - [:div.last-delivery-icon - [:div.tooltip - [:div.label text] - [:div.arrow-down]] - (if success? - [:span.icon.success i/msg-success] - [:span.icon.failure i/msg-warning])]) +(mf/defc webhook-item + {::mf/wrap [mf/memo]} + [{:keys [webhook] :as props}] + (let [on-edit #(st/emit! (modal/show :webhook {:webhook webhook})) + error-code (:error-code webhook) - (mf/defc webhook-item - {::mf/wrap [mf/memo]} - [{:keys [webhook] :as props}] - (let [on-edit #(st/emit! (modal/show :webhook {:webhook webhook})) - error-code (:error-code webhook) - extract-status - (fn [error-code] - (let [status (-> error-code - (str/split "-") - last - parse-long)] - (if (nil? status) - "" - status))) - delete-fn - (fn [] - (let [params {:id (:id webhook)} - mdata {:on-success #(st/emit! (dd/fetch-team-webhooks))}] - (st/emit! (dd/delete-team-webhook (with-meta params mdata))))) - on-delete #(st/emit! (modal/show - {:type :confirm - :title (tr "modals.delete-webhook.title") - :message (tr "modals.delete-webhook.message") - :accept-label (tr "modals.delete-webhook.accept") - :on-accept delete-fn})) - last-delivery-text (cond - (nil? error-code) - (tr "webhooks.last-delivery.success") + delete-fn + (fn [] + (let [params {:id (:id webhook)} + mdata {:on-success #(st/emit! (dd/fetch-team-webhooks))}] + (st/emit! (dd/delete-team-webhook (with-meta params mdata))))) - (= error-code "ssl-validation") - (str (tr "errors.webhooks.last-delivery") " " (tr "errors.webhooks.ssl-validation")) + on-delete + (fn [] + (st/emit! (modal/show + {:type :confirm + :title (tr "modals.delete-webhook.title") + :message (tr "modals.delete-webhook.message") + :accept-label (tr "modals.delete-webhook.accept") + :on-accept delete-fn}))) - (str/starts-with? error-code "unexpected-status") - (str (tr "errors.webhooks.last-delivery") - " " - (tr "errors.webhooks.unexpected-status" (extract-status error-code))) + last-delivery-text + (if (nil? error-code) + (tr "webhooks.last-delivery.success") + (str (tr "errors.webhooks.last-delivery") + (cond + (= error-code "ssl-validation-error") + (dm/str " " (tr "errors.webhooks.ssl-validation")) - :else - (tr "errors.webhooks.last-delivery"))] - [:div.table-row - [:div.table-field.last-delivery - [:div.icon-container - [:& last-delivery-icon {:success? (nil? error-code) :text last-delivery-text}]]] - [:div.table-field.uri - [:div (:uri webhook)]] - [:div.table-field.active - [:div (if (:is-active webhook) (tr "labels.active") (tr "labels.inactive"))]] - [:div.table-field.actions - [:& webhook-actions {:on-edit on-edit - :on-delete on-delete}]]])) + (str/starts-with? error-code "unexpected-status") + (dm/str " " (tr "errors.webhooks.unexpected-status" (extract-status error-code))))))] + [:div.table-row + [:div.table-field.last-delivery + [:div.icon-container + [:& last-delivery-icon + {:success? (nil? error-code) + :text last-delivery-text}]]] + [:div.table-field.uri + [:div (:uri webhook)]] + [:div.table-field.active + [:div (if (:is-active webhook) + (tr "labels.active") + (tr "labels.inactive"))]] + [:div.table-field.actions + [:& webhook-actions + {:on-edit on-edit + :on-delete on-delete}]]])) (mf/defc webhooks-list [{:keys [webhooks] :as props}] diff --git a/frontend/translations/en.po b/frontend/translations/en.po index c5085a092..de91460a8 100644 --- a/frontend/translations/en.po +++ b/frontend/translations/en.po @@ -690,6 +690,21 @@ msgstr "Is active" msgid "dashboard.webhooks.active.explain" msgstr "When this hook is triggered event details will be delivered" +msgid "dashboard.webhooks.update.success" +msgstr "Webhook updated successfully." + +msgid "dashboard.webhooks.create.success" +msgstr "Webhook created successfully." + +msgid "errors.webhooks.unexpected" +msgstr "Unexpected error on validating" + +msgid "errors.webhooks.timeout" +msgstr "Timeout" + +msgid "errors.webhooks.connection" +msgstr "Connection error, url not reacheable" + msgid "webhooks.last-delivery.success" msgstr "Last delivery was successfull." @@ -702,18 +717,6 @@ msgstr "Error on SSL validation." msgid "errors.webhooks.unexpected-status" msgstr "Unexpected status %s" -msgid "dashboard.webhooks.update.error" -msgstr "Error on updating webhook." - -msgid "dashboard.webhooks.update.success" -msgstr "Webhook updated successfully." - -msgid "dashboard.webhooks.create.error" -msgstr "Error on creating webhook." - -msgid "dashboard.webhooks.create.success" -msgstr "Webhook created successfully." - #: src/app/main/ui/alert.cljs msgid "ds.alert-ok" msgstr "Ok" diff --git a/frontend/translations/es.po b/frontend/translations/es.po index b66865f71..f06309bb0 100644 --- a/frontend/translations/es.po +++ b/frontend/translations/es.po @@ -737,6 +737,21 @@ msgstr "Cuando se active este webhook se enviarán detalles del evento" msgid "webhooks.last-delivery.success" msgstr "El último envío fue correcto." +msgid "dashboard.webhooks.update.success" +msgstr "Webhook modificado con éxito" + +msgid "dashboard.webhooks.create.success" +msgstr "Webhook creado con éxito" + +msgid "errors.webhooks.timeout" +msgstr "Timeout" + +msgid "errors.webhooks.unexpected" +msgstr "Error inesperado al validar" + +msgid "errors.webhooks.connection" +msgstr "Error de conexion, la url no es alcanzable" + msgid "errors.webhooks.last-delivery" msgstr "Hubo un problema en el último envío." @@ -746,18 +761,6 @@ msgstr "Error en la validación SSL." msgid "errors.webhooks.unexpected-status" msgstr "Estado inesperado %s" -msgid "dashboard.webhooks.update.error" -msgstr "Error modificando el webhook" - -msgid "dashboard.webhooks.update.success" -msgstr "Webhook modificado con éxito" - -msgid "dashboard.webhooks.create.error" -msgstr "Error creando con éxito" - -msgid "dashboard.webhooks.create.success" -msgstr "Webhook creado con éxito" - #: src/app/main/ui/alert.cljs msgid "ds.alert-ok" msgstr "Ok" From f2b60261f8b209ae5357a459b340cfc8606e36d2 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 12 Dec 2022 08:31:48 +0100 Subject: [PATCH 09/14] :tada: Add tests for webhooks rpc and logger --- backend/src/app/loggers/webhooks.clj | 12 +- backend/src/app/rpc/commands/webhooks.clj | 4 +- backend/test/backend_tests/helpers.clj | 17 +++ .../backend_tests/loggers_webhooks_test.clj | 120 ++++++++++++++++++ .../test/backend_tests/rpc_webhooks_test.clj | 41 +++++- 5 files changed, 183 insertions(+), 11 deletions(-) create mode 100644 backend/test/backend_tests/loggers_webhooks_test.clj diff --git a/backend/src/app/loggers/webhooks.clj b/backend/src/app/loggers/webhooks.clj index 28331540f..a5849c6da 100644 --- a/backend/src/app/loggers/webhooks.clj +++ b/backend/src/app/loggers/webhooks.clj @@ -22,11 +22,11 @@ ;; --- PROC -(defn lookup-webhooks-by-team +(defn- lookup-webhooks-by-team [pool team-id] (db/exec! pool ["select * from webhook where team_id=? and is_active=true" team-id])) -(defn lookup-webhooks-by-project +(defn- lookup-webhooks-by-project [pool project-id] (let [sql [(str "select * from webhook as w" " join project as p on (p.team_id = w.team_id)" @@ -34,7 +34,7 @@ project-id]] (db/exec! pool sql))) -(defn lookup-webhooks-by-file +(defn- lookup-webhooks-by-file [pool file-id] (let [sql [(str "select * from webhook as w" " join project as p on (p.team_id = w.team_id)" @@ -43,7 +43,7 @@ file-id]] (db/exec! pool sql))) -(defn lookup-webhooks +(defn- lookup-webhooks [{:keys [::db/pool]} {:keys [props] :as event}] (or (some->> (:team-id props) (lookup-webhooks-by-team pool)) (some->> (:project-id props) (lookup-webhooks-by-project pool)) @@ -77,7 +77,7 @@ (declare interpret-exception) (declare interpret-response) -(def ^:private mapper +(def ^:private json-mapper (json/mapper {:encode-key-fn str/camel :decode-key-fn (comp keyword str/kebab) @@ -123,7 +123,7 @@ whook (::config props) body (case (:mtype whook) - "application/json" (json/encode-str event mapper) + "application/json" (json/encode-str event json-mapper) "application/transit+json" (t/encode-str event) "application/x-www-form-urlencoded" (uri/map->query-string event))] diff --git a/backend/src/app/rpc/commands/webhooks.clj b/backend/src/app/rpc/commands/webhooks.clj index f1af96175..fdbc30851 100644 --- a/backend/src/app/rpc/commands/webhooks.clj +++ b/backend/src/app/rpc/commands/webhooks.clj @@ -99,8 +99,8 @@ {::doc/added "1.17"} [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id] :as params}] (check-edition-permissions! pool profile-id team-id) - (->> (validate-webhook! cfg nil params) - (p/fmap executor (fn [_] (validate-quotes! cfg params))) + (->> (validate-quotes! cfg params) + (p/fmap executor (fn [_] (validate-webhook! cfg nil params))) (p/fmap executor (fn [_] (insert-webhook! cfg params))))) (s/def ::update-webhook diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 1ab57e523..41cf3e1cf 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -284,6 +284,19 @@ :session-id session-id :profile-id profile-id}))))) +(defn create-webhook* + ([params] (create-webhook* *pool* params)) + ([pool {:keys [team-id id uri mtype is-active] + :or {is-active true + mtype "application/json" + uri "http://example.com/webhook"}}] + (db/insert! pool :webhook + {:id (or id (uuid/next)) + :team-id team-id + :uri uri + :is-active is-active + :mtype mtype}))) + ;; --- RPC HELPERS (defn handle-error @@ -417,6 +430,10 @@ [& params] (apply db/query *pool* params)) +(defn db-get + [& params] + (apply db/get* *pool* params)) + (defn sleep [ms-or-duration] (Thread/sleep (inst-ms (dt/duration ms-or-duration)))) diff --git a/backend/test/backend_tests/loggers_webhooks_test.clj b/backend/test/backend_tests/loggers_webhooks_test.clj new file mode 100644 index 000000000..8af5bd780 --- /dev/null +++ b/backend/test/backend_tests/loggers_webhooks_test.clj @@ -0,0 +1,120 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns backend-tests.loggers-webhooks-test + (:require + [app.common.uuid :as uuid] + [app.db :as db] + [app.http :as http] + [app.storage :as sto] + [backend-tests.helpers :as th] + [clojure.test :as t] + [mockery.core :refer [with-mocks]])) + +(t/use-fixtures :once th/state-init) +(t/use-fixtures :each th/database-reset) + +(t/deftest process-event-handler-with-no-webhooks + (with-mocks [submit-mock {:target 'app.worker/submit! :return nil}] + (let [prof (th/create-profile* 1 {:is-active true}) + res (th/run-task! :process-webhook-event + {:props + {:app.loggers.webhooks/event + {:type "mutation" + :name "create-project" + :props {:team-id (:default-team-id prof)}}}})] + + (t/is (= 0 (:call-count @submit-mock))) + (t/is (nil? res))))) + +(t/deftest process-event-handler + (with-mocks [submit-mock {:target 'app.worker/submit! :return nil}] + (let [prof (th/create-profile* 1 {:is-active true}) + whk (th/create-webhook* {:team-id (:default-team-id prof)}) + res (th/run-task! :process-webhook-event + {:props + {:app.loggers.webhooks/event + {:type "mutation" + :name "create-project" + :props {:team-id (:default-team-id prof)}}}})] + + (t/is (= 1 (:call-count @submit-mock))) + (t/is (nil? res))))) + +(t/deftest run-webhook-handler-1 + (with-mocks [http-mock {:target 'app.http.client/req! :return {:status 200}}] + (let [prof (th/create-profile* 1 {:is-active true}) + whk (th/create-webhook* {:team-id (:default-team-id prof)}) + evt {:type "mutation" + :name "create-project" + :props {:team-id (:default-team-id prof)}} + res (th/run-task! :run-webhook + {:props + {:app.loggers.webhooks/event evt + :app.loggers.webhooks/config whk}})] + + (t/is (= 1 (:call-count @http-mock))) + + (let [rows (th/db-exec! ["select * from webhook_delivery where webhook_id=?" + (:id whk)])] + (t/is (= 1 (count rows))) + (t/is (nil? (-> rows first :error-code)))) + + ;; Refresh webhook + (let [whk' (th/db-get :webhook {:id (:id whk)})] + (t/is (nil? (:error-code whk'))) + (prn whk')) + + ))) + +(t/deftest run-webhook-handler-2 + (with-mocks [http-mock {:target 'app.http.client/req! :return {:status 400}}] + (let [prof (th/create-profile* 1 {:is-active true}) + whk (th/create-webhook* {:team-id (:default-team-id prof)}) + evt {:type "mutation" + :name "create-project" + :props {:team-id (:default-team-id prof)}} + res (th/run-task! :run-webhook + {:props + {:app.loggers.webhooks/event evt + :app.loggers.webhooks/config whk}})] + + (t/is (= 1 (:call-count @http-mock))) + + (let [rows (th/db-query :webhook-delivery {:webhook-id (:id whk)})] + (t/is (= 1 (count rows))) + (t/is (= "unexpected-status:400" (-> rows first :error-code)))) + + ;; Refresh webhook + (let [whk' (th/db-get :webhook {:id (:id whk)})] + (t/is (= "unexpected-status:400" (:error-code whk'))) + (t/is (= 1 (:error-count whk')))) + + + ;; RUN 2 times more + + (th/run-task! :run-webhook + {:props + {:app.loggers.webhooks/event evt + :app.loggers.webhooks/config whk}}) + + (th/run-task! :run-webhook + {:props + {:app.loggers.webhooks/event evt + :app.loggers.webhooks/config whk}}) + + + (let [rows (th/db-query :webhook-delivery {:webhook-id (:id whk)})] + (t/is (= 3 (count rows))) + (t/is (= "unexpected-status:400" (-> rows first :error-code)))) + + ;; Refresh webhook + (let [whk' (th/db-get :webhook {:id (:id whk)})] + (t/is (= "unexpected-status:400" (:error-code whk'))) + (t/is (= 3 (:error-count whk'))) + (t/is (false? (:is-active whk')))) + + ))) diff --git a/backend/test/backend_tests/rpc_webhooks_test.clj b/backend/test/backend_tests/rpc_webhooks_test.clj index 37c1c83a0..6d2f97a17 100644 --- a/backend/test/backend_tests/rpc_webhooks_test.clj +++ b/backend/test/backend_tests/rpc_webhooks_test.clj @@ -12,8 +12,6 @@ [app.storage :as sto] [backend-tests.helpers :as th] [clojure.test :as t] - [datoteka.fs :as fs] - [datoteka.io :as io] [mockery.core :refer [with-mocks]])) (t/use-fixtures :once th/state-init) @@ -52,7 +50,6 @@ (t/is (= (:mtype params) (:mtype result))) (vreset! whook result)))) - (th/reset-mock! http-mock) (t/testing "update webhook 1 (success)" @@ -144,3 +141,41 @@ (t/is (= (:code error-data) :object-not-found))))) ))) + +(t/deftest webhooks-quotes + (with-mocks [http-mock {:target 'app.http.client/req! + :return {:status 200}}] + + (let [prof (th/create-profile* 1 {:is-active true}) + team-id (:default-team-id prof) + params {::th/type :create-webhook + :profile-id (:id prof) + :team-id team-id + :uri "http://example.com" + :mtype "application/json"} + out1 (th/command! params) + out2 (th/command! params) + out3 (th/command! params) + out4 (th/command! params) + out5 (th/command! params) + out6 (th/command! params) + out7 (th/command! params) + out8 (th/command! params) + out9 (th/command! params)] + + (t/is (= 8 (:call-count @http-mock))) + + (t/is (nil? (:error out1))) + (t/is (nil? (:error out2))) + (t/is (nil? (:error out3))) + (t/is (nil? (:error out4))) + (t/is (nil? (:error out5))) + (t/is (nil? (:error out6))) + (t/is (nil? (:error out7))) + (t/is (nil? (:error out8))) + + (let [error (:error out9) + error-data (ex-data error)] + (t/is (th/ex-info? error)) + (t/is (= (:type error-data) :restriction)) + (t/is (= (:code error-data) :webhooks-quote-reached)))))) From 240e480b2edca0b141db678f5c53fb777bc1fde3 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 12 Dec 2022 11:10:22 +0100 Subject: [PATCH 10/14] :tada: Allow application/json on Accept header --- backend/src/app/http/middleware.clj | 49 +++++++++++++++++++++++++++-- frontend/src/app/main/repo.cljs | 3 ++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index 5f687be12..23e229652 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -32,6 +32,12 @@ {:name ::params :compile (constantly ymw/wrap-params)}) +(def ^:private json-mapper + (json/mapper + {:encode-key-fn str/camel + :decode-key-fn (comp keyword str/kebab) + :pretty true})) + (defn wrap-parse-request [handler] (letfn [(process-request [request] @@ -46,7 +52,7 @@ (str/starts-with? header "application/json") (with-open [is (yrq/body request)] - (let [params (json/decode is)] + (let [params (json/decode is json-mapper)] (-> request (assoc :body-params params) (update :params merge params)))) @@ -117,7 +123,32 @@ (finally (.close ^OutputStream output-stream)))))) - (format-response [response request] + (json-streamable-body [data] + (reify yrs/StreamableResponseBody + (-write-body-to-stream [_ _ output-stream] + (try + + (with-open [bos (buffered-output-stream output-stream buffer-size)] + (json/write! bos data json-mapper)) + + (catch java.io.IOException _cause + ;; Do nothing, EOF means client closes connection abruptly + nil) + (catch Throwable cause + (l/warn :hint "unexpected error on encoding response" + :cause cause)) + (finally + (.close ^OutputStream output-stream)))))) + + (format-response-with-json [response _] + (let [body (yrs/body response)] + (if (or (boolean? body) (coll? body)) + (-> response + (update :headers assoc "content-type" "application/json") + (assoc :body (json-streamable-body body))) + response))) + + (format-response-with-transit [response request] (let [body (yrs/body response)] (if (or (boolean? body) (coll? body)) (let [qs (yrq/query request) @@ -130,6 +161,20 @@ (assoc :body (transit-streamable-body body opts)))) response))) + (format-response [response request] + (let [accept (yrq/get-header request "accept")] + (cond + (or (= accept "application/transit+json") + (str/includes? accept "application/transit+json")) + (format-response-with-transit response request) + + (or (= accept "application/json") + (str/includes? accept "application/json")) + (format-response-with-json response request) + + :else + (format-response-with-transit response request)))) + (process-response [response request] (cond-> response (map? response) (format-response request)))] diff --git a/frontend/src/app/main/repo.cljs b/frontend/src/app/main/repo.cljs index 67993a419..aea3ed169 100644 --- a/frontend/src/app/main/repo.cljs +++ b/frontend/src/app/main/repo.cljs @@ -60,6 +60,7 @@ http/conditional-decode-transit)] (->> (http/send! {:method :get :uri (u/join @cf/public-uri "api/rpc/query/" (name id)) + :headers {"accept" "application/transit+json"} :credentials "include" :query params}) (rx/map decode-transit) @@ -71,6 +72,7 @@ [id params] (->> (http/send! {:method :post :uri (u/join @cf/public-uri "api/rpc/mutation/" (name id)) + :headers {"accept" "application/transit+json"} :credentials "include" :body (http/transit-data params)}) (rx/map http/conditional-decode-transit) @@ -88,6 +90,7 @@ (->> (http/send! {:method method :uri (u/join @cf/public-uri "api/rpc/command/" (name id)) :credentials "include" + :headers {"accept" "application/transit+json"} :body (when (= method :post) (if form-data? (http/form-data params) From ae79ee435e62559213d9823e653d1084861208ad Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 12 Dec 2022 13:26:08 +0100 Subject: [PATCH 11/14] :tada: Add many rpc calls to webhooks registry --- backend/src/app/rpc/commands/files.clj | 18 ++++++++------ backend/src/app/rpc/commands/files/create.clj | 6 +++-- backend/src/app/rpc/mutations/fonts.clj | 24 ++++++++++--------- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index 9b3a6bbc9..27df59f4e 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -17,6 +17,7 @@ [app.common.types.shape-tree :as ctt] [app.db :as db] [app.db.sql :as sql] + [app.loggers.webhooks :as-alias webhooks] [app.rpc.commands.files.thumbnails :as-alias thumbs] [app.rpc.cond :as-alias cond] [app.rpc.doc :as-alias doc] @@ -762,7 +763,8 @@ (s/keys :req-un [::profile-id ::name ::id])) (sv/defmethod ::rename-file - {::doc/added "1.17"} + {::doc/added "1.17" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (check-edition-permissions! conn profile-id id) @@ -806,7 +808,8 @@ (s/keys :req-un [::profile-id ::id ::is-shared])) (sv/defmethod ::set-file-shared - {::doc/added "1.17"} + {::doc/added "1.17" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [id profile-id is-shared] :as params}] (db/with-atomic [conn pool] (check-edition-permissions! conn profile-id id) @@ -829,14 +832,14 @@ (s/keys :req-un [::id ::profile-id])) (sv/defmethod ::delete-file - {::doc/added "1.17"} + {::doc/added "1.17" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (check-edition-permissions! conn profile-id id) (absorb-library conn params) (mark-file-deleted conn params))) - ;; --- MUTATION COMMAND: link-file-to-library (def sql:link-file-to-library @@ -852,7 +855,8 @@ (s/keys :req-un [::profile-id ::file-id ::library-id])) (sv/defmethod ::link-file-to-library - {::doc/added "1.17"} + {::doc/added "1.17" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [profile-id file-id library-id] :as params}] (when (= file-id library-id) (ex/raise :type :validation @@ -863,7 +867,6 @@ (check-edition-permissions! conn profile-id library-id) (link-file-to-library conn params))) - ;; --- MUTATION COMMAND: unlink-file-from-library (defn unlink-file-from-library @@ -876,7 +879,8 @@ (s/keys :req-un [::profile-id ::file-id ::library-id])) (sv/defmethod ::unlink-file-from-library - {::doc/added "1.17"} + {::doc/added "1.17" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] (db/with-atomic [conn pool] (check-edition-permissions! conn profile-id file-id) diff --git a/backend/src/app/rpc/commands/files/create.clj b/backend/src/app/rpc/commands/files/create.clj index eb5ebe675..d0283abca 100644 --- a/backend/src/app/rpc/commands/files/create.clj +++ b/backend/src/app/rpc/commands/files/create.clj @@ -11,7 +11,8 @@ [app.common.types.file :as ctf] [app.common.uuid :as uuid] [app.db :as db] - [app.loggers.audit :as audit] + [app.loggers.audit :as-alias audit] + [app.loggers.webhooks :as-alias webhooks] [app.rpc.commands.files :as files] [app.rpc.doc :as-alias doc] [app.rpc.permissions :as perms] @@ -75,7 +76,8 @@ ::files/features])) (sv/defmethod ::create-file - {::doc/added "1.17"} + {::doc/added "1.17" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [profile-id project-id] :as params}] (db/with-atomic [conn pool] (proj/check-edition-permissions! conn profile-id project-id) diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj index 674254df3..b92a3fc86 100644 --- a/backend/src/app/rpc/mutations/fonts.clj +++ b/backend/src/app/rpc/mutations/fonts.clj @@ -12,6 +12,7 @@ [app.common.uuid :as uuid] [app.db :as db] [app.loggers.audit :as-alias audit] + [app.loggers.webhooks :as-alias webhooks] [app.media :as media] [app.rpc.climit :as-alias climit] [app.rpc.doc :as-alias doc] @@ -43,6 +44,8 @@ ::font-id ::font-family ::font-weight ::font-style])) (sv/defmethod ::create-font-variant + {::doc/added "1.3" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}] (let [cfg (update cfg :storage media/configure-assets-storage)] (teams/check-edition-permissions! pool profile-id team-id) @@ -119,19 +122,16 @@ (s/def ::update-font (s/keys :req-un [::profile-id ::team-id ::id ::name])) -(def sql:update-font - "update team_font_variant - set font_family = ? - where team_id = ? - and font_id = ?") - (sv/defmethod ::update-font - {::climit/queue :process-font} + {::doc/added "1.3" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [team-id profile-id id name] :as params}] (db/with-atomic [conn pool] (teams/check-edition-permissions! conn profile-id team-id) - (db/exec-one! conn [sql:update-font name team-id id]) - nil)) + (db/update! conn :team-font-variant + {:font-family name} + {:font-id id + :team-id team-id}))) ;; --- DELETE FONT @@ -139,10 +139,11 @@ (s/keys :req-un [::profile-id ::team-id ::id])) (sv/defmethod ::delete-font + {::doc/added "1.3" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [id team-id profile-id] :as params}] (db/with-atomic [conn pool] (teams/check-edition-permissions! conn profile-id team-id) - (db/update! conn :team-font-variant {:deleted-at (dt/now)} {:font-id id :team-id team-id}) @@ -154,7 +155,8 @@ (s/keys :req-un [::profile-id ::team-id ::id])) (sv/defmethod ::delete-font-variant - {::doc/added "1.3"} + {::doc/added "1.3" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [id team-id profile-id] :as params}] (db/with-atomic [conn pool] (teams/check-edition-permissions! conn profile-id team-id) From d7459db292ea4e3d3fe829a22beed350035abef7 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 13 Dec 2022 23:13:11 +0100 Subject: [PATCH 12/14] :tada: Add task deduplication by label --- backend/src/app/migrations.clj | 3 ++ .../migrations/sql/0087-mod-task-table.sql | 9 ++++ backend/src/app/worker.clj | 52 +++++++++++++------ 3 files changed, 47 insertions(+), 17 deletions(-) create mode 100644 backend/src/app/migrations/sql/0087-mod-task-table.sql diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index dda506b11..f659d93ee 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -268,6 +268,9 @@ {:name "0086-add-webhook-delivery-table" :fn (mg/resource "app/migrations/sql/0086-add-webhook-delivery-table.sql")} + + {:name "0087-mod-task-table" + :fn (mg/resource "app/migrations/sql/0087-mod-task-table.sql")} ]) diff --git a/backend/src/app/migrations/sql/0087-mod-task-table.sql b/backend/src/app/migrations/sql/0087-mod-task-table.sql new file mode 100644 index 000000000..75379eca6 --- /dev/null +++ b/backend/src/app/migrations/sql/0087-mod-task-table.sql @@ -0,0 +1,9 @@ +ALTER TABLE task + ADD COLUMN label text NULL; + +ALTER TABLE task + ALTER COLUMN label SET STORAGE external; + +CREATE INDEX task__label__idx + ON task (label, name, queue) + WHERE status = 'new'; diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index b4305a3b9..9adaa7ee1 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -97,7 +97,7 @@ (l/info :hint "registry initialized" :tasks (count tasks)) (reduce-kv (fn [registry k v] (let [tname (name k)] - (l/debug :hint "register task" :name tname) + (l/trace :hint "register task" :name tname) (assoc registry tname (wrap-task-handler metrics tname v)))) {} tasks)) @@ -214,10 +214,10 @@ (db/create-array conn "uuid" ids)]] (db/exec-one! conn sql) - (l/debug :hist "dispatcher: push tasks to redis" + (l/debug :hist "dispatcher: queue tasks" :queue queue :tasks (count ids) - :queued res))) + :total-queued res))) (run-batch! [rconn] (db/with-atomic [conn pool] @@ -445,7 +445,7 @@ :else (try - (l/trace :hint "worker: executing task" + (l/debug :hint "worker: executing task" :worker-id worker-id :task-id (:id task) :task-name (:name task) @@ -649,39 +649,57 @@ options)))) (def ^:private sql:insert-new-task - "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) - values (?, ?, ?, ?, ?, ?, now() + ?) + "insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at) + values (?, ?, ?, ?, ?, ?, ?, now() + ?) returning id") +(def ^:private + sql:remove-not-started-tasks + "delete from task + where name=? and queue=? and label=? and status = 'new' and scheduled_at > now()") + +(s/def ::label string?) (s/def ::task (s/or :kw keyword? :str string?)) (s/def ::queue (s/or :kw keyword? :str string?)) -(s/def ::delay (s/or :int ::us/integer :duration dt/duration?)) +(s/def ::delay (s/or :int integer? :duration dt/duration?)) (s/def ::conn (s/or :pool ::db/pool :connection some?)) -(s/def ::priority ::us/integer) -(s/def ::max-retries ::us/integer) +(s/def ::priority integer?) +(s/def ::max-retries integer?) +(s/def ::dedupe boolean?) (s/def ::submit-options - (s/keys :req [::task ::conn] - :opt [::delay ::queue ::priority ::max-retries])) + (s/and + (s/keys :req [::task ::conn] + :opt [::label ::delay ::queue ::priority ::max-retries ::dedupe]) + (fn [{:keys [::dedupe ::label] :or {label ""}}] + (if dedupe + (not= label "") + true)))) (defn submit! - [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn] - :or {delay 0 queue :default priority 100 max-retries 3} + [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label] + :or {delay 0 queue :default priority 100 max-retries 3 label ""} :as options}] - (us/verify ::submit-options options) + (us/verify! ::submit-options options) (let [duration (dt/duration delay) interval (db/interval duration) props (-> options extract-props db/tjson) id (uuid/next) tenant (cf/get :tenant) task (d/name task) - queue (str/ffmt "%:%" tenant (d/name queue))] + queue (str/ffmt "%:%" tenant (d/name queue)) + deleted (when dedupe + (-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label]) + :next.jdbc/update-count))] (l/debug :hint "submit task" :name task :queue queue + :label label + :dedupe (boolean dedupe) + :deleted (or deleted 0) :in (dt/format-duration duration)) - (db/exec-one! conn [sql:insert-new-task id task props - queue priority max-retries interval]) + (db/exec-one! conn [sql:insert-new-task id task props queue + label priority max-retries interval]) id)) From 782f2ed57d1000b1de434d94b557865f2267d65d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 13 Dec 2022 23:13:48 +0100 Subject: [PATCH 13/14] :tada: Enable comments events on webhooks --- backend/src/app/rpc/commands/comments.clj | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/backend/src/app/rpc/commands/comments.clj b/backend/src/app/rpc/commands/comments.clj index 0214e3111..f2aad072d 100644 --- a/backend/src/app/rpc/commands/comments.clj +++ b/backend/src/app/rpc/commands/comments.clj @@ -10,6 +10,7 @@ [app.common.geom.point :as gpt] [app.common.spec :as us] [app.db :as db] + [app.loggers.webhooks :as-alias webhooks] [app.rpc.commands.files :as files] [app.rpc.doc :as-alias doc] [app.rpc.queries.teams :as teams] @@ -43,6 +44,7 @@ #(or (:file-id %) (:team-id %)))) (sv/defmethod ::get-comment-threads + {::doc/added "1.15"} [{:keys [pool] :as cfg} params] (with-open [conn (db/open pool)] (retrieve-comment-threads conn params))) @@ -245,7 +247,8 @@ (sv/defmethod ::create-comment-thread {::retry/max-retries 3 ::retry/matches retry/conflict-db-insert? - ::doc/added "1.15"} + ::doc/added "1.15" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [profile-id file-id share-id] :as params}] (db/with-atomic [conn pool] (files/check-comment-permissions! conn profile-id file-id share-id) @@ -364,7 +367,8 @@ :opt-un [::share-id])) (sv/defmethod ::create-comment - {::doc/added "1.15"} + {::doc/added "1.15" + ::webhooks/event? true} [{:keys [pool] :as cfg} params] (db/with-atomic [conn pool] (create-comment conn params))) @@ -483,7 +487,8 @@ (s/keys :req-un [::profile-id ::id])) (sv/defmethod ::delete-comment - {::doc/added "1.15"} + {::doc/added "1.15" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [profile-id id] :as params}] (db/with-atomic [conn pool] (let [comment (db/get-by-id conn :comment id {:for-update true})] @@ -529,4 +534,3 @@ :frame-id frame-id} {:id (:id thread)}) nil))) - From d56082307bb0756976cc2dd871195858f9d55416 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 13 Dec 2022 23:14:55 +0100 Subject: [PATCH 14/14] :tada: Add update-file (batched) to webhooks --- backend/src/app/loggers/audit.clj | 32 ++++++++++++++----- backend/src/app/loggers/webhooks.clj | 5 ++- backend/src/app/rpc.clj | 15 +++++++-- backend/src/app/rpc/commands/files/update.clj | 17 ++++++++-- 4 files changed, 54 insertions(+), 15 deletions(-) diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index d004fe5d4..55c3339fd 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -104,11 +104,18 @@ (s/def ::type ::us/string) (s/def ::props (s/map-of ::us/keyword any?)) (s/def ::ip-addr ::us/string) + (s/def ::webhooks/event? ::us/boolean) +(s/def ::webhooks/batch-timeout ::dt/duration) +(s/def ::webhooks/batch-key + (s/or :fn fn? :str string? :kw keyword?)) (s/def ::event (s/keys :req-un [::type ::name ::profile-id] - :opt-un [::ip-addr ::props ::webhooks/event?])) + :opt-un [::ip-addr ::props] + :opt [::webhooks/event? + ::webhooks/batch-timeout + ::webhooks/batch-key])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; COLLECTOR @@ -153,13 +160,22 @@ (when (and (contains? cf/flags :webhooks) (::webhooks/event? event)) - (wrk/submit! ::wrk/conn pool - ::wrk/task :process-webhook-event - ::wrk/queue :webhooks - ::wrk/max-retries 0 - ::webhooks/event (-> params - (dissoc :ip-addr) - (dissoc :type)))))) + (let [batch-key (::webhooks/batch-key event) + batch-timeout (::webhooks/batch-timeout event)] + (wrk/submit! ::wrk/conn pool + ::wrk/task :process-webhook-event + ::wrk/queue :webhooks + ::wrk/max-retries 0 + ::wrk/delay (or batch-timeout 0) + ::wrk/label (cond + (fn? batch-key) (batch-key (:props event)) + (keyword? batch-key) (name batch-key) + (string? batch-key) batch-key + :else "default") + ::wrk/dedupe true + ::webhooks/event (-> params + (dissoc :ip-addr) + (dissoc :type))))))) (defn submit! "Submit audit event to the collector." diff --git a/backend/src/app/loggers/webhooks.clj b/backend/src/app/loggers/webhooks.clj index a5849c6da..b05b81558 100644 --- a/backend/src/app/loggers/webhooks.clj +++ b/backend/src/app/loggers/webhooks.clj @@ -11,6 +11,7 @@ [app.common.logging :as l] [app.common.transit :as t] [app.common.uri :as uri] + [app.config :as cf] [app.db :as db] [app.http.client :as http] [app.util.json :as json] @@ -56,6 +57,7 @@ [_ {:keys [::db/pool] :as cfg}] (fn [{:keys [props] :as task}] (let [event (::event props)] + (l/debug :hint "process webhook event" :name (:name event)) @@ -134,7 +136,8 @@ :webhook-mtype (:mtype whook)) (let [req {:uri (:uri whook) - :headers {"content-type" (:mtype whook)} + :headers {"content-type" (:mtype whook) + "user-agent" (str/ffmt "penpot/%" (:main cf/version))} :timeout (dt/duration "4s") :method :post :body body}] diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index df4fd4ea9..849f8370c 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -171,9 +171,18 @@ :profile-id profile-id :ip-addr (some-> request audit/parse-client-ip) :props props - ::webhooks/event? (or (::webhooks/event? mdata) - (::webhooks/event? resultm) - false)}] + ::webhooks/batch-key + (or (::webhooks/batch-key mdata) + (::webhooks/batch-key resultm)) + + ::webhooks/batch-timeout + (or (::webhooks/batch-timeout mdata) + (::webhooks/batch-timeout resultm)) + + ::webhooks/event? + (or (::webhooks/event? mdata) + (::webhooks/event? resultm) + false)}] (audit/submit! collector event))) diff --git a/backend/src/app/rpc/commands/files/update.clj b/backend/src/app/rpc/commands/files/update.clj index a590d63e8..520df2897 100644 --- a/backend/src/app/rpc/commands/files/update.clj +++ b/backend/src/app/rpc/commands/files/update.clj @@ -17,6 +17,7 @@ [app.config :as cf] [app.db :as db] [app.loggers.audit :as audit] + [app.loggers.webhooks :as-alias webhooks] [app.metrics :as mtx] [app.msgbus :as mbus] [app.rpc.climit :as-alias climit] @@ -122,12 +123,18 @@ ;; set is different than the persisted one, update it on the ;; database. +(defn webhook-batch-keyfn + [props] + (str "rpc:update-file:" (:id props))) + (sv/defmethod ::update-file {::climit/queue :update-file ::climit/key-fn :id + ::webhooks/event? true + ::webhooks/batch-timeout (dt/duration "2s") + ::webhooks/batch-key webhook-batch-keyfn ::doc/added "1.17"} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] - (db/with-atomic [conn pool] (files/check-edition-permissions! conn profile-id id) (db/xact-lock! conn id) @@ -173,8 +180,12 @@ {:id id}))) (-> (update-fn cfg params) - (vary-meta assoc ::audit/props {:project-id (:project-id file) - :team-id (:team-id file)})))))) + (vary-meta assoc ::audit/replace-props + {:id (:id file) + :name (:name file) + :features (:features file) + :project-id (:project-id file) + :team-id (:team-id file)})))))) (defn- update-file* [{:keys [conn] :as cfg} {:keys [file changes session-id profile-id] :as params}]