diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index c8ef0d796..d004fe5d4 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -17,6 +17,7 @@ [app.db :as db] [app.http.client :as http] [app.loggers.audit.tasks :as-alias tasks] + [app.loggers.webhooks :as-alias webhooks] [app.main :as-alias main] [app.metrics :as mtx] [app.tokens :as tokens] @@ -103,10 +104,11 @@ (s/def ::type ::us/string) (s/def ::props (s/map-of ::us/keyword any?)) (s/def ::ip-addr ::us/string) +(s/def ::webhooks/event? ::us/boolean) (s/def ::event (s/keys :req-un [::type ::name ::profile-id] - :opt-un [::ip-addr ::props])) + :opt-un [::ip-addr ::props ::webhooks/event?])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; COLLECTOR @@ -117,8 +119,7 @@ ;; an external storage and data cleared. (s/def ::collector - (s/nilable - (s/keys :req [::wrk/executor ::db/pool]))) + (s/keys :req [::wrk/executor ::db/pool])) (defmethod ig/pre-init-spec ::collector [_] (s/keys :req [::db/pool ::wrk/executor ::mtx/metrics])) @@ -126,11 +127,8 @@ (defmethod ig/init-key ::collector [_ {:keys [::db/pool] :as cfg}] (cond - (not (contains? cf/flags :audit-log)) - (l/info :hint "audit: log collection disabled") - (db/read-only? pool) - (l/warn :hint "audit: log collection disabled (db is read-only)") + (l/warn :hint "audit: disabled (db is read-only)") :else cfg)) @@ -138,19 +136,35 @@ (defn- persist-event! [pool event] (us/verify! ::event event) - (db/insert! pool :audit-log - {:id (uuid/next) - :name (:name event) - :type (:type event) - :profile-id (:profile-id event) - :tracked-at (dt/now) - :ip-addr (some-> (:ip-addr event) db/inet) - :props (db/tjson (:props event)) - :source "backend"})) + (let [params {:id (uuid/next) + :name (:name event) + :type (:type event) + :profile-id (:profile-id event) + :tracked-at (dt/now) + :ip-addr (:ip-addr event) + :props (:props event)}] + + (when (contains? cf/flags :audit-log) + (db/insert! pool :audit-log + (-> params + (update :props db/tjson) + (update :ip-addr db/inet) + (assoc :source "backend")))) + + (when (and (contains? cf/flags :webhooks) + (::webhooks/event? event)) + (wrk/submit! ::wrk/conn pool + ::wrk/task :process-webhook-event + ::wrk/queue :webhooks + ::wrk/max-retries 0 + ::webhooks/event (-> params + (dissoc :ip-addr) + (dissoc :type)))))) (defn submit! "Submit audit event to the collector." - [{:keys [::wrk/executor ::db/pool]} params] + [{:keys [::wrk/executor ::db/pool] :as collector} params] + (us/assert! ::collector collector) (->> (px/submit! executor (partial persist-event! pool (d/without-nils params))) (p/merr (fn [cause] (l/error :hint "audit: unexpected error processing event" :cause cause) diff --git a/backend/src/app/loggers/webhooks.clj b/backend/src/app/loggers/webhooks.clj new file mode 100644 index 000000000..28331540f --- /dev/null +++ b/backend/src/app/loggers/webhooks.clj @@ -0,0 +1,171 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.loggers.webhooks + "A mattermost integration for error reporting." + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.common.transit :as t] + [app.common.uri :as uri] + [app.db :as db] + [app.http.client :as http] + [app.util.json :as json] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [integrant.core :as ig])) + +;; --- PROC + +(defn lookup-webhooks-by-team + [pool team-id] + (db/exec! pool ["select * from webhook where team_id=? and is_active=true" team-id])) + +(defn lookup-webhooks-by-project + [pool project-id] + (let [sql [(str "select * from webhook as w" + " join project as p on (p.team_id = w.team_id)" + " where p.id = ? and w.is_active = true") + project-id]] + (db/exec! pool sql))) + +(defn lookup-webhooks-by-file + [pool file-id] + (let [sql [(str "select * from webhook as w" + " join project as p on (p.team_id = w.team_id)" + " join file as f on (f.project_id = p.id)" + " where f.id = ? and w.is_active = true") + file-id]] + (db/exec! pool sql))) + +(defn lookup-webhooks + [{:keys [::db/pool]} {:keys [props] :as event}] + (or (some->> (:team-id props) (lookup-webhooks-by-team pool)) + (some->> (:project-id props) (lookup-webhooks-by-project pool)) + (some->> (:file-id props) (lookup-webhooks-by-file pool)))) + +(defmethod ig/pre-init-spec ::process-event-handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/init-key ::process-event-handler + [_ {:keys [::db/pool] :as cfg}] + (fn [{:keys [props] :as task}] + (let [event (::event props)] + (l/debug :hint "process webhook event" + :name (:name event)) + + (when-let [items (lookup-webhooks cfg event)] + ;; (app.common.pprint/pprint items) + (l/trace :hint "webhooks found for event" :total (count items)) + + (db/with-atomic [conn pool] + (doseq [item items] + (wrk/submit! ::wrk/conn conn + ::wrk/task :run-webhook + ::wrk/queue :webhooks + ::wrk/max-retries 3 + ::event event + ::config item))))))) + +;; --- RUN + +(declare interpret-exception) +(declare interpret-response) + +(def ^:private mapper + (json/mapper + {:encode-key-fn str/camel + :decode-key-fn (comp keyword str/kebab) + :pretty true})) + +(defmethod ig/pre-init-spec ::run-webhook-handler [_] + (s/keys :req [::http/client ::db/pool])) + +(defmethod ig/prep-key ::run-webhook-handler + [_ cfg] + (merge {::max-errors 3} (d/without-nils cfg))) + +(defmethod ig/init-key ::run-webhook-handler + [_ {:keys [::db/pool ::max-errors] :as cfg}] + (letfn [(update-webhook! [whook err] + (if err + (let [sql [(str "update webhook " + " set error_code=?, " + " error_count=error_count+1 " + " where id=?") + err + (:id whook)] + res (db/exec-one! pool sql {:return-keys true})] + (when (>= (:error-count res) max-errors) + (db/update! pool :webhook {:is-active false} {:id (:id whook)}))) + + (db/update! pool :webhook + {:updated-at (dt/now) + :error-code nil + :error-count 0} + {:id (:id whook)}))) + + (report-delivery! [whook req rsp err] + (db/insert! pool :webhook-delivery + {:webhook-id (:id whook) + :created-at (dt/now) + :error-code err + :req-data (db/tjson req) + :rsp-data (db/tjson rsp)}))] + + (fn [{:keys [props] :as task}] + (let [event (::event props) + whook (::config props) + + body (case (:mtype whook) + "application/json" (json/encode-str event mapper) + "application/transit+json" (t/encode-str event) + "application/x-www-form-urlencoded" (uri/map->query-string event))] + + (l/debug :hint "run webhook" + :event-name (:name event) + :webhook-id (:id whook) + :webhook-uri (:uri whook) + :webhook-mtype (:mtype whook)) + + (let [req {:uri (:uri whook) + :headers {"content-type" (:mtype whook)} + :timeout (dt/duration "4s") + :method :post + :body body}] + (try + (let [rsp (http/req! cfg req {:response-type :input-stream :sync? true}) + err (interpret-response rsp)] + (report-delivery! whook req rsp err) + (update-webhook! whook err)) + (catch Throwable cause + (let [err (interpret-exception cause)] + (report-delivery! whook req nil err) + (update-webhook! whook err) + (when (= err "unknown") + (l/error :hint "unknown error on webhook request" + :cause cause)))))))))) + +(defn interpret-response + [{:keys [status] :as response}] + (when-not (or (= 200 status) + (= 204 status)) + (str/ffmt "unexpected-status:%" status))) + +(defn interpret-exception + [cause] + (cond + (instance? javax.net.ssl.SSLHandshakeException cause) + "ssl-validation-error" + + (instance? java.net.ConnectException cause) + "connection-error" + + (instance? java.net.http.HttpConnectTimeoutException cause) + "timeout" + )) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 1c36c9d03..b2145aeb4 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -15,6 +15,7 @@ [app.http.session :as-alias http.session] [app.loggers.audit :as-alias audit] [app.loggers.audit.tasks :as-alias audit.tasks] + [app.loggers.webhooks :as-alias webhooks] [app.loggers.zmq :as-alias lzmq] [app.metrics :as-alias mtx] [app.metrics.definition :as-alias mdef] @@ -357,7 +358,12 @@ :telemetry (ig/ref :app.tasks.telemetry/handler) :session-gc (ig/ref :app.http.session/gc-task) :audit-log-archive (ig/ref ::audit.tasks/archive) - :audit-log-gc (ig/ref ::audit.tasks/gc)}} + :audit-log-gc (ig/ref ::audit.tasks/gc) + + :process-webhook-event + (ig/ref ::webhooks/process-event-handler) + :run-webhook + (ig/ref ::webhooks/run-webhook-handler)}} :app.emails/sendmail @@ -420,6 +426,14 @@ ::audit.tasks/gc {::db/pool (ig/ref ::db/pool)} + ::webhooks/process-event-handler + {::db/pool (ig/ref ::db/pool) + ::http.client/client (ig/ref ::http.client/client)} + + ::webhooks/run-webhook-handler + {::db/pool (ig/ref ::db/pool) + ::http.client/client (ig/ref ::http.client/client)} + :app.loggers.loki/reporter {::lzmq/receiver (ig/ref ::lzmq/receiver) ::http.client/client (ig/ref ::http.client/client)} diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index c0694e103..dda506b11 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -265,6 +265,9 @@ {:name "0085-add-webhook-table" :fn (mg/resource "app/migrations/sql/0085-add-webhook-table.sql")} + + {:name "0086-add-webhook-delivery-table" + :fn (mg/resource "app/migrations/sql/0086-add-webhook-delivery-table.sql")} ]) diff --git a/backend/src/app/migrations/sql/0086-add-webhook-delivery-table.sql b/backend/src/app/migrations/sql/0086-add-webhook-delivery-table.sql new file mode 100644 index 000000000..3fead5b51 --- /dev/null +++ b/backend/src/app/migrations/sql/0086-add-webhook-delivery-table.sql @@ -0,0 +1,16 @@ +CREATE TABLE webhook_delivery ( + webhook_id uuid NOT NULL REFERENCES webhook(id) ON DELETE CASCADE DEFERRABLE, + created_at timestamptz NOT NULL DEFAULT now(), + + error_code text NULL, + + req_data jsonb NULL, + rsp_data jsonb NULL, + + PRIMARY KEY (webhook_id, created_at) +); + +ALTER TABLE webhook_delivery + ALTER COLUMN error_code SET STORAGE external, + ALTER COLUMN req_data SET STORAGE external, + ALTER COLUMN rsp_data SET STORAGE external; diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 51f48ba28..df4fd4ea9 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -16,6 +16,7 @@ [app.http.client :as-alias http.client] [app.http.session :as-alias http.session] [app.loggers.audit :as audit] + [app.loggers.webhooks :as-alias webhooks] [app.metrics :as mtx] [app.msgbus :as-alias mbus] [app.rpc.climit :as climit] @@ -155,18 +156,24 @@ (:profile-id result) (:profile-id params) uuid/zero) + props (or (::audit/replace-props resultm) (-> params + (d/without-qualified) (merge (::audit/props resultm)) (dissoc :profile-id) (dissoc :type))) + event {:type (or (::audit/type resultm) (::type cfg)) :name (or (::audit/name resultm) (::sv/name mdata)) :profile-id profile-id :ip-addr (some-> request audit/parse-client-ip) - :props (d/without-qualified props)}] + :props props + ::webhooks/event? (or (::webhooks/event? mdata) + (::webhooks/event? resultm) + false)}] (audit/submit! collector event))) diff --git a/backend/src/app/rpc/commands/webhooks.clj b/backend/src/app/rpc/commands/webhooks.clj index 66ae14965..f1af96175 100644 --- a/backend/src/app/rpc/commands/webhooks.clj +++ b/backend/src/app/rpc/commands/webhooks.clj @@ -11,6 +11,7 @@ [app.common.uuid :as uuid] [app.db :as db] [app.http.client :as http] + [app.loggers.webhooks :as webhooks] [app.rpc.doc :as-alias doc] [app.rpc.queries.teams :refer [check-edition-permissions! check-read-permissions!]] [app.util.services :as sv] @@ -35,77 +36,83 @@ (s/keys :req-un [::profile-id ::team-id ::uri ::mtype] :opt-un [::is-active])) -;; FIXME: validate -;; FIXME: default ratelimit -;; FIXME: quotes +;; NOTE: for now the quote is hardcoded but this need to be solved in +;; a more universal way for handling properly object quotes +(def max-hooks-for-team 8) (defn- validate-webhook! [cfg whook params] (letfn [(handle-exception [exception] - (cond - (instance? java.util.concurrent.CompletionException exception) - (handle-exception (ex/cause exception)) - - (instance? javax.net.ssl.SSLHandshakeException exception) + (if-let [hint (webhooks/interpret-exception exception)] (ex/raise :type :validation :code :webhook-validation - :hint "ssl-validation") - - :else - (ex/raise :type :validation + :hint hint) + (ex/raise :type :internal :code :webhook-validation - :hint "unknown" :cause exception))) - (handle-response [{:keys [status] :as response}] - (when (not= status 200) + (handle-response [response] + (when-let [hint (webhooks/interpret-response response)] (ex/raise :type :validation :code :webhook-validation - :hint (str/ffmt "unexpected-status-%" (:status response)))))] + :hint hint)))] (if (not= (:uri whook) (:uri params)) (->> (http/req! cfg {:method :head :uri (:uri params) - :timeout (dt/duration "2s")}) + :timeout (dt/duration "3s")}) (p/hmap (fn [response exception] (if exception (handle-exception exception) (handle-response response))))) (p/resolved nil)))) +(defn- validate-quotes! + [{:keys [::db/pool]} {:keys [team-id]}] + (let [sql ["select count(*) as total from webhook where team_id = ?" team-id] + total (:total (db/exec-one! pool sql))] + (when (>= total max-hooks-for-team) + (ex/raise :type :restriction + :code :webhooks-quote-reached + :hint (str/ffmt "can't create more than % webhooks per team" max-hooks-for-team))))) + +(defn- insert-webhook! + [{:keys [::db/pool]} {:keys [team-id uri mtype is-active] :as params}] + (db/insert! pool :webhook + {:id (uuid/next) + :team-id team-id + :uri uri + :is-active is-active + :mtype mtype})) + +(defn- update-webhook! + [{:keys [::db/pool] :as cfg} {:keys [id] :as wook} {:keys [uri mtype is-active] :as params}] + (db/update! pool :webhook + {:uri uri + :is-active is-active + :mtype mtype + :error-code nil + :error-count 0} + {:id id})) + (sv/defmethod ::create-webhook {::doc/added "1.17"} - [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id uri mtype is-active] :as params}] + [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id] :as params}] (check-edition-permissions! pool profile-id team-id) - (letfn [(insert-webhook [_] - (db/insert! pool :webhook - {:id (uuid/next) - :team-id team-id - :uri uri - :is-active is-active - :mtype mtype}))] - (->> (validate-webhook! cfg nil params) - (p/fmap executor insert-webhook)))) + (->> (validate-webhook! cfg nil params) + (p/fmap executor (fn [_] (validate-quotes! cfg params))) + (p/fmap executor (fn [_] (insert-webhook! cfg params))))) (s/def ::update-webhook (s/keys :req-un [::id ::uri ::mtype ::is-active])) (sv/defmethod ::update-webhook {::doc/added "1.17"} - [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id id uri mtype is-active] :as params}] - (let [whook (db/get pool :webhook {:id id}) - update-fn (fn [_] - (db/update! pool :webhook - {:uri uri - :is-active is-active - :mtype mtype - :error-code nil - :error-count 0} - {:id id}))] + [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [id profile-id] :as params}] + (let [whook (db/get pool :webhook {:id id})] (check-edition-permissions! pool profile-id (:team-id whook)) - (->> (validate-webhook! cfg whook params) - (p/fmap executor update-fn)))) + (p/fmap executor (fn [_] (update-webhook! cfg whook params)))))) (s/def ::delete-webhook (s/keys :req-un [::profile-id ::id])) @@ -133,4 +140,4 @@ [{:keys [pool] :as cfg} {:keys [profile-id team-id]}] (with-open [conn (db/open pool)] (check-read-permissions! conn profile-id team-id) - (db/exec! conn [sql:get-webhooks team-id]))) \ No newline at end of file + (db/exec! conn [sql:get-webhooks team-id]))) diff --git a/backend/src/app/rpc/helpers.clj b/backend/src/app/rpc/helpers.clj index aef80d754..1f4d7bbf9 100644 --- a/backend/src/app/rpc/helpers.clj +++ b/backend/src/app/rpc/helpers.clj @@ -64,6 +64,10 @@ [mdw mdata] (vary-meta mdw merge mdata)) +(defn assoc-meta + [mdw k v] + (vary-meta mdw assoc k v)) + (defn with-http-cache [mdw max-age] (vary-meta mdw update ::rpc/response-transform-fns conj diff --git a/backend/src/app/rpc/mutations/projects.clj b/backend/src/app/rpc/mutations/projects.clj index 35e598ae4..95c36d957 100644 --- a/backend/src/app/rpc/mutations/projects.clj +++ b/backend/src/app/rpc/mutations/projects.clj @@ -9,6 +9,10 @@ [app.common.spec :as us] [app.common.uuid :as uuid] [app.db :as db] + [app.loggers.audit :as-alias audit] + [app.loggers.webhooks :as-alias webhooks] + [app.rpc.doc :as-alias doc] + [app.rpc.helpers :as rph] [app.rpc.permissions :as perms] [app.rpc.queries.projects :as proj] [app.rpc.queries.teams :as teams] @@ -22,7 +26,6 @@ (s/def ::name ::us/string) (s/def ::profile-id ::us/uuid) - ;; --- Mutation: Create Project (declare create-project) @@ -35,6 +38,8 @@ :opt-un [::id])) (sv/defmethod ::create-project + {::doc/added "1.0" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [profile-id team-id] :as params}] (db/with-atomic [conn pool] (teams/check-edition-permissions! conn profile-id team-id) @@ -122,10 +127,13 @@ ;; this is not allowed. (sv/defmethod ::delete-project + {::doc/added "1.0" + ::webhooks/event? true} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (proj/check-edition-permissions! conn profile-id id) - (db/update! conn :project - {:deleted-at (dt/now)} - {:id id :is-default false}) - nil)) + (let [project (db/update! conn :project + {:deleted-at (dt/now)} + {:id id :is-default false})] + (rph/with-meta (rph/wrap) + {::audit/props {:team-id (:team-id project)}}))))