From 4ccea6b2cf30c556f7064030b9365ccc1630cb02 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 2 Apr 2024 14:36:49 +0200 Subject: [PATCH] :sparkles: Move worker dispatcher code to a separated ns --- backend/src/app/worker.clj | 99 +---------------------- backend/src/app/worker/dispatcher.clj | 110 ++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 97 deletions(-) create mode 100644 backend/src/app/worker/dispatcher.clj diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index fbb445353..9614a3fa4 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -44,6 +44,7 @@ :labels labels}))))))) (s/def ::registry (s/map-of ::us/string fn?)) +(s/def ::tasks (s/map-of keyword? fn?)) (defmethod ig/pre-init-spec ::registry [_] (s/keys :req [::mtx/metrics ::tasks])) @@ -59,7 +60,7 @@ tasks)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; DISPATCHER +;; WORKER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- decode-task-row @@ -68,102 +69,6 @@ (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))) -(s/def ::wait-duration ::dt/duration) - -(defmethod ig/pre-init-spec ::dispatcher [_] - (s/keys :req [::mtx/metrics - ::db/pool - ::rds/redis] - :opt [::wait-duration - ::batch-size])) - -(defmethod ig/prep-key ::dispatcher - [_ cfg] - (merge {::batch-size 100 - ::wait-duration (dt/duration "5s")} - (d/without-nils cfg))) - -(def ^:private sql:select-next-tasks - "select id, queue from task as t - where t.scheduled_at <= now() - and (t.status = 'new' or t.status = 'retry') - and queue ~~* ?::text - order by t.priority desc, t.scheduled_at - limit ? - for update skip locked") - -(defmethod ig/init-key ::dispatcher - [_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}] - (letfn [(get-tasks [conn] - (let [prefix (str (cf/get :tenant) ":%")] - (seq (db/exec! conn [sql:select-next-tasks prefix batch-size])))) - - (push-tasks! [conn rconn [queue tasks]] - (let [ids (mapv :id tasks) - key (str/ffmt "taskq:%" queue) - res (rds/rpush! rconn key (mapv t/encode ids)) - sql [(str "update task set status = 'scheduled'" - " where id = ANY(?)") - (db/create-array conn "uuid" ids)]] - - (db/exec-one! conn sql) - (l/trc :hist "dispatcher: queue tasks" - :queue queue - :tasks (count ids) - :queued res))) - - (run-batch! [rconn] - (try - (db/with-atomic [conn pool] - (if-let [tasks (get-tasks conn)] - (->> (group-by :queue tasks) - (run! (partial push-tasks! conn rconn))) - (px/sleep (::wait-duration cfg)))) - (catch InterruptedException cause - (throw cause)) - (catch Exception cause - (cond - (rds/exception? cause) - (do - (l/wrn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause) - (px/sleep (::rds/timeout rconn))) - - (db/sql-exception? cause) - (do - (l/wrn :hint "dispatcher: database exception (will retry in an instant)" :cause cause) - (px/sleep (::rds/timeout rconn))) - - :else - (do - (l/err :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause) - (px/sleep (::rds/timeout rconn))))))) - - (dispatcher [] - (l/inf :hint "dispatcher: started") - (try - (dm/with-open [rconn (rds/connect redis)] - (loop [] - (run-batch! rconn) - (recur))) - (catch InterruptedException _ - (l/trc :hint "dispatcher: interrupted")) - (catch Throwable cause - (l/err :hint "dispatcher: unexpected exception" :cause cause)) - (finally - (l/inf :hint "dispatcher: terminated"))))] - - (if (db/read-only? pool) - (l/wrn :hint "dispatcher: not started (db is read-only)") - (px/fn->thread dispatcher :name "penpot/worker/dispatcher" :virtual false)))) - -(defmethod ig/halt-key! ::dispatcher - [_ thread] - (some-> thread px/interrupt!)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; WORKER -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (declare ^:private run-worker-loop!) (declare ^:private start-worker!) (declare ^:private get-error-context) diff --git a/backend/src/app/worker/dispatcher.clj b/backend/src/app/worker/dispatcher.clj new file mode 100644 index 000000000..dbdb06042 --- /dev/null +++ b/backend/src/app/worker/dispatcher.clj @@ -0,0 +1,110 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.worker.dispatcher + (:require + [app.common.data :as d] + [app.common.data.macros :as dm] + [app.common.logging :as l] + [app.common.transit :as t] + [app.config :as cf] + [app.db :as db] + [app.metrics :as mtx] + [app.redis :as rds] + [app.util.time :as dt] + [app.worker :as-alias wrk] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [integrant.core :as ig] + [promesa.exec :as px])) + +(set! *warn-on-reflection* true) + +(defmethod ig/pre-init-spec ::wrk/dispatcher [_] + (s/keys :req [::mtx/metrics ::db/pool ::rds/redis])) + +(defmethod ig/prep-key ::wrk/dispatcher + [_ cfg] + (merge {::batch-size 100 + ::wait-duration (dt/duration "5s")} + (d/without-nils cfg))) + +(def ^:private sql:select-next-tasks + "select id, queue from task as t + where t.scheduled_at <= now() + and (t.status = 'new' or t.status = 'retry') + and queue ~~* ?::text + order by t.priority desc, t.scheduled_at + limit ? + for update skip locked") + +(defmethod ig/init-key ::wrk/dispatcher + [_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}] + (letfn [(get-tasks [conn] + (let [prefix (str (cf/get :tenant) ":%")] + (seq (db/exec! conn [sql:select-next-tasks prefix batch-size])))) + + (push-tasks! [conn rconn [queue tasks]] + (let [ids (mapv :id tasks) + key (str/ffmt "taskq:%" queue) + res (rds/rpush! rconn key (mapv t/encode ids)) + sql [(str "update task set status = 'scheduled'" + " where id = ANY(?)") + (db/create-array conn "uuid" ids)]] + + (db/exec-one! conn sql) + (l/trc :hist "queue tasks" + :queue queue + :tasks (count ids) + :queued res))) + + (run-batch! [rconn] + (try + (db/with-atomic [conn pool] + (if-let [tasks (get-tasks conn)] + (->> (group-by :queue tasks) + (run! (partial push-tasks! conn rconn))) + (px/sleep (::wait-duration cfg)))) + (catch InterruptedException cause + (throw cause)) + (catch Exception cause + (cond + (rds/exception? cause) + (do + (l/wrn :hint "redis exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + (db/sql-exception? cause) + (do + (l/wrn :hint "database exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + :else + (do + (l/err :hint "unhandled exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))))))) + + (dispatcher [] + (l/inf :hint "started") + (try + (dm/with-open [rconn (rds/connect redis)] + (loop [] + (run-batch! rconn) + (recur))) + (catch InterruptedException _ + (l/trc :hint "interrupted")) + (catch Throwable cause + (l/err :hint " unexpected exception" :cause cause)) + (finally + (l/inf :hint "terminated"))))] + + (if (db/read-only? pool) + (l/wrn :hint "not started (db is read-only)") + (px/fn->thread dispatcher :name "penpot/worker/dispatcher" :virtual false)))) + +(defmethod ig/halt-key! ::wrk/dispatcher + [_ thread] + (some-> thread px/interrupt!))