0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-13 16:21:57 -05:00

Merge pull request #809 from penpot/improve-trazability

Improve trazability
This commit is contained in:
Andrey Antukh 2021-04-09 15:28:55 +02:00 committed by GitHub
commit fa2d0f5ed7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 397 additions and 263 deletions

View file

@ -6,7 +6,7 @@
</Console>
<RollingFile name="main" fileName="logs/main.log" filePattern="logs/main-%i.log">
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] [%t] %level{length=1} %logger{36} - %msg%n"/>
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] %level{length=1} %logger{36} - %msg%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="50M"/>
</Policies>
@ -32,6 +32,10 @@
<AppenderRef ref="main" level="trace" />
</Logger>
<Logger name="penpot" level="fatal" additivity="false">
<AppenderRef ref="main" level="fatal" />
</Logger>
<Logger name="user" level="trace" additivity="false">
<AppenderRef ref="main" level="trace" />
</Logger>

View file

@ -14,6 +14,10 @@
<AppenderRef ref="console" />
</Logger>
<Logger name="penpot" level="fatal" additivity="false">
<AppenderRef ref="console" />
</Logger>
<Root level="info">
<AppenderRef ref="console" />
</Root>

View file

@ -2,7 +2,7 @@
export PENPOT_ASSERTS_ENABLED=true
export OPTIONS="-A:jmx-remote:dev -J-Dclojure.tools.logging.factory=clojure.tools.logging.impl/log4j2-factory -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -J-Xms512m -J-Xmx512m -J-Dlog4j2.configurationFile=log4j2-devenv.xml"
export OPTIONS="-A:jmx-remote:dev -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -J-Xms512m -J-Xmx512m -J-Dlog4j2.configurationFile=log4j2-devenv.xml"
export OPTIONS_EVAL="nil"
# export OPTIONS_EVAL="(set! *warn-on-reflection* true)"

View file

@ -16,8 +16,8 @@
[app.main :as main]
[app.rpc.mutations.profile :as profile]
[app.util.blob :as blob]
[app.util.logging :as l]
[buddy.hashers :as hashers]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(defn- mk-uuid
@ -74,7 +74,9 @@
(let [rng (java.util.Random. 1)]
(letfn [(create-profile [conn index]
(let [id (mk-uuid "profile" index)
_ (log/info "create profile" index id)
_ (l/info :action "create profile"
:index index
:id id)
prof (register-profile conn
{:id id
@ -90,20 +92,22 @@
prof))
(create-profiles [conn]
(log/info "create profiles")
(l/info :action "create profiles")
(collect (partial create-profile conn)
(range (:num-profiles opts))))
(create-team [conn index]
(let [id (mk-uuid "team" index)
name (str "Team" index)]
(log/info "create team" index id)
(l/info :action "create team"
:index index
:id id)
(db/insert! conn :team {:id id
:name name})
id))
(create-teams [conn]
(log/info "create teams")
(l/info :action "create teams")
(collect (partial create-team conn)
(range (:num-teams opts))))
@ -111,7 +115,9 @@
(let [id (mk-uuid "file" project-id index)
name (str "file" index)
data (cp/make-file-data id)]
(log/info "create file" index id)
(l/info :action "create file"
:index index
:id id)
(db/insert! conn :file
{:id id
:data (blob/encode data)
@ -126,7 +132,7 @@
id))
(create-files [conn owner-id project-id]
(log/info "create files")
(l/info :action "create files")
(run! (partial create-file conn owner-id project-id)
(range (:num-files-per-project opts))))
@ -138,7 +144,9 @@
(str "project " index)
"Drafts")
is-default (nil? index)]
(log/info "create project" index id)
(l/info :action "create project"
:index index
:id id)
(db/insert! conn :project
{:id id
:team-id team-id
@ -153,7 +161,7 @@
id))
(create-projects [conn team-id profile-ids]
(log/info "create projects")
(l/info :action "create projects")
(let [owner-id (rng-nth rng profile-ids)
project-ids (conj
(collect (partial create-project conn team-id owner-id)
@ -170,14 +178,16 @@
:can-edit true}))
(setup-team [conn team-id profile-ids]
(log/info "setup team" team-id profile-ids)
(l/info :action "setup team"
:team-id team-id
:profile-ids (pr-str profile-ids))
(assign-profile-to-team conn team-id true (first profile-ids))
(run! (partial assign-profile-to-team conn team-id false)
(rest profile-ids))
(create-projects conn team-id profile-ids))
(assign-teams-and-profiles [conn teams profiles]
(log/info "assign teams and profiles")
(l/info :action "assign teams and profiles")
(loop [team-id (first teams)
teams (rest teams)]
(when-not (nil? team-id)
@ -194,7 +204,9 @@
project-id (:default-project-id owner)
data (cp/make-file-data id)]
(log/info "create draft file" index id)
(l/info :action "create draft file"
:index index
:id id)
(db/insert! conn :file
{:id id
:data (blob/encode data)
@ -244,6 +256,6 @@
(try
(run-in-system system preset)
(catch Exception e
(log/errorf e "unhandled exception"))
(l/error :hint "unhandled exception" :cause e))
(finally
(ig/halt! system)))))

View file

@ -14,9 +14,9 @@
[app.main :as main]
[app.rpc.mutations.profile :as profile]
[app.rpc.queries.profile :refer [retrieve-profile-data-by-email]]
[app.util.logging :as l]
[clojure.string :as str]
[clojure.tools.cli :refer [parse-opts]]
[clojure.tools.logging :as log]
[integrant.core :as ig])
(:import
java.io.Console))
@ -34,7 +34,7 @@
[{:keys [label type] :or {type :text}}]
(let [^Console console (System/console)]
(when-not console
(log/error "no console found, can proceed")
(l/error :hint "no console found, can proceed")
(System/exit 1))
(binding [*out* (.writer console)]

View file

@ -14,7 +14,7 @@
[app.db :as db]
[app.main :as main]
[app.storage :as sto]
[clojure.tools.logging :as log]
[app.util.logging :as l]
[cuerdas.core :as str]
[datoteka.core :as fs]
[integrant.core :as ig]))
@ -49,7 +49,7 @@
(run-in-system)
(ig/halt!))
(catch Exception e
(log/errorf e "Unhandled exception.")))))
(l/error :hint "unhandled exception" :cause e)))))
;; --- IMPL

View file

@ -5,7 +5,7 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 UXBOX Labs SL
;; Copyright (c) UXBOX Labs SL
(ns app.db
(:require
@ -16,12 +16,12 @@
[app.db.sql :as sql]
[app.metrics :as mtx]
[app.util.json :as json]
[app.util.logging :as l]
[app.util.migrations :as mg]
[app.util.time :as dt]
[app.util.transit :as t]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[next.jdbc :as jdbc]
[next.jdbc.date-time :as jdbc-dt])
@ -60,7 +60,9 @@
(defmethod ig/init-key ::pool
[_ {:keys [migrations metrics] :as cfg}]
(log/infof "initialize connection pool '%s' with uri '%s'" (name (:name cfg)) (:uri cfg))
(l/info :action "initialize connection pool"
:name (d/name (:name cfg))
:uri (:uri cfg))
(instrument-jdbc! (:registry metrics))
(let [pool (create-pool cfg)]
(when (seq migrations)

View file

@ -15,13 +15,13 @@
[app.db :as db]
[app.db.sql :as sql]
[app.util.emails :as emails]
[app.util.logging :as l]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
;; --- PUBLIC API
(defn render
[email-factory context]
(email-factory context))
@ -181,5 +181,5 @@
(println "******** start email" (:id email) "**********")
(println (.toString baos))
(println "******** end email "(:id email) "**********"))]
(log/info out))))
(l/info :email out))))

View file

@ -15,9 +15,8 @@
[app.http.errors :as errors]
[app.http.middleware :as middleware]
[app.metrics :as mtx]
[app.util.log4j :refer [update-thread-context!]]
[app.util.logging :as l]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[reitit.ring :as rr]
[ring.adapter.jetty9 :as jetty])
@ -44,7 +43,7 @@
(defmethod ig/init-key ::server
[_ {:keys [handler router ws port name metrics] :as opts}]
(log/infof "starting '%s' server on port %s." name port)
(l/info :msg "starting http server" :port port :name name)
(let [pre-start (fn [^Server server]
(let [handler (doto (ErrorHandler.)
(.setShowStacks true)
@ -77,7 +76,9 @@
(defmethod ig/halt-key! ::server
[_ {:keys [server name port] :as opts}]
(log/infof "stoping '%s' server on port %s." name port)
(l/info :msg "stoping http server"
:name name
:port port)
(jetty/stop-server server))
(defn- router-handler
@ -93,11 +94,16 @@
(catch Throwable e
(try
(let [cdata (errors/get-error-context request e)]
(update-thread-context! cdata)
(log/errorf e "unhandled exception: %s (id: %s)" (ex-message e) (str (:id cdata)))
{:status 500 :body "internal server error"})
(l/update-thread-context! cdata)
(l/error :hint "unhandled exception"
:message (ex-message e)
:error-id (str (:id cdata))
:cause e))
{:status 500 :body "internal server error"}
(catch Throwable e
(log/errorf e "unhandled exception: %s" (ex-message e))
(l/error :hint "unhandled exception"
:message (ex-message e)
:cause e)
{:status 500 :body "internal server error"})))))))
@ -120,7 +126,10 @@
(rr/router
[["/metrics" {:get (:handler metrics)}]
["/assets" {:middleware [[middleware/format-response-body]
[middleware/errors errors/handle]]}
[middleware/errors errors/handle]
[middleware/cookies]
(:middleware session)
middleware/activity-logger]}
["/by-id/:id" {:get (:objects-handler assets)}]
["/by-file-media-id/:id" {:get (:file-objects-handler assets)}]
["/by-file-media-id/:id/thumbnail" {:get (:file-thumbnails-handler assets)}]]
@ -154,6 +163,8 @@
["/github" {:post (get-in oauth [:github :handler])}]
["/github/callback" {:get (get-in oauth [:github :callback-handler])}]]
["/rpc" {:middleware [(:middleware session)]}
["/rpc" {:middleware [(:middleware session)
middleware/activity-logger]}
["/query/:type" {:get (:query-handler rpc)}]
["/mutation/:type" {:post (:mutation-handler rpc)}]]]]))

View file

@ -14,9 +14,8 @@
[app.db :as db]
[app.db.sql :as sql]
[app.util.http :as http]
[clojure.pprint :refer [pprint]]
[app.util.logging :as l]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig]
[jsonista.core :as j]))
@ -25,11 +24,6 @@
(declare parse-notification)
(declare process-report)
(defn- pprint-report
[message]
(binding [clojure.pprint/*print-right-margin* 120]
(with-out-str (pprint message))))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool]))
@ -42,19 +36,17 @@
(= mtype "SubscriptionConfirmation")
(let [surl (get body "SubscribeURL")
stopic (get body "TopicArn")]
(log/infof "subscription received (topic=%s, url=%s)" stopic surl)
(l/info :action "subscription received" :topic stopic :url surl)
(http/send! {:uri surl :method :post :timeout 10000}))
(= mtype "Notification")
(when-let [message (parse-json (get body "Message"))]
;; (log/infof "Received: %s" (pr-str message))
(let [notification (parse-notification cfg message)]
(process-report cfg notification)))
:else
(log/warn (str "unexpected data received\n"
(pprint-report body))))
(l/warn :hint "unexpected data received"
:report (pr-str body)))
{:status 200 :body ""})))
(defn- parse-bounce
@ -184,15 +176,15 @@
(defn- process-report
[cfg {:keys [type profile-id] :as report}]
(log/trace (str "procesing report:\n" (pprint-report report)))
(l/trace :action "procesing report" :report (pr-str report))
(cond
;; In this case we receive a bounce/complaint notification without
;; confirmed identity, we just emit a warning but do nothing about
;; it because this is not a normal case. All notifications should
;; come with profile identity.
(nil? profile-id)
(log/warn (str "a notification without identity recevied from AWS\n"
(pprint-report report)))
(l/warn :msg "a notification without identity recevied from AWS"
:report (pr-str report))
(= "bounce" type)
(register-bounce-for-profile cfg report)
@ -201,7 +193,7 @@
(register-complaint-for-profile cfg report)
:else
(log/warn (str "unrecognized report received from AWS\n"
(pprint-report report)))))
(l/warn :msg "unrecognized report received from AWS"
:report (pr-str report))))

View file

@ -12,8 +12,7 @@
(:require
[app.common.exceptions :as ex]
[app.common.uuid :as uuid]
[app.util.log4j :refer [update-thread-context!]]
[clojure.tools.logging :as log]
[app.util.logging :as l]
[cuerdas.core :as str]
[expound.alpha :as expound]))
@ -73,8 +72,11 @@
[error request]
(let [edata (ex-data error)
cdata (get-error-context request error)]
(update-thread-context! cdata)
(log/errorf error "internal error: assertion (id: %s)" (str (:id cdata)))
(l/update-thread-context! cdata)
(l/error :hint "internal error: assertion"
:error-id (str (:id cdata))
:cause error)
{:status 500
:body {:type :server-error
:data (-> edata
@ -97,10 +99,11 @@
(ex/exception? (:handling edata)))
(handle-exception (:handling edata) request)
(let [cdata (get-error-context request error)]
(update-thread-context! cdata)
(log/errorf error "internal error: %s (id: %s)"
(ex-message error)
(str (:id cdata)))
(l/update-thread-context! cdata)
(l/error :hint "internal error"
:error-message (ex-message error)
:error-id (str (:id cdata))
:cause error)
{:status 500
:body {:type :server-error
:hint (ex-message error)
@ -111,11 +114,11 @@
(let [cdata (get-error-context request error)
state (.getSQLState ^java.sql.SQLException error)]
(update-thread-context! cdata)
(log/errorf error "PSQL Exception: %s (id: %s, state: %s)"
(ex-message error)
(str (:id cdata))
state)
(l/update-thread-context! cdata)
(l/error :hint "psql exception"
:error-message (ex-message error)
:error-id (str (:id cdata))
:sql-state state)
(cond
(= state "57014")

View file

@ -11,6 +11,7 @@
(:require
[app.metrics :as mtx]
[app.util.json :as json]
[app.util.logging :as l]
[app.util.transit :as t]
[buddy.core.codecs :as bc]
[buddy.core.hash :as bh]
@ -165,3 +166,18 @@
(def etag
{:name ::etag
:compile (constantly wrap-etag)})
(defn activity-logger
[handler]
(let [logger "penpot.profile-activity"]
(fn [{:keys [headers] :as request}]
(let [ip-addr (get headers "x-forwarded-for")
profile-id (:profile-id request)
qstring (:query-string request)]
(l/info ::l/async true
::l/logger logger
:ip-addr ip-addr
:profile-id profile-id
:uri (str (:uri request) (when qstring (str "?" qstring)))
:method (name (:request-method request)))
(handler request)))))

View file

@ -13,10 +13,10 @@
[app.common.spec :as us]
[app.http.oauth.google :as gg]
[app.util.http :as http]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.data.json :as json]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[lambdaisland.uri :as u]))
@ -63,7 +63,8 @@
(get "access_token"))))
(catch Exception e
(log/error e "unexpected error on get-access-token")
(l/error :hint "unexpected error on get-access-token"
:cause e)
nil)))
(defn- get-user-info
@ -80,7 +81,8 @@
:backend "github"
:fullname (get data "name")})))
(catch Exception e
(log/error e "unexpected exception on get-user-info")
(l/error :hint "unexpected exception on get-user-info"
:cause e)
nil)))
(defn- retrieve-info

View file

@ -14,10 +14,10 @@
[app.common.spec :as us]
[app.http.oauth.google :as gg]
[app.util.http :as http]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.data.json :as json]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[lambdaisland.uri :as u]))
@ -62,7 +62,8 @@
(get "access_token"))))
(catch Exception e
(log/error e "unexpected error on get-access-token")
(l/error :hint "unexpected error on get-access-token"
:cause e)
nil)))
(defn- get-user-info
@ -81,7 +82,8 @@
:fullname (get data "name")})))
(catch Exception e
(log/error e "unexpected exception on get-user-info")
(l/error :hint "unexpected exception on get-user-info"
:cause e)
nil)))

View file

@ -12,10 +12,10 @@
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.util.http :as http]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.data.json :as json]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[lambdaisland.uri :as u]))
@ -51,7 +51,8 @@
(-> (json/read-str (:body res))
(get "access_token"))))
(catch Exception e
(log/error e "unexpected error on get-access-token")
(l/error :hint "unexpected error on get-access-token"
:cause e)
nil)))
(defn- get-user-info
@ -69,7 +70,8 @@
:backend "google"
:fullname (get data "name")})))
(catch Exception e
(log/error e "unexpected exception on get-user-info")
(l/error :hint "unexpected exception on get-user-info"
:cause e)
nil)))
(defn- retrieve-info

View file

@ -15,14 +15,13 @@
[app.db :as db]
[app.metrics :as mtx]
[app.util.async :as aa]
[app.util.log4j :refer [update-thread-context!]]
[app.util.logging :as l]
[app.util.time :as dt]
[app.worker :as wrk]
[buddy.core.codecs :as bc]
[buddy.core.nonce :as bn]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
;; --- IMPL
@ -66,9 +65,9 @@
[cfg handler]
(fn [request]
(if-let [{:keys [id profile-id] :as session} (retrieve-from-request cfg request)]
(let [ech (::events-ch cfg)]
(a/>!! ech id)
(update-thread-context! {:profile-id profile-id})
(let [events-ch (::events-ch cfg)]
(a/>!! events-ch id)
(l/update-thread-context! {:profile-id profile-id})
(handler (assoc request :profile-id profile-id)))
(handler request))))
@ -110,6 +109,7 @@
[_ data]
(a/close! (::events-ch data)))
;; --- STATE INIT: SESSION UPDATER
(declare batch-events)
@ -132,9 +132,9 @@
(defmethod ig/init-key ::updater
[_ {:keys [session metrics] :as cfg}]
(log/infof "initialize session updater (max-batch-age=%s, max-batch-size=%s)"
(str (:max-batch-age cfg))
(str (:max-batch-size cfg)))
(l/info :action "initialize session updater"
:max-batch-age (str (:max-batch-age cfg))
:max-batch-size (str (:max-batch-size cfg)))
(let [input (batch-events cfg (::events-ch session))
mcnt (mtx/create
{:name "http_session_update_total"
@ -146,8 +146,13 @@
(let [result (a/<! (update-sessions cfg batch))]
(mcnt :inc)
(if (ex/exception? result)
(log/error result "updater: unexpected error on update sessions")
(log/debugf "updater: updated %s sessions (reason: %s)." result (name reason)))
(l/error :task "updater"
:hint "unexpected error on update sessions"
:cause result)
(l/debug :task "updater"
:action "update sessions"
:reason (name reason)
:count result))
(recur))))))
(defn- timeout-chan
@ -209,7 +214,9 @@
(let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-expired interval])
result (:next.jdbc/update-count result)]
(log/debugf "gc-task: removed %s rows from http-session table" result)
(l/debug :task "gc"
:action "clean http sessions"
:count result)
result))))
(def ^:private

View file

@ -15,10 +15,10 @@
[app.util.async :as aa]
[app.util.http :as http]
[app.util.json :as json]
[app.util.logging :as l]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare handle-event)
@ -33,13 +33,13 @@
(defmethod ig/init-key ::reporter
[_ {:keys [receiver uri] :as cfg}]
(when uri
(log/info "intializing loki reporter")
(l/info :msg "intializing loki reporter" :uri uri)
(let [output (a/chan (a/sliding-buffer 1024))]
(receiver :sub output)
(a/go-loop []
(let [msg (a/<! output)]
(if (nil? msg)
(log/info "stoping error reporting loop")
(l/info :msg "stoping error reporting loop")
(do
(a/<! (handle-event cfg msg))
(recur)))))
@ -75,10 +75,14 @@
(if (= (:status response) 204)
true
(do
(log/errorf "error on sending log to loki (try %s)\n%s" i (pr-str response))
(l/error :hint "error on sending log to loki"
:try i
:rsp (pr-str response))
false)))
(catch Exception e
(log/errorf e "error on sending message to loki (try %s)" i)
(l/error :hint "error on sending message to loki"
:cause e
:try i)
false)))
(defn- handle-event

View file

@ -18,12 +18,12 @@
[app.util.async :as aa]
[app.util.http :as http]
[app.util.json :as json]
[app.util.logging :as l]
[app.util.template :as tmpl]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig]))
@ -43,14 +43,14 @@
(defmethod ig/init-key ::reporter
[_ {:keys [receiver] :as cfg}]
(log/info "intializing mattermost error reporter")
(l/info :msg "intializing mattermost error reporter")
(let [output (a/chan (a/sliding-buffer 128)
(filter #(= (:level %) "error")))]
(receiver :sub output)
(a/go-loop []
(let [msg (a/<! output)]
(if (nil? msg)
(log/info "stoping error reporting loop")
(l/info :msg "stoping error reporting loop")
(do
(a/<! (handle-event cfg msg))
(recur)))))
@ -75,10 +75,12 @@
:headers {"content-type" "application/json"}
:body (json/encode-str {:text text})})]
(when (not= (:status rsp) 200)
(log/errorf "error on sending data to mattermost\n%s" (pr-str rsp))))
(l/error :hint "error on sending data to mattermost"
:response (pr-str rsp))))
(catch Exception e
(log/error e "unexpected exception on error reporter"))))
(l/error :hint "unexpected exception on error reporter"
:cause e))))
(defn- persist-on-database!
[{:keys [pool] :as cfg} {:keys [id] :as cdata}]
@ -116,7 +118,8 @@
(send-mattermost-notification! cfg cdata))
(persist-on-database! cfg cdata))
(catch Exception e
(log/error e "unexpected exception on error reporter")))))
(l/error :hint "unexpected exception on error reporter"
:cause e)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Http Handler

View file

@ -13,10 +13,10 @@
[app.common.data :as d]
[app.common.spec :as us]
[app.util.json :as json]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig])
(:import
@ -34,7 +34,7 @@
(defmethod ig/init-key ::receiver
[_ {:keys [endpoint] :as cfg}]
(log/infof "intializing ZMQ receiver on '%s'" endpoint)
(l/info :msg "intializing ZMQ receiver" :bind endpoint)
(let [buffer (a/chan 1)
output (a/chan 1 (comp (filter map?)
(map prepare)))

View file

@ -11,8 +11,8 @@
(:require
[app.common.data :as d]
[app.config :as cf]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(def system-config
@ -348,8 +348,8 @@
(-> system-config
(ig/prep)
(ig/init))))
(log/infof "welcome to penpot (version: '%s')"
(:full cf/version)))
(l/info :msg "welcome to penpot"
:version (:full cf/version)))
(defn stop
[]

View file

@ -5,13 +5,13 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020-2021 UXBOX Labs SL
;; Copyright (c) UXBOX Labs SL
(ns app.metrics
(:require
[app.common.exceptions :as ex]
[app.util.logging :as l]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig])
(:import
io.prometheus.client.CollectorRegistry
@ -50,7 +50,7 @@
(defmethod ig/init-key ::metrics
[_ {:keys [definitions] :as cfg}]
(log/infof "Initializing prometheus registry and instrumentation.")
(l/info :action "initialize metrics")
(let [registry (create-registry)
definitions (reduce-kv (fn [res k v]
(->> (assoc v :registry registry)

View file

@ -14,10 +14,10 @@
[app.common.spec :as us]
[app.config :as cfg]
[app.util.blob :as blob]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[promesa.core :as p])
(:import
@ -60,7 +60,8 @@
(defmethod ig/init-key ::msgbus
[_ {:keys [backend buffer-size] :as cfg}]
(log/debugf "initializing msgbus (backend=%s)" (name backend))
(l/debug :action "initialize msgbus"
:backend (name backend))
(let [cfg (init-backend cfg)
;; Channel used for receive publications from the application.
@ -165,13 +166,14 @@
(when-let [val (a/<! pub-ch)]
(let [result (a/<! (impl-redis-pub rac val))]
(when (ex/exception? result)
(log/error result "unexpected error on publish message to redis")))
(l/error :cause result
:hint "unexpected error on publish message to redis")))
(recur)))))
(defmethod init-sub-loop :redis
[{:keys [::sub-conn ::sub-ch buffer-size]}]
(let [rcv-ch (a/chan (a/dropping-buffer buffer-size))
chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
chans (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent"))
rac (.async ^StatefulRedisPubSubConnection sub-conn)]
;; Add a unique listener to connection
@ -184,7 +186,7 @@
;; more messages that we can process.
(let [val {:topic topic :message (blob/decode message)}]
(when-not (a/offer! rcv-ch val)
(log/warn "dropping message on subscription loop"))))
(l/warn :msg "dropping message on subscription loop"))))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it topic count])
@ -194,9 +196,12 @@
(let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))]
(when (= 1 (count nsubs))
(let [result (a/<!! (impl-redis-sub rac topic))]
(log/tracef "opening subscription to %s" topic)
(l/trace :action "open subscription"
:topic topic)
(when (ex/exception? result)
(log/errorf result "unexpected exception on subscribing to '%s'" topic))))
(l/error :cause result
:hint "unexpected exception on subscribing"
:topic topic))))
nsubs))
(subscribe-to-topics [state topics chan]
@ -210,9 +215,12 @@
(let [nsubs (disj nsubs chan)]
(when (empty? nsubs)
(let [result (a/<!! (impl-redis-unsub rac topic))]
(log/tracef "closing subscription to %s" topic)
(l/trace :action "close subscription"
:topic topic)
(when (ex/exception? result)
(log/errorf result "unexpected exception on unsubscribing from '%s'" topic))))
(l/error :cause result
:hint "unexpected exception on unsubscribing"
:topic topic))))
nsubs))
(unsubscribe-channels [state pending]
@ -246,7 +254,6 @@
(recur (rest chans) pending)
(recur (rest chans) (conj pending ch)))
pending))]
;; (log/tracef "received message => pending: %s" (pr-str pending))
(some->> (seq pending)
(send-off chans unsubscribe-channels))

View file

@ -14,12 +14,12 @@
[app.db :as db]
[app.metrics :as mtx]
[app.util.async :as aa]
[app.util.logging :as l]
[app.util.time :as dt]
[app.util.transit :as t]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[ring.adapter.jetty9 :as jetty]
[ring.middleware.cookies :refer [wrap-cookies]]
@ -149,7 +149,7 @@
:out-ch out-ch
:sub-ch sub-ch)]
(log/tracef "on-connect %s" (:session-id cfg))
(l/trace :event "connect" :session (:session-id cfg))
;; Forward all messages from out-ch to the websocket
;; connection
@ -171,20 +171,22 @@
;; close subscription
(a/close! sub-ch))))
(on-error [_conn e]
(log/tracef "on-error %s (%s)" (:session-id cfg) (ex-message e))
(on-error [_conn _e]
(l/trace :event "error" :session (:session-id cfg))
(a/close! out-ch)
(a/close! rcv-ch))
(on-close [_conn _status _reason]
(log/tracef "on-close %s" (:session-id cfg))
(l/trace :event "close" :session (:session-id cfg))
(a/close! out-ch)
(a/close! rcv-ch))
(on-message [_ws message]
(let [message (t/decode-str message)]
(when-not (a/offer! rcv-ch message)
(log/warn "droping ws input message, channe full"))))]
(l/warn :msg "drop messages"))))]
{:on-connect on-connect
:on-error on-error
@ -254,12 +256,10 @@
(defmethod handle-message :connect
[cfg _]
;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
(send-presence cfg :connect))
(defmethod handle-message :disconnect
[cfg _]
;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id)
(send-presence cfg :disconnect))
(defmethod handle-message :keepalive
@ -277,5 +277,7 @@
(defmethod handle-message :default
[_ws message]
(a/go
(log/warnf "received unexpected message: %s" message)))
(l/log :level :warn
:msg "received unexpected message"
:message message)))

View file

@ -5,7 +5,7 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020-2021 UXBOX Labs SL
;; Copyright (c) UXBOX Labs SL
(ns app.rpc
(:require
@ -15,9 +15,9 @@
[app.db :as db]
[app.metrics :as mtx]
[app.rlimits :as rlm]
[app.util.logging :as l]
[app.util.services :as sv]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig]))
@ -76,7 +76,8 @@
(ex/raise :type :internal
:code :rlimit-not-configured
:hint (str/fmt "%s rlimit not configured" key)))
(log/tracef "adding rlimit to '%s' rpc handler" (::sv/name mdata))
(l/trace :action "add rlimit"
:handler (::sv/name mdata))
(fn [cfg params]
(rlm/execute rlinst (f cfg params))))
f))
@ -86,7 +87,8 @@
(let [f (wrap-with-rlimits cfg f mdata)
f (wrap-with-metrics cfg f mdata)
spec (or (::sv/spec mdata) (s/spec any?))]
(log/tracef "registering '%s' command to rpc service" (::sv/name mdata))
(l/trace :action "register"
:name (::sv/name mdata))
(fn [params]
(when (and (:auth mdata true) (not (uuid? (:profile-id params))))
(ex/raise :type :authentication

View file

@ -19,10 +19,10 @@
[app.storage.fs :as sfs]
[app.storage.impl :as impl]
[app.storage.s3 :as ss3]
[app.util.logging :as l]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[datoteka.core :as fs]
[integrant.core :as ig]
@ -310,7 +310,9 @@
(run! (partial delete-in-bulk conn) groups)
(recur (+ n ^long total)))
(do
(log/infof "gc-deleted: processed %s items" n)
(l/info :task "gc-deleted"
:action "permanently delete items"
:count n)
{:deleted n})))))))
(def sql:retrieve-deleted-objects
@ -382,7 +384,12 @@
(recur (+ cntf (count to-freeze))
(+ cntd (count to-delete))))
(do
(log/infof "gc-touched: %s objects marked as freeze and %s marked to be deleted" cntf cntd)
(l/info :task "gc-touched"
:action "mark freeze"
:count cntf)
(l/info :task "gc-touched"
:action "mark for deletion"
:count cntd)
{:freeze cntf :delete cntd})))))))
(def sql:retrieve-touched-objects
@ -459,7 +466,10 @@
(recur (+ n (count all))
(+ d (count to-delete))))
(do
(log/infof "recheck: processed %s items, %s deleted" n d)
(l/info :task "recheck"
:action "recheck items"
:processed n
:deleted n)
{:processed n :deleted d})))))))
(def sql:retrieve-pending-to-recheck

View file

@ -11,8 +11,8 @@
(:require
[app.common.exceptions :as ex]
[app.metrics :as mtx]
[app.util.logging :as l]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[clojure.xml :as xml]
[integrant.core :as ig])
(:import
@ -60,7 +60,8 @@
(with-open [istream (IOUtils/toInputStream data "UTF-8")]
(xml/parse istream secure-factory))
(catch Exception e
(log/warnf "error on processing svg: %s" (ex-message e))
(l/warn :hint "error on processing svg"
:message (ex-message e))
(ex/raise :type :validation
:code :invalid-svg-file
:cause e))))

View file

@ -5,15 +5,16 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 UXBOX Labs SL
;; Copyright (c) UXBOX Labs SL
(ns app.tasks.delete-object
"Generic task for permanent deletion of objects."
(:require
[app.common.data :as d]
[app.common.spec :as us]
[app.db :as db]
[app.util.logging :as l]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare handle-deletion)
@ -37,7 +38,8 @@
(defmethod handle-deletion :default
[_conn {:keys [type]}]
(log/warnf "no handler found for '%s'" type))
(l/warn :hint "no handler found"
:type (d/name type)))
(defmethod handle-deletion :file
[conn {:keys [id] :as props}]

View file

@ -13,8 +13,8 @@
[app.common.spec :as us]
[app.db :as db]
[app.db.sql :as sql]
[app.util.logging :as l]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare delete-profile-data)
@ -47,7 +47,8 @@
(if (or (:is-demo profile)
(:deleted-at profile))
(delete-profile-data conn id)
(log/warnf "profile '%s' does not match constraints for deletion" id))))))
(l/warn :hint "profile does not match constraints for deletion"
:profile-id id))))))
;; --- IMPL
@ -70,7 +71,8 @@
(defn- delete-profile-data
[conn profile-id]
(log/debugf "proceding to delete all data related to profile '%s'" profile-id)
(l/debug :action "delete profile"
:profile-id profile-id)
(delete-teams conn profile-id)
(delete-profile conn profile-id)
true)

View file

@ -15,9 +15,9 @@
[app.common.pages.migrations :as pmg]
[app.db :as db]
[app.util.blob :as blob]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare process-file)
@ -40,7 +40,7 @@
(run! (partial process-file cfg) files)
(recur (+ n (count files))))
(do
(log/debugf "finalized with total of %s processed files" n)
(l/debug :msg "finished processing files" :processed n)
{:processed n}))))))))
(def ^:private
@ -88,7 +88,10 @@
unused (->> (db/query conn :file-media-object {:file-id id})
(remove #(contains? used (:id %))))]
(log/debugf "processing file: id='%s' age='%s' to-delete=%s" id age (count unused))
(l/debug :action "processing file"
:id id
:age age
:to-delete (count unused))
;; Mark file as trimmed
(db/update! conn :file
@ -96,8 +99,10 @@
{:id id})
(doseq [mobj unused]
(log/debugf "deleting media object: id='%s' media-id='%s' thumb-id='%s'"
(:id mobj) (:media-id mobj) (:thumbnail-id mobj))
(l/debug :action "deleting media object"
:id (:id mobj)
:media-id (:media-id mobj)
:thumbnail-id (:thumbnail-id mobj))
;; NOTE: deleting the file-media-object in the database
;; automatically marks as toched the referenced storage objects.
(db/delete! conn :file-media-object {:id (:id mobj)}))

View file

@ -5,16 +5,16 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020-2021 UXBOX Labs SL
;; Copyright (c) UXBOX Labs SL
(ns app.tasks.file-xlog-gc
"A maintenance task that performs a garbage collection of the file
change (transaction) log."
(:require
[app.db :as db]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare sql:delete-files-xlog)
@ -31,7 +31,7 @@
(let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-files-xlog interval])
result (:next.jdbc/update-count result)]
(log/debugf "removed %s rows from file-change table" result)
(l/debug :action "trim file-change table" :removed result)
result))))
(def ^:private

View file

@ -1,58 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.tasks.sendmail
(:require
[app.config :as cfg]
[app.util.emails :as emails]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare send-console!)
(s/def ::username ::cfg/smtp-username)
(s/def ::password ::cfg/smtp-password)
(s/def ::tls ::cfg/smtp-tls)
(s/def ::ssl ::cfg/smtp-ssl)
(s/def ::host ::cfg/smtp-host)
(s/def ::port ::cfg/smtp-port)
(s/def ::default-reply-to ::cfg/smtp-default-reply-to)
(s/def ::default-from ::cfg/smtp-default-from)
(s/def ::enabled ::cfg/smtp-enabled)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::enabled]
:opt-un [::username
::password
::tls
::ssl
::host
::port
::default-from
::default-reply-to]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(if (:enabled cfg)
(emails/send! cfg props)
(send-console! cfg props))))
(defn- send-console!
[cfg email]
(let [baos (java.io.ByteArrayOutputStream.)
mesg (emails/smtp-message cfg email)]
(.writeTo mesg baos)
(let [out (with-out-str
(println "email console dump:")
(println "******** start email" (:id email) "**********")
(println (.toString baos))
(println "******** end email "(:id email) "**********"))]
(log/info out))))

View file

@ -5,16 +5,16 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020-2021 UXBOX Labs SL
;; Copyright (c) UXBOX Labs SL
(ns app.tasks.tasks-gc
"A maintenance task that performs a cleanup of already executed tasks
from the database table."
(:require
[app.db :as db]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare sql:delete-completed-tasks)
@ -31,7 +31,7 @@
(let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-completed-tasks interval])
result (:next.jdbc/update-count result)]
(log/debugf "removed %s rows from tasks-completed table" result)
(l/debug :action "trim completed tasks table" :removed result)
result))))
(def ^:private

View file

@ -1,27 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2021 UXBOX Labs SL
(ns app.util.log4j
(:require
[clojure.pprint :refer [pprint]])
(:import
org.apache.logging.log4j.ThreadContext))
(defn update-thread-context!
[data]
(run! (fn [[key val]]
(ThreadContext/put
(name key)
(cond
(coll? val)
(binding [clojure.pprint/*print-right-margin* 120]
(with-out-str (pprint val)))
(instance? clojure.lang.Named val) (name val)
:else (str val))))
data))

View file

@ -0,0 +1,109 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.util.logging
(:require
[clojure.pprint :refer [pprint]])
(:import
org.apache.logging.log4j.Level
org.apache.logging.log4j.LogManager
org.apache.logging.log4j.Logger
org.apache.logging.log4j.ThreadContext
org.apache.logging.log4j.message.MapMessage
org.apache.logging.log4j.spi.LoggerContext))
(defn build-map-message
[m]
(let [message (MapMessage. (count m))]
(reduce-kv #(.with ^MapMessage %1 (name %2) %3) message m)))
(defprotocol ILogger
(-enabled? [logger level])
(-write! [logger level throwable message]))
(def logger-context
(LogManager/getContext false))
(def logging-agent
(agent nil :error-mode :continue))
(defn get-logger
[lname]
(.getLogger ^LoggerContext logger-context ^String lname))
(defn get-level
[level]
(case level
:trace Level/TRACE
:debug Level/DEBUG
:info Level/INFO
:warn Level/WARN
:error Level/ERROR
:fatal Level/FATAL))
(defn enabled?
[logger level]
(.isEnabled ^Logger logger ^Level level))
(defn write-log!
[logger level e msg]
(if e
(.log ^Logger logger
^Level level
^Object msg
^Throwable e)
(.log ^Logger logger
^Level level
^Object msg)))
(defmacro log
[& {:keys [level cause ::logger ::async] :as props}]
(let [props (dissoc props :level :cause ::logger ::async)
logger (or logger (str *ns*))
logger-sym (gensym "log")
level-sym (gensym "log")]
`(let [~logger-sym (get-logger ~logger)
~level-sym (get-level ~level)]
(if (enabled? ~logger-sym ~level-sym)
~(if async
`(send-off logging-agent (fn [_#] (write-log! ~logger-sym ~level-sym ~cause (build-map-message ~props))))
`(write-log! ~logger-sym ~level-sym ~cause (build-map-message ~props)))))))
(defmacro info
[& params]
`(log :level :info ~@params))
(defmacro error
[& params]
`(log :level :error ~@params))
(defmacro warn
[& params]
`(log :level :warn ~@params))
(defmacro debug
[& params]
`(log :level :debug ~@params))
(defmacro trace
[& params]
`(log :level :trace ~@params))
(defn update-thread-context!
[data]
(run! (fn [[key val]]
(ThreadContext/put
(name key)
(cond
(coll? val)
(binding [clojure.pprint/*print-right-margin* 120]
(with-out-str (pprint val)))
(instance? clojure.lang.Named val) (name val)
:else (str val))))
data))

View file

@ -5,14 +5,13 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 UXBOX Labs SL
;; Copyright (c) UXBOX Labs SL
(ns app.util.migrations
(:require
[app.util.logging :as l]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[next.jdbc :as jdbc]))
(s/def ::name string?)
@ -40,7 +39,7 @@
(defn- impl-migrate-single
[pool modname {:keys [name] :as migration}]
(when-not (registered? pool modname (:name migration))
(log/info (str/format "applying migration %s/%s" modname name))
(l/info :action "apply migration" :module modname :name name)
(register! pool modname name)
((:fn migration) pool)))

View file

@ -17,11 +17,10 @@
[app.db :as db]
[app.metrics :as mtx]
[app.util.async :as aa]
[app.util.log4j :refer [update-thread-context!]]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.exec :as px])
@ -91,7 +90,9 @@
(defmethod ig/init-key ::worker
[_ {:keys [pool poll-interval name queue] :as cfg}]
(log/infof "starting worker '%s' on queue '%s'" (d/name name) (d/name queue))
(l/info :action "start worker"
:name (d/name name)
:queue (d/name queue))
(let [close-ch (a/chan 1)
poll-ms (inst-ms poll-interval)]
(a/go-loop []
@ -100,30 +101,31 @@
;; Terminate the loop if close channel is closed or
;; event-loop-fn returns nil.
(or (= port close-ch) (nil? val))
(log/infof "stop condition found; shutdown worker: '%s'" (d/name name))
(l/debug :msg "stop condition found")
(db/pool-closed? pool)
(do
(log/info "worker eventloop is aborted because pool is closed")
(l/debug :msg "eventloop aborted because pool is closed")
(a/close! close-ch))
(and (instance? java.sql.SQLException val)
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState ^java.sql.SQLException val)))
(do
(log/error "connection error, trying resume in some instants")
(l/error :hint "connection error, trying resume in some instants")
(a/<! (a/timeout poll-interval))
(recur))
(and (instance? java.sql.SQLException val)
(= "40001" (.getSQLState ^java.sql.SQLException val)))
(do
(log/debug "serialization failure (retrying in some instants)")
(l/debug :msg "serialization failure (retrying in some instants)")
(a/<! (a/timeout poll-ms))
(recur))
(instance? Exception val)
(do
(log/errorf val "unexpected error ocurried on polling the database (will resume in some instants)")
(l/error :cause val
:hint "unexpected error ocurried on polling the database (will resume in some instants)")
(a/<! (a/timeout poll-ms))
(recur))
@ -181,12 +183,14 @@
interval (db/interval duration)
props (-> options extract-props db/tjson)
id (uuid/next)]
(log/debugf "submit task '%s' to be executed in '%s'" (d/name task) (str duration))
(l/debug :action "submit task"
:name (d/name task)
:in duration)
(db/exec-one! conn [sql:insert-new-task id (d/name task) props (d/name queue) priority max-retries interval])
id))
;; --- RUNNER
;; --- RUNNER
(def ^:private
sql:mark-as-retry
@ -242,7 +246,8 @@
(let [task-fn (get tasks name)]
(if task-fn
(task-fn item)
(log/warnf "no task handler found for '%s'" (d/name name)))
(l/warn :msg "no task handler found"
:name (d/name name)))
{:status :completed :task item}))
(defn get-error-context
@ -266,8 +271,11 @@
(assoc :inc-by 0))
(let [cdata (get-error-context error item)]
(update-thread-context! cdata)
(log/errorf error "unhandled exception on task (id: '%s')" (:id cdata))
(l/update-thread-context! cdata)
(l/error :cause error
:hint "unhandled exception on task"
:id (:id cdata))
(if (>= (:retry-num item) (:max-retries item))
{:status :failed :task item :error error}
{:status :retry :task item :error error})))))
@ -276,12 +284,19 @@
[{:keys [tasks]} item]
(let [name (d/name (:name item))]
(try
(log/debugf "started task '%s/%s/%s'" name (:id item) (:retry-num item))
(l/debug :action "start task"
:name name
:id (:id item)
:retry (:retry-num item))
(handle-task tasks item)
(catch Exception e
(handle-exception e item))
(finally
(log/debugf "finished task '%s/%s/%s'" name (:id item) (:retry-num item))))))
(l/debug :action "end task"
:name name
:id (:id item)
:retry (:retry-num item))))))
(def sql:select-next-tasks
"select * from task as t
@ -349,8 +364,8 @@
;; If id is not defined, use the task as id.
(map (fn [{:keys [id task] :as item}]
(if (some? id)
item
(assoc item :id task))))
(assoc item :id (d/name id))
(assoc item :id (d/name task)))))
(map (fn [{:keys [task] :as item}]
(let [f (get tasks task)]
(when-not f
@ -385,9 +400,8 @@
(defn- synchronize-schedule-item
[conn {:keys [id cron]}]
(let [cron (str cron)
id (name id)]
(log/infof "initialize scheduled task '%s' (cron: '%s')" id cron)
(let [cron (str cron)]
(l/debug :action "initialize scheduled task" :id id :cron cron)
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
(defn- synchronize-schedule
@ -407,8 +421,8 @@
[{:keys [executor pool] :as cfg} {:keys [id] :as task}]
(letfn [(run-task [conn]
(try
(when (db/exec-one! conn [sql:lock-scheduled-task id])
(log/debugf "executing scheduled task '%s'" id)
(when (db/exec-one! conn [sql:lock-scheduled-task (d/name id)])
(l/debug :action "execute scheduled task" :id id)
((:fn task) task))
(catch Throwable e
e)))
@ -417,7 +431,9 @@
(db/with-atomic [conn pool]
(let [result (run-task conn)]
(when (ex/exception? result)
(log/errorf result "unhandled exception on scheduled task '%s'" id)))))]
(l/error :cause result
:hint "unhandled exception on scheduled task"
:id id)))))]
(try
(px/run! executor handle-task)
@ -490,7 +506,7 @@
:help "Background task execution timing."})]
(reduce-kv (fn [res k v]
(let [tname (name k)]
(log/debugf "registring task '%s'" tname)
(l/debug :action "register task" :name tname)
(assoc res k (mtx/wrap-summary v mobj [tname]))))
{}
tasks)))