mirror of
https://github.com/penpot/penpot.git
synced 2025-03-13 00:01:51 -05:00
Merge pull request #2068 from penpot/niwinz-fix-worker-cron-locking-mechanism
🐛 Fix cron scheduler locking mechanism
This commit is contained in:
commit
6bb5fb0361
1 changed files with 13 additions and 17 deletions
|
@ -203,8 +203,7 @@
|
||||||
|
|
||||||
(instance? Exception val)
|
(instance? Exception val)
|
||||||
(do
|
(do
|
||||||
(l/warn :cause val
|
(l/warn :hint "unexpected error ocurried on polling the database (will resume in some instants)" :cause val)
|
||||||
:hint "unexpected error ocurried on polling the database (will resume in some instants)")
|
|
||||||
(a/<! (a/timeout poll-ms))
|
(a/<! (a/timeout poll-ms))
|
||||||
(recur))
|
(recur))
|
||||||
|
|
||||||
|
@ -377,7 +376,7 @@
|
||||||
[{:keys [tasks]} item]
|
[{:keys [tasks]} item]
|
||||||
(let [name (d/name (:name item))]
|
(let [name (d/name (:name item))]
|
||||||
(try
|
(try
|
||||||
(l/debug :action "execute task"
|
(l/trace :action "execute task"
|
||||||
:id (:id item)
|
:id (:id item)
|
||||||
:name name
|
:name name
|
||||||
:retry (:retry-num item))
|
:retry (:retry-num item))
|
||||||
|
@ -425,7 +424,7 @@
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(declare schedule-cron-task)
|
(declare schedule-cron-task)
|
||||||
(declare synchronize-cron-entries)
|
(declare synchronize-cron-entries!)
|
||||||
|
|
||||||
(s/def ::fn (s/or :var var? :fn fn?))
|
(s/def ::fn (s/or :var var? :fn fn?))
|
||||||
(s/def ::id keyword?)
|
(s/def ::id keyword?)
|
||||||
|
@ -466,8 +465,8 @@
|
||||||
|
|
||||||
cfg (assoc cfg :entries entries :running running)]
|
cfg (assoc cfg :entries entries :running running)]
|
||||||
|
|
||||||
(l/info :hint "cron started" :registred-tasks (count entries))
|
(l/info :hint "cron initialized" :tasks (count entries))
|
||||||
(synchronize-cron-entries cfg)
|
(synchronize-cron-entries! cfg)
|
||||||
|
|
||||||
(->> (filter some? entries)
|
(->> (filter some? entries)
|
||||||
(run! (partial schedule-cron-task cfg)))
|
(run! (partial schedule-cron-task cfg)))
|
||||||
|
@ -494,16 +493,12 @@
|
||||||
on conflict (id)
|
on conflict (id)
|
||||||
do update set cron_expr=?")
|
do update set cron_expr=?")
|
||||||
|
|
||||||
(defn- synchronize-cron-item
|
(defn- synchronize-cron-entries!
|
||||||
[conn {:keys [id cron]}]
|
[{:keys [pool entries]}]
|
||||||
(let [cron (str cron)]
|
|
||||||
(l/debug :action "initialize scheduled task" :id id :cron cron)
|
|
||||||
(db/exec-one! conn [sql:upsert-cron-task id cron cron])))
|
|
||||||
|
|
||||||
(defn- synchronize-cron-entries
|
|
||||||
[{:keys [pool schedule]}]
|
|
||||||
(db/with-atomic [conn pool]
|
(db/with-atomic [conn pool]
|
||||||
(run! (partial synchronize-cron-item conn) schedule)))
|
(doseq [{:keys [id cron]} entries]
|
||||||
|
(l/trace :hint "register cron task" :id id :cron (str cron))
|
||||||
|
(db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)]))))
|
||||||
|
|
||||||
(def sql:lock-cron-task
|
(def sql:lock-cron-task
|
||||||
"select id from scheduled_task where id=? for update skip locked")
|
"select id from scheduled_task where id=? for update skip locked")
|
||||||
|
@ -512,7 +507,7 @@
|
||||||
[{:keys [executor pool] :as cfg} {:keys [id] :as task}]
|
[{:keys [executor pool] :as cfg} {:keys [id] :as task}]
|
||||||
(letfn [(run-task [conn]
|
(letfn [(run-task [conn]
|
||||||
(when (db/exec-one! conn [sql:lock-cron-task (d/name id)])
|
(when (db/exec-one! conn [sql:lock-cron-task (d/name id)])
|
||||||
(l/debug :action "execute scheduled task" :id id)
|
(l/trace :hint "execute cron task" :id id)
|
||||||
((:fn task) task)))
|
((:fn task) task)))
|
||||||
|
|
||||||
(handle-task []
|
(handle-task []
|
||||||
|
@ -567,9 +562,10 @@
|
||||||
|
|
||||||
(defmethod ig/init-key ::registry
|
(defmethod ig/init-key ::registry
|
||||||
[_ {:keys [metrics tasks]}]
|
[_ {:keys [metrics tasks]}]
|
||||||
|
(l/info :hint "registry initialized" :tasks (count tasks))
|
||||||
(reduce-kv (fn [res k v]
|
(reduce-kv (fn [res k v]
|
||||||
(let [tname (name k)]
|
(let [tname (name k)]
|
||||||
(l/debug :hint "register task" :name tname)
|
(l/trace :hint "register task" :name tname)
|
||||||
(assoc res k (wrap-task-handler metrics tname v))))
|
(assoc res k (wrap-task-handler metrics tname v))))
|
||||||
{}
|
{}
|
||||||
tasks))
|
tasks))
|
||||||
|
|
Loading…
Add table
Reference in a new issue