@ -4,4 +4,6 @@
{:update-file {:concurrency 1 :queue-size 3}
:auth {:concurrency 128}
:process-font {:concurrency 4 :queue-size 32}
:process-image {:concurrency 8 :queue-size 32}}
:process-image {:concurrency 8 :queue-size 32}
{:concurrency 1 :queue-size 3}}
@ -64,7 +64,7 @@
(= 200 (:status response))
(let [data (json/read (:body response))]
(let [data (json/decode (:body response))]
{:token-uri (get data :token_endpoint)
:auth-uri (get data :authorization_endpoint)
:user-uri (get data :userinfo_endpoint)})
@ -172,7 +172,7 @@
:hint "unable to retrieve github emails"
:http-status status
:http-body body))
(->> response :body json/read (filter :primary) first :email))))))
(->> response :body json/decode (filter :primary) first :email))))))
(defmethod ig/pre-init-spec ::providers/github [_]
(s/keys :req [::http/client]))
@ -278,7 +278,7 @@
(->> (http/req! cfg req)
(p/map (fn [{:keys [status body] :as res}]
(if (= status 200)
(let [data (json/read body)]
(let [data (json/decode body)]
{:token (get data :access_token)
:type (get data :token_type)})
(ex/raise :type :internal
@ -316,7 +316,7 @@
(get info attr-kw)))
(process-response [response]
(p/let [info (-> response :body json/read)
(p/let [info (-> response :body json/decode)
email (get-email info)]
{:backend (:name provider)
:email email
@ -108,7 +108,9 @@
(s/def ::default-executor-parallelism ::us/integer)
(s/def ::scheduled-executor-parallelism ::us/integer)
(s/def ::worker-parallelism ::us/integer)
(s/def ::worker-default-parallelism ::us/integer)
(s/def ::worker-webhook-parallelism ::us/integer)
(s/def ::authenticated-cookie-domain ::us/string)
(s/def ::authenticated-cookie-name ::us/string)
@ -222,7 +224,8 @@
@ -427,7 +427,7 @@
val (.getValue o)]
(if (or (= typ "json")
(= typ "jsonb"))
(json/read val)
(json/decode val)
(defn decode-transit-pgobject
@ -442,29 +442,33 @@
(defn inet
(doto (org.postgresql.util.PGobject.)
(.setType "inet")
(.setValue (str ip-addr))))
(when ip-addr
(doto (org.postgresql.util.PGobject.)
(.setType "inet")
(.setValue (str ip-addr)))))
(defn decode-inet
[^PGobject o]
(if (= "inet" (.getType o))
(.getValue o)
(when o
(if (= "inet" (.getType o))
(.getValue o)
(defn tjson
"Encode as transit json."
(doto (org.postgresql.util.PGobject.)
(.setType "jsonb")
(.setValue (t/encode-str data {:type :json-verbose}))))
(when data
(doto (org.postgresql.util.PGobject.)
(.setType "jsonb")
(.setValue (t/encode-str data {:type :json-verbose})))))
(defn json
"Encode as plain json."
(doto (org.postgresql.util.PGobject.)
(.setType "jsonb")
(.setValue (json/write-str data))))
(when data
(doto (org.postgresql.util.PGobject.)
(.setType "jsonb")
(.setValue (json/encode-str data)))))
;; --- Locks
@ -116,7 +116,6 @@
(s/def ::assets map?)
(s/def ::audit-handler fn?)
(s/def ::awsns-handler fn?)
(s/def ::debug-routes (s/nilable vector?))
(s/def ::doc-routes (s/nilable vector?))
@ -138,7 +137,6 @@
@ -173,8 +171,6 @@
["/api" {:middleware [[mw/cors]
[session/middleware-2 session]]}
["/audit/events" {:handler (:audit-handler cfg)
:allowed-methods #{:post}}]
["/feedback" {:handler feedback
:allowed-methods #{:post}}]
(:doc-routes cfg)
@ -32,6 +32,12 @@
{:name ::params
:compile (constantly ymw/wrap-params)})
(def ^:private json-mapper
{:encode-key-fn str/camel
:decode-key-fn (comp keyword str/kebab)
:pretty true}))
(defn wrap-parse-request
(letfn [(process-request [request]
@ -46,7 +52,7 @@
(str/starts-with? header "application/json")
(with-open [is (yrq/body request)]
(let [params (json/read is)]
(let [params (json/decode is json-mapper)]
(-> request
(assoc :body-params params)
(update :params merge params))))
@ -117,7 +123,32 @@
(.close ^OutputStream output-stream))))))
(format-response [response request]
(json-streamable-body [data]
(reify yrs/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
(with-open [bos (buffered-output-stream output-stream buffer-size)]
(json/write! bos data json-mapper))
(catch java.io.IOException _cause
;; Do nothing, EOF means client closes connection abruptly
(catch Throwable cause
(l/warn :hint "unexpected error on encoding response"
:cause cause))
(.close ^OutputStream output-stream))))))
(format-response-with-json [response _]
(let [body (yrs/body response)]
(if (or (boolean? body) (coll? body))
(-> response
(update :headers assoc "content-type" "application/json")
(assoc :body (json-streamable-body body)))
(format-response-with-transit [response request]
(let [body (yrs/body response)]
(if (or (boolean? body) (coll? body))
(let [qs (yrq/query request)
@ -130,6 +161,20 @@
(assoc :body (transit-streamable-body body opts))))
(format-response [response request]
(let [accept (yrq/get-header request "accept")]
(or (= accept "application/transit+json")
(str/includes? accept "application/transit+json"))
(format-response-with-transit response request)
(or (= accept "application/json")
(str/includes? accept "application/json"))
(format-response-with-json response request)
(format-response-with-transit response request))))
(process-response [response request]
(cond-> response
(map? response) (format-response request)))]
@ -17,6 +17,7 @@
[app.db :as db]
[app.http.client :as http]
[app.loggers.audit.tasks :as-alias tasks]
[app.loggers.webhooks :as-alias webhooks]
[app.main :as-alias main]
[app.metrics :as mtx]
[app.tokens :as tokens]
@ -28,9 +29,7 @@
[lambdaisland.uri :as u]
[promesa.core :as p]
[promesa.exec :as px]
[promesa.exec.bulkhead :as pxb]
[yetti.request :as yrq]
[yetti.response :as yrs]))
[yetti.request :as yrq]))
@ -56,7 +55,6 @@
(assoc (->> sk str/kebab (keyword "penpot")) v))))]
(reduce-kv process-param {} params)))
(def ^:private
@ -105,110 +103,19 @@
(s/def ::name ::us/string)
(s/def ::type ::us/string)
(s/def ::props (s/map-of ::us/keyword any?))
(s/def ::timestamp dt/instant?)
(s/def ::context (s/map-of ::us/keyword any?))
(s/def ::frontend-event
(s/keys :req-un [::type ::name ::props ::timestamp ::profile-id]
:opt-un [::context]))
(s/def ::frontend-events (s/every ::frontend-event))
(s/def ::ip-addr ::us/string)
(s/def ::backend-event
(s/def ::webhooks/event? ::us/boolean)
(s/def ::webhooks/batch-timeout ::dt/duration)
(s/def ::webhooks/batch-key
(s/or :fn fn? :str string? :kw keyword?))
(s/def ::event
(s/keys :req-un [::type ::name ::profile-id]
:opt-un [::ip-addr ::props]))
(s/def ::concurrency ::us/integer)
(defmethod ig/pre-init-spec ::http-handler [_]
(s/keys :req [::wrk/executor ::db/pool ::mtx/metrics ::concurrency]))
(defmethod ig/prep-key ::http-handler
[_ cfg]
(merge {::concurrency (cf/get :audit-log-http-handler-concurrency 8)}
(d/without-nils cfg)))
(defmethod ig/init-key ::http-handler
[_ {:keys [::wrk/executor ::db/pool ::mtx/metrics ::concurrency] :as cfg}]
(if (or (db/read-only? pool)
(not (contains? cf/flags :audit-log)))
(l/warn :hint "audit: http handler disabled or db is read-only")
(fn [_ respond _]
(respond (yrs/response 204))))
(letfn [(event->row [event]
(:name event)
(:source event)
(:type event)
(:timestamp event)
(:profile-id event)
(db/inet (:ip-addr event))
(db/tjson (:props event))
(db/tjson (d/without-nils (:context event)))])
(handle-request [{:keys [profile-id] :as request}]
(let [events (->> (:events (:params request))
(remove #(not= profile-id (:profile-id %)))
(us/conform ::frontend-events))
ip-addr (parse-client-ip request)
xform (comp
(map #(assoc % :ip-addr ip-addr))
(map #(assoc % :source "frontend"))
(map event->row))
columns [:id :name :source :type :tracked-at
:profile-id :ip-addr :props :context]]
(when (seq events)
(->> (into [] xform events)
(db/insert-multi! pool :audit-log columns)))))
(report-error! [cause]
(if-let [xdata (us/validation-error? cause)]
(l/error ::l/raw (str "audit: validation error frontend events request\n" (ex/explain xdata)))
(l/error :hint "audit: unexpected error on processing frontend events" :cause cause)))
(on-queue [instance]
(l/trace :hint "http-handler: enqueued"
:queue-size (get instance ::pxb/current-queue-size)
:concurrency (get instance ::pxb/current-concurrency))
(mtx/run! metrics
:id :audit-http-handler-queue-size
:val (get instance ::pxb/current-queue-size))
(mtx/run! metrics
:id :audit-http-handler-concurrency
:val (get instance ::pxb/current-concurrency)))
(on-run [instance task]
(let [elapsed (- (inst-ms (dt/now))
(inst-ms task))]
(l/trace :hint "http-handler: execute"
:elapsed (str elapsed "ms"))
(mtx/run! metrics
:id :audit-http-handler-timing
:val elapsed)
(mtx/run! metrics
:id :audit-http-handler-queue-size
:val (get instance ::pxb/current-queue-size))
(mtx/run! metrics
:id :audit-http-handler-concurrency
:val (get instance ::pxb/current-concurrency))))]
(let [limiter (pxb/create :executor executor
:concurrency concurrency
:on-queue on-queue
:on-run on-run)]
(fn [request respond _]
(->> (px/submit! limiter (partial handle-request request))
(p/fnly (fn [_ cause]
(some-> cause report-error!)
(respond (yrs/response 204))))))))))
:opt-un [::ip-addr ::props]
:opt [::webhooks/event?
@ -219,8 +126,7 @@
;; an external storage and data cleared.
(s/def ::collector
(s/keys :req [::wrk/executor ::db/pool])))
(s/keys :req [::wrk/executor ::db/pool]))
(defmethod ig/pre-init-spec ::collector [_]
(s/keys :req [::db/pool ::wrk/executor ::mtx/metrics]))
@ -228,31 +134,53 @@
(defmethod ig/init-key ::collector
[_ {:keys [::db/pool] :as cfg}]
(not (contains? cf/flags :audit-log))
(l/info :hint "audit: log collection disabled")
(db/read-only? pool)
(l/warn :hint "audit: log collection disabled (db is read-only)")
(l/warn :hint "audit: disabled (db is read-only)")
(defn- persist-event!
[pool event]
(us/verify! ::backend-event event)
(db/insert! pool :audit-log
{:id (uuid/next)
:name (:name event)
:type (:type event)
:profile-id (:profile-id event)
:tracked-at (dt/now)
:ip-addr (some-> (:ip-addr event) db/inet)
:props (db/tjson (:props event))
:source "backend"}))
(us/verify! ::event event)
(let [params {:id (uuid/next)
:name (:name event)
:type (:type event)
:profile-id (:profile-id event)
:tracked-at (dt/now)
:ip-addr (:ip-addr event)
:props (:props event)}]
(when (contains? cf/flags :audit-log)
(db/insert! pool :audit-log
(-> params
(update :props db/tjson)
(update :ip-addr db/inet)
(assoc :source "backend"))))
(when (and (contains? cf/flags :webhooks)
(::webhooks/event? event))
(let [batch-key (::webhooks/batch-key event)
batch-timeout (::webhooks/batch-timeout event)]
(wrk/submit! ::wrk/conn pool
::wrk/task :process-webhook-event
::wrk/queue :webhooks
::wrk/max-retries 0
::wrk/delay (or batch-timeout 0)
::wrk/label (cond
(fn? batch-key) (batch-key (:props event))
(keyword? batch-key) (name batch-key)
(string? batch-key) batch-key
:else "default")
::wrk/dedupe true
::webhooks/event (-> params
(dissoc :ip-addr)
(dissoc :type)))))))
(defn submit!
"Submit audit event to the collector."
[{:keys [::wrk/executor ::db/pool]} params]
[{:keys [::wrk/executor ::db/pool] :as collector} params]
(us/assert! ::collector collector)
(->> (px/submit! executor (partial persist-event! pool (d/without-nils params)))
(p/merr (fn [cause]
(l/error :hint "audit: unexpected error processing event" :cause cause)
@ -335,7 +263,6 @@
{:iss "authentication"
:iat (dt/now)
:uid uuid/zero})
;; FIXME tokens/generate
body (t/encode {:events events})
headers {"content-type" "application/transit+json"
"origin" (cf/get :public-uri)
@ -73,7 +73,7 @@
:timeout 3000
:method :post
:headers {"content-type" "application/json"}
:body (json/write payload)}
:body (json/encode payload)}
{:sync? true}))
(defn- handle-event
@ -29,7 +29,7 @@
{:uri (cf/get :error-report-webhook)
:method :post
:headers {"content-type" "application/json"}
:body (json/write-str {:text text})}
:body (json/encode-str {:text text})}
{:sync? true})]
(when (not= 200 (:status resp))
@ -0,0 +1,174 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; Copyright (c) KALEIDOS INC
(ns app.loggers.webhooks
"A mattermost integration for error reporting."
[app.common.data :as d]
[app.common.logging :as l]
[app.common.transit :as t]
[app.common.uri :as uri]
[app.config :as cf]
[app.db :as db]
[app.http.client :as http]
[app.util.json :as json]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]))
;; --- PROC
(defn- lookup-webhooks-by-team
[pool team-id]
(db/exec! pool ["select * from webhook where team_id=? and is_active=true" team-id]))
(defn- lookup-webhooks-by-project
[pool project-id]
(let [sql [(str "select * from webhook as w"
" join project as p on (p.team_id = w.team_id)"
" where p.id = ? and w.is_active = true")
(db/exec! pool sql)))
(defn- lookup-webhooks-by-file
[pool file-id]
(let [sql [(str "select * from webhook as w"
" join project as p on (p.team_id = w.team_id)"
" join file as f on (f.project_id = p.id)"
" where f.id = ? and w.is_active = true")
(db/exec! pool sql)))
(defn- lookup-webhooks
[{:keys [::db/pool]} {:keys [props] :as event}]
(or (some->> (:team-id props) (lookup-webhooks-by-team pool))
(some->> (:project-id props) (lookup-webhooks-by-project pool))
(some->> (:file-id props) (lookup-webhooks-by-file pool))))
(defmethod ig/pre-init-spec ::process-event-handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::process-event-handler
[_ {:keys [::db/pool] :as cfg}]
(fn [{:keys [props] :as task}]
(let [event (::event props)]
(l/debug :hint "process webhook event"
:name (:name event))
(when-let [items (lookup-webhooks cfg event)]
;; (app.common.pprint/pprint items)
(l/trace :hint "webhooks found for event" :total (count items))
(db/with-atomic [conn pool]
(doseq [item items]
(wrk/submit! ::wrk/conn conn
::wrk/task :run-webhook
::wrk/queue :webhooks
::wrk/max-retries 3
::event event
::config item)))))))
;; --- RUN
(declare interpret-exception)
(declare interpret-response)
(def ^:private json-mapper
{:encode-key-fn str/camel
:decode-key-fn (comp keyword str/kebab)
:pretty true}))
(defmethod ig/pre-init-spec ::run-webhook-handler [_]
(s/keys :req [::http/client ::db/pool]))
(defmethod ig/prep-key ::run-webhook-handler
[_ cfg]
(merge {::max-errors 3} (d/without-nils cfg)))
(defmethod ig/init-key ::run-webhook-handler
[_ {:keys [::db/pool ::max-errors] :as cfg}]
(letfn [(update-webhook! [whook err]
(if err
(let [sql [(str "update webhook "
" set error_code=?, "
" error_count=error_count+1 "
" where id=?")
(:id whook)]
res (db/exec-one! pool sql {:return-keys true})]
(when (>= (:error-count res) max-errors)
(db/update! pool :webhook {:is-active false} {:id (:id whook)})))
(db/update! pool :webhook
{:updated-at (dt/now)
:error-code nil
:error-count 0}
{:id (:id whook)})))
(report-delivery! [whook req rsp err]
(db/insert! pool :webhook-delivery
{:webhook-id (:id whook)
:created-at (dt/now)
:error-code err
:req-data (db/tjson req)
:rsp-data (db/tjson rsp)}))]
(fn [{:keys [props] :as task}]
(let [event (::event props)
whook (::config props)
body (case (:mtype whook)
"application/json" (json/encode-str event json-mapper)
"application/transit+json" (t/encode-str event)
"application/x-www-form-urlencoded" (uri/map->query-string event))]
(l/debug :hint "run webhook"
:event-name (:name event)
:webhook-id (:id whook)
:webhook-uri (:uri whook)
:webhook-mtype (:mtype whook))
(let [req {:uri (:uri whook)
:headers {"content-type" (:mtype whook)
"user-agent" (str/ffmt "penpot/%" (:main cf/version))}
:timeout (dt/duration "4s")
:method :post
:body body}]
(let [rsp (http/req! cfg req {:response-type :input-stream :sync? true})
err (interpret-response rsp)]
(report-delivery! whook req rsp err)
(update-webhook! whook err))
(catch Throwable cause
(let [err (interpret-exception cause)]
(report-delivery! whook req nil err)
(update-webhook! whook err)
(when (= err "unknown")
(l/error :hint "unknown error on webhook request"
:cause cause))))))))))
(defn interpret-response
[{:keys [status] :as response}]
(when-not (or (= 200 status)
(= 204 status))
(str/ffmt "unexpected-status:%" status)))
(defn interpret-exception
(instance? javax.net.ssl.SSLHandshakeException cause)
(instance? java.net.ConnectException cause)
(instance? java.net.http.HttpConnectTimeoutException cause)
@ -92,7 +92,7 @@
(.. socket (setReceiveTimeOut 5000))
(loop []
(let [msg (.recv ^ZMQ$Socket socket)
msg (ex/ignoring (json/read msg json-mapper))
msg (ex/ignoring (json/decode msg json-mapper))
msg (if (nil? msg) :empty msg)]
(when (a/>!! output msg)
@ -15,6 +15,7 @@
[app.http.session :as-alias http.session]
[app.loggers.audit :as-alias audit]
[app.loggers.audit.tasks :as-alias audit.tasks]
[app.loggers.webhooks :as-alias webhooks]
[app.loggers.zmq :as-alias lzmq]
[app.metrics :as-alias mtx]
[app.metrics.definition :as-alias mdef]
@ -281,7 +282,6 @@
:metrics (ig/ref ::mtx/metrics)
:public-uri (cf/get :public-uri)
:storage (ig/ref ::sto/storage)
:audit-handler (ig/ref ::audit/http-handler)
:rpc-routes (ig/ref :app.rpc/routes)
:doc-routes (ig/ref :app.rpc.doc/routes)
:executor (ig/ref ::wrk/executor)}
@ -358,7 +358,12 @@
:telemetry (ig/ref :app.tasks.telemetry/handler)
:session-gc (ig/ref :app.http.session/gc-task)
:audit-log-archive (ig/ref ::audit.tasks/archive)
:audit-log-gc (ig/ref ::audit.tasks/gc)}}
:audit-log-gc (ig/ref ::audit.tasks/gc)
(ig/ref ::webhooks/process-event-handler)
(ig/ref ::webhooks/run-webhook-handler)}}
@ -408,11 +413,6 @@
{::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::mtx/metrics (ig/ref ::mtx/metrics)}
{::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
@ -426,6 +426,14 @@
{::db/pool (ig/ref ::db/pool)}
{::db/pool (ig/ref ::db/pool)
::http.client/client (ig/ref ::http.client/client)}
{::db/pool (ig/ref ::db/pool)
::http.client/client (ig/ref ::http.client/client)}
{::lzmq/receiver (ig/ref ::lzmq/receiver)
::http.client/client (ig/ref ::http.client/client)}
@ -500,20 +508,28 @@
{:cron #app/cron "30 */5 * * * ?" ;; every 5m
:task :audit-log-gc})]}
{::rds/redis (ig/ref ::rds/redis)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}
{::wrk/parallelism (cf/get ::worker-parallelism 1)
;; FIXME: read queues from configuration
::wrk/queue "default"
[::default ::wrk/worker]
{::wrk/parallelism (cf/get ::worker-default-parallelism 1)
::wrk/queue :default
::rds/redis (ig/ref ::rds/redis)
::wrk/registry (ig/ref ::wrk/registry)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}
[::webhook ::wrk/worker]
{::wrk/parallelism (cf/get ::worker-webhook-parallelism 1)
::wrk/queue :webhooks
::rds/redis (ig/ref ::rds/redis)
::wrk/registry (ig/ref ::wrk/registry)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}})
(def system nil)
(defn start
@ -265,6 +265,12 @@
{:name "0085-add-webhook-table"
:fn (mg/resource "app/migrations/sql/0085-add-webhook-table.sql")}
{:name "0086-add-webhook-delivery-table"
:fn (mg/resource "app/migrations/sql/0086-add-webhook-delivery-table.sql")}
{:name "0087-mod-task-table"
:fn (mg/resource "app/migrations/sql/0087-mod-task-table.sql")}
@ -0,0 +1,16 @@
CREATE TABLE webhook_delivery (
created_at timestamptz NOT NULL DEFAULT now(),
error_code text NULL,
req_data jsonb NULL,
rsp_data jsonb NULL,
PRIMARY KEY (webhook_id, created_at)
ALTER TABLE webhook_delivery
ALTER COLUMN error_code SET STORAGE external,
ALTER COLUMN req_data SET STORAGE external,
ALTER COLUMN rsp_data SET STORAGE external;
@ -0,0 +1,9 @@
ADD COLUMN label text NULL;
CREATE INDEX task__label__idx
ON task (label, name, queue)
WHERE status = 'new';
@ -16,6 +16,7 @@
[app.http.client :as-alias http.client]
[app.http.session :as-alias http.session]
[app.loggers.audit :as audit]
[app.loggers.webhooks :as-alias webhooks]
[app.metrics :as mtx]
[app.msgbus :as-alias mbus]
[app.rpc.climit :as climit]
@ -155,18 +156,34 @@
(:profile-id result)
(:profile-id params)
props (or (::audit/replace-props resultm)
(-> params
(merge (::audit/props resultm))
(dissoc :profile-id)
(dissoc :type)))
event {:type (or (::audit/type resultm)
(::type cfg))
:name (or (::audit/name resultm)
(::sv/name mdata))
:profile-id profile-id
:ip-addr (some-> request audit/parse-client-ip)
:props (d/without-qualified props)}]
:props props
(or (::webhooks/batch-key mdata)
(::webhooks/batch-key resultm))
(or (::webhooks/batch-timeout mdata)
(::webhooks/batch-timeout resultm))
(or (::webhooks/event? mdata)
(::webhooks/event? resultm)
(audit/submit! collector event)))
(handle-request [cfg params]
@ -174,8 +191,9 @@
(p/mcat (fn [result]
(->> (handle-audit params result)
(p/map (constantly result)))))))]
(with-meta handle-request mdata))
(if-not (::audit/skip mdata)
(with-meta handle-request mdata)
(defn- wrap
@ -254,6 +272,7 @@
@ -0,0 +1,86 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; Copyright (c) KALEIDOS INC
(ns app.rpc.commands.audit
"Audit Log related RPC methods"
[app.common.data :as d]
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.http :as-alias http]
[app.loggers.audit :as audit]
[app.rpc.climit :as-alias climit]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[promesa.core :as p]
[promesa.exec :as px]))
(defn- event->row [event]
(:name event)
(:source event)
(:type event)
(:timestamp event)
(:profile-id event)
(db/inet (:ip-addr event))
(db/tjson (:props event))
(db/tjson (d/without-nils (:context event)))])
(def ^:private event-columns
[:id :name :source :type :tracked-at
:profile-id :ip-addr :props :context])
(defn- handle-events
[{:keys [::db/pool]} {:keys [profile-id events ::http/request] :as params}]
(let [ip-addr (audit/parse-client-ip request)
xform (comp
(map #(assoc % :profile-id profile-id))
(map #(assoc % :ip-addr ip-addr))
(map #(assoc % :source "frontend"))
(filter :profile-id)
(map event->row))
events (sequence xform events)]
(when (seq events)
(db/insert-multi! pool :audit-log event-columns events))))
(s/def ::profile-id ::us/uuid)
(s/def ::name ::us/string)
(s/def ::type ::us/string)
(s/def ::props (s/map-of ::us/keyword any?))
(s/def ::timestamp dt/instant?)
(s/def ::context (s/map-of ::us/keyword any?))
(s/def ::event
(s/keys :req-un [::type ::name ::props ::timestamp]
:opt-un [::context]))
(s/def ::events (s/every ::event))
(s/def ::push-audit-events
(s/keys :req-un [::events ::profile-id]))
(sv/defmethod ::push-audit-events
{::climit/queue :push-audit-events
::climit/key-fn :profile-id
::audit/skip true
::doc/added "1.17"}
[{:keys [::db/pool ::wrk/executor] :as cfg} params]
(if (or (db/read-only? pool)
(not (contains? cf/flags :audit-log)))
(l/warn :hint "audit: http handler disabled or db is read-only")
(rph/wrap nil))
(->> (px/submit! executor #(handle-events cfg params))
(p/fmap (constantly nil)))))
@ -10,6 +10,7 @@
[app.common.geom.point :as gpt]
[app.common.spec :as us]
[app.db :as db]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc.commands.files :as files]
[app.rpc.doc :as-alias doc]
[app.rpc.queries.teams :as teams]
@ -43,6 +44,7 @@
#(or (:file-id %) (:team-id %))))
(sv/defmethod ::get-comment-threads
{::doc/added "1.15"}
[{:keys [pool] :as cfg} params]
(with-open [conn (db/open pool)]
(retrieve-comment-threads conn params)))
@ -245,7 +247,8 @@
(sv/defmethod ::create-comment-thread
{::retry/max-retries 3
::retry/matches retry/conflict-db-insert?
::doc/added "1.15"}
::doc/added "1.15"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [profile-id file-id share-id] :as params}]
(db/with-atomic [conn pool]
(files/check-comment-permissions! conn profile-id file-id share-id)
@ -364,7 +367,8 @@
:opt-un [::share-id]))
(sv/defmethod ::create-comment
{::doc/added "1.15"}
{::doc/added "1.15"
::webhooks/event? true}
[{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool]
(create-comment conn params)))
@ -483,7 +487,8 @@
(s/keys :req-un [::profile-id ::id]))
(sv/defmethod ::delete-comment
{::doc/added "1.15"}
{::doc/added "1.15"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [profile-id id] :as params}]
(db/with-atomic [conn pool]
(let [comment (db/get-by-id conn :comment id {:for-update true})]
@ -529,4 +534,3 @@
:frame-id frame-id}
{:id (:id thread)})
@ -17,6 +17,7 @@
[app.common.types.shape-tree :as ctt]
[app.db :as db]
[app.db.sql :as sql]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc.commands.files.thumbnails :as-alias thumbs]
[app.rpc.cond :as-alias cond]
[app.rpc.doc :as-alias doc]
@ -762,7 +763,8 @@
(s/keys :req-un [::profile-id ::name ::id]))
(sv/defmethod ::rename-file
{::doc/added "1.17"}
{::doc/added "1.17"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
@ -806,7 +808,8 @@
(s/keys :req-un [::profile-id ::id ::is-shared]))
(sv/defmethod ::set-file-shared
{::doc/added "1.17"}
{::doc/added "1.17"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [id profile-id is-shared] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
@ -829,14 +832,14 @@
(s/keys :req-un [::id ::profile-id]))
(sv/defmethod ::delete-file
{::doc/added "1.17"}
{::doc/added "1.17"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
(absorb-library conn params)
(mark-file-deleted conn params)))
;; --- MUTATION COMMAND: link-file-to-library
(def sql:link-file-to-library
@ -852,7 +855,8 @@
(s/keys :req-un [::profile-id ::file-id ::library-id]))
(sv/defmethod ::link-file-to-library
{::doc/added "1.17"}
{::doc/added "1.17"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [profile-id file-id library-id] :as params}]
(when (= file-id library-id)
(ex/raise :type :validation
@ -863,7 +867,6 @@
(check-edition-permissions! conn profile-id library-id)
(link-file-to-library conn params)))
;; --- MUTATION COMMAND: unlink-file-from-library
(defn unlink-file-from-library
@ -876,7 +879,8 @@
(s/keys :req-un [::profile-id ::file-id ::library-id]))
(sv/defmethod ::unlink-file-from-library
{::doc/added "1.17"}
{::doc/added "1.17"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id file-id)
@ -11,7 +11,8 @@
[app.common.types.file :as ctf]
[app.common.uuid :as uuid]
[app.db :as db]
[app.loggers.audit :as audit]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc.commands.files :as files]
[app.rpc.doc :as-alias doc]
[app.rpc.permissions :as perms]
@ -75,7 +76,8 @@
(sv/defmethod ::create-file
{::doc/added "1.17"}
{::doc/added "1.17"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [profile-id project-id] :as params}]
(db/with-atomic [conn pool]
(proj/check-edition-permissions! conn profile-id project-id)
@ -17,6 +17,7 @@
[app.config :as cf]
[app.db :as db]
[app.loggers.audit :as audit]
[app.loggers.webhooks :as-alias webhooks]
[app.metrics :as mtx]
[app.msgbus :as mbus]
[app.rpc.climit :as-alias climit]
@ -122,12 +123,18 @@
;; set is different than the persisted one, update it on the
;; database.
(defn webhook-batch-keyfn
(str "rpc:update-file:" (:id props)))
(sv/defmethod ::update-file
{::climit/queue :update-file
::climit/key-fn :id
::webhooks/event? true
::webhooks/batch-timeout (dt/duration "2s")
::webhooks/batch-key webhook-batch-keyfn
::doc/added "1.17"}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool]
(files/check-edition-permissions! conn profile-id id)
(db/xact-lock! conn id)
@ -173,8 +180,12 @@
{:id id})))
(-> (update-fn cfg params)
(vary-meta assoc ::audit/props {:project-id (:project-id file)
:team-id (:team-id file)}))))))
(vary-meta assoc ::audit/replace-props
{:id (:id file)
:name (:name file)
:features (:features file)
:project-id (:project-id file)
:team-id (:team-id file)}))))))
(defn- update-file*
[{:keys [conn] :as cfg} {:keys [file changes session-id profile-id] :as params}]
@ -11,6 +11,7 @@
[app.common.uuid :as uuid]
[app.db :as db]
[app.http.client :as http]
[app.loggers.webhooks :as webhooks]
[app.rpc.doc :as-alias doc]
[app.rpc.queries.teams :refer [check-edition-permissions! check-read-permissions!]]
[app.util.services :as sv]
@ -35,77 +36,83 @@
(s/keys :req-un [::profile-id ::team-id ::uri ::mtype]
:opt-un [::is-active]))
;; FIXME: validate
;; FIXME: default ratelimit
;; FIXME: quotes
;; NOTE: for now the quote is hardcoded but this need to be solved in
;; a more universal way for handling properly object quotes
(def max-hooks-for-team 8)
(defn- validate-webhook!
[cfg whook params]
(letfn [(handle-exception [exception]
(instance? java.util.concurrent.CompletionException exception)
(handle-exception (ex/cause exception))
(instance? javax.net.ssl.SSLHandshakeException exception)
(if-let [hint (webhooks/interpret-exception exception)]
(ex/raise :type :validation
:code :webhook-validation
:hint "ssl-validation")
(ex/raise :type :validation
:hint hint)
(ex/raise :type :internal
:code :webhook-validation
:hint "unknown"
:cause exception)))
(handle-response [{:keys [status] :as response}]
(when (not= status 200)
(handle-response [response]
(when-let [hint (webhooks/interpret-response response)]
(ex/raise :type :validation
:code :webhook-validation
:hint (str/ffmt "unexpected-status-%" (:status response)))))]
:hint hint)))]
(if (not= (:uri whook) (:uri params))
(->> (http/req! cfg {:method :head
:uri (:uri params)
:timeout (dt/duration "2s")})
:timeout (dt/duration "3s")})
(p/hmap (fn [response exception]
(if exception
(handle-exception exception)
(handle-response response)))))
(p/resolved nil))))
(defn- validate-quotes!
[{:keys [::db/pool]} {:keys [team-id]}]
(let [sql ["select count(*) as total from webhook where team_id = ?" team-id]
total (:total (db/exec-one! pool sql))]
(when (>= total max-hooks-for-team)
(ex/raise :type :restriction
:code :webhooks-quote-reached
:hint (str/ffmt "can't create more than % webhooks per team" max-hooks-for-team)))))
(defn- insert-webhook!
[{:keys [::db/pool]} {:keys [team-id uri mtype is-active] :as params}]
(db/insert! pool :webhook
{:id (uuid/next)
:team-id team-id
:uri uri
:is-active is-active
:mtype mtype}))
(defn- update-webhook!
[{:keys [::db/pool] :as cfg} {:keys [id] :as wook} {:keys [uri mtype is-active] :as params}]
(db/update! pool :webhook
{:uri uri
:is-active is-active
:mtype mtype
:error-code nil
:error-count 0}
{:id id}))
(sv/defmethod ::create-webhook
{::doc/added "1.17"}
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id uri mtype is-active] :as params}]
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id] :as params}]
(check-edition-permissions! pool profile-id team-id)
(letfn [(insert-webhook [_]
(db/insert! pool :webhook
{:id (uuid/next)
:team-id team-id
:uri uri
:is-active is-active
:mtype mtype}))]
(->> (validate-webhook! cfg nil params)
(p/fmap executor insert-webhook))))
(->> (validate-quotes! cfg params)
(p/fmap executor (fn [_] (validate-webhook! cfg nil params)))
(p/fmap executor (fn [_] (insert-webhook! cfg params)))))
(s/def ::update-webhook
(s/keys :req-un [::id ::uri ::mtype ::is-active]))
(sv/defmethod ::update-webhook
{::doc/added "1.17"}
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id id uri mtype is-active] :as params}]
(let [whook (db/get pool :webhook {:id id})
update-fn (fn [_]
(db/update! pool :webhook
{:uri uri
:is-active is-active
:mtype mtype
:error-code nil
:error-count 0}
{:id id}))]
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [id profile-id] :as params}]
(let [whook (db/get pool :webhook {:id id})]
(check-edition-permissions! pool profile-id (:team-id whook))
(->> (validate-webhook! cfg whook params)
(p/fmap executor update-fn))))
(p/fmap executor (fn [_] (update-webhook! cfg whook params))))))
(s/def ::delete-webhook
(s/keys :req-un [::profile-id ::id]))
@ -133,4 +140,4 @@
[{:keys [pool] :as cfg} {:keys [profile-id team-id]}]
(with-open [conn (db/open pool)]
(check-read-permissions! conn profile-id team-id)
(db/exec! conn [sql:get-webhooks team-id])))
(db/exec! conn [sql:get-webhooks team-id])))
@ -64,6 +64,10 @@
[mdw mdata]
(vary-meta mdw merge mdata))
(defn assoc-meta
[mdw k v]
(vary-meta mdw assoc k v))
(defn with-http-cache
[mdw max-age]
(vary-meta mdw update ::rpc/response-transform-fns conj
@ -12,6 +12,7 @@
[app.common.uuid :as uuid]
[app.db :as db]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.media :as media]
[app.rpc.climit :as-alias climit]
[app.rpc.doc :as-alias doc]
@ -43,6 +44,8 @@
::font-id ::font-family ::font-weight ::font-style]))
(sv/defmethod ::create-font-variant
{::doc/added "1.3"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
(teams/check-edition-permissions! pool profile-id team-id)
@ -119,19 +122,16 @@
(s/def ::update-font
(s/keys :req-un [::profile-id ::team-id ::id ::name]))
(def sql:update-font
"update team_font_variant
set font_family = ?
where team_id = ?
and font_id = ?")
(sv/defmethod ::update-font
{::climit/queue :process-font}
{::doc/added "1.3"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [team-id profile-id id name] :as params}]
(db/with-atomic [conn pool]
(teams/check-edition-permissions! conn profile-id team-id)
(db/exec-one! conn [sql:update-font name team-id id])
(db/update! conn :team-font-variant
{:font-family name}
{:font-id id
:team-id team-id})))
@ -139,10 +139,11 @@
(s/keys :req-un [::profile-id ::team-id ::id]))
(sv/defmethod ::delete-font
{::doc/added "1.3"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [id team-id profile-id] :as params}]
(db/with-atomic [conn pool]
(teams/check-edition-permissions! conn profile-id team-id)
(db/update! conn :team-font-variant
{:deleted-at (dt/now)}
{:font-id id :team-id team-id})
@ -154,7 +155,8 @@
(s/keys :req-un [::profile-id ::team-id ::id]))
(sv/defmethod ::delete-font-variant
{::doc/added "1.3"}
{::doc/added "1.3"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [id team-id profile-id] :as params}]
(db/with-atomic [conn pool]
(teams/check-edition-permissions! conn profile-id team-id)
@ -9,6 +9,10 @@
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.db :as db]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.rpc.permissions :as perms]
[app.rpc.queries.projects :as proj]
[app.rpc.queries.teams :as teams]
@ -22,7 +26,6 @@
(s/def ::name ::us/string)
(s/def ::profile-id ::us/uuid)
;; --- Mutation: Create Project
(declare create-project)
@ -35,6 +38,8 @@
:opt-un [::id]))
(sv/defmethod ::create-project
{::doc/added "1.0"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [profile-id team-id] :as params}]
(db/with-atomic [conn pool]
(teams/check-edition-permissions! conn profile-id team-id)
@ -122,10 +127,13 @@
;; this is not allowed.
(sv/defmethod ::delete-project
{::doc/added "1.0"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool]
(proj/check-edition-permissions! conn profile-id id)
(db/update! conn :project
{:deleted-at (dt/now)}
{:id id :is-default false})
(let [project (db/update! conn :project
{:deleted-at (dt/now)}
{:id id :is-default false})]
(rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project)}}))))
@ -81,7 +81,7 @@
{:method :post
:uri (cf/get :telemetry-uri)
:headers {"content-type" "application/json"}
:body (json/write-str data)}
:body (json/encode-str data)}
{:sync? true})]
(when (> (:status response) 206)
(ex/raise :type :internal
@ -5,7 +5,6 @@
;; Copyright (c) KALEIDOS INC
(ns app.util.json
(:refer-clojure :exclude [read])
[jsonista.core :as j]))
@ -13,23 +12,23 @@
(j/object-mapper params))
(defn write
(defn read!
([from] (j/read-value from j/keyword-keys-object-mapper))
([from mapper] (j/read-value from mapper)))
(defn write!
([to v] (j/write-value to v j/keyword-keys-object-mapper))
([to v mapper] (j/write-value to v mapper)))
(defn encode
([v] (j/write-value-as-bytes v j/keyword-keys-object-mapper))
([v mapper] (j/write-value-as-bytes v mapper)))
(defn write-str
([v] (j/write-value-as-string v j/keyword-keys-object-mapper))
([v mapper] (j/write-value-as-string v mapper)))
(defn read
(defn decode
([v] (j/read-value v j/keyword-keys-object-mapper))
([v mapper] (j/read-value v mapper)))
(defn encode
(j/write-value-as-bytes v j/keyword-keys-object-mapper))
(defn decode
(j/read-value v j/keyword-keys-object-mapper))
(defn encode-str
([v] (j/write-value-as-string v j/keyword-keys-object-mapper))
([v mapper] (j/write-value-as-string v mapper)))
@ -14,6 +14,7 @@
[app.common.spec :as us]
[app.common.transit :as t]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.metrics :as mtx]
[app.redis :as rds]
@ -96,7 +97,7 @@
(l/info :hint "registry initialized" :tasks (count tasks))
(reduce-kv (fn [registry k v]
(let [tname (name k)]
(l/debug :hint "register task" :name tname)
(l/trace :hint "register task" :name tname)
(assoc registry tname (wrap-task-handler metrics tname v))))
@ -174,63 +175,62 @@
(db/pgobject? props)
(assoc :props (db/decode-transit-pgobject props))))
(s/def ::queue ::us/string)
(s/def ::wait-duration ::dt/duration)
(defmethod ig/pre-init-spec ::scheduler [_]
(defmethod ig/pre-init-spec ::dispatcher [_]
(s/keys :req [::mtx/metrics
:opt [::wait-duration
(defmethod ig/prep-key ::scheduler
(defmethod ig/prep-key ::dispatcher
[_ cfg]
(merge {::batch-size 1
::wait-duration (dt/duration "2s")}
(merge {::batch-size 100
::wait-duration (dt/duration "5s")}
(d/without-nils cfg)))
(def ^:private sql:select-next-tasks
"select * from task as t
"select id, queue from task as t
where t.scheduled_at <= now()
and (t.status = 'new' or t.status = 'retry')
and queue ~~* ?::text
order by t.priority desc, t.scheduled_at
limit ?
for update skip locked")
(defn- format-queue
(str/ffmt "penpot-tasks-queue:%" queue))
(defmethod ig/init-key ::scheduler
(defmethod ig/init-key ::dispatcher
[_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}]
(letfn [(get-tasks-batch [conn]
(->> (db/exec! conn [sql:select-next-tasks batch-size])
(map decode-task-row)
(letfn [(get-tasks [conn]
(let [prefix (str (cf/get :tenant) ":%")]
(seq (db/exec! conn [sql:select-next-tasks prefix batch-size]))))
(queue-task [conn rconn {:keys [id queue] :as task}]
(db/update! conn :task {:status "ready"} {:id id})
(let [queue (format-queue queue)
payload (t/encode id)
result (rds/rpush! rconn queue payload)]
(l/debug :hist "scheduler: task pushed to redis"
:task-id id
:key queue
:queued result)))
(push-tasks! [conn rconn [queue tasks]]
(let [ids (mapv :id tasks)
key (str/ffmt "taskq:%" queue)
res (rds/rpush! rconn key (mapv t/encode ids))
sql [(str "update task set status = 'scheduled'"
" where id = ANY(?)")
(db/create-array conn "uuid" ids)]]
(run-batch [rconn]
(db/exec-one! conn sql)
(l/debug :hist "dispatcher: queue tasks"
:queue queue
:tasks (count ids)
:total-queued res)))
(run-batch! [rconn]
(db/with-atomic [conn pool]
(when-let [tasks (get-tasks-batch conn)]
(run! (partial queue-task conn rconn) tasks)
(when-let [tasks (get-tasks conn)]
(->> (group-by :queue tasks)
(run! (partial push-tasks! conn rconn)))
(if (db/read-only? pool)
(l/warn :hint "scheduler: not started (db is read-only)")
(l/warn :hint "dispatcher: not started (db is read-only)")
{:name "penpot/scheduler"}
(l/info :hint "scheduler: started")
{:name "penpot/worker-dispatcher"}
(l/info :hint "dispatcher: started")
(dm/with-open [rconn (rds/connect redis)]
(loop []
@ -238,7 +238,7 @@
(throw (InterruptedException. "interrumpted")))
(when-not (run-batch rconn)
(when-not (run-batch! rconn)
(px/sleep (::wait-duration cfg)))
(catch InterruptedException cause
(throw cause))
@ -246,29 +246,29 @@
(rds/exception? cause)
(l/warn :hint "scheduler: redis exception (will retry in an instant)" :cause cause)
(l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
(db/sql-exception? cause)
(l/warn :hint "scheduler: database exception (will retry in an instant)" :cause cause)
(l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
(l/error :hint "scheduler: unhandled exception (will retry in an instant)" :cause cause)
(l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn))))))
(catch InterruptedException _
(l/debug :hint "scheduler: interrupted"))
(l/debug :hint "dispatcher: interrupted"))
(catch Throwable cause
(l/error :hint "scheduler: unexpected exception" :cause cause))
(l/error :hint "dispatcher: unexpected exception" :cause cause))
(l/info :hint "scheduler: terminated")))))))
(l/info :hint "dispatcher: terminated")))))))
(defmethod ig/halt-key! ::scheduler
(defmethod ig/halt-key! ::dispatcher
[_ thread]
(some-> thread px/interrupt!))
@ -288,36 +288,38 @@
;; FIXME: define queue as set
(defmethod ig/prep-key ::worker
[_ cfg]
(merge {::queue "default" ::parallelism 1}
(merge {::parallelism 1}
(d/without-nils cfg)))
(defmethod ig/init-key ::worker
[_ {:keys [::db/pool ::queue ::parallelism] :as cfg}]
(if (db/read-only? pool)
(l/warn :hint "workers: not started (db is read-only)" :queue queue)
(->> (range parallelism)
(map #(assoc cfg ::worker-id %))
(map start-worker!)))))
(let [queue (d/name queue)
cfg (assoc cfg ::queue queue)]
(if (db/read-only? pool)
(l/warn :hint "worker: not started (db is read-only)" :queue queue :parallelism parallelism)
(->> (range parallelism)
(map #(assoc cfg ::worker-id %))
(map start-worker!))))))
(defmethod ig/halt-key! ::worker
[_ threads]
(run! px/interrupt! threads))
(defn- start-worker!
[{:keys [::rds/redis ::worker-id] :as cfg}]
[{:keys [::rds/redis ::worker-id ::queue] :as cfg}]
{:name (format "penpot/worker/%s" worker-id)}
(l/info :hint "worker: started" :worker-id worker-id)
(l/info :hint "worker: started" :worker-id worker-id :queue queue)
(dm/with-open [rconn (rds/connect redis)]
(let [cfg (-> cfg
(update ::queue format-queue)
(assoc ::rds/rconn rconn)
(assoc ::timeout (dt/duration "5s")))]
(let [tenant (cf/get :tenant "main")
cfg (-> cfg
(assoc ::queue (str/ffmt "taskq:%:%" tenant queue))
(assoc ::rds/rconn rconn)
(assoc ::timeout (dt/duration "5s")))]
(loop []
(when (px/interrupted?)
(throw (InterruptedException. "interrupted")))
@ -327,13 +329,17 @@
(catch InterruptedException _
(l/debug :hint "worker: interrupted"
:worker-id worker-id))
:worker-id worker-id
:queue queue))
(catch Throwable cause
(l/error :hint "worker: unexpected exception"
:worker-id worker-id
:queue queue
:cause cause))
(l/info :hint "worker: terminated" :worker-id worker-id)))))
(l/info :hint "worker: terminated"
:worker-id worker-id
:queue queue)))))
(defn- run-worker-loop!
[{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}]
@ -439,7 +445,7 @@
(l/trace :hint "worker: executing task"
(l/debug :hint "worker: executing task"
:worker-id worker-id
:task-id (:id task)
:task-name (:name task)
@ -631,46 +637,69 @@
(s/def ::task keyword?)
(s/def ::delay (s/or :int ::us/integer :duration dt/duration?))
(s/def ::conn some?)
(s/def ::priority ::us/integer)
(s/def ::max-retries ::us/integer)
(s/def ::submit-options
(s/keys :req [::task ::conn]
:opt [::delay ::queue ::priority ::max-retries]))
(defn- extract-props
(reduce-kv (fn [res k v]
(cond-> res
(not (qualified-keyword? k))
(assoc! k v)))
(transient {})
(let [cns (namespace ::sample)]
(reduce-kv (fn [res k v]
(cond-> res
(not= (namespace k) cns)
(assoc! k v)))
(transient {})
(def ^:private sql:insert-new-task
"insert into task (id, name, props, queue, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, now() + ?)
"insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, ?, now() + ?)
returning id")
(def ^:private
"delete from task
where name=? and queue=? and label=? and status = 'new' and scheduled_at > now()")
(s/def ::label string?)
(s/def ::task (s/or :kw keyword? :str string?))
(s/def ::queue (s/or :kw keyword? :str string?))
(s/def ::delay (s/or :int integer? :duration dt/duration?))
(s/def ::conn (s/or :pool ::db/pool :connection some?))
(s/def ::priority integer?)
(s/def ::max-retries integer?)
(s/def ::dedupe boolean?)
(s/def ::submit-options
(s/keys :req [::task ::conn]
:opt [::label ::delay ::queue ::priority ::max-retries ::dedupe])
(fn [{:keys [::dedupe ::label] :or {label ""}}]
(if dedupe
(not= label "")
(defn submit!
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn]
:or {delay 0 queue "default" priority 100 max-retries 3}
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label]
:or {delay 0 queue :default priority 100 max-retries 3 label ""}
:as options}]
(us/verify ::submit-options options)
(us/verify! ::submit-options options)
(let [duration (dt/duration delay)
interval (db/interval duration)
props (-> options extract-props db/tjson)
id (uuid/next)]
id (uuid/next)
tenant (cf/get :tenant)
task (d/name task)
queue (str/ffmt "%:%" tenant (d/name queue))
deleted (when dedupe
(-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label])
(l/debug :hint "submit task"
:name (d/name task)
:name task
:queue queue
:label label
:dedupe (boolean dedupe)
:deleted (or deleted 0)
:in (dt/format-duration duration))
(db/exec-one! conn [sql:insert-new-task id (d/name task) props
queue priority max-retries interval])
(db/exec-one! conn [sql:insert-new-task id task props queue
label priority max-retries interval])
@ -284,6 +284,19 @@
:session-id session-id
:profile-id profile-id})))))
(defn create-webhook*
([params] (create-webhook* *pool* params))
([pool {:keys [team-id id uri mtype is-active]
:or {is-active true
mtype "application/json"
uri "http://example.com/webhook"}}]
(db/insert! pool :webhook
{:id (or id (uuid/next))
:team-id team-id
:uri uri
:is-active is-active
:mtype mtype})))
(defn handle-error
@ -417,6 +430,10 @@
[& params]
(apply db/query *pool* params))
(defn db-get
[& params]
(apply db/get* *pool* params))
(defn sleep
(Thread/sleep (inst-ms (dt/duration ms-or-duration))))
@ -0,0 +1,120 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; Copyright (c) KALEIDOS INC
(ns backend-tests.loggers-webhooks-test
[app.common.uuid :as uuid]
[app.db :as db]
[app.http :as http]
[app.storage :as sto]
[backend-tests.helpers :as th]
[clojure.test :as t]
[mockery.core :refer [with-mocks]]))
(t/use-fixtures :once th/state-init)
(t/use-fixtures :each th/database-reset)
(t/deftest process-event-handler-with-no-webhooks
(with-mocks [submit-mock {:target 'app.worker/submit! :return nil}]
(let [prof (th/create-profile* 1 {:is-active true})
res (th/run-task! :process-webhook-event
{:type "mutation"
:name "create-project"
:props {:team-id (:default-team-id prof)}}}})]
(t/is (= 0 (:call-count @submit-mock)))
(t/is (nil? res)))))
(t/deftest process-event-handler
(with-mocks [submit-mock {:target 'app.worker/submit! :return nil}]
(let [prof (th/create-profile* 1 {:is-active true})
whk (th/create-webhook* {:team-id (:default-team-id prof)})
res (th/run-task! :process-webhook-event
{:type "mutation"
:name "create-project"
:props {:team-id (:default-team-id prof)}}}})]
(t/is (= 1 (:call-count @submit-mock)))
(t/is (nil? res)))))
(t/deftest run-webhook-handler-1
(with-mocks [http-mock {:target 'app.http.client/req! :return {:status 200}}]
(let [prof (th/create-profile* 1 {:is-active true})
whk (th/create-webhook* {:team-id (:default-team-id prof)})
evt {:type "mutation"
:name "create-project"
:props {:team-id (:default-team-id prof)}}
res (th/run-task! :run-webhook
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})]
(t/is (= 1 (:call-count @http-mock)))
(let [rows (th/db-exec! ["select * from webhook_delivery where webhook_id=?"
(:id whk)])]
(t/is (= 1 (count rows)))
(t/is (nil? (-> rows first :error-code))))
;; Refresh webhook
(let [whk' (th/db-get :webhook {:id (:id whk)})]
(t/is (nil? (:error-code whk')))
(prn whk'))
(t/deftest run-webhook-handler-2
(with-mocks [http-mock {:target 'app.http.client/req! :return {:status 400}}]
(let [prof (th/create-profile* 1 {:is-active true})
whk (th/create-webhook* {:team-id (:default-team-id prof)})
evt {:type "mutation"
:name "create-project"
:props {:team-id (:default-team-id prof)}}
res (th/run-task! :run-webhook
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})]
(t/is (= 1 (:call-count @http-mock)))
(let [rows (th/db-query :webhook-delivery {:webhook-id (:id whk)})]
(t/is (= 1 (count rows)))
(t/is (= "unexpected-status:400" (-> rows first :error-code))))
;; Refresh webhook
(let [whk' (th/db-get :webhook {:id (:id whk)})]
(t/is (= "unexpected-status:400" (:error-code whk')))
(t/is (= 1 (:error-count whk'))))
;; RUN 2 times more
(th/run-task! :run-webhook
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})
(th/run-task! :run-webhook
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})
(let [rows (th/db-query :webhook-delivery {:webhook-id (:id whk)})]
(t/is (= 3 (count rows)))
(t/is (= "unexpected-status:400" (-> rows first :error-code))))
;; Refresh webhook
(let [whk' (th/db-get :webhook {:id (:id whk)})]
(t/is (= "unexpected-status:400" (:error-code whk')))
(t/is (= 3 (:error-count whk')))
(t/is (false? (:is-active whk'))))
@ -0,0 +1,92 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; Copyright (c) KALEIDOS INC
(ns backend-tests.rpc-audit-test
[app.common.pprint :as pp]
[app.common.uuid :as uuid]
[app.db :as db]
[app.util.time :as dt]
[backend-tests.helpers :as th]
[clojure.test :as t]))
(t/use-fixtures :once th/state-init)
(t/use-fixtures :each th/database-reset)
(defn decode-row
[{:keys [props context] :as row}]
(cond-> row
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))
(db/pgobject? context) (assoc :context (db/decode-transit-pgobject context))))
(def http-request
(get-header [_ name]
(case name
"x-forwarded-for" ""))))
(t/deftest push-events-1
(with-redefs [app.config/flags #{:audit-log}]
(let [prof (th/create-profile* 1 {:is-active true})
team-id (:default-team-id prof)
proj-id (:default-project-id prof)
params {::th/type :push-audit-events
:app.http/request http-request
:profile-id (:id prof)
:events [{:name "navigate"
:props {:project-id proj-id
:team-id team-id
:route "dashboard-files"}
:context {:engine "blink"}
:profile-id (:id prof)
:timestamp (dt/now)
:type "action"}]}
out (th/command! params)]
;; (th/print-result! out)
(t/is (nil? (:error out)))
(t/is (nil? (:result out)))
(let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"])
(mapv decode-row))]
;; (pp/pprint rows)
(t/is (= 1 (count rows)))
(t/is (= (:id prof) (:profile-id row)))
(t/is (= "navigate" (:name row)))
(t/is (= "frontend" (:source row)))))))
(t/deftest push-events-2
(with-redefs [app.config/flags #{:audit-log}]
(let [prof (th/create-profile* 1 {:is-active true})
team-id (:default-team-id prof)
proj-id (:default-project-id prof)
params {::th/type :push-audit-events
:app.http/request http-request
:profile-id (:id prof)
:events [{:name "navigate"
:props {:project-id proj-id
:team-id team-id
:route "dashboard-files"}
:context {:engine "blink"}
:profile-id uuid/zero
:timestamp (dt/now)
:type "action"}]}
out (th/command! params)]
;; (th/print-result! out)
(t/is (nil? (:error out)))
(t/is (nil? (:result out)))
(let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"])
(mapv decode-row))]
;; (pp/pprint rows)
(t/is (= 1 (count rows)))
(t/is (= (:id prof) (:profile-id row)))
(t/is (= "navigate" (:name row)))
(t/is (= "frontend" (:source row)))))))
@ -12,8 +12,6 @@
[app.storage :as sto]
[backend-tests.helpers :as th]
[clojure.test :as t]
[datoteka.fs :as fs]
[datoteka.io :as io]
[mockery.core :refer [with-mocks]]))
(t/use-fixtures :once th/state-init)
@ -52,7 +50,6 @@
(t/is (= (:mtype params) (:mtype result)))
(vreset! whook result))))
(th/reset-mock! http-mock)
(t/testing "update webhook 1 (success)"
@ -144,3 +141,41 @@
(t/is (= (:code error-data) :object-not-found)))))
(t/deftest webhooks-quotes
(with-mocks [http-mock {:target 'app.http.client/req!
:return {:status 200}}]
(let [prof (th/create-profile* 1 {:is-active true})
team-id (:default-team-id prof)
params {::th/type :create-webhook
:profile-id (:id prof)
:team-id team-id
:uri "http://example.com"
:mtype "application/json"}
out1 (th/command! params)
out2 (th/command! params)
out3 (th/command! params)
out4 (th/command! params)
out5 (th/command! params)
out6 (th/command! params)
out7 (th/command! params)
out8 (th/command! params)
out9 (th/command! params)]
(t/is (= 8 (:call-count @http-mock)))
(t/is (nil? (:error out1)))
(t/is (nil? (:error out2)))
(t/is (nil? (:error out3)))
(t/is (nil? (:error out4)))
(t/is (nil? (:error out5)))
(t/is (nil? (:error out6)))
(t/is (nil? (:error out7)))
(t/is (nil? (:error out8)))
(let [error (:error out9)
error-data (ex-data error)]
(t/is (th/ex-info? error))
(t/is (= (:type error-data) :restriction))
(t/is (= (:code error-data) :webhooks-quote-reached))))))
@ -169,9 +169,7 @@
(print-all [cause]
(print-summary cause)
(println "DETAIL:")
(when trace?
(print-trace cause))
@ -64,7 +64,7 @@
(defn initialize
[{:keys [id] :as params}]
(us/assert ::us/uuid id)
(us/assert! ::us/uuid id)
(ptk/reify ::initialize
(update [_ state]
@ -201,7 +201,7 @@
(defn search
(us/assert ::search params)
(us/assert! ::search params)
(ptk/reify ::search
(update [_ state]
@ -236,7 +236,7 @@
(defn fetch-files
[{:keys [project-id] :as params}]
(us/assert ::us/uuid project-id)
(us/assert! ::us/uuid project-id)
(ptk/reify ::fetch-files
(watch [_ _ _]
@ -347,7 +347,7 @@
(defn toggle-file-select
[{:keys [id project-id] :as file}]
(us/assert ::file file)
(us/assert! ::file file)
(ptk/reify ::toggle-file-select
(update [_ state]
@ -377,7 +377,7 @@
(defn create-team
[{:keys [name] :as params}]
(us/assert string? name)
(us/assert! ::us/string name)
(ptk/reify ::create-team
(watch [_ _ _]
@ -394,7 +394,7 @@
(defn create-team-with-invitations
[{:keys [name emails role] :as params}]
(us/assert string? name)
(us/assert! ::us/string name)
(ptk/reify ::create-team-with-invitations
(watch [_ _ _]
@ -413,7 +413,7 @@
(defn update-team
[{:keys [id name] :as params}]
(us/assert ::team params)
(us/assert! ::team params)
(ptk/reify ::update-team
(update [_ state]
@ -426,7 +426,7 @@
(defn update-team-photo
[{:keys [file] :as params}]
(us/assert ::di/file file)
(us/assert! ::di/file file)
(ptk/reify ::update-team-photo
(watch [_ state _]
@ -447,8 +447,8 @@
(defn update-team-member-role
[{:keys [role member-id] :as params}]
(us/assert ::us/uuid member-id)
(us/assert ::us/keyword role)
(us/assert! ::us/uuid member-id)
(us/assert! ::us/keyword role)
(ptk/reify ::update-team-member-role
(watch [_ state _]
@ -461,7 +461,7 @@
(defn delete-team-member
[{:keys [member-id] :as params}]
(us/assert ::us/uuid member-id)
(us/assert! ::us/uuid member-id)
(ptk/reify ::delete-team-member
(watch [_ state _]
@ -474,7 +474,9 @@
(defn leave-team
[{:keys [reassign-to] :as params}]
(us/assert (s/nilable ::us/uuid) reassign-to)
:spec (s/nilable ::us/uuid)
:val reassign-to)
(ptk/reify ::leave-team
(watch [_ state _]
@ -510,9 +512,9 @@
(defn update-team-invitation-role
[{:keys [email team-id role] :as params}]
(us/assert ::us/email email)
(us/assert ::us/uuid team-id)
(us/assert ::us/keyword role)
(us/assert! ::us/email email)
(us/assert! ::us/uuid team-id)
(us/assert! ::us/keyword role)
(ptk/reify ::update-team-invitation-role
(-deref [_] {:role role})
@ -528,8 +530,8 @@
(defn delete-team-invitation
[{:keys [email team-id] :as params}]
(us/assert ::us/email email)
(us/assert ::us/uuid team-id)
(us/assert! ::us/email email)
(us/assert! ::us/uuid team-id)
(ptk/reify ::delete-team-invitation
(watch [_ _ _]
@ -542,7 +544,7 @@
(defn delete-team-webhook
[{:keys [id] :as params}]
(us/assert ::us/uuid id)
(us/assert! ::us/uuid id)
(ptk/reify ::delete-team-webhook
(watch [_ state _]
@ -562,10 +564,10 @@
(defn update-team-webhook
[{:keys [id uri mtype is-active] :as params}]
(us/assert ::us/uuid id)
(us/assert ::us/uri uri)
(us/assert ::mtype mtype)
(us/assert ::us/boolean is-active)
(us/assert! ::us/uuid id)
(us/assert! ::us/uri uri)
(us/assert! ::mtype mtype)
(us/assert! ::us/boolean is-active)
(ptk/reify ::update-team-webhook
(watch [_ state _]
@ -580,9 +582,9 @@
(defn create-team-webhook
[{:keys [uri mtype is-active] :as params}]
(us/assert ::us/uri uri)
(us/assert ::mtype mtype)
(us/assert ::us/boolean is-active)
(us/assert! ::us/uri uri)
(us/assert! ::mtype mtype)
(us/assert! ::us/boolean is-active)
(ptk/reify ::create-team-webhook
(watch [_ state _]
@ -599,7 +601,7 @@
(defn delete-team
[{:keys [id] :as params}]
(us/assert ::team params)
(us/assert! ::team params)
(ptk/reify ::delete-team
(watch [_ _ _]
@ -652,7 +654,7 @@
(defn duplicate-project
[{:keys [id name] :as params}]
(us/assert ::us/uuid id)
(us/assert! ::us/uuid id)
(ptk/reify ::duplicate-project
(watch [_ _ _]
@ -669,8 +671,8 @@
(defn move-project
[{:keys [id team-id] :as params}]
(us/assert ::us/uuid id)
(us/assert ::us/uuid team-id)
(us/assert! ::us/uuid id)
(us/assert! ::us/uuid team-id)
(ptk/reify ::move-project
(-deref [_]
@ -688,7 +690,7 @@
(defn toggle-project-pin
[{:keys [id is-pinned] :as project}]
(us/assert ::project project)
(us/assert! ::project project)
(ptk/reify ::toggle-project-pin
(update [_ state]
@ -705,7 +707,7 @@
(defn rename-project
[{:keys [id name] :as params}]
(us/assert ::project params)
(us/assert! ::project params)
(ptk/reify ::rename-project
(update [_ state]
@ -723,7 +725,7 @@
(defn delete-project
[{:keys [id] :as params}]
(us/assert ::project params)
(us/assert! ::project params)
(ptk/reify ::delete-project
(update [_ state]
@ -745,7 +747,7 @@
(defn delete-file
[{:keys [id project-id] :as params}]
(us/assert ::file params)
(us/assert! ::file params)
(ptk/reify ::delete-file
(update [_ state]
@ -764,7 +766,7 @@
(defn rename-file
[{:keys [id name] :as params}]
(us/assert ::file params)
(us/assert! ::file params)
(ptk/reify ::rename-file
(-deref [_]
@ -787,7 +789,7 @@
(defn set-file-shared
[{:keys [id is-shared] :as params}]
(us/assert ::file params)
(us/assert! ::file params)
(ptk/reify ::set-file-shared
(-deref [_]
@ -828,7 +830,7 @@
(defn create-file
[{:keys [project-id] :as params}]
(us/assert ::us/uuid project-id)
(us/assert! ::us/uuid project-id)
(ptk/reify ::create-file
@ -857,8 +859,8 @@
(defn duplicate-file
[{:keys [id name] :as params}]
(us/assert ::us/uuid id)
(us/assert ::name name)
(us/assert! ::us/uuid id)
(us/assert! ::name name)
(ptk/reify ::duplicate-file
(watch [_ _ _]
@ -877,8 +879,8 @@
(defn move-files
[{:keys [ids project-id] :as params}]
(us/assert ::us/set-of-uuid ids)
(us/assert ::us/uuid project-id)
(us/assert! ::us/set-of-uuid ids)
(us/assert! ::us/uuid project-id)
(ptk/reify ::move-files
(-deref [_]
@ -898,7 +900,7 @@
;; --- EVENT: clone-template
(defn clone-template
[{:keys [template-id project-id] :as params}]
(us/assert ::us/uuid project-id)
(us/assert! ::us/uuid project-id)
(ptk/reify ::clone-template
(-deref [_]
@ -920,7 +922,7 @@
(defn go-to-workspace
[{:keys [id project-id] :as file}]
(us/assert ::file file)
(us/assert! ::file file)
(ptk/reify ::go-to-workspace
(watch [_ _ _]
@ -38,7 +38,7 @@
(defn- collect-context
(let [uagent (UAParser.)]
{:app-version (:full @cf/version)
:locale @i18n/locale}
(let [browser (.getBrowser uagent)]
@ -215,12 +215,17 @@
(defn- persist-events
(if (seq events)
(let [uri (u/join @cf/public-uri "api/audit/events")
(let [uri (u/join @cf/public-uri "api/rpc/command/push-audit-events")
params {:uri uri
:method :post
:credentials "include"
:body (http/transit-data {:events events})}]
(->> (http/send! params)
(rx/mapcat rp/handle-response)))
(rx/mapcat rp/handle-response)
(rx/catch (fn [_]
(l/error :hint "unexpected error on persisting audit events")
(rx/of nil)))))
(rx/of nil)))
(defn initialize
@ -274,7 +279,7 @@
(rx/map (fn [event]
(let [session* (or @session (dt/now))
context (-> @context
(d/merge (:context event))
(merge (:context event))
(assoc :session session*))]
(reset! session session*)
(-> event
@ -60,6 +60,7 @@
(->> (http/send! {:method :get
:uri (u/join @cf/public-uri "api/rpc/query/" (name id))
:headers {"accept" "application/transit+json"}
:credentials "include"
:query params})
(rx/map decode-transit)
@ -71,6 +72,7 @@
[id params]
(->> (http/send! {:method :post
:uri (u/join @cf/public-uri "api/rpc/mutation/" (name id))
:headers {"accept" "application/transit+json"}
:credentials "include"
:body (http/transit-data params)})
(rx/map http/conditional-decode-transit)
@ -88,6 +90,7 @@
(->> (http/send! {:method method
:uri (u/join @cf/public-uri "api/rpc/command/" (name id))
:credentials "include"
:headers {"accept" "application/transit+json"}
:body (when (= method :post)
(if form-data?
(http/form-data params)
@ -587,53 +587,78 @@
(s/def ::webhook-form
(s/keys :req-un [::uri ::mtype]))
(mf/defc webhook-modal {::mf/register modal/components
::mf/register-as :webhook}
(def valid-webhook-mtypes
[{:label "application/json" :value "application/json"}
{:label "application/x-www-form-urlencoded" :value "application/x-www-form-urlencoded"}
{:label "application/transit+json" :value "application/transit+json"}])
(defn- extract-status
(-> error-code (str/split #":") second))
(mf/defc webhook-modal
{::mf/register modal/components
::mf/register-as :webhook}
[{:keys [webhook] :as props}]
(let [initial (mf/use-memo (fn [] (or webhook {:is-active false :mtype "application/json"})))
form (fm/use-form :spec ::webhook-form
:initial initial)
mtypes [{:label "application/json" :value "application/json"}
{:label "application/x-www-form-urlencoded" :value "application/x-www-form-urlencoded"}
{:label "application/transit+json" :value "application/transit+json"}]
(fn [message]
(st/emit! (dd/fetch-team-webhooks)
(msg/success message)
(fn [_]
(let [message (tr "dashboard.webhooks.create.success")]
(st/emit! (dd/fetch-team-webhooks)
(msg/success message)
(fn [message {:keys [type code hint] :as error}]
(let [message (if (and (= type :validation) (= code :webhook-validation))
(str message " "
(case hint
"ssl-validation" (tr "errors.webhooks.ssl-validation")
"")) ;; TODO Add more error codes when back defines them
(rx/of (msg/error message))))
(fn [form {:keys [type code hint] :as error}]
(if (and (= type :validation)
(= code :webhook-validation))
(let [message (cond
(= hint "unknown")
(tr "errors.webhooks.unexpected")
(= hint "ssl-validation-error")
(tr "errors.webhooks.ssl-validation")
(= hint "timeout")
(tr "errors.webhooks.timeout")
(= hint "connection-error")
(tr "errors.webhooks.connection")
(str/starts-with? hint "unexpected-status")
(tr "errors.webhooks.unexpected-status" (extract-status hint)))]
(swap! form assoc-in [:errors :uri] {:message message}))
(rx/throw error))))
(fn []
(let [mdata {:on-success #(on-success (tr "dashboard.webhooks.create.success"))
:on-error (partial on-error (tr "dashboard.webhooks.create.error"))}
webhook {:uri (get-in @form [:clean-data :uri])
:mtype (get-in @form [:clean-data :mtype])
:is-active (get-in @form [:clean-data :is-active])}]
(st/emit! (dd/create-team-webhook (with-meta webhook mdata)))))
(fn [form]
(let [cdata (:clean-data @form)
mdata {:on-success (partial on-success form)
:on-error (partial on-error form)}
params {:uri (:uri cdata)
:mtype (:mtype cdata)
:is-active (:is-active cdata)}]
(st/emit! (dd/create-team-webhook
(with-meta params mdata))))))
(fn []
(let [mdata {:on-success #(on-success (tr "dashboard.webhooks.update.success"))
:on-error (partial on-error (tr "dashboard.webhooks.update.error"))}
webhook (get @form :clean-data)]
(st/emit! (dd/update-team-webhook (with-meta webhook mdata)))))
(fn [form]
(let [params (:clean-data @form)
mdata {:on-success (partial on-success form)
:on-error (partial on-error form)}]
(st/emit! (dd/update-team-webhook
(with-meta params mdata))))))
#(let [data (:clean-data @form)]
(if (:id data)
(fn [form]
(prn @form)
(let [data (:clean-data @form)]
(if (:id data)
(on-update-submit form)
(on-create-submit form)))))]
@ -659,7 +684,7 @@
:placeholder (tr "modals.create-webhook.url.placeholder")}]]
[:& fm/select {:options mtypes
[:& fm/select {:options valid-webhook-mtypes
:label (tr "dashboard.webhooks.content-type")
:default "application/json"
:name :mtype}]]]
@ -704,79 +729,75 @@
{:on-click #(st/emit! (modal/show :webhook {}))}
[:span (tr "dashboard.webhooks.create")]]]])
(mf/defc webhook-actions
[{:keys [on-edit on-delete] :as props}]
(let [show? (mf/use-state false)]
[:span.icon {:on-click #(reset! show? true)} [i/actions]]
[:& dropdown {:show @show?
:on-close #(reset! show? false)}
[:li {:on-click on-edit} (tr "labels.edit")]
[:li {:on-click on-delete} (tr "labels.delete")]]]]))
(mf/defc webhook-actions
[{:keys [on-edit on-delete] :as props}]
(let [show? (mf/use-state false)]
[:span.icon {:on-click #(reset! show? true)} [i/actions]]
[:& dropdown {:show @show?
:on-close #(reset! show? false)}
[:li {:on-click on-edit} (tr "labels.edit")]
[:li {:on-click on-delete} (tr "labels.delete")]]]]))
(mf/defc last-delivery-icon
[{:keys [success? text] :as props}]
[:div.label text]
(if success?
[:span.icon.success i/msg-success]
[:span.icon.failure i/msg-warning])])
(mf/defc last-delivery-icon
[{:keys [success? text] :as props}]
[:div.label text]
(if success?
[:span.icon.success i/msg-success]
[:span.icon.failure i/msg-warning])])
(mf/defc webhook-item
{::mf/wrap [mf/memo]}
[{:keys [webhook] :as props}]
(let [on-edit #(st/emit! (modal/show :webhook {:webhook webhook}))
error-code (:error-code webhook)
(mf/defc webhook-item
{::mf/wrap [mf/memo]}
[{:keys [webhook] :as props}]
(let [on-edit #(st/emit! (modal/show :webhook {:webhook webhook}))
error-code (:error-code webhook)
(fn [error-code]
(let [status (-> error-code
(str/split "-")
(if (nil? status)
(fn []
(let [params {:id (:id webhook)}
mdata {:on-success #(st/emit! (dd/fetch-team-webhooks))}]
(st/emit! (dd/delete-team-webhook (with-meta params mdata)))))
on-delete #(st/emit! (modal/show
{:type :confirm
:title (tr "modals.delete-webhook.title")
:message (tr "modals.delete-webhook.message")
:accept-label (tr "modals.delete-webhook.accept")
:on-accept delete-fn}))
last-delivery-text (cond
(nil? error-code)
(tr "webhooks.last-delivery.success")
(fn []
(let [params {:id (:id webhook)}
mdata {:on-success #(st/emit! (dd/fetch-team-webhooks))}]
(st/emit! (dd/delete-team-webhook (with-meta params mdata)))))
(= error-code "ssl-validation")
(str (tr "errors.webhooks.last-delivery") " " (tr "errors.webhooks.ssl-validation"))
(fn []
(st/emit! (modal/show
{:type :confirm
:title (tr "modals.delete-webhook.title")
:message (tr "modals.delete-webhook.message")
:accept-label (tr "modals.delete-webhook.accept")
:on-accept delete-fn})))
(str/starts-with? error-code "unexpected-status")
(str (tr "errors.webhooks.last-delivery")
" "
(tr "errors.webhooks.unexpected-status" (extract-status error-code)))
(if (nil? error-code)
(tr "webhooks.last-delivery.success")
(str (tr "errors.webhooks.last-delivery")
(= error-code "ssl-validation-error")
(dm/str " " (tr "errors.webhooks.ssl-validation"))
(tr "errors.webhooks.last-delivery"))]
[:& last-delivery-icon {:success? (nil? error-code) :text last-delivery-text}]]]
[:div (:uri webhook)]]
[:div (if (:is-active webhook) (tr "labels.active") (tr "labels.inactive"))]]
[:& webhook-actions {:on-edit on-edit
:on-delete on-delete}]]]))
(str/starts-with? error-code "unexpected-status")
(dm/str " " (tr "errors.webhooks.unexpected-status" (extract-status error-code))))))]
[:& last-delivery-icon
{:success? (nil? error-code)
:text last-delivery-text}]]]
[:div (:uri webhook)]]
[:div (if (:is-active webhook)
(tr "labels.active")
(tr "labels.inactive"))]]
[:& webhook-actions
{:on-edit on-edit
:on-delete on-delete}]]]))
(mf/defc webhooks-list
[{:keys [webhooks] :as props}]
@ -690,6 +690,21 @@ msgstr "Is active"
msgid "dashboard.webhooks.active.explain"
msgstr "When this hook is triggered event details will be delivered"
msgid "dashboard.webhooks.update.success"
msgstr "Webhook updated successfully."
msgid "dashboard.webhooks.create.success"
msgstr "Webhook created successfully."
msgid "errors.webhooks.unexpected"
msgstr "Unexpected error on validating"
msgid "errors.webhooks.timeout"
msgstr "Timeout"
msgid "errors.webhooks.connection"
msgstr "Connection error, url not reacheable"
msgid "webhooks.last-delivery.success"
msgstr "Last delivery was successfull."
@ -702,18 +717,6 @@ msgstr "Error on SSL validation."
msgid "errors.webhooks.unexpected-status"
msgstr "Unexpected status %s"
msgid "dashboard.webhooks.update.error"
msgstr "Error on updating webhook."
msgid "dashboard.webhooks.update.success"
msgstr "Webhook updated successfully."
msgid "dashboard.webhooks.create.error"
msgstr "Error on creating webhook."
msgid "dashboard.webhooks.create.success"
msgstr "Webhook created successfully."
#: src/app/main/ui/alert.cljs
msgid "ds.alert-ok"
msgstr "Ok"
@ -737,6 +737,21 @@ msgstr "Cuando se active este webhook se enviarán detalles del evento"
msgid "webhooks.last-delivery.success"
msgstr "El último envío fue correcto."
msgid "dashboard.webhooks.update.success"
msgstr "Webhook modificado con éxito"
msgid "dashboard.webhooks.create.success"
msgstr "Webhook creado con éxito"
msgid "errors.webhooks.timeout"
msgstr "Timeout"
msgid "errors.webhooks.unexpected"
msgstr "Error inesperado al validar"
msgid "errors.webhooks.connection"
msgstr "Error de conexion, la url no es alcanzable"
msgid "errors.webhooks.last-delivery"
msgstr "Hubo un problema en el último envío."
@ -746,18 +761,6 @@ msgstr "Error en la validación SSL."
msgid "errors.webhooks.unexpected-status"
msgstr "Estado inesperado %s"
msgid "dashboard.webhooks.update.error"
msgstr "Error modificando el webhook"
msgid "dashboard.webhooks.update.success"
msgstr "Webhook modificado con éxito"
msgid "dashboard.webhooks.create.error"
msgstr "Error creando con éxito"
msgid "dashboard.webhooks.create.success"
msgstr "Webhook creado con éxito"
#: src/app/main/ui/alert.cljs
msgid "ds.alert-ok"
msgstr "Ok"
Add table
Reference in a new issue