mirror of
synced 2025-03-12 07:41:43 -05:00
✨ Improve worker queue management
and add specific worker instance for webhooks
This commit is contained in:
3 changed files with 106 additions and 84 deletions
@ -108,7 +108,9 @@
(s/def ::default-executor-parallelism ::us/integer)
(s/def ::scheduled-executor-parallelism ::us/integer)
(s/def ::worker-parallelism ::us/integer)
(s/def ::worker-default-parallelism ::us/integer)
(s/def ::worker-webhook-parallelism ::us/integer)
(s/def ::authenticated-cookie-domain ::us/string)
(s/def ::authenticated-cookie-name ::us/string)
@ -222,7 +224,8 @@
@ -494,20 +494,28 @@
{:cron #app/cron "30 */5 * * * ?" ;; every 5m
:task :audit-log-gc})]}
{::rds/redis (ig/ref ::rds/redis)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}
{::wrk/parallelism (cf/get ::worker-parallelism 1)
;; FIXME: read queues from configuration
::wrk/queue "default"
[::default ::wrk/worker]
{::wrk/parallelism (cf/get ::worker-default-parallelism 1)
::wrk/queue :default
::rds/redis (ig/ref ::rds/redis)
::wrk/registry (ig/ref ::wrk/registry)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}
[::webhook ::wrk/worker]
{::wrk/parallelism (cf/get ::worker-webhook-parallelism 1)
::wrk/queue :webhooks
::rds/redis (ig/ref ::rds/redis)
::wrk/registry (ig/ref ::wrk/registry)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}})
(def system nil)
(defn start
@ -14,6 +14,7 @@
[app.common.spec :as us]
[app.common.transit :as t]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.metrics :as mtx]
[app.redis :as rds]
@ -174,63 +175,62 @@
(db/pgobject? props)
(assoc :props (db/decode-transit-pgobject props))))
(s/def ::queue ::us/string)
(s/def ::wait-duration ::dt/duration)
(defmethod ig/pre-init-spec ::scheduler [_]
(defmethod ig/pre-init-spec ::dispatcher [_]
(s/keys :req [::mtx/metrics
:opt [::wait-duration
(defmethod ig/prep-key ::scheduler
(defmethod ig/prep-key ::dispatcher
[_ cfg]
(merge {::batch-size 1
::wait-duration (dt/duration "2s")}
(merge {::batch-size 100
::wait-duration (dt/duration "5s")}
(d/without-nils cfg)))
(def ^:private sql:select-next-tasks
"select * from task as t
"select id, queue from task as t
where t.scheduled_at <= now()
and (t.status = 'new' or t.status = 'retry')
and queue ~~* ?::text
order by t.priority desc, t.scheduled_at
limit ?
for update skip locked")
(defn- format-queue
(str/ffmt "penpot-tasks-queue:%" queue))
(defmethod ig/init-key ::scheduler
(defmethod ig/init-key ::dispatcher
[_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}]
(letfn [(get-tasks-batch [conn]
(->> (db/exec! conn [sql:select-next-tasks batch-size])
(map decode-task-row)
(letfn [(get-tasks [conn]
(let [prefix (str (cf/get :tenant) ":%")]
(seq (db/exec! conn [sql:select-next-tasks prefix batch-size]))))
(queue-task [conn rconn {:keys [id queue] :as task}]
(db/update! conn :task {:status "ready"} {:id id})
(let [queue (format-queue queue)
payload (t/encode id)
result (rds/rpush! rconn queue payload)]
(l/debug :hist "scheduler: task pushed to redis"
:task-id id
:key queue
:queued result)))
(push-tasks! [conn rconn [queue tasks]]
(let [ids (mapv :id tasks)
key (str/ffmt "taskq:%" queue)
res (rds/rpush! rconn key (mapv t/encode ids))
sql [(str "update task set status = 'scheduled'"
" where id = ANY(?)")
(db/create-array conn "uuid" ids)]]
(run-batch [rconn]
(db/exec-one! conn sql)
(l/debug :hist "dispatcher: push tasks to redis"
:queue queue
:tasks (count ids)
:queued res)))
(run-batch! [rconn]
(db/with-atomic [conn pool]
(when-let [tasks (get-tasks-batch conn)]
(run! (partial queue-task conn rconn) tasks)
(when-let [tasks (get-tasks conn)]
(->> (group-by :queue tasks)
(run! (partial push-tasks! conn rconn)))
(if (db/read-only? pool)
(l/warn :hint "scheduler: not started (db is read-only)")
(l/warn :hint "dispatcher: not started (db is read-only)")
{:name "penpot/scheduler"}
(l/info :hint "scheduler: started")
{:name "penpot/worker-dispatcher"}
(l/info :hint "dispatcher: started")
(dm/with-open [rconn (rds/connect redis)]
(loop []
@ -238,7 +238,7 @@
(throw (InterruptedException. "interrumpted")))
(when-not (run-batch rconn)
(when-not (run-batch! rconn)
(px/sleep (::wait-duration cfg)))
(catch InterruptedException cause
(throw cause))
@ -246,29 +246,29 @@
(rds/exception? cause)
(l/warn :hint "scheduler: redis exception (will retry in an instant)" :cause cause)
(l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
(db/sql-exception? cause)
(l/warn :hint "scheduler: database exception (will retry in an instant)" :cause cause)
(l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
(l/error :hint "scheduler: unhandled exception (will retry in an instant)" :cause cause)
(l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn))))))
(catch InterruptedException _
(l/debug :hint "scheduler: interrupted"))
(l/debug :hint "dispatcher: interrupted"))
(catch Throwable cause
(l/error :hint "scheduler: unexpected exception" :cause cause))
(l/error :hint "dispatcher: unexpected exception" :cause cause))
(l/info :hint "scheduler: terminated")))))))
(l/info :hint "dispatcher: terminated")))))))
(defmethod ig/halt-key! ::scheduler
(defmethod ig/halt-key! ::dispatcher
[_ thread]
(some-> thread px/interrupt!))
@ -288,36 +288,38 @@
;; FIXME: define queue as set
(defmethod ig/prep-key ::worker
[_ cfg]
(merge {::queue "default" ::parallelism 1}
(merge {::parallelism 1}
(d/without-nils cfg)))
(defmethod ig/init-key ::worker
[_ {:keys [::db/pool ::queue ::parallelism] :as cfg}]
(if (db/read-only? pool)
(l/warn :hint "workers: not started (db is read-only)" :queue queue)
(->> (range parallelism)
(map #(assoc cfg ::worker-id %))
(map start-worker!)))))
(let [queue (d/name queue)
cfg (assoc cfg ::queue queue)]
(if (db/read-only? pool)
(l/warn :hint "worker: not started (db is read-only)" :queue queue :parallelism parallelism)
(->> (range parallelism)
(map #(assoc cfg ::worker-id %))
(map start-worker!))))))
(defmethod ig/halt-key! ::worker
[_ threads]
(run! px/interrupt! threads))
(defn- start-worker!
[{:keys [::rds/redis ::worker-id] :as cfg}]
[{:keys [::rds/redis ::worker-id ::queue] :as cfg}]
{:name (format "penpot/worker/%s" worker-id)}
(l/info :hint "worker: started" :worker-id worker-id)
(l/info :hint "worker: started" :worker-id worker-id :queue queue)
(dm/with-open [rconn (rds/connect redis)]
(let [cfg (-> cfg
(update ::queue format-queue)
(assoc ::rds/rconn rconn)
(assoc ::timeout (dt/duration "5s")))]
(let [tenant (cf/get :tenant "main")
cfg (-> cfg
(assoc ::queue (str/ffmt "taskq:%:%" tenant queue))
(assoc ::rds/rconn rconn)
(assoc ::timeout (dt/duration "5s")))]
(loop []
(when (px/interrupted?)
(throw (InterruptedException. "interrupted")))
@ -327,13 +329,17 @@
(catch InterruptedException _
(l/debug :hint "worker: interrupted"
:worker-id worker-id))
:worker-id worker-id
:queue queue))
(catch Throwable cause
(l/error :hint "worker: unexpected exception"
:worker-id worker-id
:queue queue
:cause cause))
(l/info :hint "worker: terminated" :worker-id worker-id)))))
(l/info :hint "worker: terminated"
:worker-id worker-id
:queue queue)))))
(defn- run-worker-loop!
[{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}]
@ -631,9 +637,26 @@
(s/def ::task keyword?)
(defn- extract-props
(let [cns (namespace ::sample)]
(reduce-kv (fn [res k v]
(cond-> res
(not= (namespace k) cns)
(assoc! k v)))
(transient {})
(def ^:private sql:insert-new-task
"insert into task (id, name, props, queue, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, now() + ?)
returning id")
(s/def ::task (s/or :kw keyword? :str string?))
(s/def ::queue (s/or :kw keyword? :str string?))
(s/def ::delay (s/or :int ::us/integer :duration dt/duration?))
(s/def ::conn some?)
(s/def ::conn (s/or :pool ::db/pool :connection some?))
(s/def ::priority ::us/integer)
(s/def ::max-retries ::us/integer)
@ -641,36 +664,24 @@
(s/keys :req [::task ::conn]
:opt [::delay ::queue ::priority ::max-retries]))
(defn- extract-props
(reduce-kv (fn [res k v]
(cond-> res
(not (qualified-keyword? k))
(assoc! k v)))
(transient {})
(def ^:private sql:insert-new-task
"insert into task (id, name, props, queue, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, now() + ?)
returning id")
(defn submit!
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn]
:or {delay 0 queue "default" priority 100 max-retries 3}
:or {delay 0 queue :default priority 100 max-retries 3}
:as options}]
(us/verify ::submit-options options)
(let [duration (dt/duration delay)
interval (db/interval duration)
props (-> options extract-props db/tjson)
id (uuid/next)]
id (uuid/next)
tenant (cf/get :tenant)
task (d/name task)
queue (str/ffmt "%:%" tenant (d/name queue))]
(l/debug :hint "submit task"
:name (d/name task)
:name task
:queue queue
:in (dt/format-duration duration))
(db/exec-one! conn [sql:insert-new-task id (d/name task) props
(db/exec-one! conn [sql:insert-new-task id task props
queue priority max-retries interval])
Add table
Reference in a new issue