From 7948f565e33a906f8b53c6a9d7f33b7d85be9be0 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 15 Apr 2024 10:05:16 +0200 Subject: [PATCH] :sparkles: Make cron task schedule sync more lock resilent --- backend/src/app/worker/cron.clj | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/backend/src/app/worker/cron.clj b/backend/src/app/worker/cron.clj index 689fcba90..72a66a22b 100644 --- a/backend/src/app/worker/cron.clj +++ b/backend/src/app/worker/cron.clj @@ -27,14 +27,15 @@ "insert into scheduled_task (id, cron_expr) values (?, ?) on conflict (id) - do update set cron_expr=?") + do nothing") (defn- synchronize-cron-entries! - [{:keys [::db/pool ::entries]}] - (db/with-atomic [conn pool] - (doseq [{:keys [id cron]} entries] - (l/trc :hint "register cron task" :id id :cron (str cron)) - (db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)])))) + [{:keys [::db/conn ::entries]}] + (doseq [{:keys [id cron]} entries] + (let [result (db/exec-one! conn [sql:upsert-cron-task id (str cron)]) + updated? (pos? (db/get-update-count result))] + (l/dbg :hint "register task" :id id :cron (str cron) + :status (if updated? "created" "exists"))))) (defn- lock-scheduled-task! [conn id] @@ -45,7 +46,7 @@ (declare ^:private schedule-cron-task) (defn- execute-cron-task - [cfg {:keys [id] :as task}] + [cfg {:keys [id cron] :as task}] (px/thread {:name (str "penpot/cron-task/" id)} (let [tpoint (dt/tpoint)] @@ -54,20 +55,25 @@ (db/exec-one! conn ["SET LOCAL statement_timeout=0;"]) (db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout=0;"]) (when (lock-scheduled-task! conn id) - (l/dbg :hint "start task" :task-id id) + (db/update! conn :scheduled-task + {:cron-expr (str cron) + :modified-at (dt/now)} + {:id id} + {::db/return-keys false}) + (l/dbg :hint "start" :id id) ((:fn task) task) (let [elapsed (dt/format-duration (tpoint))] - (l/dbg :hint "end task" :task-id id :elapsed elapsed))))) + (l/dbg :hint "end" :id id :elapsed elapsed))))) (catch InterruptedException _ (let [elapsed (dt/format-duration (tpoint))] - (l/debug :hint "task interrupted" :task-id id :elapsed elapsed))) + (l/debug :hint "task interrupted" :id id :elapsed elapsed))) (catch Throwable cause (let [elapsed (dt/format-duration (tpoint))] (binding [l/*context* (get-error-context cause task)] (l/err :hint "unhandled exception on running task" - :task-id id + :id id :elapsed elapsed :cause cause)))) (finally @@ -86,7 +92,7 @@ (let [ts (ms-until-valid cron) ft (px/schedule! ts (partial execute-cron-task cfg task))] - (l/dbg :hint "schedule task" :task-id id + (l/dbg :hint "schedule task" :id id :ts (dt/format-duration ts) :at (dt/format-instant (dt/in-future ts))) @@ -135,7 +141,8 @@ cfg (assoc cfg ::entries entries ::running running)] (l/inf :hint "started" :tasks (count entries)) - (synchronize-cron-entries! cfg) + + (db/tx-run! cfg synchronize-cron-entries!) (->> (filter some? entries) (run! (partial schedule-cron-task cfg)))