mirror of
https://github.com/penpot/penpot.git
synced 2025-02-03 04:49:03 -05:00
✨ Minor changes on worker code.
This commit is contained in:
parent
5f338921cf
commit
51e7ffb959
5 changed files with 358 additions and 391 deletions
|
@ -14,13 +14,13 @@
|
||||||
org.apache.logging.log4j/log4j-web {:mvn/version "2.13.3"}
|
org.apache.logging.log4j/log4j-web {:mvn/version "2.13.3"}
|
||||||
org.apache.logging.log4j/log4j-jul {:mvn/version "2.13.3"}
|
org.apache.logging.log4j/log4j-jul {:mvn/version "2.13.3"}
|
||||||
org.apache.logging.log4j/log4j-slf4j-impl {:mvn/version "2.13.3"}
|
org.apache.logging.log4j/log4j-slf4j-impl {:mvn/version "2.13.3"}
|
||||||
|
org.slf4j/slf4j-api {:mvn/version "1.7.30"}
|
||||||
|
|
||||||
io.prometheus/simpleclient {:mvn/version "0.9.0"}
|
io.prometheus/simpleclient {:mvn/version "0.9.0"}
|
||||||
io.prometheus/simpleclient_hotspot {:mvn/version "0.9.0"}
|
io.prometheus/simpleclient_hotspot {:mvn/version "0.9.0"}
|
||||||
io.prometheus/simpleclient_httpserver {:mvn/version "0.9.0"}
|
io.prometheus/simpleclient_httpserver {:mvn/version "0.9.0"}
|
||||||
|
|
||||||
selmer/selmer {:mvn/version "1.12.28"}
|
selmer/selmer {:mvn/version "1.12.28"}
|
||||||
|
|
||||||
expound/expound {:mvn/version "0.8.5"}
|
expound/expound {:mvn/version "0.8.5"}
|
||||||
com.cognitect/transit-clj {:mvn/version "1.0.324"}
|
com.cognitect/transit-clj {:mvn/version "1.0.324"}
|
||||||
|
|
||||||
|
@ -28,9 +28,9 @@
|
||||||
java-http-clj/java-http-clj {:mvn/version "0.4.1"}
|
java-http-clj/java-http-clj {:mvn/version "0.4.1"}
|
||||||
|
|
||||||
info.sunng/ring-jetty9-adapter {:mvn/version "0.14.0"}
|
info.sunng/ring-jetty9-adapter {:mvn/version "0.14.0"}
|
||||||
seancorfield/next.jdbc {:mvn/version "1.1.581"}
|
seancorfield/next.jdbc {:mvn/version "1.1.582"}
|
||||||
metosin/reitit-ring {:mvn/version "0.5.5"}
|
metosin/reitit-ring {:mvn/version "0.5.5"}
|
||||||
org.postgresql/postgresql {:mvn/version "42.2.14"}
|
org.postgresql/postgresql {:mvn/version "42.2.15"}
|
||||||
com.zaxxer/HikariCP {:mvn/version "3.4.5"}
|
com.zaxxer/HikariCP {:mvn/version "3.4.5"}
|
||||||
|
|
||||||
funcool/datoteka {:mvn/version "1.2.0"}
|
funcool/datoteka {:mvn/version "1.2.0"}
|
||||||
|
|
|
@ -5,42 +5,31 @@
|
||||||
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] [%t] %level{length=1} %logger{36} - %msg%n"/>
|
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] [%t] %level{length=1} %logger{36} - %msg%n"/>
|
||||||
</Console>
|
</Console>
|
||||||
|
|
||||||
<RollingFile name="file-debug" fileName="logs/app-debug.log" filePattern="logs/app-debug-%i.log">
|
<RollingFile name="main" fileName="logs/main.log" filePattern="logs/main-%i.log">
|
||||||
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] [%t] %level{length=1} %logger{36} - %msg%n"/>
|
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] [%t] %level{length=1} %logger{36} - %msg%n"/>
|
||||||
<Policies>
|
<Policies>
|
||||||
<SizeBasedTriggeringPolicy size="50M"/>
|
<SizeBasedTriggeringPolicy size="50M"/>
|
||||||
</Policies>
|
</Policies>
|
||||||
<DefaultRolloverStrategy max="9"/>
|
<DefaultRolloverStrategy max="9"/>
|
||||||
</RollingFile>
|
</RollingFile>
|
||||||
|
|
||||||
<RollingFile name="file" fileName="logs/app.log" filePattern="logs/app-%i.log">
|
|
||||||
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] [%t] %level{length=1} %logger{36} - %msg%n"/>
|
|
||||||
<Policies>
|
|
||||||
<SizeBasedTriggeringPolicy size="20M"/>
|
|
||||||
</Policies>
|
|
||||||
<DefaultRolloverStrategy max="9"/>
|
|
||||||
</RollingFile>
|
|
||||||
</Appenders>
|
</Appenders>
|
||||||
|
|
||||||
<Loggers>
|
<Loggers>
|
||||||
<Logger name="com.zaxxer.hikari" level="error" additivity="false">
|
<Logger name="com.zaxxer.hikari" level="error" additivity="false" />
|
||||||
<AppenderRef ref="console"/>
|
<Logger name="org.eclipse.jetty" level="error" additivity="false" />
|
||||||
</Logger>
|
|
||||||
|
|
||||||
<Logger name="io.lettuce" level="error" additivity="false" />
|
<Logger name="io.lettuce" level="error" additivity="false" />
|
||||||
|
|
||||||
|
|
||||||
<Logger name="app.cli" level="debug" additivity="false">
|
<Logger name="app.cli" level="debug" additivity="false">
|
||||||
<AppenderRef ref="console"/>
|
<AppenderRef ref="console"/>
|
||||||
</Logger>
|
</Logger>
|
||||||
|
|
||||||
<Logger name="app" level="debug" additivity="false">
|
<Logger name="app" level="debug" additivity="false">
|
||||||
<AppenderRef ref="file-debug" level="debug" />
|
<AppenderRef ref="main" level="debug" />
|
||||||
<AppenderRef ref="file" level="info" />
|
|
||||||
</Logger>
|
</Logger>
|
||||||
|
|
||||||
<Root level="info">
|
<Root level="info">
|
||||||
<AppenderRef ref="console"/>
|
<AppenderRef ref="main" />
|
||||||
|
<!-- <AppenderRef ref="console" /> -->
|
||||||
</Root>
|
</Root>
|
||||||
</Loggers>
|
</Loggers>
|
||||||
</Configuration>
|
</Configuration>
|
||||||
|
|
|
@ -9,23 +9,41 @@
|
||||||
|
|
||||||
(ns app.worker
|
(ns app.worker
|
||||||
(:require
|
(:require
|
||||||
[clojure.spec.alpha :as s]
|
[app.common.exceptions :as ex]
|
||||||
[clojure.tools.logging :as log]
|
|
||||||
[mount.core :as mount :refer [defstate]]
|
|
||||||
[app.common.spec :as us]
|
[app.common.spec :as us]
|
||||||
[app.config :as cfg]
|
[app.common.uuid :as uuid]
|
||||||
[app.db :as db]
|
[app.db :as db]
|
||||||
[app.metrics :as mtx]
|
|
||||||
[app.tasks.delete-object]
|
[app.tasks.delete-object]
|
||||||
[app.tasks.delete-profile]
|
[app.tasks.delete-profile]
|
||||||
[app.tasks.gc]
|
[app.tasks.gc]
|
||||||
[app.tasks.remove-media]
|
[app.tasks.remove-media]
|
||||||
[app.tasks.sendmail]
|
[app.tasks.sendmail]
|
||||||
[app.tasks.trim-file]
|
[app.tasks.trim-file]
|
||||||
|
[app.util.async :as aa]
|
||||||
|
[app.util.blob :as blob]
|
||||||
[app.util.time :as dt]
|
[app.util.time :as dt]
|
||||||
[app.worker-impl :as impl]))
|
[clojure.core.async :as a]
|
||||||
|
[clojure.spec.alpha :as s]
|
||||||
|
[clojure.tools.logging :as log]
|
||||||
|
[mount.core :as mount :refer [defstate]]
|
||||||
|
[promesa.exec :as px])
|
||||||
|
(:import
|
||||||
|
org.eclipse.jetty.util.thread.QueuedThreadPool
|
||||||
|
java.util.concurrent.ExecutorService
|
||||||
|
java.util.concurrent.Executors
|
||||||
|
java.util.concurrent.Executor
|
||||||
|
java.time.Duration
|
||||||
|
java.time.Instant
|
||||||
|
java.util.Date))
|
||||||
|
|
||||||
;; --- State initialization
|
(declare start-scheduler-worker!)
|
||||||
|
(declare start-worker!)
|
||||||
|
(declare thread-pool)
|
||||||
|
(declare stop!)
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Entry Point (state initialization)
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(def ^:private tasks
|
(def ^:private tasks
|
||||||
{"delete-profile" #'app.tasks.delete-profile/handler
|
{"delete-profile" #'app.tasks.delete-profile/handler
|
||||||
|
@ -44,20 +62,337 @@
|
||||||
|
|
||||||
|
|
||||||
(defstate executor
|
(defstate executor
|
||||||
:start (impl/thread-pool {:idle-timeout 10000
|
:start (thread-pool {:idle-timeout 10000
|
||||||
:min-threads 0
|
:min-threads 0
|
||||||
:max-threads 256})
|
:max-threads 256})
|
||||||
:stop (impl/stop! executor))
|
:stop (stop! executor))
|
||||||
|
|
||||||
(defstate worker
|
(defstate worker
|
||||||
:start (impl/start-worker!
|
:start (start-worker!
|
||||||
{:tasks tasks
|
{:tasks tasks
|
||||||
:name "worker1"
|
:name "worker1"
|
||||||
:batch-size 1
|
:batch-size 1
|
||||||
:executor executor})
|
:executor executor})
|
||||||
:stop (impl/stop! worker))
|
:stop (stop! worker))
|
||||||
|
|
||||||
(defstate scheduler-worker
|
(defstate scheduler-worker
|
||||||
:start (impl/start-scheduler-worker! {:schedule schedule
|
:start (start-scheduler-worker! {:schedule schedule
|
||||||
:executor executor})
|
:executor executor})
|
||||||
:stop (impl/stop! scheduler-worker))
|
:stop (stop! scheduler-worker))
|
||||||
|
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Worker Impl
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
(def ^:private
|
||||||
|
sql:mark-as-retry
|
||||||
|
"update task
|
||||||
|
set scheduled_at = clock_timestamp() + '10 seconds'::interval,
|
||||||
|
modified_at = clock_timestamp(),
|
||||||
|
error = ?,
|
||||||
|
status = 'retry',
|
||||||
|
retry_num = retry_num + ?
|
||||||
|
where id = ?")
|
||||||
|
|
||||||
|
(defn- mark-as-retry
|
||||||
|
[conn {:keys [task error inc-by]
|
||||||
|
:or {inc-by 1}}]
|
||||||
|
(let [explain (ex-message error)
|
||||||
|
sqlv [sql:mark-as-retry explain inc-by (:id task)]]
|
||||||
|
(db/exec-one! conn sqlv)
|
||||||
|
nil))
|
||||||
|
|
||||||
|
(defn- mark-as-failed
|
||||||
|
[conn {:keys [task error]}]
|
||||||
|
(let [explain (ex-message error)]
|
||||||
|
(db/update! conn :task
|
||||||
|
{:error explain
|
||||||
|
:modified-at (dt/now)
|
||||||
|
:status "failed"}
|
||||||
|
{:id (:id task)})
|
||||||
|
nil))
|
||||||
|
|
||||||
|
(defn- mark-as-completed
|
||||||
|
[conn {:keys [task] :as opts}]
|
||||||
|
(let [now (dt/now)]
|
||||||
|
(db/update! conn :task
|
||||||
|
{:completed-at now
|
||||||
|
:modified-at now
|
||||||
|
:status "completed"}
|
||||||
|
{:id (:id task)})
|
||||||
|
nil))
|
||||||
|
|
||||||
|
(defn- decode-task-row
|
||||||
|
[{:keys [props] :as row}]
|
||||||
|
(when row
|
||||||
|
(cond-> row
|
||||||
|
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))))
|
||||||
|
|
||||||
|
(defn- handle-task
|
||||||
|
[tasks {:keys [name] :as item}]
|
||||||
|
(let [task-fn (get tasks name)]
|
||||||
|
(if task-fn
|
||||||
|
(task-fn item)
|
||||||
|
(do
|
||||||
|
(log/warn "no task handler found for" (pr-str name))
|
||||||
|
nil))))
|
||||||
|
|
||||||
|
(defn- run-task
|
||||||
|
[{:keys [tasks conn]} item]
|
||||||
|
(try
|
||||||
|
(log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item))
|
||||||
|
(handle-task tasks item)
|
||||||
|
{:status :completed :task item}
|
||||||
|
(catch Exception e
|
||||||
|
(let [data (ex-data e)]
|
||||||
|
(cond
|
||||||
|
(and (= ::retry (:type data))
|
||||||
|
(= ::noop (:strategy data)))
|
||||||
|
{:status :retry :task item :error e :inc-by 0}
|
||||||
|
|
||||||
|
(and (< (:retry-num item)
|
||||||
|
(:max-retries item))
|
||||||
|
(= ::retry (:type data)))
|
||||||
|
{:status :retry :task item :error e}
|
||||||
|
|
||||||
|
:else
|
||||||
|
(do
|
||||||
|
(log/errorf e "Unhandled exception on task '%s' (retry: %s)\nProps: %s"
|
||||||
|
(:name item) (:retry-num item) (pr-str (:props item)))
|
||||||
|
(if (>= (:retry-num item) (:max-retries item))
|
||||||
|
{:status :failed :task item :error e}
|
||||||
|
{:status :retry :task item :error e})))))
|
||||||
|
(finally
|
||||||
|
(log/debugf "Finished task '%s/%s/%s'." (:name item) (:id item) (:retry-num item)))))
|
||||||
|
|
||||||
|
(def ^:private
|
||||||
|
sql:select-next-tasks
|
||||||
|
"select * from task as t
|
||||||
|
where t.scheduled_at <= now()
|
||||||
|
and t.queue = ?
|
||||||
|
and (t.status = 'new' or t.status = 'retry')
|
||||||
|
order by t.priority desc, t.scheduled_at
|
||||||
|
limit ?
|
||||||
|
for update skip locked")
|
||||||
|
|
||||||
|
(defn- event-loop-fn*
|
||||||
|
[{:keys [tasks executor batch-size] :as opts}]
|
||||||
|
(db/with-atomic [conn db/pool]
|
||||||
|
(let [queue (:queue opts "default")
|
||||||
|
items (->> (db/exec! conn [sql:select-next-tasks queue batch-size])
|
||||||
|
(map decode-task-row)
|
||||||
|
(seq))
|
||||||
|
opts (assoc opts :conn conn)]
|
||||||
|
|
||||||
|
(if (nil? items)
|
||||||
|
::empty
|
||||||
|
(let [results (->> items
|
||||||
|
(map #(partial run-task opts %))
|
||||||
|
(map #(px/submit! executor %)))]
|
||||||
|
(doseq [res results]
|
||||||
|
(let [res (deref res)]
|
||||||
|
(case (:status res)
|
||||||
|
:retry (mark-as-retry conn res)
|
||||||
|
:failed (mark-as-failed conn res)
|
||||||
|
:completed (mark-as-completed conn res))))
|
||||||
|
::handled)))))
|
||||||
|
|
||||||
|
(defn- event-loop-fn
|
||||||
|
[{:keys [executor] :as opts}]
|
||||||
|
(aa/thread-call executor #(event-loop-fn* opts)))
|
||||||
|
|
||||||
|
(s/def ::batch-size ::us/integer)
|
||||||
|
(s/def ::poll-interval ::us/integer)
|
||||||
|
(s/def ::fn (s/or :var var? :fn fn?))
|
||||||
|
(s/def ::tasks (s/map-of string? ::fn))
|
||||||
|
|
||||||
|
(s/def ::start-worker-params
|
||||||
|
(s/keys :req-un [::tasks ::aa/executor ::batch-size]
|
||||||
|
:opt-un [::poll-interval]))
|
||||||
|
|
||||||
|
(defn start-worker!
|
||||||
|
[{:keys [poll-interval executor]
|
||||||
|
:or {poll-interval 5000}
|
||||||
|
:as opts}]
|
||||||
|
(us/assert ::start-worker-params opts)
|
||||||
|
(log/infof "Starting worker '%s' on queue '%s'."
|
||||||
|
(:name opts "anonymous")
|
||||||
|
(:queue opts "default"))
|
||||||
|
(let [cch (a/chan 1)]
|
||||||
|
(a/go-loop []
|
||||||
|
(let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)]
|
||||||
|
(cond
|
||||||
|
;; Terminate the loop if close channel is closed or
|
||||||
|
;; event-loop-fn returns nil.
|
||||||
|
(or (= port cch) (nil? val))
|
||||||
|
(log/infof "Stop condition found. Shutdown worker: '%s'"
|
||||||
|
(:name opts "anonymous"))
|
||||||
|
|
||||||
|
(db/pool-closed? db/pool)
|
||||||
|
(do
|
||||||
|
(log/info "Worker eventloop is aborted because pool is closed.")
|
||||||
|
(a/close! cch))
|
||||||
|
|
||||||
|
(and (instance? java.sql.SQLException val)
|
||||||
|
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val)))
|
||||||
|
(do
|
||||||
|
(log/error "Connection error, trying resume in some instants.")
|
||||||
|
(a/<! (a/timeout poll-interval))
|
||||||
|
(recur))
|
||||||
|
|
||||||
|
(and (instance? java.sql.SQLException val)
|
||||||
|
(= "40001" (.getSQLState ^java.sql.SQLException val)))
|
||||||
|
(do
|
||||||
|
(log/debug "Serialization failure (retrying in some instants).")
|
||||||
|
(a/<! (a/timeout 1000))
|
||||||
|
(recur))
|
||||||
|
|
||||||
|
(instance? Exception val)
|
||||||
|
(do
|
||||||
|
(log/errorf val "Unexpected error ocurried on polling the database (will resume operations in some instants). ")
|
||||||
|
(a/<! (a/timeout poll-interval))
|
||||||
|
(recur))
|
||||||
|
|
||||||
|
(= ::handled val)
|
||||||
|
(recur)
|
||||||
|
|
||||||
|
(= ::empty val)
|
||||||
|
(do
|
||||||
|
(a/<! (a/timeout poll-interval))
|
||||||
|
(recur)))))
|
||||||
|
|
||||||
|
(reify
|
||||||
|
java.lang.AutoCloseable
|
||||||
|
(close [_]
|
||||||
|
(a/close! cch)))))
|
||||||
|
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Scheduled Tasks (cron based) IMPL
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
(def ^:private
|
||||||
|
sql:upsert-scheduled-task
|
||||||
|
"insert into scheduled_task (id, cron_expr)
|
||||||
|
values (?, ?)
|
||||||
|
on conflict (id)
|
||||||
|
do update set cron_expr=?")
|
||||||
|
|
||||||
|
(defn- synchronize-schedule-item
|
||||||
|
[conn {:keys [id cron] :as item}]
|
||||||
|
(let [cron (str cron)]
|
||||||
|
(log/debugf "Initialize scheduled task '%s' (cron: '%s')." id cron)
|
||||||
|
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
|
||||||
|
|
||||||
|
(defn- synchronize-schedule!
|
||||||
|
[schedule]
|
||||||
|
(db/with-atomic [conn db/pool]
|
||||||
|
(run! (partial synchronize-schedule-item conn) schedule)))
|
||||||
|
|
||||||
|
(def ^:private sql:lock-scheduled-task
|
||||||
|
"select id from scheduled_task where id=? for update skip locked")
|
||||||
|
|
||||||
|
(declare schedule-task!)
|
||||||
|
|
||||||
|
(defn exception->string
|
||||||
|
[error]
|
||||||
|
(with-out-str
|
||||||
|
(.printStackTrace ^Throwable error (java.io.PrintWriter. *out*))))
|
||||||
|
|
||||||
|
(defn- execute-scheduled-task
|
||||||
|
[{:keys [scheduler executor] :as opts} {:keys [id cron] :as task}]
|
||||||
|
(letfn [(run-task [conn]
|
||||||
|
(try
|
||||||
|
(when (db/exec-one! conn [sql:lock-scheduled-task id])
|
||||||
|
(log/info "Executing scheduled task" id)
|
||||||
|
((:fn task) task))
|
||||||
|
(catch Exception e
|
||||||
|
e)))
|
||||||
|
|
||||||
|
(handle-task* [conn]
|
||||||
|
(let [result (run-task conn)]
|
||||||
|
(if (instance? Throwable result)
|
||||||
|
(do
|
||||||
|
(log/warnf result "Unhandled exception on scheduled task '%s'." id)
|
||||||
|
(db/insert! conn :scheduled-task-history
|
||||||
|
{:id (uuid/next)
|
||||||
|
:task-id id
|
||||||
|
:is-error true
|
||||||
|
:reason (exception->string result)}))
|
||||||
|
(db/insert! conn :scheduled-task-history
|
||||||
|
{:id (uuid/next)
|
||||||
|
:task-id id}))))
|
||||||
|
(handle-task []
|
||||||
|
(db/with-atomic [conn db/pool]
|
||||||
|
(handle-task* conn)))]
|
||||||
|
|
||||||
|
(try
|
||||||
|
(px/run! executor handle-task)
|
||||||
|
(finally
|
||||||
|
(schedule-task! opts task)))))
|
||||||
|
|
||||||
|
(defn ms-until-valid
|
||||||
|
[cron]
|
||||||
|
(s/assert dt/cron? cron)
|
||||||
|
(let [^Instant now (dt/now)
|
||||||
|
^Instant next (dt/next-valid-instant-from cron now)]
|
||||||
|
(inst-ms (dt/duration-between now next))))
|
||||||
|
|
||||||
|
(defn- schedule-task!
|
||||||
|
[{:keys [scheduler] :as opts} {:keys [cron] :as task}]
|
||||||
|
(let [ms (ms-until-valid cron)]
|
||||||
|
(px/schedule! scheduler ms (partial execute-scheduled-task opts task))))
|
||||||
|
|
||||||
|
(s/def ::fn (s/or :var var? :fn fn?))
|
||||||
|
(s/def ::id string?)
|
||||||
|
(s/def ::cron dt/cron?)
|
||||||
|
(s/def ::props (s/nilable map?))
|
||||||
|
(s/def ::scheduled-task
|
||||||
|
(s/keys :req-un [::id ::cron ::fn]
|
||||||
|
:opt-un [::props]))
|
||||||
|
|
||||||
|
(s/def ::schedule (s/coll-of ::scheduled-task))
|
||||||
|
(s/def ::start-scheduler-worker-params
|
||||||
|
(s/keys :req-un [::schedule]))
|
||||||
|
|
||||||
|
(defn start-scheduler-worker!
|
||||||
|
[{:keys [schedule] :as opts}]
|
||||||
|
(us/assert ::start-scheduler-worker-params opts)
|
||||||
|
(let [scheduler (Executors/newScheduledThreadPool (int 1))
|
||||||
|
opts (assoc opts :scheduler scheduler)]
|
||||||
|
(synchronize-schedule! schedule)
|
||||||
|
(run! (partial schedule-task! opts) schedule)
|
||||||
|
(reify
|
||||||
|
java.lang.AutoCloseable
|
||||||
|
(close [_]
|
||||||
|
(.shutdownNow ^ExecutorService scheduler)))))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Thread Pool
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
(defn thread-pool
|
||||||
|
([] (thread-pool {}))
|
||||||
|
([{:keys [min-threads max-threads idle-timeout name]
|
||||||
|
:or {min-threads 0 max-threads 128 idle-timeout 60000}}]
|
||||||
|
(let [executor (QueuedThreadPool. max-threads min-threads)]
|
||||||
|
(.setName executor (or name "default-tp"))
|
||||||
|
(.start executor)
|
||||||
|
executor)))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; Helpers
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
(defn stop!
|
||||||
|
[o]
|
||||||
|
(cond
|
||||||
|
(instance? java.lang.AutoCloseable o)
|
||||||
|
(.close ^java.lang.AutoCloseable o)
|
||||||
|
|
||||||
|
(instance? org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
||||||
|
(.stop ^org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
||||||
|
|
||||||
|
:else
|
||||||
|
(ex/raise :type :not-implemented)))
|
||||||
|
|
|
@ -1,357 +0,0 @@
|
||||||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
||||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
||||||
;;
|
|
||||||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
|
||||||
;; defined by the Mozilla Public License, v. 2.0.
|
|
||||||
;;
|
|
||||||
;; Copyright (c) 2020 UXBOX Labs SL
|
|
||||||
|
|
||||||
(ns app.worker-impl
|
|
||||||
(:require
|
|
||||||
[cuerdas.core :as str]
|
|
||||||
[clojure.core.async :as a]
|
|
||||||
[clojure.spec.alpha :as s]
|
|
||||||
[clojure.tools.logging :as log]
|
|
||||||
[promesa.exec :as px]
|
|
||||||
[app.common.exceptions :as ex]
|
|
||||||
[app.common.spec :as us]
|
|
||||||
[app.common.uuid :as uuid]
|
|
||||||
[app.config :as cfg]
|
|
||||||
[app.db :as db]
|
|
||||||
[app.util.async :as aa]
|
|
||||||
[app.util.blob :as blob]
|
|
||||||
[app.util.time :as dt])
|
|
||||||
(:import
|
|
||||||
org.eclipse.jetty.util.thread.QueuedThreadPool
|
|
||||||
java.util.concurrent.ExecutorService
|
|
||||||
java.util.concurrent.Executors
|
|
||||||
java.util.concurrent.Executor
|
|
||||||
java.time.Duration
|
|
||||||
java.time.Instant
|
|
||||||
java.util.Date))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; Tasks
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(def ^:private
|
|
||||||
sql:mark-as-retry
|
|
||||||
"update task
|
|
||||||
set scheduled_at = clock_timestamp() + '10 seconds'::interval,
|
|
||||||
modified_at = clock_timestamp(),
|
|
||||||
error = ?,
|
|
||||||
status = 'retry',
|
|
||||||
retry_num = retry_num + ?
|
|
||||||
where id = ?")
|
|
||||||
|
|
||||||
(defn- mark-as-retry
|
|
||||||
[conn {:keys [task error inc-by]
|
|
||||||
:or {inc-by 1}}]
|
|
||||||
(let [explain (ex-message error)
|
|
||||||
sqlv [sql:mark-as-retry explain inc-by (:id task)]]
|
|
||||||
(db/exec-one! conn sqlv)
|
|
||||||
nil))
|
|
||||||
|
|
||||||
(defn- mark-as-failed
|
|
||||||
[conn {:keys [task error]}]
|
|
||||||
(let [explain (ex-message error)]
|
|
||||||
(db/update! conn :task
|
|
||||||
{:error explain
|
|
||||||
:modified-at (dt/now)
|
|
||||||
:status "failed"}
|
|
||||||
{:id (:id task)})
|
|
||||||
nil))
|
|
||||||
|
|
||||||
(defn- mark-as-completed
|
|
||||||
[conn {:keys [task] :as opts}]
|
|
||||||
(let [now (dt/now)]
|
|
||||||
(db/update! conn :task
|
|
||||||
{:completed-at now
|
|
||||||
:modified-at now
|
|
||||||
:status "completed"}
|
|
||||||
{:id (:id task)})
|
|
||||||
nil))
|
|
||||||
|
|
||||||
(defn- decode-task-row
|
|
||||||
[{:keys [props] :as row}]
|
|
||||||
(when row
|
|
||||||
(cond-> row
|
|
||||||
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))))
|
|
||||||
|
|
||||||
|
|
||||||
(defn- log-task-error
|
|
||||||
[item err]
|
|
||||||
(log/error (str/format "Unhandled exception on task '%s' (retry: %s)\n" (:name item) (:retry-num item))
|
|
||||||
(str/format "Props: %s\n" (pr-str (:props item)))
|
|
||||||
(with-out-str
|
|
||||||
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
|
|
||||||
|
|
||||||
(defn- handle-task
|
|
||||||
[tasks {:keys [name] :as item}]
|
|
||||||
(let [task-fn (get tasks name)]
|
|
||||||
(if task-fn
|
|
||||||
(task-fn item)
|
|
||||||
(do
|
|
||||||
(log/warn "no task handler found for" (pr-str name))
|
|
||||||
nil))))
|
|
||||||
|
|
||||||
(defn- run-task
|
|
||||||
[{:keys [tasks conn]} item]
|
|
||||||
(try
|
|
||||||
(log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item))
|
|
||||||
(handle-task tasks item)
|
|
||||||
{:status :completed :task item}
|
|
||||||
(catch Exception e
|
|
||||||
(let [data (ex-data e)]
|
|
||||||
(cond
|
|
||||||
(and (= ::retry (:type data))
|
|
||||||
(= ::noop (:strategy data)))
|
|
||||||
{:status :retry :task item :error e :inc-by 0}
|
|
||||||
|
|
||||||
(and (< (:retry-num item)
|
|
||||||
(:max-retries item))
|
|
||||||
(= ::retry (:type data)))
|
|
||||||
{:status :retry :task item :error e}
|
|
||||||
|
|
||||||
:else
|
|
||||||
(do
|
|
||||||
(log/errorf e "Unhandled exception on task '%s' (retry: %s)\nProps: %s"
|
|
||||||
(:name item) (:retry-num item) (pr-str (:props item)))
|
|
||||||
(if (>= (:retry-num item) (:max-retries item))
|
|
||||||
{:status :failed :task item :error e}
|
|
||||||
{:status :retry :task item :error e})))))
|
|
||||||
(finally
|
|
||||||
(log/debugf "Finished task '%s/%s/%s'." (:name item) (:id item) (:retry-num item)))))
|
|
||||||
|
|
||||||
(def ^:private
|
|
||||||
sql:select-next-tasks
|
|
||||||
"select * from task as t
|
|
||||||
where t.scheduled_at <= now()
|
|
||||||
and t.queue = ?
|
|
||||||
and (t.status = 'new' or t.status = 'retry')
|
|
||||||
order by t.priority desc, t.scheduled_at
|
|
||||||
limit ?
|
|
||||||
for update skip locked")
|
|
||||||
|
|
||||||
(defn- event-loop-fn*
|
|
||||||
[{:keys [tasks executor batch-size] :as opts}]
|
|
||||||
(db/with-atomic [conn db/pool]
|
|
||||||
(let [queue (:queue opts "default")
|
|
||||||
items (->> (db/exec! conn [sql:select-next-tasks queue batch-size])
|
|
||||||
(map decode-task-row)
|
|
||||||
(seq))
|
|
||||||
opts (assoc opts :conn conn)]
|
|
||||||
|
|
||||||
(if (nil? items)
|
|
||||||
::empty
|
|
||||||
(let [results (->> items
|
|
||||||
(map #(partial run-task opts %))
|
|
||||||
(map #(px/submit! executor %)))]
|
|
||||||
(doseq [res results]
|
|
||||||
(let [res (deref res)]
|
|
||||||
(case (:status res)
|
|
||||||
:retry (mark-as-retry conn res)
|
|
||||||
:failed (mark-as-failed conn res)
|
|
||||||
:completed (mark-as-completed conn res))))
|
|
||||||
::handled)))))
|
|
||||||
|
|
||||||
(defn- event-loop-fn
|
|
||||||
[{:keys [executor] :as opts}]
|
|
||||||
(aa/thread-call executor #(event-loop-fn* opts)))
|
|
||||||
|
|
||||||
(s/def ::batch-size ::us/integer)
|
|
||||||
(s/def ::poll-interval ::us/integer)
|
|
||||||
(s/def ::fn (s/or :var var? :fn fn?))
|
|
||||||
(s/def ::tasks (s/map-of string? ::fn))
|
|
||||||
|
|
||||||
(s/def ::start-worker-params
|
|
||||||
(s/keys :req-un [::tasks ::aa/executor ::batch-size]
|
|
||||||
:opt-un [::poll-interval]))
|
|
||||||
|
|
||||||
(defn start-worker!
|
|
||||||
[{:keys [poll-interval executor]
|
|
||||||
:or {poll-interval 5000}
|
|
||||||
:as opts}]
|
|
||||||
(us/assert ::start-worker-params opts)
|
|
||||||
(log/infof "Starting worker '%s' on queue '%s'."
|
|
||||||
(:name opts "anonymous")
|
|
||||||
(:queue opts "default"))
|
|
||||||
(let [cch (a/chan 1)]
|
|
||||||
(a/go-loop []
|
|
||||||
(let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)]
|
|
||||||
(cond
|
|
||||||
;; Terminate the loop if close channel is closed or
|
|
||||||
;; event-loop-fn returns nil.
|
|
||||||
(or (= port cch) (nil? val))
|
|
||||||
(log/infof "Stop condition found. Shutdown worker: '%s'"
|
|
||||||
(:name opts "anonymous"))
|
|
||||||
|
|
||||||
(db/pool-closed? db/pool)
|
|
||||||
(do
|
|
||||||
(log/info "Worker eventloop is aborted because pool is closed.")
|
|
||||||
(a/close! cch))
|
|
||||||
|
|
||||||
(and (instance? java.sql.SQLException val)
|
|
||||||
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val)))
|
|
||||||
(do
|
|
||||||
(log/error "Connection error, trying resume in some instants.")
|
|
||||||
(a/<! (a/timeout poll-interval))
|
|
||||||
(recur))
|
|
||||||
|
|
||||||
(and (instance? java.sql.SQLException val)
|
|
||||||
(= "40001" (.getSQLState ^java.sql.SQLException val)))
|
|
||||||
(do
|
|
||||||
(log/debug "Serialization failure (retrying in some instants).")
|
|
||||||
(a/<! (a/timeout 1000))
|
|
||||||
(recur))
|
|
||||||
|
|
||||||
(instance? Exception val)
|
|
||||||
(do
|
|
||||||
(log/errorf val "Unexpected error ocurried on polling the database (will resume operations in some instants). ")
|
|
||||||
(a/<! (a/timeout poll-interval))
|
|
||||||
(recur))
|
|
||||||
|
|
||||||
(= ::handled val)
|
|
||||||
(recur)
|
|
||||||
|
|
||||||
(= ::empty val)
|
|
||||||
(do
|
|
||||||
(a/<! (a/timeout poll-interval))
|
|
||||||
(recur)))))
|
|
||||||
|
|
||||||
(reify
|
|
||||||
java.lang.AutoCloseable
|
|
||||||
(close [_]
|
|
||||||
(a/close! cch)))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; Scheduled Tasks
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(def ^:private
|
|
||||||
sql:upsert-scheduled-task
|
|
||||||
"insert into scheduled_task (id, cron_expr)
|
|
||||||
values (?, ?)
|
|
||||||
on conflict (id)
|
|
||||||
do update set cron_expr=?")
|
|
||||||
|
|
||||||
(defn- synchronize-schedule-item
|
|
||||||
[conn {:keys [id cron] :as item}]
|
|
||||||
(let [cron (str cron)]
|
|
||||||
(log/debug (str/format "Initialize scheduled task '%s' (cron: '%s')." id cron))
|
|
||||||
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
|
|
||||||
|
|
||||||
(defn- synchronize-schedule!
|
|
||||||
[schedule]
|
|
||||||
(db/with-atomic [conn db/pool]
|
|
||||||
(run! (partial synchronize-schedule-item conn) schedule)))
|
|
||||||
|
|
||||||
(def ^:private sql:lock-scheduled-task
|
|
||||||
"select id from scheduled_task where id=? for update skip locked")
|
|
||||||
|
|
||||||
(declare schedule-task!)
|
|
||||||
|
|
||||||
(defn exception->string
|
|
||||||
[error]
|
|
||||||
(with-out-str
|
|
||||||
(.printStackTrace ^Throwable error (java.io.PrintWriter. *out*))))
|
|
||||||
|
|
||||||
(defn- execute-scheduled-task
|
|
||||||
[{:keys [scheduler executor] :as opts} {:keys [id cron] :as task}]
|
|
||||||
(letfn [(run-task [conn]
|
|
||||||
(try
|
|
||||||
(when (db/exec-one! conn [sql:lock-scheduled-task id])
|
|
||||||
(log/info "Executing scheduled task" id)
|
|
||||||
((:fn task) task))
|
|
||||||
(catch Exception e
|
|
||||||
e)))
|
|
||||||
|
|
||||||
(handle-task* [conn]
|
|
||||||
(let [result (run-task conn)]
|
|
||||||
(if (instance? Throwable result)
|
|
||||||
(do
|
|
||||||
(log/warnf result "Unhandled exception on scheduled task '%s'." id)
|
|
||||||
(db/insert! conn :scheduled-task-history
|
|
||||||
{:id (uuid/next)
|
|
||||||
:task-id id
|
|
||||||
:is-error true
|
|
||||||
:reason (exception->string result)}))
|
|
||||||
(db/insert! conn :scheduled-task-history
|
|
||||||
{:id (uuid/next)
|
|
||||||
:task-id id}))))
|
|
||||||
(handle-task []
|
|
||||||
(db/with-atomic [conn db/pool]
|
|
||||||
(handle-task* conn)))]
|
|
||||||
|
|
||||||
(try
|
|
||||||
(px/run! executor handle-task)
|
|
||||||
(finally
|
|
||||||
(schedule-task! opts task)))))
|
|
||||||
|
|
||||||
(defn ms-until-valid
|
|
||||||
[cron]
|
|
||||||
(s/assert dt/cron? cron)
|
|
||||||
(let [^Instant now (dt/now)
|
|
||||||
^Instant next (dt/next-valid-instant-from cron now)]
|
|
||||||
(inst-ms (dt/duration-between now next))))
|
|
||||||
|
|
||||||
(defn- schedule-task!
|
|
||||||
[{:keys [scheduler] :as opts} {:keys [cron] :as task}]
|
|
||||||
(let [ms (ms-until-valid cron)]
|
|
||||||
(px/schedule! scheduler ms (partial execute-scheduled-task opts task))))
|
|
||||||
|
|
||||||
(s/def ::fn (s/or :var var? :fn fn?))
|
|
||||||
(s/def ::id string?)
|
|
||||||
(s/def ::cron dt/cron?)
|
|
||||||
(s/def ::props (s/nilable map?))
|
|
||||||
(s/def ::scheduled-task
|
|
||||||
(s/keys :req-un [::id ::cron ::fn]
|
|
||||||
:opt-un [::props]))
|
|
||||||
|
|
||||||
(s/def ::schedule (s/coll-of ::scheduled-task))
|
|
||||||
(s/def ::start-scheduler-worker-params
|
|
||||||
(s/keys :req-un [::schedule]))
|
|
||||||
|
|
||||||
(defn start-scheduler-worker!
|
|
||||||
[{:keys [schedule] :as opts}]
|
|
||||||
(us/assert ::start-scheduler-worker-params opts)
|
|
||||||
(let [scheduler (Executors/newScheduledThreadPool (int 1))
|
|
||||||
opts (assoc opts :scheduler scheduler)]
|
|
||||||
(synchronize-schedule! schedule)
|
|
||||||
(run! (partial schedule-task! opts) schedule)
|
|
||||||
(reify
|
|
||||||
java.lang.AutoCloseable
|
|
||||||
(close [_]
|
|
||||||
(.shutdownNow ^ExecutorService scheduler)))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; Thread Pool
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(defn thread-pool
|
|
||||||
([] (thread-pool {}))
|
|
||||||
([{:keys [min-threads max-threads idle-timeout name]
|
|
||||||
:or {min-threads 0 max-threads 128 idle-timeout 60000}}]
|
|
||||||
(let [executor (QueuedThreadPool. max-threads min-threads)]
|
|
||||||
(.setName executor (or name "default-tp"))
|
|
||||||
(.start executor)
|
|
||||||
executor)))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; Helpers
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(defn stop!
|
|
||||||
[o]
|
|
||||||
(cond
|
|
||||||
(instance? java.lang.AutoCloseable o)
|
|
||||||
(.close ^java.lang.AutoCloseable o)
|
|
||||||
|
|
||||||
(instance? org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
|
||||||
(.stop ^org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
|
||||||
|
|
||||||
:else
|
|
||||||
(ex/raise :type :not-implemented)))
|
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@
|
||||||
(let [profile (mf/deref refs/profile)
|
(let [profile (mf/deref refs/profile)
|
||||||
page (get-in route [:data :name])
|
page (get-in route [:data :name])
|
||||||
{:keys [search-term team-id project-id] :as params}
|
{:keys [search-term team-id project-id] :as params}
|
||||||
(parse-params route profile)]
|
(parse-params route profile)]
|
||||||
[:*
|
[:*
|
||||||
[:& global-notifications {:profile profile}]
|
[:& global-notifications {:profile profile}]
|
||||||
[:section.dashboard-layout
|
[:section.dashboard-layout
|
||||||
|
|
Loading…
Add table
Reference in a new issue