From 0600b2abe4ed12fb85c117f2d57a000f65e633b9 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 22 Nov 2022 18:06:24 +0100 Subject: [PATCH] :recycle: Make the worker abstraction more scalable Start using redis for dispatcher to worker communication and add the ability to start multiple threads to worker for increase the concurrency. --- .clj-kondo/config.edn | 1 + backend/src/app/config.clj | 6 +- backend/src/app/db.clj | 15 + backend/src/app/main.clj | 202 ++-- backend/src/app/msgbus.clj | 4 +- backend/src/app/redis.clj | 176 ++-- backend/src/app/rpc.clj | 2 + backend/src/app/srepl/main.clj | 11 + backend/src/app/util/closeable.clj | 31 - backend/src/app/worker.clj | 951 ++++++++++-------- backend/test/backend_tests/helpers.clj | 2 +- .../backend_tests/tasks_telemetry_test.clj | 8 +- common/deps.edn | 2 +- common/src/app/common/data.cljc | 20 +- common/src/app/common/data/macros.cljc | 17 +- common/src/app/common/exceptions.cljc | 4 + 16 files changed, 827 insertions(+), 625 deletions(-) delete mode 100644 backend/src/app/util/closeable.clj diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn index 7b0b571f2..e655c6e4c 100644 --- a/.clj-kondo/config.edn +++ b/.clj-kondo/config.edn @@ -7,6 +7,7 @@ app.common.data/export clojure.core/def app.db/with-atomic clojure.core/with-open app.common.data.macros/get-in clojure.core/get-in + app.common.data.macros/with-open clojure.core/with-open app.common.data.macros/select-keys clojure.core/select-keys app.common.logging/with-context clojure.core/do} diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 7d8556e0c..e2d30373b 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -106,7 +106,8 @@ (s/def ::file-change-snapshot-timeout ::dt/duration) (s/def ::default-executor-parallelism ::us/integer) -(s/def ::worker-executor-parallelism ::us/integer) +(s/def ::scheduled-executor-parallelism ::us/integer) +(s/def ::worker-parallelism ::us/integer) (s/def ::authenticated-cookie-domain ::us/string) (s/def ::authenticated-cookie-name ::us/string) @@ -218,7 +219,8 @@ ::default-rpc-rlimit ::error-report-webhook ::default-executor-parallelism - ::worker-executor-parallelism + ::scheduled-executor-parallelism + ::worker-parallelism ::file-change-snapshot-every ::file-change-snapshot-timeout ::user-feedback-destination diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index e62b8290b..90b960c4a 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -493,3 +493,18 @@ (let [n (xact-check-param n) row (exec-one! conn ["select pg_try_advisory_xact_lock(?::bigint) as lock" n])] (:lock row))) + +(defn sql-exception? + [cause] + (instance? java.sql.SQLException cause)) + +(defn connection-error? + [cause] + (and (sql-exception? cause) + (contains? #{"08003" "08006" "08001" "08004"} + (.getSQLState ^java.sql.SQLException cause)))) + +(defn serialization-error? + [cause] + (and (sql-exception? cause) + (= "40001" (.getSQLState ^java.sql.SQLException cause)))) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index a5cb51673..4a8119d3e 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -9,8 +9,13 @@ [app.auth.oidc] [app.common.logging :as l] [app.config :as cf] + [app.db :as-alias db] + [app.metrics :as-alias mtx] [app.metrics.definition :as-alias mdef] + [app.redis :as-alias rds] + [app.storage :as-alias sto] [app.util.time :as dt] + [app.worker :as-alias wrk] [cuerdas.core :as str] [integrant.core :as ig]) (:gen-class)) @@ -120,90 +125,84 @@ ::mdef/type :gauge}}) (def system-config - {:app.db/pool + {::db/pool {:uri (cf/get :database-uri) :username (cf/get :database-username) :password (cf/get :database-password) :read-only (cf/get :database-readonly false) - :metrics (ig/ref :app.metrics/metrics) + :metrics (ig/ref ::mtx/metrics) :migrations (ig/ref :app.migrations/all) :name :main :min-size (cf/get :database-min-pool-size 0) :max-size (cf/get :database-max-pool-size 60)} ;; Default thread pool for IO operations - [::default :app.worker/executor] - {:parallelism (cf/get :default-executor-parallelism 70)} + ::wrk/executor + {::wrk/parallelism (cf/get :default-executor-parallelism 100)} - ;; Dedicated thread pool for background tasks execution. - [::worker :app.worker/executor] - {:parallelism (cf/get :worker-executor-parallelism 20)} + ::wrk/scheduled-executor + {::wrk/parallelism (cf/get :scheduled-executor-parallelism 20)} - :app.worker/scheduled-executor - {:parallelism 1} - - :app.worker/executors - {:default (ig/ref [::default :app.worker/executor]) - :worker (ig/ref [::worker :app.worker/executor])} - - :app.worker/executor-monitor - {:metrics (ig/ref :app.metrics/metrics) - :executors (ig/ref :app.worker/executors)} + ::wrk/monitor + {::mtx/metrics (ig/ref ::mtx/metrics) + ::wrk/name "default" + ::wrk/executor (ig/ref ::wrk/executor)} :app.migrations/migrations {} - :app.metrics/metrics + ::mtx/metrics {:default default-metrics} :app.migrations/all {:main (ig/ref :app.migrations/migrations)} - :app.redis/redis - {:uri (cf/get :redis-uri) - :metrics (ig/ref :app.metrics/metrics)} + ::rds/redis + {::rds/uri (cf/get :redis-uri) + ::mtx/metrics (ig/ref ::mtx/metrics)} :app.msgbus/msgbus {:backend (cf/get :msgbus-backend :redis) - :executor (ig/ref [::default :app.worker/executor]) - :redis (ig/ref :app.redis/redis)} + :executor (ig/ref ::wrk/executor) + :redis (ig/ref ::rds/redis)} + ;; TODO: refactor execution model :app.storage.tmp/cleaner - {:executor (ig/ref [::worker :app.worker/executor]) - :scheduled-executor (ig/ref :app.worker/scheduled-executor)} + {:executor (ig/ref ::wrk/executor) + :scheduled-executor (ig/ref ::wrk/scheduled-executor)} - :app.storage/gc-deleted-task - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage) - :executor (ig/ref [::worker :app.worker/executor])} + ::sto/gc-deleted-task + {:pool (ig/ref ::db/pool) + :storage (ig/ref ::sto/storage) + :executor (ig/ref ::wrk/executor)} - :app.storage/gc-touched-task - {:pool (ig/ref :app.db/pool)} + ::sto/gc-touched-task + {:pool (ig/ref ::db/pool)} :app.http/client - {:executor (ig/ref [::default :app.worker/executor])} + {:executor (ig/ref ::wrk/executor)} :app.http.session/manager - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :sprops (ig/ref :app.setup/props) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} :app.http.session/gc-task - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :max-age (cf/get :auth-token-cookie-max-age)} :app.http.awsns/handler {:sprops (ig/ref :app.setup/props) - :pool (ig/ref :app.db/pool) + :pool (ig/ref ::db/pool) :http-client (ig/ref :app.http/client) - :executor (ig/ref [::worker :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} :app.http/server {:port (cf/get :http-server-port) :host (cf/get :http-server-host) :router (ig/ref :app.http/router) - :metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref [::default :app.worker/executor]) + :metrics (ig/ref ::mtx/metrics) + :executor (ig/ref ::wrk/executor) :io-threads (cf/get :http-server-io-threads) :max-body-size (cf/get :http-server-max-body-size) :max-multipart-body-size (cf/get :http-server-max-multipart-body-size)} @@ -263,10 +262,10 @@ :oidc (ig/ref :app.auth.oidc/generic-provider)} :sprops (ig/ref :app.setup/props) :http-client (ig/ref :app.http/client) - :pool (ig/ref :app.db/pool) + :pool (ig/ref ::db/pool) :session (ig/ref :app.http.session/manager) :public-uri (cf/get :public-uri) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} ;; TODO: revisit the dependencies of this service, looks they are too much unused of them :app.http/router @@ -277,61 +276,60 @@ :debug-routes (ig/ref :app.http.debug/routes) :oidc-routes (ig/ref :app.auth.oidc/routes) :ws (ig/ref :app.http.websocket/handler) - :metrics (ig/ref :app.metrics/metrics) + :metrics (ig/ref ::mtx/metrics) :public-uri (cf/get :public-uri) - :storage (ig/ref :app.storage/storage) + :storage (ig/ref ::sto/storage) :audit-handler (ig/ref :app.loggers.audit/http-handler) :rpc-routes (ig/ref :app.rpc/routes) :doc-routes (ig/ref :app.rpc.doc/routes) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} :app.http.debug/routes - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::worker :app.worker/executor]) - :storage (ig/ref :app.storage/storage) + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor) + :storage (ig/ref ::sto/storage) :session (ig/ref :app.http.session/manager)} :app.http.websocket/handler - {:pool (ig/ref :app.db/pool) - :metrics (ig/ref :app.metrics/metrics) + {:pool (ig/ref ::db/pool) + :metrics (ig/ref ::mtx/metrics) :msgbus (ig/ref :app.msgbus/msgbus)} :app.http.assets/handlers - {:metrics (ig/ref :app.metrics/metrics) + {:metrics (ig/ref ::mtx/metrics) :assets-path (cf/get :assets-path) - :storage (ig/ref :app.storage/storage) - :executor (ig/ref [::default :app.worker/executor]) + :storage (ig/ref ::sto/storage) + :executor (ig/ref ::wrk/executor) :cache-max-age (dt/duration {:hours 24}) :signature-max-age (dt/duration {:hours 24 :minutes 5})} :app.http.feedback/handler - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::default :app.worker/executor])} + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} :app.rpc/climit - {:metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref [::default :app.worker/executor])} + {:metrics (ig/ref ::mtx/metrics) + :executor (ig/ref ::wrk/executor)} :app.rpc/rlimit - {:executor (ig/ref [::worker :app.worker/executor]) - :scheduled-executor (ig/ref :app.worker/scheduled-executor)} + {:executor (ig/ref ::wrk/executor) + :scheduled-executor (ig/ref ::wrk/scheduled-executor)} :app.rpc/methods - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :session (ig/ref :app.http.session/manager) :sprops (ig/ref :app.setup/props) - :metrics (ig/ref :app.metrics/metrics) - :storage (ig/ref :app.storage/storage) + :metrics (ig/ref ::mtx/metrics) + :storage (ig/ref ::sto/storage) :msgbus (ig/ref :app.msgbus/msgbus) :public-uri (cf/get :public-uri) - :redis (ig/ref :app.redis/redis) + :redis (ig/ref ::rds/redis) :audit (ig/ref :app.loggers.audit/collector) :ldap (ig/ref :app.auth.ldap/provider) :http-client (ig/ref :app.http/client) :climit (ig/ref :app.rpc/climit) :rlimit (ig/ref :app.rpc/rlimit) - :executors (ig/ref :app.worker/executors) - :executor (ig/ref [::default :app.worker/executor]) + :executor (ig/ref ::wrk/executor) :templates (ig/ref :app.setup/builtin-templates) } @@ -341,15 +339,15 @@ :app.rpc/routes {:methods (ig/ref :app.rpc/methods)} - :app.worker/registry - {:metrics (ig/ref :app.metrics/metrics) + ::wrk/registry + {:metrics (ig/ref ::mtx/metrics) :tasks {:sendmail (ig/ref :app.emails/handler) :objects-gc (ig/ref :app.tasks.objects-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) - :storage-gc-deleted (ig/ref :app.storage/gc-deleted-task) - :storage-gc-touched (ig/ref :app.storage/gc-touched-task) + :storage-gc-deleted (ig/ref ::sto/gc-deleted-task) + :storage-gc-touched (ig/ref ::sto/gc-touched-task) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) :session-gc (ig/ref :app.http.session/gc-task) @@ -369,24 +367,24 @@ :app.emails/handler {:sendmail (ig/ref :app.emails/sendmail) - :metrics (ig/ref :app.metrics/metrics)} + :metrics (ig/ref ::mtx/metrics)} :app.tasks.tasks-gc/handler - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :max-age cf/deletion-delay} :app.tasks.objects-gc/handler - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage)} + {:pool (ig/ref ::db/pool) + :storage (ig/ref ::sto/storage)} :app.tasks.file-gc/handler - {:pool (ig/ref :app.db/pool)} + {:pool (ig/ref ::db/pool)} :app.tasks.file-xlog-gc/handler - {:pool (ig/ref :app.db/pool)} + {:pool (ig/ref ::db/pool)} :app.tasks.telemetry/handler - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :version (:full cf/version) :uri (cf/get :telemetry-uri) :sprops (ig/ref :app.setup/props) @@ -400,28 +398,28 @@ {:http-client (ig/ref :app.http/client)} :app.setup/props - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :key (cf/get :secret-key)} :app.loggers.zmq/receiver {:endpoint (cf/get :loggers-zmq-uri)} :app.loggers.audit/http-handler - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::default :app.worker/executor])} + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} :app.loggers.audit/collector - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::worker :app.worker/executor])} + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} :app.loggers.audit/archive-task {:uri (cf/get :audit-log-archive-uri) :sprops (ig/ref :app.setup/props) - :pool (ig/ref :app.db/pool) + :pool (ig/ref ::db/pool) :http-client (ig/ref :app.http/client)} :app.loggers.audit/gc-task - {:pool (ig/ref :app.db/pool)} + {:pool (ig/ref ::db/pool)} :app.loggers.loki/reporter {:uri (cf/get :loggers-loki-uri) @@ -435,12 +433,12 @@ :app.loggers.database/reporter {:receiver (ig/ref :app.loggers.zmq/receiver) - :pool (ig/ref :app.db/pool) - :executor (ig/ref [::worker :app.worker/executor])} + :pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} - :app.storage/storage - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::default :app.worker/executor]) + ::sto/storage + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor) :backends {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) @@ -454,7 +452,7 @@ {:region (cf/get :storage-assets-s3-region) :endpoint (cf/get :storage-assets-s3-endpoint) :bucket (cf/get :storage-assets-s3-bucket) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} [::assets :app.storage.fs/backend] {:directory (cf/get :storage-assets-fs-directory)} @@ -462,12 +460,11 @@ (def worker-config - {:app.worker/cron - {:executor (ig/ref [::worker :app.worker/executor]) - :scheduled-executor (ig/ref :app.worker/scheduled-executor) - :tasks (ig/ref :app.worker/registry) - :pool (ig/ref :app.db/pool) - :entries + {::wrk/cron + {::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor) + ::wrk/registry (ig/ref ::wrk/registry) + ::db/pool (ig/ref ::db/pool) + ::wrk/entries [{:cron #app/cron "0 0 * * * ?" ;; hourly :task :file-xlog-gc} @@ -500,11 +497,18 @@ {:cron #app/cron "30 */5 * * * ?" ;; every 5m :task :audit-log-gc})]} - :app.worker/worker - {:executor (ig/ref [::worker :app.worker/executor]) - :tasks (ig/ref :app.worker/registry) - :metrics (ig/ref :app.metrics/metrics) - :pool (ig/ref :app.db/pool)}}) + ::wrk/scheduler + {::rds/redis (ig/ref ::rds/redis) + ::mtx/metrics (ig/ref ::mtx/metrics) + ::db/pool (ig/ref ::db/pool)} + + ::wrk/worker + {::wrk/parallelism (cf/get ::worker-parallelism 3) + ::wrk/queue "default" + ::rds/redis (ig/ref ::rds/redis) + ::wrk/registry (ig/ref ::wrk/registry) + ::mtx/metrics (ig/ref ::mtx/metrics) + ::db/pool (ig/ref ::db/pool)}}) (def system nil) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 371af7de0..8dd5b3345 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -123,8 +123,8 @@ (defn- redis-disconnect [{:keys [::pconn ::sconn] :as cfg}] - (redis/close! pconn) - (redis/close! sconn)) + (d/close! pconn) + (d/close! sconn)) (defn- conj-subscription "A low level function that is responsible to create on-demand diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index 3c8c1d399..dcf9b7ea2 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -21,13 +21,19 @@ [promesa.core :as p]) (:import clojure.lang.IDeref + clojure.lang.MapEntry + io.lettuce.core.KeyValue io.lettuce.core.RedisClient + io.lettuce.core.RedisCommandInterruptedException + io.lettuce.core.RedisCommandTimeoutException + io.lettuce.core.RedisException io.lettuce.core.RedisURI io.lettuce.core.ScriptOutputType io.lettuce.core.api.StatefulConnection io.lettuce.core.api.StatefulRedisConnection io.lettuce.core.api.async.RedisAsyncCommands io.lettuce.core.api.async.RedisScriptingAsyncCommands + io.lettuce.core.api.sync.RedisCommands io.lettuce.core.codec.ByteArrayCodec io.lettuce.core.codec.RedisCodec io.lettuce.core.codec.StringCodec @@ -45,8 +51,7 @@ (declare initialize-resources) (declare shutdown-resources) -(declare connect) -(declare close!) +(declare connect*) (s/def ::timer #(instance? Timer %)) @@ -82,32 +87,37 @@ (s/def ::connect? ::us/boolean) (s/def ::io-threads ::us/integer) (s/def ::worker-threads ::us/integer) +(s/def ::cache #(instance? clojure.lang.Atom %)) (s/def ::redis - (s/keys :req [::resources ::redis-uri ::timer ::mtx/metrics] - :opt [::connection])) - -(defmethod ig/pre-init-spec ::redis [_] - (s/keys :req-un [::uri ::mtx/metrics] - :opt-un [::timeout - ::connect? - ::io-threads - ::worker-threads])) + (s/keys :req [::resources + ::redis-uri + ::timer + ::mtx/metrics] + :opt [::connection + ::cache])) (defmethod ig/prep-key ::redis [_ cfg] (let [runtime (Runtime/getRuntime) cpus (.availableProcessors ^Runtime runtime)] - (merge {:timeout (dt/duration 5000) - :io-threads (max 3 cpus) - :worker-threads (max 3 cpus)} + (merge {::timeout (dt/duration "10s") + ::io-threads (max 3 cpus) + ::worker-threads (max 3 cpus)} (d/without-nils cfg)))) +(defmethod ig/pre-init-spec ::redis [_] + (s/keys :req [::uri ::mtx/metrics] + :opt [::timeout + ::connect? + ::io-threads + ::worker-threads])) + (defmethod ig/init-key ::redis - [_ {:keys [connect?] :as cfg}] - (let [cfg (initialize-resources cfg)] - (cond-> cfg - connect? (assoc ::connection (connect cfg))))) + [_ {:keys [::connect?] :as cfg}] + (let [state (initialize-resources cfg)] + (cond-> state + connect? (assoc ::connection (connect* cfg {}))))) (defmethod ig/halt-key! ::redis [_ state] @@ -121,7 +131,7 @@ (defn- initialize-resources "Initialize redis connection resources" - [{:keys [uri io-threads worker-threads connect? metrics] :as cfg}] + [{:keys [::uri ::io-threads ::worker-threads ::connect?] :as cfg}] (l/info :hint "initialize redis resources" :uri uri :io-threads io-threads @@ -138,53 +148,60 @@ redis-uri (RedisURI/create ^String uri)] (-> cfg - (assoc ::mtx/metrics metrics) - (assoc ::cache (atom {})) + (assoc ::resources resources) (assoc ::timer timer) - (assoc ::redis-uri redis-uri) - (assoc ::resources resources)))) + (assoc ::cache (atom {})) + (assoc ::redis-uri redis-uri)))) (defn- shutdown-resources [{:keys [::resources ::cache ::timer]}] - (run! close! (vals @cache)) + (run! d/close! (vals @cache)) (when resources (.shutdown ^ClientResources resources)) (when timer (.stop ^Timer timer))) -(defn connect +(defn connect* [{:keys [::resources ::redis-uri] :as state} - & {:keys [timeout codec type] - :or {codec default-codec type :default}}] + {:keys [timeout codec type] + :or {codec default-codec type :default}}] (us/assert! ::resources resources) - (let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri) - timeout (or timeout (:timeout state)) + timeout (or timeout (::timeout state)) conn (case type :default (.connect ^RedisClient client ^RedisCodec codec) :pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))] (.setTimeout ^StatefulConnection conn ^Duration timeout) - (assoc state ::connection - (reify - IDeref - (deref [_] conn) + (reify + IDeref + (deref [_] conn) - AutoCloseable - (close [_] - (.close ^StatefulConnection conn) - (.shutdown ^RedisClient client)))))) + AutoCloseable + (close [_] + (.close ^StatefulConnection conn) + (.shutdown ^RedisClient client))))) + +(defn connect + [state & {:as opts}] + (let [connection (connect* state opts)] + (-> state + (assoc ::connection connection) + (dissoc ::cache) + (vary-meta assoc `d/close! (fn [_] (d/close! connection)))))) (defn get-or-connect [{:keys [::cache] :as state} key options] - (assoc state ::connection - (or (get @cache key) - (-> (swap! cache (fn [cache] - (when-let [prev (get cache key)] - (close! prev)) - (assoc cache key (connect state options)))) - (get key))))) + (-> state + (assoc ::connection + (or (get @cache key) + (-> (swap! cache (fn [cache] + (when-let [prev (get cache key)] + (d/close! prev)) + (assoc cache key (connect* state options)))) + (get key)))) + (dissoc ::cache))) (defn add-listener! [{:keys [::connection] :as conn} listener] @@ -210,18 +227,63 @@ [{:keys [::connection] :as conn} & topics] (us/assert! ::connection-holder conn) (us/assert! ::pubsub-connection connection) - (let [topics (into-array String (map str topics)) - cmd (.sync ^StatefulRedisPubSubConnection @connection)] - (.subscribe ^RedisPubSubCommands cmd topics))) + (try + (let [topics (into-array String (map str topics)) + cmd (.sync ^StatefulRedisPubSubConnection @connection)] + (.subscribe ^RedisPubSubCommands cmd topics)) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) (defn unsubscribe! "Blocking operation, intended to be used on a thread/agent thread." [{:keys [::connection] :as conn} & topics] (us/assert! ::connection-holder conn) (us/assert! ::pubsub-connection connection) - (let [topics (into-array String (map str topics)) - cmd (.sync ^StatefulRedisPubSubConnection @connection)] - (.unsubscribe ^RedisPubSubCommands cmd topics))) + (try + (let [topics (into-array String (map str topics)) + cmd (.sync ^StatefulRedisPubSubConnection @connection)] + (.unsubscribe ^RedisPubSubCommands cmd topics)) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) + +(defn rpush! + [{:keys [::connection] :as conn} key payload] + (us/assert! ::connection-holder conn) + (us/assert! (or (and (vector? payload) + (every? bytes? payload)) + (bytes? payload))) + (try + (let [cmd (.sync ^StatefulRedisConnection @connection) + data (if (vector? payload) payload [payload]) + vals (make-array (. Class (forName "[B")) (count data))] + + (loop [i 0 xs (seq data)] + (when xs + (aset ^"[[B" vals i ^bytes (first xs)) + (recur (inc i) (next xs)))) + + (.rpush ^RedisCommands cmd + ^String key + ^"[[B" vals)) + + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) + +(defn blpop! + [{:keys [::connection] :as conn} timeout & keys] + (us/assert! ::connection-holder conn) + (try + (let [keys (into-array Object (map str keys)) + cmd (.sync ^StatefulRedisConnection @connection) + timeout (/ (double (inst-ms timeout)) 1000.0)] + (when-let [res (.blpop ^RedisCommands cmd + ^double timeout + ^"[Ljava.lang.String;" keys)] + (MapEntry/create + (.getKey ^KeyValue res) + (.getValue ^KeyValue res)))) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) (defn open? [{:keys [::connection] :as conn}] @@ -256,12 +318,6 @@ (when on-unsubscribe (on-unsubscribe nil topic count))))) -(defn close! - [{:keys [::connection] :as conn}] - (us/assert! ::connection-holder conn) - (us/assert! ::connection connection) - (.close ^AutoCloseable connection)) - (def ^:private scripts-cache (atom {})) (def noop-fn (constantly nil)) @@ -332,3 +388,11 @@ (eval-script sha) (->> (load-script) (p/mapcat eval-script)))))) + +(defn timeout-exception? + [cause] + (instance? RedisCommandTimeoutException cause)) + +(defn exception? + [cause] + (instance? RedisException cause)) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 52cf69b91..adeeceb9e 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -23,6 +23,7 @@ [app.storage :as-alias sto] [app.util.services :as sv] [app.util.time :as ts] + [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] @@ -270,6 +271,7 @@ ::http-client ::rlimit ::climit + ::wrk/executor ::mtx/metrics ::db/pool ::ldap])) diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 5a85ab684..80d5a7035 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -20,6 +20,7 @@ [app.util.objects-map :as omap] [app.util.pointer-map :as pmap] [app.util.time :as dt] + [app.worker :as wrk] [clojure.pprint :refer [pprint]] [cuerdas.core :as str])) @@ -37,6 +38,16 @@ (task-fn params) (println (format "no task '%s' found" name)))))) +(defn schedule-task! + ([system name] + (schedule-task! system name {})) + ([system name props] + (let [pool (:app.db/pool system)] + (wrk/submit! + ::wrk/conn pool + ::wrk/task name + ::wrk/props props)))) + (defn send-test-email! [system destination] (us/verify! diff --git a/backend/src/app/util/closeable.clj b/backend/src/app/util/closeable.clj deleted file mode 100644 index 6d20f765f..000000000 --- a/backend/src/app/util/closeable.clj +++ /dev/null @@ -1,31 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) KALEIDOS INC - -(ns app.util.closeable - "A closeable abstraction. A drop in replacement for - clojure builtin `with-open` syntax abstraction." - (:refer-clojure :exclude [with-open])) - -(defprotocol ICloseable - (-close [_] "Close the resource.")) - -(defmacro with-open - [bindings & body] - {:pre [(vector? bindings) - (even? (count bindings)) - (pos? (count bindings))]} - (reduce (fn [acc bindings] - `(let ~(vec bindings) - (try - ~acc - (finally - (-close ~(first bindings)))))) - `(do ~@body) - (reverse (partition 2 bindings)))) - -(extend-protocol ICloseable - java.lang.AutoCloseable - (-close [this] (.close this))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index b6f0ed859..f54a3add1 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -8,22 +8,22 @@ "Async tasks abstraction (impl)." (:require [app.common.data :as d] + [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] + [app.common.transit :as t] [app.common.uuid :as uuid] [app.db :as db] [app.metrics :as mtx] - [app.util.async :as aa] + [app.redis :as rds] [app.util.time :as dt] - [clojure.core.async :as a] [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] [promesa.exec :as px]) (:import java.util.concurrent.ExecutorService - java.util.concurrent.Executors java.util.concurrent.ForkJoinPool java.util.concurrent.Future java.util.concurrent.ScheduledExecutorService)) @@ -40,10 +40,10 @@ (s/def ::parallelism ::us/integer) (defmethod ig/pre-init-spec ::executor [_] - (s/keys :req-un [::parallelism])) + (s/keys :req [::parallelism])) (defmethod ig/init-key ::executor - [skey {:keys [parallelism]}] + [skey {:keys [::parallelism]}] (let [prefix (if (vector? skey) (-> skey first name keyword) :default) tname (str "penpot/" prefix "/%s") factory (px/forkjoin-thread-factory :name tname)] @@ -54,43 +54,78 @@ (defmethod ig/halt-key! ::executor [_ instance] - (.shutdown ^ExecutorService instance)) + (px/shutdown! instance)) (defmethod ig/pre-init-spec ::scheduled-executor [_] - (s/keys :opt-un [::parallelism])) + (s/keys :req [::parallelism])) (defmethod ig/init-key ::scheduled-executor - [_ {:keys [parallelism] :or {parallelism 1}}] + [_ {:keys [::parallelism]}] (px/scheduled-executor :parallelism parallelism :factory (px/thread-factory :name "penpot/scheduled-executor/%s"))) -(defmethod ig/halt-key! ::scheduler +(defmethod ig/halt-key! ::scheduled-executor [_ instance] - (.shutdown ^ExecutorService instance)) + (px/shutdown! instance)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Executor Monitor +;; TASKS REGISTRY ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::executors - (s/map-of keyword? ::executor)) +(defn- wrap-task-handler + [metrics tname f] + (let [labels (into-array String [tname])] + (fn [params] + (let [tp (dt/tpoint)] + (try + (f params) + (finally + (mtx/run! metrics + {:id :tasks-timing + :val (inst-ms (tp)) + :labels labels}))))))) -(defmethod ig/pre-init-spec ::executor-monitor [_] - (s/keys :req-un [::executors ::mtx/metrics])) +(s/def ::registry (s/map-of ::us/string fn?)) -(defmethod ig/init-key ::executor-monitor - [_ {:keys [executors metrics interval] :or {interval 3000}}] - (letfn [(monitor! [state skey ^ForkJoinPool executor] - (let [prev-steals (get state skey 0) - running (.getRunningThreadCount executor) - queued (.getQueuedSubmissionCount executor) - active (.getPoolSize executor) - steals (.getStealCount executor) - labels (into-array String [(name skey)]) +(defmethod ig/pre-init-spec ::registry [_] + (s/keys :req-un [::mtx/metrics ::tasks])) - steals-increment (- steals prev-steals) - steals-increment (if (neg? steals-increment) 0 steals-increment)] +(defmethod ig/init-key ::registry + [_ {:keys [metrics tasks]}] + (l/info :hint "registry initialized" :tasks (count tasks)) + (reduce-kv (fn [registry k v] + (let [tname (name k)] + (l/debug :hint "register task" :name tname) + (assoc registry tname (wrap-task-handler metrics tname v)))) + {} + tasks)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; EXECUTOR MONITOR +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(s/def ::name ::us/keyword) + +(defmethod ig/pre-init-spec ::monitor [_] + (s/keys :req [::name ::executor ::mtx/metrics])) + +(defmethod ig/prep-key ::monitor + [_ cfg] + (merge {::interval (dt/duration "2s")} + (d/without-nils cfg))) + +(defmethod ig/init-key ::monitor + [_ {:keys [::executor ::mtx/metrics ::interval ::name]}] + (letfn [(monitor! [^ForkJoinPool executor prev-steals] + (let [running (.getRunningThreadCount executor) + queued (.getQueuedSubmissionCount executor) + active (.getPoolSize executor) + steals (.getStealCount executor) + labels (into-array String [(d/name name)]) + + steals-inc (- steals prev-steals) + steals-inc (if (neg? steals-inc) 0 steals-inc)] (mtx/run! metrics :id :executor-active-threads @@ -104,138 +139,495 @@ :labels labels :val queued) (mtx/run! metrics - :id :executors-completed-tasks - :labels labels - :inc steals-increment) + :id :executors-completed-tasks + :labels labels + :inc steals-inc) - (aa/thread-sleep interval) - (if (.isShutdown executor) - (l/debug :hint "stopping monitor; cause: executor is shutdown") - (assoc state skey steals)))) + steals))] - (monitor-fn [] - (try - (loop [items (into (d/queue) executors) - state {}] - (when-let [[skey executor :as item] (peek items)] - (if-let [state (monitor! state skey executor)] - (recur (conj items item) state) - (recur items state)))) - (catch InterruptedException _cause - (l/debug :hint "stopping monitor; interrupted"))))] + (px/thread + {:name "penpot/executors-monitor"} + (l/info :hint "monitor: started" :name name) + (try + (loop [steals 0] + (when-not (px/shutdown? executor) + (px/sleep interval) + (recur (long (monitor! executor steals))))) + (catch InterruptedException _cause + (l/debug :hint "monitor: interrupted" :name name)) + (catch Throwable cause + (l/error :hint "monitor: unexpected error" :name name :cause cause)) + (finally + (l/info :hint "monitor: terminated" :name name)))))) - (let [thread (Thread. monitor-fn)] - (.setDaemon thread true) - (.setName thread "penpot/executor-monitor") - (.start thread) - - thread))) - -(defmethod ig/halt-key! ::executor-monitor +(defmethod ig/halt-key! ::monitor [_ thread] - (.interrupt ^Thread thread)) + (px/interrupt! thread)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Worker +;; SCHEDULER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare event-loop-fn) -(declare event-loop) +(defn- decode-task-row + [{:keys [props] :as row}] + (cond-> row + (db/pgobject? props) + (assoc :props (db/decode-transit-pgobject props)))) -(s/def ::queue keyword?) -(s/def ::parallelism ::us/integer) -(s/def ::batch-size ::us/integer) -(s/def ::tasks (s/map-of keyword? fn?)) -(s/def ::poll-interval ::dt/duration) +(s/def ::queue ::us/string) +(s/def ::wait-duration ::dt/duration) + +(defmethod ig/pre-init-spec ::scheduler [_] + (s/keys :req [::mtx/metrics + ::db/pool + ::rds/redis] + :opt [::wait-duration + ::batch-size])) + +(defmethod ig/prep-key ::scheduler + [_ cfg] + (merge {::batch-size 1 + ::wait-duration (dt/duration "2s")} + (d/without-nils cfg))) + +(def ^:private sql:select-next-tasks + "select * from task as t + where t.scheduled_at <= now() + and (t.status = 'new' or t.status = 'retry') + order by t.priority desc, t.scheduled_at + limit ? + for update skip locked") + +(defn- format-queue + [queue] + (str/ffmt "penpot-tasks-queue:%" queue)) + +(defmethod ig/init-key ::scheduler + [_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}] + (letfn [(get-tasks-batch [conn] + (->> (db/exec! conn [sql:select-next-tasks batch-size]) + (map decode-task-row) + (seq))) + + (queue-task [conn rconn {:keys [id queue] :as task}] + (db/update! conn :task {:status "ready"} {:id id}) + (let [queue (format-queue queue) + payload (t/encode id) + result (rds/rpush! rconn queue payload)] + (l/debug :hist "scheduler: task pushed to redis" + :task-id id + :key queue + :queued result))) + + (run-batch [rconn] + (db/with-atomic [conn pool] + (when-let [tasks (get-tasks-batch conn)] + (run! (partial queue-task conn rconn) tasks) + true))) + ] + + (if (db/read-only? pool) + (l/warn :hint "scheduler: not started (db is read-only)") + (px/thread + {:name "penpot/scheduler"} + (l/info :hint "scheduler: started") + (try + (dm/with-open [rconn (rds/connect redis)] + (loop [] + (when (px/interrupted?) + (throw (InterruptedException. "interrumpted"))) + + (try + (when-not (run-batch rconn) + (px/sleep (::wait-duration cfg))) + (catch InterruptedException cause + (throw cause)) + (catch Exception cause + (cond + (rds/exception? cause) + (do + (l/warn :hint "scheduler: redis exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + (db/sql-exception? cause) + (do + (l/warn :hint "scheduler: database exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + :else + (do + (l/error :hint "scheduler: unhandled exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn)))))) + + (recur))) + + (catch InterruptedException _ + (l/debug :hint "scheduler: interrupted")) + (catch Throwable cause + (l/error :hint "scheduler: unexpected exception" :cause cause)) + (finally + (l/info :hint "scheduler: terminated"))))))) + +(defmethod ig/halt-key! ::scheduler + [_ thread] + (some-> thread px/interrupt!)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; WORKER +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare ^:private run-worker-loop!) +(declare ^:private start-worker!) +(declare ^:private get-error-context) (defmethod ig/pre-init-spec ::worker [_] - (s/keys :req-un [::executor - ::mtx/metrics - ::db/pool - ::batch-size - ::name - ::poll-interval - ::queue - ::tasks])) + (s/keys :req [::parallelism + ::mtx/metrics + ::db/pool + ::rds/redis + ::queue + ::registry])) (defmethod ig/prep-key ::worker [_ cfg] - (d/merge {:batch-size 2 - :name :worker - :poll-interval (dt/duration {:seconds 5}) - :queue :default} - (d/without-nils cfg))) + (merge {::queue "default" + ::parallelism 1} + (d/without-nils cfg))) (defmethod ig/init-key ::worker - [_ {:keys [pool name queue] :as cfg}] - (let [close-ch (a/chan 1) - cfg (assoc cfg :close-ch close-ch)] - - (if (db/read-only? pool) - (l/warn :hint "worker not started, db is read-only" - :name (d/name name) - :queue (d/name queue)) - (do - (l/info :hint "worker initialized" - :name (d/name name) - :queue (d/name queue)) - (event-loop cfg))) - - (reify - java.lang.AutoCloseable - (close [_] - (a/close! close-ch))))) + [_ {:keys [::db/pool ::queue ::parallelism] :as cfg}] + (if (db/read-only? pool) + (l/warn :hint "workers: not started (db is read-only)" :queue queue) + (doall + (->> (range parallelism) + (map #(assoc cfg ::worker-id %)) + (map start-worker!))))) (defmethod ig/halt-key! ::worker + [_ threads] + (run! px/interrupt! threads)) + +(defn- start-worker! + [{:keys [::rds/redis ::worker-id] :as cfg}] + (px/thread + {:name (format "penpot/worker/%s" worker-id)} + (l/info :hint "worker: started" :worker-id worker-id) + (try + (dm/with-open [rconn (rds/connect redis)] + (let [cfg (-> cfg + (update ::queue format-queue) + (assoc ::rds/rconn rconn) + (assoc ::timeout (dt/duration "5s")))] + (loop [] + (when (px/interrupted?) + (throw (InterruptedException. "interrupted"))) + + (run-worker-loop! cfg) + (recur)))) + + (catch InterruptedException _ + (l/debug :hint "worker: interrupted" + :worker-id worker-id)) + (catch Throwable cause + (l/error :hint "worker: unexpected exception" + :worker-id worker-id + :cause cause)) + (finally + (l/info :hint "worker: terminated" :worker-id worker-id))))) + +(defn- run-worker-loop! + [{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}] + (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}] + (let [explain (ex-message error) + nretry (+ (:retry-num task) inc-by) + now (dt/now) + delay (->> (iterate #(* % 2) delay) (take nretry) (last))] + (db/update! pool :task + {:error explain + :status "retry" + :modified-at now + :scheduled-at (dt/plus now delay) + :retry-num nretry} + {:id (:id task)}) + nil)) + + (handle-task-failure [{:keys [task error]}] + (let [explain (ex-message error)] + (db/update! pool :task + {:error explain + :modified-at (dt/now) + :status "failed"} + {:id (:id task)}) + nil)) + + (handle-task-completion [{:keys [task]}] + (let [now (dt/now)] + (db/update! pool :task + {:completed-at now + :modified-at now + :status "completed"} + {:id (:id task)}) + nil)) + + (decode-payload [^bytes payload] + (try + (let [task-id (t/decode payload)] + (if (uuid? task-id) + task-id + (l/error :hint "worker: received unexpected payload (uuid expected)" + :payload task-id))) + (catch Throwable cause + (l/error :hint "worker: unable to decode payload" + :payload payload + :length (alength payload) + :cause cause)))) + + (handle-task [{:keys [name] :as task}] + (let [task-fn (get registry name)] + (if task-fn + (task-fn task) + (l/warn :hint "no task handler found" :name name)) + {:status :completed :task task})) + + (handle-task-exception [cause task] + (let [edata (ex-data cause)] + (if (and (< (:retry-num task) + (:max-retries task)) + (= ::retry (:type edata))) + (cond-> {:status :retry :task task :error cause} + (dt/duration? (:delay edata)) + (assoc :delay (:delay edata)) + + (= ::noop (:strategy edata)) + (assoc :inc-by 0)) + (do + (l/error :hint "worker: unhandled exception on task" + ::l/context (get-error-context cause task) + :cause cause) + (if (>= (:retry-num task) (:max-retries task)) + {:status :failed :task task :error cause} + {:status :retry :task task :error cause}))))) + + (get-task [task-id] + (ex/try (db/get* pool :task {:id task-id}))) + + (run-task [task-id] + (loop [task (get-task task-id)] + (cond + (ex/exception? task) + (if (or (db/connection-error? task) + (db/serialization-error? task)) + (do + (l/warn :hint "worker: connection error on retrieving task from database (retrying in some instants)" + :worker-id worker-id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task task-id))) + (do + (l/error :hint "worker: unhandled exception on retrieving task from database (retrying in some instants)" + :worker-id worker-id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task task-id)))) + + (nil? task) + (l/warn :hint "worker: no task found on the database" + :worker-id worker-id + :task-id task-id) + + :else + (try + (l/trace :hint "worker: executing task" + :worker-id worker-id + :task-id (:id task) + :task-name (:name task) + :task-retry (:retry-num task)) + (handle-task task) + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (handle-task-exception cause task)))))) + + (process-result [{:keys [status] :as result}] + (ex/try! + (case status + :retry (handle-task-retry result) + :failed (handle-task-failure result) + :completed (handle-task-completion result)))) + + (run-task-loop [task-id] + (loop [result (run-task task-id)] + (when-let [cause (process-result result)] + (if (or (db/connection-error? cause) + (db/serialization-error? cause)) + (do + (l/warn :hint "worker: database exeption on processing task result (retrying in some instants)" + :cause cause) + (px/sleep (::rds/timeout rconn)) + (recur result)) + (do + (l/error :hint "worker: unhandled exception on processing task result (retrying in some instants)" + :cause cause) + (px/sleep (::rds/timeout rconn)) + (recur result))))))] + + (try + (let [[_ payload] (rds/blpop! rconn timeout queue)] + (some-> payload + decode-payload + run-task-loop)) + + (catch InterruptedException cause + (throw cause)) + + (catch Exception cause + (if (rds/timeout-exception? cause) + (do + (l/error :hint "worker: redis pop operation timeout, consider increasing redis timeout (will retry in some instants)" + :timeout timeout + :cause cause) + (px/sleep timeout)) + + (l/error :hint "worker: unhandled exception" :cause cause)))))) + +(defn- get-error-context + [error item] + (let [data (ex-data error)] + (merge + {:hint (ex-message error) + :spec-problems (some->> data ::s/problems (take 10) seq vec) + :spec-value (some->> data ::s/value) + :data (some-> data (dissoc ::s/problems ::s/value ::s/spec)) + :params item} + (when (and data (::s/problems data)) + {:spec-explain (us/pretty-explain data)})))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; CRON +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare schedule-cron-task) +(declare synchronize-cron-entries!) + +(s/def ::fn (s/or :var var? :fn fn?)) +(s/def ::id keyword?) +(s/def ::cron dt/cron?) +(s/def ::props (s/nilable map?)) +(s/def ::task keyword?) + +(s/def ::cron-task + (s/keys :req-un [::cron ::task] + :opt-un [::props ::id])) + +(s/def ::entries (s/coll-of (s/nilable ::cron-task))) + +(defmethod ig/pre-init-spec ::cron [_] + (s/keys :req [::scheduled-executor ::db/pool ::entries ::registry])) + +(defmethod ig/init-key ::cron + [_ {:keys [::entries ::registry ::db/pool] :as cfg}] + (if (db/read-only? pool) + (l/warn :hint "cron: not started (db is read-only)") + (let [running (atom #{}) + entries (->> entries + (filter some?) + ;; If id is not defined, use the task as id. + (map (fn [{:keys [id task] :as item}] + (if (some? id) + (assoc item :id (d/name id)) + (assoc item :id (d/name task))))) + (map (fn [item] + (update item :task d/name))) + (map (fn [{:keys [task] :as item}] + (let [f (get registry task)] + (when-not f + (ex/raise :type :internal + :code :task-not-found + :hint (str/fmt "task %s not configured" task))) + (-> item + (dissoc :task) + (assoc :fn f)))))) + + cfg (assoc cfg ::entries entries ::running running)] + + (l/info :hint "cron: started" :tasks (count entries)) + (synchronize-cron-entries! cfg) + + (->> (filter some? entries) + (run! (partial schedule-cron-task cfg))) + + (reify + clojure.lang.IDeref + (deref [_] @running) + + java.lang.AutoCloseable + (close [_] + (l/info :hint "cron: terminated") + (doseq [item @running] + (when-not (.isDone ^Future item) + (.cancel ^Future item true)))))))) + +(defmethod ig/halt-key! ::cron [_ instance] - (.close ^java.lang.AutoCloseable instance)) + (some-> instance d/close!)) -(defn- event-loop - "Main, worker eventloop" - [{:keys [pool poll-interval close-ch] :as cfg}] - (let [poll-ms (inst-ms poll-interval)] - (a/go-loop [] - (let [[val port] (a/alts! [close-ch (event-loop-fn cfg)] :priority true)] - (cond - ;; Terminate the loop if close channel is closed or - ;; event-loop-fn returns nil. - (or (= port close-ch) (nil? val)) - (l/debug :hint "stop condition found") +(def sql:upsert-cron-task + "insert into scheduled_task (id, cron_expr) + values (?, ?) + on conflict (id) + do update set cron_expr=?") - (db/closed? pool) - (do - (l/debug :hint "eventloop aborted because pool is closed") - (a/close! close-ch)) +(defn- synchronize-cron-entries! + [{:keys [::db/pool ::entries]}] + (db/with-atomic [conn pool] + (doseq [{:keys [id cron]} entries] + (l/trace :hint "register cron task" :id id :cron (str cron)) + (db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)])))) - (and (instance? java.sql.SQLException val) - (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState ^java.sql.SQLException val))) - (do - (l/warn :hint "connection error, trying resume in some instants") - (a/> (iterate #(* % 2) delay) (take nretry) (last)) - sqlv [sql:mark-as-retry (db/interval delay) explain nretry (:id task)]] - (db/exec-one! conn sqlv) - nil)) - -(defn- mark-as-failed - [conn {:keys [task error]}] - (let [explain (ex-message error)] - (db/update! conn :task - {:error explain - :modified-at (dt/now) - :status "failed"} - {:id (:id task)}) - nil)) - -(defn- mark-as-completed - [conn {:keys [task] :as cfg}] - (let [now (dt/now)] - (db/update! conn :task - {:completed-at now - :modified-at now - :status "completed"} - {:id (:id task)}) - nil)) - -(defn- decode-task-row - [{:keys [props name] :as row}] - (when row - (cond-> row - (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)) - (string? name) (assoc :name (keyword name))))) - -(defn- handle-task - [tasks {:keys [name] :as item}] - (let [task-fn (get tasks name)] - (if task-fn - (task-fn item) - (l/warn :hint "no task handler found" - :name (d/name name))) - {:status :completed :task item})) - -(defn get-error-context - [error item] - (let [data (ex-data error)] - (merge - {:hint (ex-message error) - :spec-problems (some->> data ::s/problems (take 10) seq vec) - :spec-value (some->> data ::s/value) - :data (some-> data (dissoc ::s/problems ::s/value ::s/spec)) - :params item} - (when (and data (::s/problems data)) - {:spec-explain (us/pretty-explain data)})))) - -(defn- handle-exception - [error item] - (let [edata (ex-data error)] - (if (and (< (:retry-num item) - (:max-retries item)) - (= ::retry (:type edata))) - (cond-> {:status :retry :task item :error error} - (dt/duration? (:delay edata)) - (assoc :delay (:delay edata)) - - (= ::noop (:strategy edata)) - (assoc :inc-by 0)) - (do - (l/error :hint "unhandled exception on task" - ::l/context (get-error-context error item) - :cause error) - (if (>= (:retry-num item) (:max-retries item)) - {:status :failed :task item :error error} - {:status :retry :task item :error error}))))) - -(defn- run-task - [{:keys [tasks]} item] - (let [name (d/name (:name item))] - (try - (l/trace :action "execute task" - :id (:id item) - :name name - :retry (:retry-num item)) - (handle-task tasks item) - (catch Exception e - (handle-exception e item))))) - -(def sql:select-next-tasks - "select * from task as t - where t.scheduled_at <= now() - and t.queue = ? - and (t.status = 'new' or t.status = 'retry') - order by t.priority desc, t.scheduled_at - limit ? - for update skip locked") - -(defn- event-loop-fn* - [{:keys [pool executor batch-size] :as cfg}] - (db/with-atomic [conn pool] - (let [queue (name (:queue cfg)) - items (->> (db/exec! conn [sql:select-next-tasks queue batch-size]) - (map decode-task-row) - (seq)) - cfg (assoc cfg :conn conn)] - - (if (nil? items) - ::empty - (let [proc-xf (comp (map #(partial run-task cfg %)) - (map #(px/submit! executor %)))] - (->> (into [] proc-xf items) - (map deref) - (run! (fn [res] - (case (:status res) - :retry (mark-as-retry conn res) - :failed (mark-as-failed conn res) - :completed (mark-as-completed conn res))))) - ::handled))))) - -(defn- event-loop-fn - [{:keys [executor] :as cfg}] - (aa/thread-call executor #(event-loop-fn* cfg))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Scheduler -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(declare schedule-cron-task) -(declare synchronize-cron-entries!) - -(s/def ::fn (s/or :var var? :fn fn?)) -(s/def ::id keyword?) -(s/def ::cron dt/cron?) -(s/def ::props (s/nilable map?)) -(s/def ::task keyword?) - -(s/def ::cron-task - (s/keys :req-un [::cron ::task] - :opt-un [::props ::id])) - -(s/def ::entries (s/coll-of (s/nilable ::cron-task))) - -(defmethod ig/pre-init-spec ::cron [_] - (s/keys :req-un [::executor ::scheduled-executor ::db/pool ::entries ::tasks])) - -(defmethod ig/init-key ::cron - [_ {:keys [entries tasks pool] :as cfg}] - (if (db/read-only? pool) - (l/warn :hint "scheduler not started, db is read-only") - (let [running (atom #{}) - entries (->> entries - (filter some?) - ;; If id is not defined, use the task as id. - (map (fn [{:keys [id task] :as item}] - (if (some? id) - (assoc item :id (d/name id)) - (assoc item :id (d/name task))))) - (map (fn [{:keys [task] :as item}] - (let [f (get tasks task)] - (when-not f - (ex/raise :type :internal - :code :task-not-found - :hint (str/fmt "task %s not configured" task))) - (-> item - (dissoc :task) - (assoc :fn f)))))) - - cfg (assoc cfg :entries entries :running running)] - - (l/info :hint "cron initialized" :tasks (count entries)) - (synchronize-cron-entries! cfg) - - (->> (filter some? entries) - (run! (partial schedule-cron-task cfg))) - - (reify - clojure.lang.IDeref - (deref [_] @running) - - java.lang.AutoCloseable - (close [_] - (doseq [item @running] - (when-not (.isDone ^Future item) - (.cancel ^Future item true)))))))) - - -(defmethod ig/halt-key! ::cron - [_ instance] - (when instance - (.close ^java.lang.AutoCloseable instance))) - -(def sql:upsert-cron-task - "insert into scheduled_task (id, cron_expr) - values (?, ?) - on conflict (id) - do update set cron_expr=?") - -(defn- synchronize-cron-entries! - [{:keys [pool entries]}] - (db/with-atomic [conn pool] - (doseq [{:keys [id cron]} entries] - (l/trace :hint "register cron task" :id id :cron (str cron)) - (db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)])))) - -(def sql:lock-cron-task - "select id from scheduled_task where id=? for update skip locked") - -(defn- execute-cron-task - [{:keys [executor pool] :as cfg} {:keys [id] :as task}] - (letfn [(run-task [conn] - (when (db/exec-one! conn [sql:lock-cron-task (d/name id)]) - (l/trace :hint "execute cron task" :id id) - ((:fn task) task))) - - (handle-task [] - (try - (db/with-atomic [conn pool] - (run-task conn)) - (catch Throwable cause - (l/error :hint "unhandled exception on scheduled task" - ::l/context (get-error-context cause task) - :task-id id - :cause cause))))] - - (px/run! executor handle-task) - (px/run! executor #(schedule-cron-task cfg task)) - nil)) - -(defn- ms-until-valid - [cron] - (s/assert dt/cron? cron) - (let [now (dt/now) - next (dt/next-valid-instant-from cron now)] - (inst-ms (dt/diff now next)))) - -(def ^:private - xf-without-done - (remove #(.isDone ^Future %))) - -(defn- schedule-cron-task - [{:keys [scheduled-executor running] :as cfg} {:keys [cron] :as task}] - (let [ft (px/schedule! scheduled-executor - (ms-until-valid cron) - (partial execute-cron-task cfg task))] - (swap! running #(into #{ft} xf-without-done %)))) - -;; --- INSTRUMENTATION - -(defn- wrap-task-handler - [metrics tname f] - (let [labels (into-array String [tname])] - (fn [params] - (let [start (System/nanoTime)] - (try - (f params) - (finally - (mtx/run! metrics - {:id :tasks-timing - :val (/ (- (System/nanoTime) start) 1000000) - :labels labels}))))))) - -(defmethod ig/pre-init-spec ::registry [_] - (s/keys :req-un [::mtx/metrics ::tasks])) - -(defmethod ig/init-key ::registry - [_ {:keys [metrics tasks]}] - (l/info :hint "registry initialized" :tasks (count tasks)) - (reduce-kv (fn [res k v] - (let [tname (name k)] - (l/debug :hint "register task" :name tname) - (assoc res k (wrap-task-handler metrics tname v)))) - {} - tasks)) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 3e12312ed..b25fc6bcb 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -324,7 +324,7 @@ (run-task! name {})) ([name params] (let [tasks (:app.worker/registry *system*)] - (let [task-fn (get tasks name)] + (let [task-fn (get tasks (d/name name))] (task-fn params))))) ;; --- UTILS diff --git a/backend/test/backend_tests/tasks_telemetry_test.clj b/backend/test/backend_tests/tasks_telemetry_test.clj index 7ff727b8b..43e8a59eb 100644 --- a/backend/test/backend_tests/tasks_telemetry_test.clj +++ b/backend/test/backend_tests/tasks_telemetry_test.clj @@ -20,12 +20,10 @@ (t/deftest test-base-report-data-structure (with-mocks [mock {:target 'app.tasks.telemetry/send! :return nil}] - (let [task-fn (-> th/*system* :app.worker/registry :telemetry) - prof (th/create-profile* 1 {:is-active true - :props {:newsletter-news true}})] + (let [prof (th/create-profile* 1 {:is-active true + :props {:newsletter-news true}})] - ;; run the task - (task-fn {:send? true :enabled? true}) + (th/run-task! :telemetry {:send? true :enabled? true}) (t/is (:called? @mock)) (let [[_ data] (-> @mock :call-args)] diff --git a/common/deps.edn b/common/deps.edn index f9f253fb3..efe44a594 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -23,7 +23,7 @@ com.cognitect/transit-cljs {:mvn/version "0.8.280"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/promesa {:mvn/version "9.1.540"} + funcool/promesa {:mvn/version "9.2.541"} funcool/cuerdas {:mvn/version "2022.06.16-403"} lambdaisland/uri {:mvn/version "1.13.95" diff --git a/common/src/app/common/data.cljc b/common/src/app/common/data.cljc index 8e4424655..d2461675e 100644 --- a/common/src/app/common/data.cljc +++ b/common/src/app/common/data.cljc @@ -5,7 +5,8 @@ ;; Copyright (c) KALEIDOS INC (ns app.common.data - "Data manipulation and query helper functions." + "A collection if helpers for working with data structures and other + data resources." (:refer-clojure :exclude [read-string hash-map merge name update-vals parse-double group-by iteration concat mapcat]) #?(:cljs @@ -22,7 +23,9 @@ [linked.set :as lks]) #?(:clj - (:import linked.set.LinkedSet))) + (:import + linked.set.LinkedSet + java.lang.AutoCloseable))) (def boolean-or-nil? (some-fn nil? boolean?)) @@ -697,3 +700,16 @@ (map (fn [key] [key (delay (generator-fn key))])) keys)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Util protocols +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defprotocol ICloseable + :extend-via-metadata true + (close! [_] "Close the resource.")) + +#?(:clj + (extend-protocol ICloseable + AutoCloseable + (close! [this] (.close this)))) diff --git a/common/src/app/common/data/macros.cljc b/common/src/app/common/data/macros.cljc index 6c93dab85..0d204e7ef 100644 --- a/common/src/app/common/data/macros.cljc +++ b/common/src/app/common/data/macros.cljc @@ -7,7 +7,7 @@ #_:clj-kondo/ignore (ns app.common.data.macros "Data retrieval & manipulation specific macros." - (:refer-clojure :exclude [get-in select-keys str]) + (:refer-clojure :exclude [get-in select-keys str with-open]) #?(:cljs (:require-macros [app.common.data.macros])) (:require #?(:clj [clojure.core :as c] @@ -94,5 +94,16 @@ [s & params] `(str/ffmt ~s ~@params)) - - +(defmacro with-open + [bindings & body] + {:pre [(vector? bindings) + (even? (count bindings)) + (pos? (count bindings))]} + (reduce (fn [acc bindings] + `(let ~(vec bindings) + (try + ~acc + (finally + (d/close! ~(first bindings)))))) + `(do ~@body) + (reverse (partition 2 bindings)))) diff --git a/common/src/app/common/exceptions.cljc b/common/src/app/common/exceptions.cljc index e13628515..11a1d432f 100644 --- a/common/src/app/common/exceptions.cljc +++ b/common/src/app/common/exceptions.cljc @@ -50,6 +50,10 @@ [& exprs] `(try* (^:once fn* [] ~@exprs) identity)) +(defmacro try! + [& exprs] + `(try* (^:once fn* [] ~@exprs) identity)) + (defn with-always "A helper that evaluates an exptession independently if the body raises exception or not."