From e2ddb3e31e27aad97b060a2278025f00c071bf29 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 2 Apr 2024 14:13:37 +0200 Subject: [PATCH] :sparkles: Move worker cron related code to a separated namespace --- backend/src/app/worker.clj | 140 +--------------------------- backend/src/app/worker/cron.clj | 156 ++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 139 deletions(-) create mode 100644 backend/src/app/worker/cron.clj diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index be00edd10..7e8f1a67c 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -22,11 +22,9 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] - [promesa.core :as p] [promesa.exec :as px]) (:import java.util.concurrent.Executor - java.util.concurrent.Future java.util.concurrent.ThreadPoolExecutor)) (set! *warn-on-reflection* true) @@ -486,146 +484,10 @@ (l/err :hint "worker: unhandled exception" :cause cause)))))) -(defn- get-error-context +(defn get-error-context [_ item] {:params item}) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; CRON -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(declare schedule-cron-task) -(declare synchronize-cron-entries!) - -(s/def ::fn (s/or :var var? :fn fn?)) -(s/def ::id keyword?) -(s/def ::cron dt/cron?) -(s/def ::props (s/nilable map?)) -(s/def ::task keyword?) - -(s/def ::cron-task - (s/keys :req-un [::cron ::task] - :opt-un [::props ::id])) - -(s/def ::entries (s/coll-of (s/nilable ::cron-task))) - -(defmethod ig/pre-init-spec ::cron [_] - (s/keys :req [::db/pool ::entries ::registry])) - -(defmethod ig/init-key ::cron - [_ {:keys [::entries ::registry ::db/pool] :as cfg}] - (if (db/read-only? pool) - (l/wrn :hint "cron: not started (db is read-only)") - (let [running (atom #{}) - entries (->> entries - (filter some?) - ;; If id is not defined, use the task as id. - (map (fn [{:keys [id task] :as item}] - (if (some? id) - (assoc item :id (d/name id)) - (assoc item :id (d/name task))))) - (map (fn [item] - (update item :task d/name))) - (map (fn [{:keys [task] :as item}] - (let [f (get registry task)] - (when-not f - (ex/raise :type :internal - :code :task-not-found - :hint (str/fmt "task %s not configured" task))) - (-> item - (dissoc :task) - (assoc :fn f)))))) - - cfg (assoc cfg ::entries entries ::running running)] - - (l/inf :hint "cron: started" :tasks (count entries)) - (synchronize-cron-entries! cfg) - - (->> (filter some? entries) - (run! (partial schedule-cron-task cfg))) - - (reify - clojure.lang.IDeref - (deref [_] @running) - - java.lang.AutoCloseable - (close [_] - (l/inf :hint "cron: terminated") - (doseq [item @running] - (when-not (.isDone ^Future item) - (.cancel ^Future item true)))))))) - -(defmethod ig/halt-key! ::cron - [_ instance] - (some-> instance d/close!)) - -(def sql:upsert-cron-task - "insert into scheduled_task (id, cron_expr) - values (?, ?) - on conflict (id) - do update set cron_expr=?") - -(defn- synchronize-cron-entries! - [{:keys [::db/pool ::entries]}] - (db/with-atomic [conn pool] - (doseq [{:keys [id cron]} entries] - (l/trc :hint "register cron task" :id id :cron (str cron)) - (db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)])))) - -(defn- lock-scheduled-task! - [conn id] - (let [sql (str "SELECT id FROM scheduled_task " - " WHERE id=? FOR UPDATE SKIP LOCKED")] - (some? (db/exec-one! conn [sql (d/name id)])))) - -(defn- execute-cron-task - [cfg {:keys [id] :as task}] - (px/thread - {:name (str "penpot/cron-task/" id)} - (let [tpoint (dt/tpoint)] - (try - (db/tx-run! cfg (fn [{:keys [::db/conn]}] - (db/exec-one! conn ["SET LOCAL statement_timeout=0;"]) - (db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout=0;"]) - (when (lock-scheduled-task! conn id) - (l/dbg :hint "cron: start task" :task-id id) - ((:fn task) task) - (let [elapsed (dt/format-duration (tpoint))] - (l/dbg :hint "cron: end task" :task-id id :elapsed elapsed))))) - - (catch InterruptedException _ - (let [elapsed (dt/format-duration (tpoint))] - (l/debug :hint "cron: task interrupted" :task-id id :elapsed elapsed))) - - (catch Throwable cause - (let [elapsed (dt/format-duration (tpoint))] - (binding [l/*context* (get-error-context cause task)] - (l/err :hint "cron: unhandled exception on running task" - :task-id id - :elapsed elapsed - :cause cause)))) - (finally - (when-not (px/interrupted? :current) - (schedule-cron-task cfg task))))))) - -(defn- ms-until-valid - [cron] - (s/assert dt/cron? cron) - (let [now (dt/now) - next (dt/next-valid-instant-from cron now)] - (dt/diff now next))) - -(defn- schedule-cron-task - [{:keys [::running] :as cfg} {:keys [cron id] :as task}] - (let [ts (ms-until-valid cron) - ft (px/schedule! ts (partial execute-cron-task cfg task))] - - (l/dbg :hint "cron: schedule task" :task-id id - :ts (dt/format-duration ts) - :at (dt/format-instant (dt/in-future ts))) - - (swap! running #(into #{ft} (filter p/pending?) %)))) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; SUBMIT API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/worker/cron.clj b/backend/src/app/worker/cron.clj new file mode 100644 index 000000000..0f44dcaae --- /dev/null +++ b/backend/src/app/worker/cron.clj @@ -0,0 +1,156 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.worker.cron + (:require + [app.common.data :as d] + [app.common.exceptions :as ex] + [app.common.logging :as l] + [app.db :as db] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [integrant.core :as ig] + [promesa.core :as p] + [promesa.exec :as px]) + (:import + java.util.concurrent.Future)) + +(set! *warn-on-reflection* true) + +(def sql:upsert-cron-task + "insert into scheduled_task (id, cron_expr) + values (?, ?) + on conflict (id) + do update set cron_expr=?") + +(defn- synchronize-cron-entries! + [{:keys [::db/pool ::entries]}] + (db/with-atomic [conn pool] + (doseq [{:keys [id cron]} entries] + (l/trc :hint "register cron task" :id id :cron (str cron)) + (db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)])))) + +(defn- lock-scheduled-task! + [conn id] + (let [sql (str "SELECT id FROM scheduled_task " + " WHERE id=? FOR UPDATE SKIP LOCKED")] + (some? (db/exec-one! conn [sql (d/name id)])))) + +(declare ^:private schedule-cron-task) + +(defn- execute-cron-task + [cfg {:keys [id] :as task}] + (px/thread + {:name (str "penpot/cron-task/" id)} + (let [tpoint (dt/tpoint)] + (try + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + (db/exec-one! conn ["SET LOCAL statement_timeout=0;"]) + (db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout=0;"]) + (when (lock-scheduled-task! conn id) + (l/dbg :hint "start task" :task-id id) + ((:fn task) task) + (let [elapsed (dt/format-duration (tpoint))] + (l/dbg :hint "end task" :task-id id :elapsed elapsed))))) + + (catch InterruptedException _ + (let [elapsed (dt/format-duration (tpoint))] + (l/debug :hint "task interrupted" :task-id id :elapsed elapsed))) + + (catch Throwable cause + (let [elapsed (dt/format-duration (tpoint))] + (binding [l/*context* (wrk/get-error-context cause task)] + (l/err :hint "unhandled exception on running task" + :task-id id + :elapsed elapsed + :cause cause)))) + (finally + (when-not (px/interrupted? :current) + (schedule-cron-task cfg task))))))) + +(defn- ms-until-valid + [cron] + (s/assert dt/cron? cron) + (let [now (dt/now) + next (dt/next-valid-instant-from cron now)] + (dt/diff now next))) + +(defn- schedule-cron-task + [{:keys [::running] :as cfg} {:keys [cron id] :as task}] + (let [ts (ms-until-valid cron) + ft (px/schedule! ts (partial execute-cron-task cfg task))] + + (l/dbg :hint "schedule task" :task-id id + :ts (dt/format-duration ts) + :at (dt/format-instant (dt/in-future ts))) + + (swap! running #(into #{ft} (filter p/pending?) %)))) + + +(s/def ::fn (s/or :var var? :fn fn?)) +(s/def ::id keyword?) +(s/def ::cron dt/cron?) +(s/def ::props (s/nilable map?)) +(s/def ::task keyword?) + +(s/def ::wrk/task + (s/keys :req-un [::cron ::task] + :opt-un [::props ::id])) + +(s/def ::wrk/entries (s/coll-of (s/nilable ::wrk/task))) + +(defmethod ig/pre-init-spec ::wrk/cron [_] + (s/keys :req [::db/pool ::wrk/entries ::wrk/registry])) + +(defmethod ig/init-key ::wrk/cron + [_ {:keys [::wrk/entries ::wrk/registry ::db/pool] :as cfg}] + (if (db/read-only? pool) + (l/wrn :hint "service not started (db is read-only)") + (let [running (atom #{}) + entries (->> entries + (filter some?) + ;; If id is not defined, use the task as id. + (map (fn [{:keys [id task] :as item}] + (if (some? id) + (assoc item :id (d/name id)) + (assoc item :id (d/name task))))) + (map (fn [item] + (update item :task d/name))) + (map (fn [{:keys [task] :as item}] + (let [f (get registry task)] + (when-not f + (ex/raise :type :internal + :code :task-not-found + :hint (str/fmt "task %s not configured" task))) + (-> item + (dissoc :task) + (assoc :fn f)))))) + + cfg (assoc cfg ::entries entries ::running running)] + + (l/inf :hint "started" :tasks (count entries)) + (synchronize-cron-entries! cfg) + + (->> (filter some? entries) + (run! (partial schedule-cron-task cfg))) + + (reify + clojure.lang.IDeref + (deref [_] @running) + + java.lang.AutoCloseable + (close [_] + (l/inf :hint "terminated") + (doseq [item @running] + (when-not (.isDone ^Future item) + (.cancel ^Future item true)))))))) + +(defmethod ig/halt-key! ::wrk/cron + [_ instance] + (some-> instance d/close!)) +