From 3a67e51f2fe01bb53a5a63a3a21219176352838d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 2 Apr 2024 15:22:46 +0200 Subject: [PATCH] :sparkles: Move worker runner to a separated namespace --- backend/resources/log4j2-devenv.xml | 2 +- backend/src/app/main.clj | 4 +- backend/src/app/worker.clj | 246 +--------------------- backend/src/app/worker/cron.clj | 9 +- backend/src/app/worker/runner.clj | 272 +++++++++++++++++++++++++ backend/test/backend_tests/helpers.clj | 4 +- 6 files changed, 284 insertions(+), 253 deletions(-) create mode 100644 backend/src/app/worker/runner.clj diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml index 31e196829..3cf7ab00b 100644 --- a/backend/resources/log4j2-devenv.xml +++ b/backend/resources/log4j2-devenv.xml @@ -40,7 +40,7 @@ - + diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 056c99cc8..3c61e6b35 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -493,7 +493,7 @@ ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)} - [::default ::wrk/worker] + [::default ::wrk/runner] {::wrk/parallelism (cf/get ::worker-default-parallelism 1) ::wrk/queue :default ::rds/redis (ig/ref ::rds/redis) @@ -501,7 +501,7 @@ ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)} - [::webhook ::wrk/worker] + [::webhook ::wrk/runner] {::wrk/parallelism (cf/get ::worker-webhook-parallelism 1) ::wrk/queue :webhooks ::rds/redis (ig/ref ::rds/redis) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 9614a3fa4..a648080f3 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -8,21 +8,16 @@ "Async tasks abstraction (impl)." (:require [app.common.data :as d] - [app.common.data.macros :as dm] - [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] - [app.common.transit :as t] [app.common.uuid :as uuid] [app.config :as cf] [app.db :as db] [app.metrics :as mtx] - [app.redis :as rds] [app.util.time :as dt] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [integrant.core :as ig] - [promesa.exec :as px])) + [integrant.core :as ig])) (set! *warn-on-reflection* true) @@ -59,244 +54,6 @@ {} tasks)) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; WORKER -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn- decode-task-row - [{:keys [props] :as row}] - (cond-> row - (db/pgobject? props) - (assoc :props (db/decode-transit-pgobject props)))) - -(declare ^:private run-worker-loop!) -(declare ^:private start-worker!) -(declare ^:private get-error-context) - -(defmethod ig/pre-init-spec ::worker [_] - (s/keys :req [::parallelism - ::mtx/metrics - ::db/pool - ::rds/redis - ::queue - ::registry])) - -(defmethod ig/prep-key ::worker - [_ cfg] - (merge {::parallelism 1} - (d/without-nils cfg))) - -(defmethod ig/init-key ::worker - [_ {:keys [::db/pool ::queue ::parallelism] :as cfg}] - (let [queue (d/name queue) - cfg (assoc cfg ::queue queue)] - (if (db/read-only? pool) - (l/wrn :hint "worker: not started (db is read-only)" :queue queue :parallelism parallelism) - (doall - (->> (range parallelism) - (map #(assoc cfg ::worker-id %)) - (map start-worker!)))))) - -(defmethod ig/halt-key! ::worker - [_ threads] - (run! px/interrupt! threads)) - -(defn- start-worker! - [{:keys [::rds/redis ::worker-id ::queue] :as cfg}] - (px/thread - {:name (format "penpot/worker/runner:%s" worker-id)} - (l/inf :hint "worker: started" :worker-id worker-id :queue queue) - (try - (dm/with-open [rconn (rds/connect redis)] - (let [tenant (cf/get :tenant "main") - cfg (-> cfg - (assoc ::queue (str/ffmt "taskq:%:%" tenant queue)) - (assoc ::rds/rconn rconn) - (assoc ::timeout (dt/duration "5s")))] - (loop [] - (when (px/interrupted?) - (throw (InterruptedException. "interrupted"))) - - (run-worker-loop! cfg) - (recur)))) - - (catch InterruptedException _ - (l/debug :hint "worker: interrupted" - :worker-id worker-id - :queue queue)) - (catch Throwable cause - (l/err :hint "worker: unexpected exception" - :worker-id worker-id - :queue queue - :cause cause)) - (finally - (l/inf :hint "worker: terminated" - :worker-id worker-id - :queue queue))))) - -(defn- run-worker-loop! - [{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}] - (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}] - (let [explain (ex-message error) - nretry (+ (:retry-num task) inc-by) - now (dt/now) - delay (->> (iterate #(* % 2) delay) (take nretry) (last))] - (db/update! pool :task - {:error explain - :status "retry" - :modified-at now - :scheduled-at (dt/plus now delay) - :retry-num nretry} - {:id (:id task)}) - nil)) - - (handle-task-failure [{:keys [task error]}] - (let [explain (ex-message error)] - (db/update! pool :task - {:error explain - :modified-at (dt/now) - :status "failed"} - {:id (:id task)}) - nil)) - - (handle-task-completion [{:keys [task]}] - (let [now (dt/now)] - (db/update! pool :task - {:completed-at now - :modified-at now - :status "completed"} - {:id (:id task)}) - nil)) - - (decode-payload [^bytes payload] - (try - (let [task-id (t/decode payload)] - (if (uuid? task-id) - task-id - (l/err :hint "worker: received unexpected payload (uuid expected)" - :payload task-id))) - (catch Throwable cause - (l/err :hint "worker: unable to decode payload" - :payload payload - :length (alength payload) - :cause cause)))) - - (handle-task [{:keys [name] :as task}] - (let [task-fn (get registry name)] - (if task-fn - (task-fn task) - (l/wrn :hint "no task handler found" :name name)) - {:status :completed :task task})) - - (handle-task-exception [cause task] - (let [edata (ex-data cause)] - (if (and (< (:retry-num task) - (:max-retries task)) - (= ::retry (:type edata))) - (cond-> {:status :retry :task task :error cause} - (dt/duration? (:delay edata)) - (assoc :delay (:delay edata)) - - (= ::noop (:strategy edata)) - (assoc :inc-by 0)) - (do - (l/err :hint "worker: unhandled exception on task" - ::l/context (get-error-context cause task) - :cause cause) - (if (>= (:retry-num task) (:max-retries task)) - {:status :failed :task task :error cause} - {:status :retry :task task :error cause}))))) - - (get-task [task-id] - (ex/try! - (some-> (db/get* pool :task {:id task-id}) - (decode-task-row)))) - - (run-task [task-id] - (loop [task (get-task task-id)] - (cond - (ex/exception? task) - (if (or (db/connection-error? task) - (db/serialization-error? task)) - (do - (l/wrn :hint "worker: connection error on retrieving task from database (retrying in some instants)" - :worker-id worker-id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id))) - (do - (l/err :hint "worker: unhandled exception on retrieving task from database (retrying in some instants)" - :worker-id worker-id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id)))) - - (nil? task) - (l/wrn :hint "worker: no task found on the database" - :worker-id worker-id - :task-id task-id) - - :else - (try - (l/trc :hint "executing task" - :name (:name task) - :id (str (:id task)) - :queue queue - :worker-id worker-id - :retry (:retry-num task)) - (handle-task task) - (catch InterruptedException cause - (throw cause)) - (catch Throwable cause - (handle-task-exception cause task)))))) - - (process-result [{:keys [status] :as result}] - (ex/try! - (case status - :retry (handle-task-retry result) - :failed (handle-task-failure result) - :completed (handle-task-completion result) - nil))) - - (run-task-loop [task-id] - (loop [result (run-task task-id)] - (when-let [cause (process-result result)] - (if (or (db/connection-error? cause) - (db/serialization-error? cause)) - (do - (l/wrn :hint "worker: database exeption on processing task result (retrying in some instants)" - :cause cause) - (px/sleep (::rds/timeout rconn)) - (recur result)) - (do - (l/err :hint "worker: unhandled exception on processing task result (retrying in some instants)" - :cause cause) - (px/sleep (::rds/timeout rconn)) - (recur result))))))] - - (try - (let [[_ payload] (rds/blpop! rconn timeout queue)] - (some-> payload - decode-payload - run-task-loop)) - - (catch InterruptedException cause - (throw cause)) - - (catch Exception cause - (if (rds/timeout-exception? cause) - (do - (l/err :hint "worker: redis pop operation timeout, consider increasing redis timeout (will retry in some instants)" - :timeout timeout - :cause cause) - (px/sleep timeout)) - - (l/err :hint "worker: unhandled exception" :cause cause)))))) - -(defn get-error-context - [_ item] - {:params item}) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; SUBMIT API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -348,6 +105,7 @@ [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label] :or {delay 0 queue :default priority 100 max-retries 3 label ""} :as options}] + (us/verify! ::submit-options options) (let [duration (dt/duration delay) interval (db/interval duration) diff --git a/backend/src/app/worker/cron.clj b/backend/src/app/worker/cron.clj index 0f44dcaae..689fcba90 100644 --- a/backend/src/app/worker/cron.clj +++ b/backend/src/app/worker/cron.clj @@ -11,7 +11,8 @@ [app.common.logging :as l] [app.db :as db] [app.util.time :as dt] - [app.worker :as wrk] + [app.worker :as-alias wrk] + [app.worker.runner :refer [get-error-context]] [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] @@ -64,7 +65,7 @@ (catch Throwable cause (let [elapsed (dt/format-duration (tpoint))] - (binding [l/*context* (wrk/get-error-context cause task)] + (binding [l/*context* (get-error-context cause task)] (l/err :hint "unhandled exception on running task" :task-id id :elapsed elapsed @@ -98,11 +99,11 @@ (s/def ::props (s/nilable map?)) (s/def ::task keyword?) -(s/def ::wrk/task +(s/def ::task-item (s/keys :req-un [::cron ::task] :opt-un [::props ::id])) -(s/def ::wrk/entries (s/coll-of (s/nilable ::wrk/task))) +(s/def ::wrk/entries (s/coll-of (s/nilable ::task-item))) (defmethod ig/pre-init-spec ::wrk/cron [_] (s/keys :req [::db/pool ::wrk/entries ::wrk/registry])) diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj new file mode 100644 index 000000000..40332ab23 --- /dev/null +++ b/backend/src/app/worker/runner.clj @@ -0,0 +1,272 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.worker.runner + "Async tasks abstraction (impl)." + (:require + [app.common.data :as d] + [app.common.data.macros :as dm] + [app.common.exceptions :as ex] + [app.common.logging :as l] + [app.common.transit :as t] + [app.config :as cf] + [app.db :as db] + [app.metrics :as mtx] + [app.redis :as rds] + [app.util.time :as dt] + [app.worker :as-alias wrk] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [integrant.core :as ig] + [promesa.exec :as px])) + +(set! *warn-on-reflection* true) + +(defn- decode-task-row + [{:keys [props] :as row}] + (cond-> row + (db/pgobject? props) + (assoc :props (db/decode-transit-pgobject props)))) + +(defn get-error-context + [_ item] + {:params item}) + +(defn- run-worker-loop! + [{:keys [::db/pool ::rds/rconn ::wrk/registry ::timeout ::queue ::id]}] + (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}] + (let [explain (ex-message error) + nretry (+ (:retry-num task) inc-by) + now (dt/now) + delay (->> (iterate #(* % 2) delay) (take nretry) (last))] + (db/update! pool :task + {:error explain + :status "retry" + :modified-at now + :scheduled-at (dt/plus now delay) + :retry-num nretry} + {:id (:id task)}) + nil)) + + (handle-task-failure [{:keys [task error]}] + (let [explain (ex-message error)] + (db/update! pool :task + {:error explain + :modified-at (dt/now) + :status "failed"} + {:id (:id task)}) + nil)) + + (handle-task-completion [{:keys [task]}] + (let [now (dt/now)] + (db/update! pool :task + {:completed-at now + :modified-at now + :status "completed"} + {:id (:id task)}) + nil)) + + (decode-payload [^bytes payload] + (try + (let [task-id (t/decode payload)] + (if (uuid? task-id) + task-id + (l/err :hint "received unexpected payload (uuid expected)" + :payload task-id))) + (catch Throwable cause + (l/err :hint "unable to decode payload" + :payload payload + :length (alength payload) + :cause cause)))) + + (handle-task [{:keys [name] :as task}] + (let [task-fn (get registry name)] + (if task-fn + (task-fn task) + (l/wrn :hint "no task handler found" :name name)) + {:status :completed :task task})) + + (handle-task-exception [cause task] + (let [edata (ex-data cause)] + (if (and (< (:retry-num task) + (:max-retries task)) + (= ::retry (:type edata))) + (cond-> {:status :retry :task task :error cause} + (dt/duration? (:delay edata)) + (assoc :delay (:delay edata)) + + (= ::noop (:strategy edata)) + (assoc :inc-by 0)) + (do + (l/err :hint "unhandled exception on task" + ::l/context (get-error-context cause task) + :cause cause) + (if (>= (:retry-num task) (:max-retries task)) + {:status :failed :task task :error cause} + {:status :retry :task task :error cause}))))) + + (get-task [task-id] + (ex/try! + (some-> (db/get* pool :task {:id task-id}) + (decode-task-row)))) + + (run-task [task-id] + (loop [task (get-task task-id)] + (cond + (ex/exception? task) + (if (or (db/connection-error? task) + (db/serialization-error? task)) + (do + (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task task-id))) + (do + (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task task-id)))) + + (nil? task) + (l/wrn :hint "no task found on the database" + :id id + :task-id task-id) + + :else + (try + (l/trc :hint "start task" + :queue queue + :runner-id id + :name (:name task) + :task-id (str task-id) + :retry (:retry-num task)) + (let [tpoint (dt/tpoint) + result (handle-task task) + elapsed (dt/format-duration (tpoint))] + + (l/trc :hint "end task" + :queue queue + :runner-id id + :name (:name task) + :task-id (str task-id) + :retry (:retry-num task) + :elapsed elapsed) + + result) + + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (handle-task-exception cause task)))))) + + (process-result [{:keys [status] :as result}] + (ex/try! + (case status + :retry (handle-task-retry result) + :failed (handle-task-failure result) + :completed (handle-task-completion result) + nil))) + + (run-task-loop [task-id] + (loop [result (run-task task-id)] + (when-let [cause (process-result result)] + (if (or (db/connection-error? cause) + (db/serialization-error? cause)) + (do + (l/wrn :hint "database exeption on processing task result (retrying in some instants)" + :cause cause) + (px/sleep (::rds/timeout rconn)) + (recur result)) + (do + (l/err :hint "unhandled exception on processing task result (retrying in some instants)" + :cause cause) + (px/sleep (::rds/timeout rconn)) + (recur result))))))] + + (try + (let [queue (str/ffmt "taskq:%" queue) + [_ payload] (rds/blpop! rconn timeout queue)] + (some-> payload + decode-payload + run-task-loop)) + + (catch InterruptedException cause + (throw cause)) + + (catch Exception cause + (if (rds/timeout-exception? cause) + (do + (l/err :hint "redis pop operation timeout, consider increasing redis timeout (will retry in some instants)" + :timeout timeout + :cause cause) + (px/sleep timeout)) + + (l/err :hint "unhandled exception" :cause cause)))))) + +(defn- start-thread! + [{:keys [::rds/redis ::id ::queue] :as cfg}] + (px/thread + {:name (format "penpot/worker/runner:%s" id)} + (l/inf :hint "started" :id id :queue queue) + (try + (dm/with-open [rconn (rds/connect redis)] + (let [tenant (cf/get :tenant "main") + cfg (-> cfg + (assoc ::queue (str/ffmt "%:%" tenant queue)) + (assoc ::rds/rconn rconn) + (assoc ::timeout (dt/duration "5s")))] + (loop [] + (when (px/interrupted?) + (throw (InterruptedException. "interrupted"))) + + (run-worker-loop! cfg) + (recur)))) + + (catch InterruptedException _ + (l/debug :hint "interrupted" + :id id + :queue queue)) + (catch Throwable cause + (l/err :hint "unexpected exception" + :id id + :queue queue + :cause cause)) + (finally + (l/inf :hint "terminated" + :id id + :queue queue))))) + +(s/def ::wrk/queue keyword?) + +(defmethod ig/pre-init-spec ::runner [_] + (s/keys :req [::wrk/parallelism + ::mtx/metrics + ::db/pool + ::rds/redis + ::wrk/queue + ::wrk/registry])) + +(defmethod ig/prep-key ::wrk/runner + [_ cfg] + (merge {::wrk/parallelism 1} + (d/without-nils cfg))) + +(defmethod ig/init-key ::wrk/runner + [_ {:keys [::db/pool ::wrk/queue ::wrk/parallelism] :as cfg}] + (let [queue (d/name queue) + cfg (assoc cfg ::queue queue)] + (if (db/read-only? pool) + (l/wrn :hint "not started (db is read-only)" :queue queue :parallelism parallelism) + (doall + (->> (range parallelism) + (map #(assoc cfg ::id %)) + (map start-thread!)))))) + +(defmethod ig/halt-key! ::wrk/runner + [_ threads] + (run! px/interrupt! threads)) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 27544c4fa..61b5f42bf 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -156,8 +156,8 @@ :app.loggers.database/reporter :app.worker/cron :app.worker/dispatcher - [:app.main/default :app.worker/worker] - [:app.main/webhook :app.worker/worker])) + [:app.main/default :app.worker/runner] + [:app.main/webhook :app.worker/runner])) _ (ig/load-namespaces system) system (-> (ig/prep system) (ig/init))]