From 04b321caae2a4def59f4b03157803a4fbb9b78f6 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 20 Feb 2023 13:15:11 +0100 Subject: [PATCH] :sparkles: Add several improvements to internal worker impl Mainly for make the cron jobs do not block the scheduled executor and offload all work to a separate threads --- backend/src/app/main.clj | 4 +- backend/src/app/worker.clj | 125 ++++++++++++++++++------------------- 2 files changed, 64 insertions(+), 65 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 18509bbe0..9d5ae4ad0 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -297,10 +297,10 @@ ::wrk/executor (ig/ref ::wrk/executor) ::session/manager (ig/ref ::session/manager)} - :app.http.websocket/routes + ::http.ws/routes {::db/pool (ig/ref ::db/pool) ::mtx/metrics (ig/ref ::mtx/metrics) - ::mbus/msgbus (ig/ref :app.msgbus/msgbus) + ::mbus/msgbus (ig/ref ::mbus/msgbus) ::session/manager (ig/ref ::session/manager)} :app.http.assets/routes diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 61681fb30..337136cea 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -27,8 +27,7 @@ (:import java.util.concurrent.ExecutorService java.util.concurrent.ForkJoinPool - java.util.concurrent.Future - java.util.concurrent.ScheduledExecutorService)) + java.util.concurrent.Future)) (set! *warn-on-reflection* true) @@ -133,7 +132,7 @@ steals))] (px/thread - {:name "penpot/executors-monitor"} + {:name "penpot/executors-monitor" :virtual true} (l/info :hint "monitor: started" :name name) (try (loop [steals 0] @@ -206,53 +205,52 @@ :queued res))) (run-batch! [rconn] - (db/with-atomic [conn pool] - (when-let [tasks (get-tasks conn)] - (->> (group-by :queue tasks) - (run! (partial push-tasks! conn rconn))) - true)))] + (try + (db/with-atomic [conn pool] + (if-let [tasks (get-tasks conn)] + (->> (group-by :queue tasks) + (run! (partial push-tasks! conn rconn))) + (px/sleep (::wait-duration cfg)))) + (catch InterruptedException cause + (throw cause)) + (catch Exception cause + (cond + (rds/exception? cause) + (do + (l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + (db/sql-exception? cause) + (do + (l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + :else + (do + (l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))))))) + + (dispatcher [] + (l/info :hint "dispatcher: started") + (try + (dm/with-open [rconn (rds/connect redis)] + (loop [] + (run-batch! rconn) + (recur))) + (catch InterruptedException _ + (l/trace :hint "dispatcher: interrupted")) + (catch Throwable cause + (l/error :hint "dispatcher: unexpected exception" :cause cause)) + (finally + (l/info :hint "dispatcher: terminated"))))] (if (db/read-only? pool) (l/warn :hint "dispatcher: not started (db is read-only)") - (px/thread - {:name "penpot/worker-dispatcher"} - (l/info :hint "dispatcher: started") - (try - (dm/with-open [rconn (rds/connect redis)] - (loop [] - (when (px/interrupted?) - (throw (InterruptedException. "interrumpted"))) - (try - (when-not (run-batch! rconn) - (px/sleep (::wait-duration cfg))) - (catch InterruptedException cause - (throw cause)) - (catch Exception cause - (cond - (rds/exception? cause) - (do - (l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause) - (px/sleep (::rds/timeout rconn))) - - (db/sql-exception? cause) - (do - (l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause) - (px/sleep (::rds/timeout rconn))) - - :else - (do - (l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause) - (px/sleep (::rds/timeout rconn)))))) - - (recur))) - - (catch InterruptedException _ - (l/debug :hint "dispatcher: interrupted")) - (catch Throwable cause - (l/error :hint "dispatcher: unexpected exception" :cause cause)) - (finally - (l/info :hint "dispatcher: terminated"))))))) + ;; FIXME: we don't use virtual threads here until JDBC is uptaded to >= 42.6.0 + ;; bacause it has the necessary fixes fro make the JDBC driver properly compatible + ;; with Virtual Threads. + (px/fn->thread dispatcher :name "penpot/worker/dispatcher" :virtual false)))) (defmethod ig/halt-key! ::dispatcher [_ thread] @@ -297,7 +295,7 @@ (defn- start-worker! [{:keys [::rds/redis ::worker-id ::queue] :as cfg}] (px/thread - {:name (format "penpot/worker/%s" worker-id)} + {:name (format "penpot/worker/runner:%s" worker-id)} (l/info :hint "worker: started" :worker-id worker-id :queue queue) (try (dm/with-open [rconn (rds/connect redis)] @@ -584,22 +582,23 @@ (defn- execute-cron-task [{:keys [::db/pool] :as cfg} {:keys [id] :as task}] - (try - (db/with-atomic [conn pool] - (when (db/exec-one! conn [sql:lock-cron-task (d/name id)]) - (l/trace :hint "cron: execute task" :task-id id) - ((:fn task) task))) - (catch InterruptedException _ - (px/interrupt! (px/current-thread)) - (l/debug :hint "cron: task interrupted" :task-id id)) - (catch Throwable cause - (l/error :hint "cron: unhandled exception on running task" - ::l/context (get-error-context cause task) - :task-id id - :cause cause)) - (finally - (when-not (px/interrupted? :current) - (schedule-cron-task cfg task))))) + (px/thread + {:name (str "penpot/cront-task/" id)} + (try + (db/with-atomic [conn pool] + (when (db/exec-one! conn [sql:lock-cron-task (d/name id)]) + (l/trace :hint "cron: execute task" :task-id id) + ((:fn task) task))) + (catch InterruptedException _ + (l/debug :hint "cron: task interrupted" :task-id id)) + (catch Throwable cause + (l/error :hint "cron: unhandled exception on running task" + ::l/context (get-error-context cause task) + :task-id id + :cause cause)) + (finally + (when-not (px/interrupted? :current) + (schedule-cron-task cfg task)))))) (defn- ms-until-valid [cron]