diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn index 7b0b571f2..e655c6e4c 100644 --- a/.clj-kondo/config.edn +++ b/.clj-kondo/config.edn @@ -7,6 +7,7 @@ app.common.data/export clojure.core/def app.db/with-atomic clojure.core/with-open app.common.data.macros/get-in clojure.core/get-in + app.common.data.macros/with-open clojure.core/with-open app.common.data.macros/select-keys clojure.core/select-keys app.common.logging/with-context clojure.core/do} diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 7d8556e0c..e2d30373b 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -106,7 +106,8 @@ (s/def ::file-change-snapshot-timeout ::dt/duration) (s/def ::default-executor-parallelism ::us/integer) -(s/def ::worker-executor-parallelism ::us/integer) +(s/def ::scheduled-executor-parallelism ::us/integer) +(s/def ::worker-parallelism ::us/integer) (s/def ::authenticated-cookie-domain ::us/string) (s/def ::authenticated-cookie-name ::us/string) @@ -218,7 +219,8 @@ ::default-rpc-rlimit ::error-report-webhook ::default-executor-parallelism - ::worker-executor-parallelism + ::scheduled-executor-parallelism + ::worker-parallelism ::file-change-snapshot-every ::file-change-snapshot-timeout ::user-feedback-destination diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index e62b8290b..90b960c4a 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -493,3 +493,18 @@ (let [n (xact-check-param n) row (exec-one! conn ["select pg_try_advisory_xact_lock(?::bigint) as lock" n])] (:lock row))) + +(defn sql-exception? + [cause] + (instance? java.sql.SQLException cause)) + +(defn connection-error? + [cause] + (and (sql-exception? cause) + (contains? #{"08003" "08006" "08001" "08004"} + (.getSQLState ^java.sql.SQLException cause)))) + +(defn serialization-error? + [cause] + (and (sql-exception? cause) + (= "40001" (.getSQLState ^java.sql.SQLException cause)))) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ae32a063c..1e909f822 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -9,8 +9,13 @@ [app.auth.oidc] [app.common.logging :as l] [app.config :as cf] + [app.db :as-alias db] + [app.metrics :as-alias mtx] [app.metrics.definition :as-alias mdef] + [app.redis :as-alias rds] + [app.storage :as-alias sto] [app.util.time :as dt] + [app.worker :as-alias wrk] [cuerdas.core :as str] [integrant.core :as ig]) (:gen-class)) @@ -120,91 +125,83 @@ ::mdef/type :gauge}}) (def system-config - {:app.db/pool + {::db/pool {:uri (cf/get :database-uri) :username (cf/get :database-username) :password (cf/get :database-password) :read-only (cf/get :database-readonly false) - :metrics (ig/ref :app.metrics/metrics) + :metrics (ig/ref ::mtx/metrics) :migrations (ig/ref :app.migrations/all) :name :main :min-size (cf/get :database-min-pool-size 0) :max-size (cf/get :database-max-pool-size 60)} ;; Default thread pool for IO operations - [::default :app.worker/executor] - {:parallelism (cf/get :default-executor-parallelism 70)} + ::wrk/executor + {::wrk/parallelism (cf/get :default-executor-parallelism 100)} - ;; Dedicated thread pool for background tasks execution. - [::worker :app.worker/executor] - {:parallelism (cf/get :worker-executor-parallelism 20)} + ::wrk/scheduled-executor + {::wrk/parallelism (cf/get :scheduled-executor-parallelism 20)} - :app.worker/scheduler - {:parallelism 1 - :prefix :scheduler} - - :app.worker/executors - {:default (ig/ref [::default :app.worker/executor]) - :worker (ig/ref [::worker :app.worker/executor])} - - :app.worker/executor-monitor - {:metrics (ig/ref :app.metrics/metrics) - :executors (ig/ref :app.worker/executors)} + ::wrk/monitor + {::mtx/metrics (ig/ref ::mtx/metrics) + ::wrk/name "default" + ::wrk/executor (ig/ref ::wrk/executor)} :app.migrations/migrations {} - :app.metrics/metrics + ::mtx/metrics {:default default-metrics} :app.migrations/all {:main (ig/ref :app.migrations/migrations)} - :app.redis/redis - {:uri (cf/get :redis-uri) - :metrics (ig/ref :app.metrics/metrics)} + ::rds/redis + {::rds/uri (cf/get :redis-uri) + ::mtx/metrics (ig/ref ::mtx/metrics)} :app.msgbus/msgbus {:backend (cf/get :msgbus-backend :redis) - :executor (ig/ref [::default :app.worker/executor]) - :redis (ig/ref :app.redis/redis)} + :executor (ig/ref ::wrk/executor) + :redis (ig/ref ::rds/redis)} :app.storage.tmp/cleaner - {:executor (ig/ref [::worker :app.worker/executor]) - :scheduler (ig/ref :app.worker/scheduler)} + {::wrk/executor (ig/ref ::wrk/executor) + ::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)} - :app.storage/gc-deleted-task - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage) - :executor (ig/ref [::worker :app.worker/executor])} + ::sto/gc-deleted-task + {:pool (ig/ref ::db/pool) + :storage (ig/ref ::sto/storage) + :executor (ig/ref ::wrk/executor)} - :app.storage/gc-touched-task - {:pool (ig/ref :app.db/pool)} + ::sto/gc-touched-task + {:pool (ig/ref ::db/pool)} :app.http/client - {:executor (ig/ref [::default :app.worker/executor])} + {:executor (ig/ref ::wrk/executor)} :app.http.session/manager - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :sprops (ig/ref :app.setup/props) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} :app.http.session/gc-task - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :max-age (cf/get :auth-token-cookie-max-age)} :app.http.awsns/handler {:sprops (ig/ref :app.setup/props) - :pool (ig/ref :app.db/pool) + :pool (ig/ref ::db/pool) :http-client (ig/ref :app.http/client) - :executor (ig/ref [::worker :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} :app.http/server {:port (cf/get :http-server-port) :host (cf/get :http-server-host) :router (ig/ref :app.http/router) - :metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref [::default :app.worker/executor]) + :metrics (ig/ref ::mtx/metrics) + :executor (ig/ref ::wrk/executor) :io-threads (cf/get :http-server-io-threads) :max-body-size (cf/get :http-server-max-body-size) :max-multipart-body-size (cf/get :http-server-max-multipart-body-size)} @@ -264,10 +261,10 @@ :oidc (ig/ref :app.auth.oidc/generic-provider)} :sprops (ig/ref :app.setup/props) :http-client (ig/ref :app.http/client) - :pool (ig/ref :app.db/pool) + :pool (ig/ref ::db/pool) :session (ig/ref :app.http.session/manager) :public-uri (cf/get :public-uri) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} ;; TODO: revisit the dependencies of this service, looks they are too much unused of them :app.http/router @@ -278,61 +275,60 @@ :debug-routes (ig/ref :app.http.debug/routes) :oidc-routes (ig/ref :app.auth.oidc/routes) :ws (ig/ref :app.http.websocket/handler) - :metrics (ig/ref :app.metrics/metrics) + :metrics (ig/ref ::mtx/metrics) :public-uri (cf/get :public-uri) - :storage (ig/ref :app.storage/storage) + :storage (ig/ref ::sto/storage) :audit-handler (ig/ref :app.loggers.audit/http-handler) :rpc-routes (ig/ref :app.rpc/routes) :doc-routes (ig/ref :app.rpc.doc/routes) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} :app.http.debug/routes - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::worker :app.worker/executor]) - :storage (ig/ref :app.storage/storage) + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor) + :storage (ig/ref ::sto/storage) :session (ig/ref :app.http.session/manager)} :app.http.websocket/handler - {:pool (ig/ref :app.db/pool) - :metrics (ig/ref :app.metrics/metrics) + {:pool (ig/ref ::db/pool) + :metrics (ig/ref ::mtx/metrics) :msgbus (ig/ref :app.msgbus/msgbus)} :app.http.assets/handlers - {:metrics (ig/ref :app.metrics/metrics) + {:metrics (ig/ref ::mtx/metrics) :assets-path (cf/get :assets-path) - :storage (ig/ref :app.storage/storage) - :executor (ig/ref [::default :app.worker/executor]) + :storage (ig/ref ::sto/storage) + :executor (ig/ref ::wrk/executor) :cache-max-age (dt/duration {:hours 24}) :signature-max-age (dt/duration {:hours 24 :minutes 5})} :app.http.feedback/handler - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::default :app.worker/executor])} + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} :app.rpc/climit - {:metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref [::default :app.worker/executor])} + {:metrics (ig/ref ::mtx/metrics) + :executor (ig/ref ::wrk/executor)} :app.rpc/rlimit - {:executor (ig/ref [::worker :app.worker/executor]) - :scheduler (ig/ref :app.worker/scheduler)} + {:executor (ig/ref ::wrk/executor) + :scheduled-executor (ig/ref ::wrk/scheduled-executor)} :app.rpc/methods - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :session (ig/ref :app.http.session/manager) :sprops (ig/ref :app.setup/props) - :metrics (ig/ref :app.metrics/metrics) - :storage (ig/ref :app.storage/storage) + :metrics (ig/ref ::mtx/metrics) + :storage (ig/ref ::sto/storage) :msgbus (ig/ref :app.msgbus/msgbus) :public-uri (cf/get :public-uri) - :redis (ig/ref :app.redis/redis) + :redis (ig/ref ::rds/redis) :audit (ig/ref :app.loggers.audit/collector) :ldap (ig/ref :app.auth.ldap/provider) :http-client (ig/ref :app.http/client) :climit (ig/ref :app.rpc/climit) :rlimit (ig/ref :app.rpc/rlimit) - :executors (ig/ref :app.worker/executors) - :executor (ig/ref [::default :app.worker/executor]) + :executor (ig/ref ::wrk/executor) :templates (ig/ref :app.setup/builtin-templates) } @@ -342,15 +338,15 @@ :app.rpc/routes {:methods (ig/ref :app.rpc/methods)} - :app.worker/registry - {:metrics (ig/ref :app.metrics/metrics) + ::wrk/registry + {:metrics (ig/ref ::mtx/metrics) :tasks {:sendmail (ig/ref :app.emails/handler) :objects-gc (ig/ref :app.tasks.objects-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) - :storage-gc-deleted (ig/ref :app.storage/gc-deleted-task) - :storage-gc-touched (ig/ref :app.storage/gc-touched-task) + :storage-gc-deleted (ig/ref ::sto/gc-deleted-task) + :storage-gc-touched (ig/ref ::sto/gc-touched-task) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) :session-gc (ig/ref :app.http.session/gc-task) @@ -370,24 +366,24 @@ :app.emails/handler {:sendmail (ig/ref :app.emails/sendmail) - :metrics (ig/ref :app.metrics/metrics)} + :metrics (ig/ref ::mtx/metrics)} :app.tasks.tasks-gc/handler - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :max-age cf/deletion-delay} :app.tasks.objects-gc/handler - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage)} + {:pool (ig/ref ::db/pool) + :storage (ig/ref ::sto/storage)} :app.tasks.file-gc/handler - {:pool (ig/ref :app.db/pool)} + {:pool (ig/ref ::db/pool)} :app.tasks.file-xlog-gc/handler - {:pool (ig/ref :app.db/pool)} + {:pool (ig/ref ::db/pool)} :app.tasks.telemetry/handler - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :version (:full cf/version) :uri (cf/get :telemetry-uri) :sprops (ig/ref :app.setup/props) @@ -401,28 +397,28 @@ {:http-client (ig/ref :app.http/client)} :app.setup/props - {:pool (ig/ref :app.db/pool) + {:pool (ig/ref ::db/pool) :key (cf/get :secret-key)} :app.loggers.zmq/receiver {:endpoint (cf/get :loggers-zmq-uri)} :app.loggers.audit/http-handler - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::default :app.worker/executor])} + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} :app.loggers.audit/collector - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::worker :app.worker/executor])} + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} :app.loggers.audit/archive-task {:uri (cf/get :audit-log-archive-uri) :sprops (ig/ref :app.setup/props) - :pool (ig/ref :app.db/pool) + :pool (ig/ref ::db/pool) :http-client (ig/ref :app.http/client)} :app.loggers.audit/gc-task - {:pool (ig/ref :app.db/pool)} + {:pool (ig/ref ::db/pool)} :app.loggers.loki/reporter {:uri (cf/get :loggers-loki-uri) @@ -436,12 +432,12 @@ :app.loggers.database/reporter {:receiver (ig/ref :app.loggers.zmq/receiver) - :pool (ig/ref :app.db/pool) - :executor (ig/ref [::worker :app.worker/executor])} + :pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor)} - :app.storage/storage - {:pool (ig/ref :app.db/pool) - :executor (ig/ref [::default :app.worker/executor]) + ::sto/storage + {:pool (ig/ref ::db/pool) + :executor (ig/ref ::wrk/executor) :backends {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) @@ -455,7 +451,7 @@ {:region (cf/get :storage-assets-s3-region) :endpoint (cf/get :storage-assets-s3-endpoint) :bucket (cf/get :storage-assets-s3-bucket) - :executor (ig/ref [::default :app.worker/executor])} + :executor (ig/ref ::wrk/executor)} [::assets :app.storage.fs/backend] {:directory (cf/get :storage-assets-fs-directory)} @@ -463,12 +459,11 @@ (def worker-config - {:app.worker/cron - {:executor (ig/ref [::worker :app.worker/executor]) - :scheduler (ig/ref :app.worker/scheduler) - :tasks (ig/ref :app.worker/registry) - :pool (ig/ref :app.db/pool) - :entries + {::wrk/cron + {::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor) + ::wrk/registry (ig/ref ::wrk/registry) + ::db/pool (ig/ref ::db/pool) + ::wrk/entries [{:cron #app/cron "0 0 * * * ?" ;; hourly :task :file-xlog-gc} @@ -501,11 +496,18 @@ {:cron #app/cron "30 */5 * * * ?" ;; every 5m :task :audit-log-gc})]} - :app.worker/worker - {:executor (ig/ref [::worker :app.worker/executor]) - :tasks (ig/ref :app.worker/registry) - :metrics (ig/ref :app.metrics/metrics) - :pool (ig/ref :app.db/pool)}}) + ::wrk/scheduler + {::rds/redis (ig/ref ::rds/redis) + ::mtx/metrics (ig/ref ::mtx/metrics) + ::db/pool (ig/ref ::db/pool)} + + ::wrk/worker + {::wrk/parallelism (cf/get ::worker-parallelism 3) + ::wrk/queue "default" + ::rds/redis (ig/ref ::rds/redis) + ::wrk/registry (ig/ref ::wrk/registry) + ::mtx/metrics (ig/ref ::mtx/metrics) + ::db/pool (ig/ref ::db/pool)}}) (def system nil) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 7d3959931..8dd5b3345 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -20,7 +20,8 @@ [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] - [promesa.core :as p])) + [promesa.core :as p] + [promesa.exec :as px])) (set! *warn-on-reflection* true) @@ -52,8 +53,8 @@ (s/def ::rcv-ch ::aa/channel) (s/def ::pub-ch ::aa/channel) (s/def ::state ::us/agent) -(s/def ::pconn ::redis/connection) -(s/def ::sconn ::redis/connection) +(s/def ::pconn ::redis/connection-holder) +(s/def ::sconn ::redis/connection-holder) (s/def ::msgbus (s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor])) @@ -122,8 +123,8 @@ (defn- redis-disconnect [{:keys [::pconn ::sconn] :as cfg}] - (redis/close! pconn) - (redis/close! sconn)) + (d/close! pconn) + (d/close! sconn)) (defn- conj-subscription "A low level function that is responsible to create on-demand @@ -205,31 +206,33 @@ (when-let [closed (a/> (vals state) + (mapcat identity) + (filter some?) + (run! a/close!)) + nil))) - (a/go-loop [] - (let [[val port] (a/alts! [pub-ch rcv-ch])] - (cond - (nil? val) - (do - (l/trace :hint "stopping io-loop, nil received") - (send-via executor state (fn [state] - (->> (vals state) - (mapcat identity) - (filter some?) - (run! a/close!)) - nil))) + (= port rcv-ch) + (do + (a/ cfg - connect? (assoc ::connection (connect cfg))))) + [_ {:keys [::connect?] :as cfg}] + (let [state (initialize-resources cfg)] + (cond-> state + connect? (assoc ::connection (connect* cfg {}))))) (defmethod ig/halt-key! ::redis [_ state] @@ -114,7 +131,7 @@ (defn- initialize-resources "Initialize redis connection resources" - [{:keys [uri io-threads worker-threads connect? metrics] :as cfg}] + [{:keys [::uri ::io-threads ::worker-threads ::connect?] :as cfg}] (l/info :hint "initialize redis resources" :uri uri :io-threads io-threads @@ -131,34 +148,32 @@ redis-uri (RedisURI/create ^String uri)] (-> cfg - (assoc ::mtx/metrics metrics) - (assoc ::cache (atom {})) + (assoc ::resources resources) (assoc ::timer timer) - (assoc ::redis-uri redis-uri) - (assoc ::resources resources)))) + (assoc ::cache (atom {})) + (assoc ::redis-uri redis-uri)))) (defn- shutdown-resources [{:keys [::resources ::cache ::timer]}] - (run! close! (vals @cache)) + (run! d/close! (vals @cache)) (when resources (.shutdown ^ClientResources resources)) (when timer (.stop ^Timer timer))) -(defn connect - [{:keys [::resources ::redis-uri] :as cfg} - & {:keys [timeout codec type] :or {codec default-codec type :default}}] +(defn connect* + [{:keys [::resources ::redis-uri] :as state} + {:keys [timeout codec type] + :or {codec default-codec type :default}}] (us/assert! ::resources resources) - (let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri) - timeout (or timeout (:timeout cfg)) + timeout (or timeout (::timeout state)) conn (case type :default (.connect ^RedisClient client ^RedisCodec codec) :pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))] (.setTimeout ^StatefulConnection conn ^Duration timeout) - (reify IDeref (deref [_] conn) @@ -168,53 +183,113 @@ (.close ^StatefulConnection conn) (.shutdown ^RedisClient client))))) +(defn connect + [state & {:as opts}] + (let [connection (connect* state opts)] + (-> state + (assoc ::connection connection) + (dissoc ::cache) + (vary-meta assoc `d/close! (fn [_] (d/close! connection)))))) + (defn get-or-connect [{:keys [::cache] :as state} key options] - (assoc state ::connection - (or (get @cache key) - (-> (swap! cache (fn [cache] - (when-let [prev (get cache key)] - (close! prev)) - (assoc cache key (connect state options)))) - (get key))))) + (-> state + (assoc ::connection + (or (get @cache key) + (-> (swap! cache (fn [cache] + (when-let [prev (get cache key)] + (d/close! prev)) + (assoc cache key (connect* state options)))) + (get key)))) + (dissoc ::cache))) (defn add-listener! - [conn listener] - (us/assert! ::pubsub-connection @conn) + [{:keys [::connection] :as conn} listener] + (us/assert! ::connection-holder conn) + (us/assert! ::pubsub-connection connection) (us/assert! ::pubsub-listener listener) - - (.addListener ^StatefulRedisPubSubConnection @conn + (.addListener ^StatefulRedisPubSubConnection @connection ^RedisPubSubListener listener) conn) (defn publish! - [conn topic message] + [{:keys [::connection] :as conn} topic message] (us/assert! ::us/string topic) (us/assert! ::us/bytes message) - (us/assert! ::connection @conn) + (us/assert! ::connection-holder conn) + (us/assert! ::default-connection connection) - (let [pcomm (.async ^StatefulRedisConnection @conn)] + (let [pcomm (.async ^StatefulRedisConnection @connection)] (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message))) (defn subscribe! - "Blocking operation, intended to be used on a worker/agent thread." - [conn & topics] - (us/assert! ::pubsub-connection @conn) - (let [topics (into-array String (map str topics)) - cmd (.sync ^StatefulRedisPubSubConnection @conn)] - (.subscribe ^RedisPubSubCommands cmd topics))) + "Blocking operation, intended to be used on a thread/agent thread." + [{:keys [::connection] :as conn} & topics] + (us/assert! ::connection-holder conn) + (us/assert! ::pubsub-connection connection) + (try + (let [topics (into-array String (map str topics)) + cmd (.sync ^StatefulRedisPubSubConnection @connection)] + (.subscribe ^RedisPubSubCommands cmd topics)) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) (defn unsubscribe! - "Blocking operation, intended to be used on a worker/agent thread." - [conn & topics] - (us/assert! ::pubsub-connection @conn) - (let [topics (into-array String (map str topics)) - cmd (.sync ^StatefulRedisPubSubConnection @conn)] - (.unsubscribe ^RedisPubSubCommands cmd topics))) + "Blocking operation, intended to be used on a thread/agent thread." + [{:keys [::connection] :as conn} & topics] + (us/assert! ::connection-holder conn) + (us/assert! ::pubsub-connection connection) + (try + (let [topics (into-array String (map str topics)) + cmd (.sync ^StatefulRedisPubSubConnection @connection)] + (.unsubscribe ^RedisPubSubCommands cmd topics)) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) + +(defn rpush! + [{:keys [::connection] :as conn} key payload] + (us/assert! ::connection-holder conn) + (us/assert! (or (and (vector? payload) + (every? bytes? payload)) + (bytes? payload))) + (try + (let [cmd (.sync ^StatefulRedisConnection @connection) + data (if (vector? payload) payload [payload]) + vals (make-array (. Class (forName "[B")) (count data))] + + (loop [i 0 xs (seq data)] + (when xs + (aset ^"[[B" vals i ^bytes (first xs)) + (recur (inc i) (next xs)))) + + (.rpush ^RedisCommands cmd + ^String key + ^"[[B" vals)) + + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) + +(defn blpop! + [{:keys [::connection] :as conn} timeout & keys] + (us/assert! ::connection-holder conn) + (try + (let [keys (into-array Object (map str keys)) + cmd (.sync ^StatefulRedisConnection @connection) + timeout (/ (double (inst-ms timeout)) 1000.0)] + (when-let [res (.blpop ^RedisCommands cmd + ^double timeout + ^"[Ljava.lang.String;" keys)] + (MapEntry/create + (.getKey ^KeyValue res) + (.getValue ^KeyValue res)))) + (catch RedisCommandInterruptedException cause + (throw (InterruptedException. (ex-message cause)))))) (defn open? - [conn] - (.isOpen ^StatefulConnection @conn)) + [{:keys [::connection] :as conn}] + (us/assert! ::connection-holder conn) + (us/assert! ::pubsub-connection connection) + (.isOpen ^StatefulConnection @connection)) (defn pubsub-listener [& {:keys [on-message on-subscribe on-unsubscribe]}] @@ -243,10 +318,6 @@ (when on-unsubscribe (on-unsubscribe nil topic count))))) -(defn close! - [o] - (.close ^AutoCloseable o)) - (def ^:private scripts-cache (atom {})) (def noop-fn (constantly nil)) @@ -262,12 +333,12 @@ ::rscript/vals])) (defn eval! - [{:keys [::mtx/metrics] :as state} script] - (us/assert! ::rscript/script script) + [{:keys [::mtx/metrics ::connection] :as state} script] (us/assert! ::redis state) + (us/assert! ::connection-holder state) + (us/assert! ::rscript/script script) - (let [rconn (-> state ::connection deref) - cmd (.async ^StatefulRedisConnection rconn) + (let [cmd (.async ^StatefulRedisConnection @connection) keys (into-array String (map str (::rscript/keys script))) vals (into-array String (map str (::rscript/vals script))) sname (::rscript/name script)] @@ -276,20 +347,20 @@ (if (instance? io.lettuce.core.RedisNoScriptException cause) (do (l/error :hint "no script found" :name sname :cause cause) - (-> (load-script) - (p/then eval-script))) + (->> (load-script) + (p/mapcat eval-script))) (if-let [on-error (::rscript/on-error script)] (on-error cause) (p/rejected cause)))) (eval-script [sha] (let [tpoint (dt/tpoint)] - (-> (.evalsha ^RedisScriptingAsyncCommands cmd - ^String sha - ^ScriptOutputType ScriptOutputType/MULTI - ^"[Ljava.lang.String;" keys - ^"[Ljava.lang.String;" vals) - (p/then (fn [result] + (->> (.evalsha ^RedisScriptingAsyncCommands cmd + ^String sha + ^ScriptOutputType ScriptOutputType/MULTI + ^"[Ljava.lang.String;" keys + ^"[Ljava.lang.String;" vals) + (p/map (fn [result] (let [elapsed (tpoint)] (mtx/run! metrics {:id :redis-eval-timing :labels [(name sname)] @@ -300,20 +371,28 @@ :params (str/join "," (::rscript/vals script)) :elapsed (dt/format-duration elapsed)) result))) - (p/catch on-error)))) + (p/error on-error)))) (read-script [] (-> script ::rscript/path io/resource slurp)) (load-script [] (l/trace :hint "load script" :name sname) - (-> (.scriptLoad ^RedisScriptingAsyncCommands cmd + (->> (.scriptLoad ^RedisScriptingAsyncCommands cmd ^String (read-script)) - (p/then (fn [sha] + (p/map (fn [sha] (swap! scripts-cache assoc sname sha) sha))))] (if-let [sha (get @scripts-cache sname)] (eval-script sha) - (-> (load-script) - (p/then eval-script)))))) + (->> (load-script) + (p/mapcat eval-script)))))) + +(defn timeout-exception? + [cause] + (instance? RedisCommandTimeoutException cause)) + +(defn exception? + [cause] + (instance? RedisException cause)) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 52cf69b91..adeeceb9e 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -23,6 +23,7 @@ [app.storage :as-alias sto] [app.util.services :as sv] [app.util.time :as ts] + [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] @@ -270,6 +271,7 @@ ::http-client ::rlimit ::climit + ::wrk/executor ::mtx/metrics ::db/pool ::ldap])) diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index 76c6f44e7..8b97c4cce 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -63,16 +63,16 @@ (l/trace :hint "enqueued" :key (name bkey) :skey (str skey) - :queue-size (get instance :current-queue-size) - :concurrency (get instance :current-concurrency) + :queue-size (get instance ::pxb/current-queue-size) + :concurrency (get instance ::pxb/current-concurrency)) (mtx/run! metrics :id :rpc-climit-queue-size - :val (get instance :current-queue-size) + :val (get instance ::pxb/current-queue-size) :labels labels) (mtx/run! metrics :id :rpc-climit-concurrency - :val (get instance :current-concurrency) - :labels labels))) + :val (get instance ::pxb/current-concurrency) + :labels labels)) on-run (fn [instance task] (let [elapsed (- (inst-ms (dt/now)) @@ -87,11 +87,11 @@ :labels labels) (mtx/run! metrics :id :rpc-climit-queue-size - :val (get instance :current-queue-size) + :val (get instance ::pxb/current-queue-size) :labels labels) (mtx/run! metrics :id :rpc-climit-concurrency - :val (get instance :current-concurrency) + :val (get instance ::pxb/current-concurrency) :labels labels))) options {:executor executor diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index 390892e33..7c07d6758 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -332,7 +332,7 @@ ::limits limits})))) (defn- refresh-config - [{:keys [state path executor scheduler] :as params}] + [{:keys [state path executor scheduled-executor] :as params}] (letfn [(update-config [{:keys [::updated-at] :as state}] (let [updated-at' (fs/last-modified-time path)] (merge state @@ -347,7 +347,7 @@ state))))) (schedule-next [state] - (px/schedule! scheduler + (px/schedule! scheduled-executor (inst-ms (::refresh state)) (partial refresh-config params)) state)] @@ -371,7 +371,7 @@ (and (fs/exists? path) (fs/regular-file? path) path))) (defmethod ig/pre-init-spec :app.rpc/rlimit [_] - (s/keys :req-un [::wrk/executor ::wrk/scheduler])) + (s/keys :req-un [::wrk/executor ::wrk/scheduled-executor])) (defmethod ig/init-key ::rpc/rlimit [_ {:keys [executor] :as params}] diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 5a85ab684..80d5a7035 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -20,6 +20,7 @@ [app.util.objects-map :as omap] [app.util.pointer-map :as pmap] [app.util.time :as dt] + [app.worker :as wrk] [clojure.pprint :refer [pprint]] [cuerdas.core :as str])) @@ -37,6 +38,16 @@ (task-fn params) (println (format "no task '%s' found" name)))))) +(defn schedule-task! + ([system name] + (schedule-task! system name {})) + ([system name props] + (let [pool (:app.db/pool system)] + (wrk/submit! + ::wrk/conn pool + ::wrk/task name + ::wrk/props props)))) + (defn send-test-email! [system destination] (us/verify! diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj index 89382a121..3e64e6bfc 100644 --- a/backend/src/app/storage/tmp.clj +++ b/backend/src/app/storage/tmp.clj @@ -12,6 +12,7 @@ (:require [app.common.data :as d] [app.common.logging :as l] + [app.storage :as-alias sto] [app.util.time :as dt] [app.worker :as wrk] [clojure.core.async :as a] @@ -23,43 +24,43 @@ (declare remove-temp-file) (defonce queue (a/chan 128)) -(s/def ::min-age ::dt/duration) - (defmethod ig/pre-init-spec ::cleaner [_] - (s/keys :req-un [::min-age ::wrk/scheduler ::wrk/executor])) + (s/keys :req [::sto/min-age ::wrk/scheduled-executor])) (defmethod ig/prep-key ::cleaner [_ cfg] - (merge {:min-age (dt/duration {:minutes 30})} + (merge {::sto/min-age (dt/duration "30m")} (d/without-nils cfg))) (defmethod ig/init-key ::cleaner - [_ {:keys [scheduler executor min-age] :as cfg}] - (l/info :hint "starting tempfile cleaner service") - (let [cch (a/chan)] - (a/go-loop [] - (let [[path port] (a/alts! [queue cch])] - (when (not= port cch) + [_ {:keys [::sto/min-age ::wrk/scheduled-executor] :as cfg}] + (px/thread + {:name "penpot/storage-tmp-cleaner"} + (try + (l/info :hint "started tmp file cleaner") + (loop [] + (when-let [path (a/ close-ch a/close!)) + [_ thread] + (px/interrupt! thread)) (defn- remove-temp-file "Permanently delete tempfile" - [executor path] - (px/with-dispatch executor - (l/trace :hint "permanently delete tempfile" :path path) - (when (fs/exists? path) - (fs/delete path)))) + [path] + (l/trace :hint "permanently delete tempfile" :path path) + (when (fs/exists? path) + (fs/delete path))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; API diff --git a/backend/src/app/util/closeable.clj b/backend/src/app/util/closeable.clj deleted file mode 100644 index 6d20f765f..000000000 --- a/backend/src/app/util/closeable.clj +++ /dev/null @@ -1,31 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) KALEIDOS INC - -(ns app.util.closeable - "A closeable abstraction. A drop in replacement for - clojure builtin `with-open` syntax abstraction." - (:refer-clojure :exclude [with-open])) - -(defprotocol ICloseable - (-close [_] "Close the resource.")) - -(defmacro with-open - [bindings & body] - {:pre [(vector? bindings) - (even? (count bindings)) - (pos? (count bindings))]} - (reduce (fn [acc bindings] - `(let ~(vec bindings) - (try - ~acc - (finally - (-close ~(first bindings)))))) - `(do ~@body) - (reverse (partition 2 bindings)))) - -(extend-protocol ICloseable - java.lang.AutoCloseable - (-close [this] (.close this))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index ea9df91af..f54a3add1 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -8,113 +8,124 @@ "Async tasks abstraction (impl)." (:require [app.common.data :as d] + [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] + [app.common.transit :as t] [app.common.uuid :as uuid] [app.db :as db] [app.metrics :as mtx] - [app.util.async :as aa] + [app.redis :as rds] [app.util.time :as dt] - [clojure.core.async :as a] [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] [promesa.exec :as px]) (:import java.util.concurrent.ExecutorService - java.util.concurrent.Executors java.util.concurrent.ForkJoinPool java.util.concurrent.Future - java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory - java.util.concurrent.ForkJoinWorkerThread - java.util.concurrent.ScheduledExecutorService - java.util.concurrent.ThreadFactory - java.util.concurrent.atomic.AtomicLong)) + java.util.concurrent.ScheduledExecutorService)) (set! *warn-on-reflection* true) (s/def ::executor #(instance? ExecutorService %)) -(s/def ::scheduler #(instance? ScheduledExecutorService %)) +(s/def ::scheduled-executor #(instance? ScheduledExecutorService %)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare ^:private get-fj-thread-factory) -(declare ^:private get-thread-factory) - (s/def ::parallelism ::us/integer) (defmethod ig/pre-init-spec ::executor [_] - (s/keys :opt-un [::parallelism])) + (s/keys :req [::parallelism])) (defmethod ig/init-key ::executor - [skey {:keys [parallelism]}] - (let [prefix (if (vector? skey) (-> skey first name keyword) :default)] - (if parallelism - (ForkJoinPool. (int parallelism) (get-fj-thread-factory prefix) nil false) - (Executors/newCachedThreadPool (get-thread-factory prefix))))) + [skey {:keys [::parallelism]}] + (let [prefix (if (vector? skey) (-> skey first name keyword) :default) + tname (str "penpot/" prefix "/%s") + factory (px/forkjoin-thread-factory :name tname)] + (px/forkjoin-executor + :factory factory + :parallelism parallelism + :async? true))) (defmethod ig/halt-key! ::executor [_ instance] - (.shutdown ^ExecutorService instance)) + (px/shutdown! instance)) -(defmethod ig/pre-init-spec ::scheduler [_] - (s/keys :req-un [::prefix] - :opt-un [::parallelism])) +(defmethod ig/pre-init-spec ::scheduled-executor [_] + (s/keys :req [::parallelism])) -(defmethod ig/init-key ::scheduler - [_ {:keys [parallelism prefix] :or {parallelism 1}}] - (px/scheduled-pool parallelism (get-thread-factory prefix))) +(defmethod ig/init-key ::scheduled-executor + [_ {:keys [::parallelism]}] + (px/scheduled-executor + :parallelism parallelism + :factory (px/thread-factory :name "penpot/scheduled-executor/%s"))) -(defmethod ig/halt-key! ::scheduler +(defmethod ig/halt-key! ::scheduled-executor [_ instance] - (.shutdown ^ExecutorService instance)) - -(defn- get-fj-thread-factory - ^ForkJoinPool$ForkJoinWorkerThreadFactory - [prefix] - (let [^AtomicLong counter (AtomicLong. 0)] - (reify ForkJoinPool$ForkJoinWorkerThreadFactory - (newThread [_ pool] - (let [thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool) - tname (str "penpot/" (name prefix) "-" (.getAndIncrement counter))] - (.setName ^ForkJoinWorkerThread thread ^String tname) - thread))))) - -(defn- get-thread-factory - ^ThreadFactory - [prefix] - (let [^AtomicLong counter (AtomicLong. 0)] - (reify ThreadFactory - (newThread [_ runnable] - (doto (Thread. runnable) - (.setDaemon true) - (.setName (str "penpot/" (name prefix) "-" (.getAndIncrement counter)))))))) + (px/shutdown! instance)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Executor Monitor +;; TASKS REGISTRY ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::executors - (s/map-of keyword? ::executor)) +(defn- wrap-task-handler + [metrics tname f] + (let [labels (into-array String [tname])] + (fn [params] + (let [tp (dt/tpoint)] + (try + (f params) + (finally + (mtx/run! metrics + {:id :tasks-timing + :val (inst-ms (tp)) + :labels labels}))))))) -(defmethod ig/pre-init-spec ::executor-monitor [_] - (s/keys :req-un [::executors ::mtx/metrics])) +(s/def ::registry (s/map-of ::us/string fn?)) -(defmethod ig/init-key ::executor-monitor - [_ {:keys [executors metrics interval] :or {interval 3000}}] - (letfn [(monitor! [state skey ^ForkJoinPool executor] - (let [prev-steals (get state skey 0) - running (.getRunningThreadCount executor) - queued (.getQueuedSubmissionCount executor) - active (.getPoolSize executor) - steals (.getStealCount executor) - labels (into-array String [(name skey)]) +(defmethod ig/pre-init-spec ::registry [_] + (s/keys :req-un [::mtx/metrics ::tasks])) - steals-increment (- steals prev-steals) - steals-increment (if (neg? steals-increment) 0 steals-increment)] +(defmethod ig/init-key ::registry + [_ {:keys [metrics tasks]}] + (l/info :hint "registry initialized" :tasks (count tasks)) + (reduce-kv (fn [registry k v] + (let [tname (name k)] + (l/debug :hint "register task" :name tname) + (assoc registry tname (wrap-task-handler metrics tname v)))) + {} + tasks)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; EXECUTOR MONITOR +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(s/def ::name ::us/keyword) + +(defmethod ig/pre-init-spec ::monitor [_] + (s/keys :req [::name ::executor ::mtx/metrics])) + +(defmethod ig/prep-key ::monitor + [_ cfg] + (merge {::interval (dt/duration "2s")} + (d/without-nils cfg))) + +(defmethod ig/init-key ::monitor + [_ {:keys [::executor ::mtx/metrics ::interval ::name]}] + (letfn [(monitor! [^ForkJoinPool executor prev-steals] + (let [running (.getRunningThreadCount executor) + queued (.getQueuedSubmissionCount executor) + active (.getPoolSize executor) + steals (.getStealCount executor) + labels (into-array String [(d/name name)]) + + steals-inc (- steals prev-steals) + steals-inc (if (neg? steals-inc) 0 steals-inc)] (mtx/run! metrics :id :executor-active-threads @@ -128,137 +139,495 @@ :labels labels :val queued) (mtx/run! metrics - :id :executors-completed-tasks - :labels labels - :inc steals-increment) + :id :executors-completed-tasks + :labels labels + :inc steals-inc) - (aa/thread-sleep interval) - (if (.isShutdown executor) - (l/debug :hint "stopping monitor; cause: executor is shutdown") - (assoc state skey steals)))) + steals))] - (monitor-fn [] - (try - (loop [items (into (d/queue) executors) - state {}] - (when-let [[skey executor :as item] (peek items)] - (if-let [state (monitor! state skey executor)] - (recur (conj items item) state) - (recur items state)))) - (catch InterruptedException _cause - (l/debug :hint "stopping monitor; interrupted"))))] + (px/thread + {:name "penpot/executors-monitor"} + (l/info :hint "monitor: started" :name name) + (try + (loop [steals 0] + (when-not (px/shutdown? executor) + (px/sleep interval) + (recur (long (monitor! executor steals))))) + (catch InterruptedException _cause + (l/debug :hint "monitor: interrupted" :name name)) + (catch Throwable cause + (l/error :hint "monitor: unexpected error" :name name :cause cause)) + (finally + (l/info :hint "monitor: terminated" :name name)))))) - (let [thread (Thread. monitor-fn)] - (.setDaemon thread true) - (.setName thread "penpot/executor-monitor") - (.start thread) - - thread))) - -(defmethod ig/halt-key! ::executor-monitor +(defmethod ig/halt-key! ::monitor [_ thread] - (.interrupt ^Thread thread)) + (px/interrupt! thread)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Worker +;; SCHEDULER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare event-loop-fn) -(declare event-loop) +(defn- decode-task-row + [{:keys [props] :as row}] + (cond-> row + (db/pgobject? props) + (assoc :props (db/decode-transit-pgobject props)))) -(s/def ::queue keyword?) -(s/def ::parallelism ::us/integer) -(s/def ::batch-size ::us/integer) -(s/def ::tasks (s/map-of keyword? fn?)) -(s/def ::poll-interval ::dt/duration) +(s/def ::queue ::us/string) +(s/def ::wait-duration ::dt/duration) + +(defmethod ig/pre-init-spec ::scheduler [_] + (s/keys :req [::mtx/metrics + ::db/pool + ::rds/redis] + :opt [::wait-duration + ::batch-size])) + +(defmethod ig/prep-key ::scheduler + [_ cfg] + (merge {::batch-size 1 + ::wait-duration (dt/duration "2s")} + (d/without-nils cfg))) + +(def ^:private sql:select-next-tasks + "select * from task as t + where t.scheduled_at <= now() + and (t.status = 'new' or t.status = 'retry') + order by t.priority desc, t.scheduled_at + limit ? + for update skip locked") + +(defn- format-queue + [queue] + (str/ffmt "penpot-tasks-queue:%" queue)) + +(defmethod ig/init-key ::scheduler + [_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}] + (letfn [(get-tasks-batch [conn] + (->> (db/exec! conn [sql:select-next-tasks batch-size]) + (map decode-task-row) + (seq))) + + (queue-task [conn rconn {:keys [id queue] :as task}] + (db/update! conn :task {:status "ready"} {:id id}) + (let [queue (format-queue queue) + payload (t/encode id) + result (rds/rpush! rconn queue payload)] + (l/debug :hist "scheduler: task pushed to redis" + :task-id id + :key queue + :queued result))) + + (run-batch [rconn] + (db/with-atomic [conn pool] + (when-let [tasks (get-tasks-batch conn)] + (run! (partial queue-task conn rconn) tasks) + true))) + ] + + (if (db/read-only? pool) + (l/warn :hint "scheduler: not started (db is read-only)") + (px/thread + {:name "penpot/scheduler"} + (l/info :hint "scheduler: started") + (try + (dm/with-open [rconn (rds/connect redis)] + (loop [] + (when (px/interrupted?) + (throw (InterruptedException. "interrumpted"))) + + (try + (when-not (run-batch rconn) + (px/sleep (::wait-duration cfg))) + (catch InterruptedException cause + (throw cause)) + (catch Exception cause + (cond + (rds/exception? cause) + (do + (l/warn :hint "scheduler: redis exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + (db/sql-exception? cause) + (do + (l/warn :hint "scheduler: database exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn))) + + :else + (do + (l/error :hint "scheduler: unhandled exception (will retry in an instant)" :cause cause) + (px/sleep (::rds/timeout rconn)))))) + + (recur))) + + (catch InterruptedException _ + (l/debug :hint "scheduler: interrupted")) + (catch Throwable cause + (l/error :hint "scheduler: unexpected exception" :cause cause)) + (finally + (l/info :hint "scheduler: terminated"))))))) + +(defmethod ig/halt-key! ::scheduler + [_ thread] + (some-> thread px/interrupt!)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; WORKER +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare ^:private run-worker-loop!) +(declare ^:private start-worker!) +(declare ^:private get-error-context) (defmethod ig/pre-init-spec ::worker [_] - (s/keys :req-un [::executor - ::mtx/metrics - ::db/pool - ::batch-size - ::name - ::poll-interval - ::queue - ::tasks])) + (s/keys :req [::parallelism + ::mtx/metrics + ::db/pool + ::rds/redis + ::queue + ::registry])) (defmethod ig/prep-key ::worker [_ cfg] - (d/merge {:batch-size 2 - :name :worker - :poll-interval (dt/duration {:seconds 5}) - :queue :default} - (d/without-nils cfg))) - -(defn- event-loop - "Main, worker eventloop" - [{:keys [pool poll-interval close-ch] :as cfg}] - (let [poll-ms (inst-ms poll-interval)] - (a/go-loop [] - (let [[val port] (a/alts! [close-ch (event-loop-fn cfg)] :priority true)] - (cond - ;; Terminate the loop if close channel is closed or - ;; event-loop-fn returns nil. - (or (= port close-ch) (nil? val)) - (l/debug :hint "stop condition found") - - (db/closed? pool) - (do - (l/debug :hint "eventloop aborted because pool is closed") - (a/close! close-ch)) - - (and (instance? java.sql.SQLException val) - (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState ^java.sql.SQLException val))) - (do - (l/warn :hint "connection error, trying resume in some instants") - (a/> (range parallelism) + (map #(assoc cfg ::worker-id %)) + (map start-worker!))))) (defmethod ig/halt-key! ::worker - [_ instance] - (.close ^java.lang.AutoCloseable instance)) + [_ threads] + (run! px/interrupt! threads)) -;; --- SUBMIT +(defn- start-worker! + [{:keys [::rds/redis ::worker-id] :as cfg}] + (px/thread + {:name (format "penpot/worker/%s" worker-id)} + (l/info :hint "worker: started" :worker-id worker-id) + (try + (dm/with-open [rconn (rds/connect redis)] + (let [cfg (-> cfg + (update ::queue format-queue) + (assoc ::rds/rconn rconn) + (assoc ::timeout (dt/duration "5s")))] + (loop [] + (when (px/interrupted?) + (throw (InterruptedException. "interrupted"))) + + (run-worker-loop! cfg) + (recur)))) + + (catch InterruptedException _ + (l/debug :hint "worker: interrupted" + :worker-id worker-id)) + (catch Throwable cause + (l/error :hint "worker: unexpected exception" + :worker-id worker-id + :cause cause)) + (finally + (l/info :hint "worker: terminated" :worker-id worker-id))))) + +(defn- run-worker-loop! + [{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}] + (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}] + (let [explain (ex-message error) + nretry (+ (:retry-num task) inc-by) + now (dt/now) + delay (->> (iterate #(* % 2) delay) (take nretry) (last))] + (db/update! pool :task + {:error explain + :status "retry" + :modified-at now + :scheduled-at (dt/plus now delay) + :retry-num nretry} + {:id (:id task)}) + nil)) + + (handle-task-failure [{:keys [task error]}] + (let [explain (ex-message error)] + (db/update! pool :task + {:error explain + :modified-at (dt/now) + :status "failed"} + {:id (:id task)}) + nil)) + + (handle-task-completion [{:keys [task]}] + (let [now (dt/now)] + (db/update! pool :task + {:completed-at now + :modified-at now + :status "completed"} + {:id (:id task)}) + nil)) + + (decode-payload [^bytes payload] + (try + (let [task-id (t/decode payload)] + (if (uuid? task-id) + task-id + (l/error :hint "worker: received unexpected payload (uuid expected)" + :payload task-id))) + (catch Throwable cause + (l/error :hint "worker: unable to decode payload" + :payload payload + :length (alength payload) + :cause cause)))) + + (handle-task [{:keys [name] :as task}] + (let [task-fn (get registry name)] + (if task-fn + (task-fn task) + (l/warn :hint "no task handler found" :name name)) + {:status :completed :task task})) + + (handle-task-exception [cause task] + (let [edata (ex-data cause)] + (if (and (< (:retry-num task) + (:max-retries task)) + (= ::retry (:type edata))) + (cond-> {:status :retry :task task :error cause} + (dt/duration? (:delay edata)) + (assoc :delay (:delay edata)) + + (= ::noop (:strategy edata)) + (assoc :inc-by 0)) + (do + (l/error :hint "worker: unhandled exception on task" + ::l/context (get-error-context cause task) + :cause cause) + (if (>= (:retry-num task) (:max-retries task)) + {:status :failed :task task :error cause} + {:status :retry :task task :error cause}))))) + + (get-task [task-id] + (ex/try (db/get* pool :task {:id task-id}))) + + (run-task [task-id] + (loop [task (get-task task-id)] + (cond + (ex/exception? task) + (if (or (db/connection-error? task) + (db/serialization-error? task)) + (do + (l/warn :hint "worker: connection error on retrieving task from database (retrying in some instants)" + :worker-id worker-id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task task-id))) + (do + (l/error :hint "worker: unhandled exception on retrieving task from database (retrying in some instants)" + :worker-id worker-id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task task-id)))) + + (nil? task) + (l/warn :hint "worker: no task found on the database" + :worker-id worker-id + :task-id task-id) + + :else + (try + (l/trace :hint "worker: executing task" + :worker-id worker-id + :task-id (:id task) + :task-name (:name task) + :task-retry (:retry-num task)) + (handle-task task) + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (handle-task-exception cause task)))))) + + (process-result [{:keys [status] :as result}] + (ex/try! + (case status + :retry (handle-task-retry result) + :failed (handle-task-failure result) + :completed (handle-task-completion result)))) + + (run-task-loop [task-id] + (loop [result (run-task task-id)] + (when-let [cause (process-result result)] + (if (or (db/connection-error? cause) + (db/serialization-error? cause)) + (do + (l/warn :hint "worker: database exeption on processing task result (retrying in some instants)" + :cause cause) + (px/sleep (::rds/timeout rconn)) + (recur result)) + (do + (l/error :hint "worker: unhandled exception on processing task result (retrying in some instants)" + :cause cause) + (px/sleep (::rds/timeout rconn)) + (recur result))))))] + + (try + (let [[_ payload] (rds/blpop! rconn timeout queue)] + (some-> payload + decode-payload + run-task-loop)) + + (catch InterruptedException cause + (throw cause)) + + (catch Exception cause + (if (rds/timeout-exception? cause) + (do + (l/error :hint "worker: redis pop operation timeout, consider increasing redis timeout (will retry in some instants)" + :timeout timeout + :cause cause) + (px/sleep timeout)) + + (l/error :hint "worker: unhandled exception" :cause cause)))))) + +(defn- get-error-context + [error item] + (let [data (ex-data error)] + (merge + {:hint (ex-message error) + :spec-problems (some->> data ::s/problems (take 10) seq vec) + :spec-value (some->> data ::s/value) + :data (some-> data (dissoc ::s/problems ::s/value ::s/spec)) + :params item} + (when (and data (::s/problems data)) + {:spec-explain (us/pretty-explain data)})))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; CRON +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare schedule-cron-task) +(declare synchronize-cron-entries!) + +(s/def ::fn (s/or :var var? :fn fn?)) +(s/def ::id keyword?) +(s/def ::cron dt/cron?) +(s/def ::props (s/nilable map?)) +(s/def ::task keyword?) + +(s/def ::cron-task + (s/keys :req-un [::cron ::task] + :opt-un [::props ::id])) + +(s/def ::entries (s/coll-of (s/nilable ::cron-task))) + +(defmethod ig/pre-init-spec ::cron [_] + (s/keys :req [::scheduled-executor ::db/pool ::entries ::registry])) + +(defmethod ig/init-key ::cron + [_ {:keys [::entries ::registry ::db/pool] :as cfg}] + (if (db/read-only? pool) + (l/warn :hint "cron: not started (db is read-only)") + (let [running (atom #{}) + entries (->> entries + (filter some?) + ;; If id is not defined, use the task as id. + (map (fn [{:keys [id task] :as item}] + (if (some? id) + (assoc item :id (d/name id)) + (assoc item :id (d/name task))))) + (map (fn [item] + (update item :task d/name))) + (map (fn [{:keys [task] :as item}] + (let [f (get registry task)] + (when-not f + (ex/raise :type :internal + :code :task-not-found + :hint (str/fmt "task %s not configured" task))) + (-> item + (dissoc :task) + (assoc :fn f)))))) + + cfg (assoc cfg ::entries entries ::running running)] + + (l/info :hint "cron: started" :tasks (count entries)) + (synchronize-cron-entries! cfg) + + (->> (filter some? entries) + (run! (partial schedule-cron-task cfg))) + + (reify + clojure.lang.IDeref + (deref [_] @running) + + java.lang.AutoCloseable + (close [_] + (l/info :hint "cron: terminated") + (doseq [item @running] + (when-not (.isDone ^Future item) + (.cancel ^Future item true)))))))) + +(defmethod ig/halt-key! ::cron + [_ instance] + (some-> instance d/close!)) + +(def sql:upsert-cron-task + "insert into scheduled_task (id, cron_expr) + values (?, ?) + on conflict (id) + do update set cron_expr=?") + +(defn- synchronize-cron-entries! + [{:keys [::db/pool ::entries]}] + (db/with-atomic [conn pool] + (doseq [{:keys [id cron]} entries] + (l/trace :hint "register cron task" :id id :cron (str cron)) + (db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)])))) + +(def sql:lock-cron-task + "select id from scheduled_task where id=? for update skip locked") + +(defn- execute-cron-task + [{:keys [::db/pool] :as cfg} {:keys [id] :as task}] + (try + (db/with-atomic [conn pool] + (when (db/exec-one! conn [sql:lock-cron-task (d/name id)]) + (l/trace :hint "cron: execute task" :task-id id) + ((:fn task) task))) + (catch InterruptedException _ + (px/interrupt! (px/current-thread)) + (l/debug :hint "cron: task interrupted" :task-id id)) + (catch Throwable cause + (l/error :hint "cron: unhandled exception on running task" + ::l/context (get-error-context cause task) + :task-id id + :cause cause)) + (finally + (when-not (px/interrupted? :current) + (schedule-cron-task cfg task))))) + +(defn- ms-until-valid + [cron] + (s/assert dt/cron? cron) + (let [now (dt/now) + next (dt/next-valid-instant-from cron now)] + (inst-ms (dt/diff now next)))) + +(def ^:private + xf-without-done + (remove #(.isDone ^Future %))) + +(defn- schedule-cron-task + [{:keys [::scheduled-executor ::running] :as cfg} {:keys [cron] :as task}] + (let [ft (px/schedule! scheduled-executor + (ms-until-valid cron) + (partial execute-cron-task cfg task))] + (swap! running #(into #{ft} xf-without-done %)))) + + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; SUBMIT API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (s/def ::task keyword?) (s/def ::delay (s/or :int ::us/integer :duration dt/duration?)) @@ -286,9 +655,9 @@ returning id") (defn submit! - [{:keys [::task ::delay ::queue ::priority ::max-retries ::conn] - :or {delay 0 queue :default priority 100 max-retries 3} - :as options}] + [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn] + :or {delay 0 queue "default" priority 100 max-retries 3} + :as options}] (us/verify ::submit-options options) (let [duration (dt/duration delay) interval (db/interval duration) @@ -297,296 +666,9 @@ (l/debug :action "submit task" :name (d/name task) + :queue queue :in duration) (db/exec-one! conn [sql:insert-new-task id (d/name task) props - (d/name queue) priority max-retries interval]) + queue priority max-retries interval]) id)) - -;; --- RUNNER - -(def ^:private - sql:mark-as-retry - "update task - set scheduled_at = now() + ?::interval, - modified_at = now(), - error = ?, - status = 'retry', - retry_num = ? - where id = ?") - -(defn- mark-as-retry - [conn {:keys [task error inc-by delay] - :or {inc-by 1 delay 1000}}] - (let [explain (ex-message error) - nretry (+ (:retry-num task) inc-by) - delay (->> (iterate #(* % 2) delay) (take nretry) (last)) - sqlv [sql:mark-as-retry (db/interval delay) explain nretry (:id task)]] - (db/exec-one! conn sqlv) - nil)) - -(defn- mark-as-failed - [conn {:keys [task error]}] - (let [explain (ex-message error)] - (db/update! conn :task - {:error explain - :modified-at (dt/now) - :status "failed"} - {:id (:id task)}) - nil)) - -(defn- mark-as-completed - [conn {:keys [task] :as cfg}] - (let [now (dt/now)] - (db/update! conn :task - {:completed-at now - :modified-at now - :status "completed"} - {:id (:id task)}) - nil)) - -(defn- decode-task-row - [{:keys [props name] :as row}] - (when row - (cond-> row - (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)) - (string? name) (assoc :name (keyword name))))) - -(defn- handle-task - [tasks {:keys [name] :as item}] - (let [task-fn (get tasks name)] - (if task-fn - (task-fn item) - (l/warn :hint "no task handler found" - :name (d/name name))) - {:status :completed :task item})) - -(defn get-error-context - [error item] - (let [data (ex-data error)] - (merge - {:hint (ex-message error) - :spec-problems (some->> data ::s/problems (take 10) seq vec) - :spec-value (some->> data ::s/value) - :data (some-> data (dissoc ::s/problems ::s/value ::s/spec)) - :params item} - (when (and data (::s/problems data)) - {:spec-explain (us/pretty-explain data)})))) - -(defn- handle-exception - [error item] - (let [edata (ex-data error)] - (if (and (< (:retry-num item) - (:max-retries item)) - (= ::retry (:type edata))) - (cond-> {:status :retry :task item :error error} - (dt/duration? (:delay edata)) - (assoc :delay (:delay edata)) - - (= ::noop (:strategy edata)) - (assoc :inc-by 0)) - (do - (l/error :hint "unhandled exception on task" - ::l/context (get-error-context error item) - :cause error) - (if (>= (:retry-num item) (:max-retries item)) - {:status :failed :task item :error error} - {:status :retry :task item :error error}))))) - -(defn- run-task - [{:keys [tasks]} item] - (let [name (d/name (:name item))] - (try - (l/trace :action "execute task" - :id (:id item) - :name name - :retry (:retry-num item)) - (handle-task tasks item) - (catch Exception e - (handle-exception e item))))) - -(def sql:select-next-tasks - "select * from task as t - where t.scheduled_at <= now() - and t.queue = ? - and (t.status = 'new' or t.status = 'retry') - order by t.priority desc, t.scheduled_at - limit ? - for update skip locked") - -(defn- event-loop-fn* - [{:keys [pool executor batch-size] :as cfg}] - (db/with-atomic [conn pool] - (let [queue (name (:queue cfg)) - items (->> (db/exec! conn [sql:select-next-tasks queue batch-size]) - (map decode-task-row) - (seq)) - cfg (assoc cfg :conn conn)] - - (if (nil? items) - ::empty - (let [proc-xf (comp (map #(partial run-task cfg %)) - (map #(px/submit! executor %)))] - (->> (into [] proc-xf items) - (map deref) - (run! (fn [res] - (case (:status res) - :retry (mark-as-retry conn res) - :failed (mark-as-failed conn res) - :completed (mark-as-completed conn res))))) - ::handled))))) - -(defn- event-loop-fn - [{:keys [executor] :as cfg}] - (aa/thread-call executor #(event-loop-fn* cfg))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Scheduler -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(declare schedule-cron-task) -(declare synchronize-cron-entries!) - -(s/def ::fn (s/or :var var? :fn fn?)) -(s/def ::id keyword?) -(s/def ::cron dt/cron?) -(s/def ::props (s/nilable map?)) -(s/def ::task keyword?) - -(s/def ::cron-task - (s/keys :req-un [::cron ::task] - :opt-un [::props ::id])) - -(s/def ::entries (s/coll-of (s/nilable ::cron-task))) - -(defmethod ig/pre-init-spec ::cron [_] - (s/keys :req-un [::executor ::scheduler ::db/pool ::entries ::tasks])) - -(defmethod ig/init-key ::cron - [_ {:keys [entries tasks pool] :as cfg}] - (if (db/read-only? pool) - (l/warn :hint "scheduler not started, db is read-only") - (let [running (atom #{}) - entries (->> entries - (filter some?) - ;; If id is not defined, use the task as id. - (map (fn [{:keys [id task] :as item}] - (if (some? id) - (assoc item :id (d/name id)) - (assoc item :id (d/name task))))) - (map (fn [{:keys [task] :as item}] - (let [f (get tasks task)] - (when-not f - (ex/raise :type :internal - :code :task-not-found - :hint (str/fmt "task %s not configured" task))) - (-> item - (dissoc :task) - (assoc :fn f)))))) - - cfg (assoc cfg :entries entries :running running)] - - (l/info :hint "cron initialized" :tasks (count entries)) - (synchronize-cron-entries! cfg) - - (->> (filter some? entries) - (run! (partial schedule-cron-task cfg))) - - (reify - clojure.lang.IDeref - (deref [_] @running) - - java.lang.AutoCloseable - (close [_] - (doseq [item @running] - (when-not (.isDone ^Future item) - (.cancel ^Future item true)))))))) - - -(defmethod ig/halt-key! ::cron - [_ instance] - (when instance - (.close ^java.lang.AutoCloseable instance))) - -(def sql:upsert-cron-task - "insert into scheduled_task (id, cron_expr) - values (?, ?) - on conflict (id) - do update set cron_expr=?") - -(defn- synchronize-cron-entries! - [{:keys [pool entries]}] - (db/with-atomic [conn pool] - (doseq [{:keys [id cron]} entries] - (l/trace :hint "register cron task" :id id :cron (str cron)) - (db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)])))) - -(def sql:lock-cron-task - "select id from scheduled_task where id=? for update skip locked") - -(defn- execute-cron-task - [{:keys [executor pool] :as cfg} {:keys [id] :as task}] - (letfn [(run-task [conn] - (when (db/exec-one! conn [sql:lock-cron-task (d/name id)]) - (l/trace :hint "execute cron task" :id id) - ((:fn task) task))) - - (handle-task [] - (try - (db/with-atomic [conn pool] - (run-task conn)) - (catch Throwable cause - (l/error :hint "unhandled exception on scheduled task" - ::l/context (get-error-context cause task) - :task-id id - :cause cause))))] - - (px/run! executor handle-task) - (px/run! executor #(schedule-cron-task cfg task)) - nil)) - -(defn- ms-until-valid - [cron] - (s/assert dt/cron? cron) - (let [now (dt/now) - next (dt/next-valid-instant-from cron now)] - (inst-ms (dt/diff now next)))) - -(def ^:private - xf-without-done - (remove #(.isDone ^Future %))) - -(defn- schedule-cron-task - [{:keys [scheduler running] :as cfg} {:keys [cron] :as task}] - (let [ft (px/schedule! scheduler - (ms-until-valid cron) - (partial execute-cron-task cfg task))] - (swap! running #(into #{ft} xf-without-done %)))) - -;; --- INSTRUMENTATION - -(defn- wrap-task-handler - [metrics tname f] - (let [labels (into-array String [tname])] - (fn [params] - (let [start (System/nanoTime)] - (try - (f params) - (finally - (mtx/run! metrics - {:id :tasks-timing - :val (/ (- (System/nanoTime) start) 1000000) - :labels labels}))))))) - -(defmethod ig/pre-init-spec ::registry [_] - (s/keys :req-un [::mtx/metrics ::tasks])) - -(defmethod ig/init-key ::registry - [_ {:keys [metrics tasks]}] - (l/info :hint "registry initialized" :tasks (count tasks)) - (reduce-kv (fn [res k v] - (let [tname (name k)] - (l/debug :hint "register task" :name tname) - (assoc res k (wrap-task-handler metrics tname v)))) - {} - tasks)) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 3e12312ed..e5ff49846 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -68,7 +68,7 @@ :thumbnail-uri "test" :path (-> "backend_tests/test_files/template.penpot" io/resource fs/path)}] system (-> (merge main/system-config main/worker-config) - (assoc-in [:app.redis/redis :uri] (:redis-uri config)) + (assoc-in [:app.redis/redis :app.redis/uri] (:redis-uri config)) (assoc-in [:app.db/pool :uri] (:database-uri config)) (assoc-in [:app.db/pool :username] (:database-username config)) (assoc-in [:app.db/pool :password] (:database-password config)) @@ -324,7 +324,7 @@ (run-task! name {})) ([name params] (let [tasks (:app.worker/registry *system*)] - (let [task-fn (get tasks name)] + (let [task-fn (get tasks (d/name name))] (task-fn params))))) ;; --- UTILS diff --git a/backend/test/backend_tests/tasks_telemetry_test.clj b/backend/test/backend_tests/tasks_telemetry_test.clj index 7ff727b8b..43e8a59eb 100644 --- a/backend/test/backend_tests/tasks_telemetry_test.clj +++ b/backend/test/backend_tests/tasks_telemetry_test.clj @@ -20,12 +20,10 @@ (t/deftest test-base-report-data-structure (with-mocks [mock {:target 'app.tasks.telemetry/send! :return nil}] - (let [task-fn (-> th/*system* :app.worker/registry :telemetry) - prof (th/create-profile* 1 {:is-active true - :props {:newsletter-news true}})] + (let [prof (th/create-profile* 1 {:is-active true + :props {:newsletter-news true}})] - ;; run the task - (task-fn {:send? true :enabled? true}) + (th/run-task! :telemetry {:send? true :enabled? true}) (t/is (:called? @mock)) (let [[_ data] (-> @mock :call-args)] diff --git a/common/deps.edn b/common/deps.edn index 2ffa93000..435fffa59 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -23,7 +23,7 @@ com.cognitect/transit-cljs {:mvn/version "0.8.280"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/promesa {:mvn/version "9.0.507"} + funcool/promesa {:mvn/version "9.2.542"} funcool/cuerdas {:mvn/version "2022.06.16-403"} lambdaisland/uri {:mvn/version "1.13.95" diff --git a/common/src/app/common/data.cljc b/common/src/app/common/data.cljc index 8e4424655..d2461675e 100644 --- a/common/src/app/common/data.cljc +++ b/common/src/app/common/data.cljc @@ -5,7 +5,8 @@ ;; Copyright (c) KALEIDOS INC (ns app.common.data - "Data manipulation and query helper functions." + "A collection if helpers for working with data structures and other + data resources." (:refer-clojure :exclude [read-string hash-map merge name update-vals parse-double group-by iteration concat mapcat]) #?(:cljs @@ -22,7 +23,9 @@ [linked.set :as lks]) #?(:clj - (:import linked.set.LinkedSet))) + (:import + linked.set.LinkedSet + java.lang.AutoCloseable))) (def boolean-or-nil? (some-fn nil? boolean?)) @@ -697,3 +700,16 @@ (map (fn [key] [key (delay (generator-fn key))])) keys)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Util protocols +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defprotocol ICloseable + :extend-via-metadata true + (close! [_] "Close the resource.")) + +#?(:clj + (extend-protocol ICloseable + AutoCloseable + (close! [this] (.close this)))) diff --git a/common/src/app/common/data/macros.cljc b/common/src/app/common/data/macros.cljc index 6c93dab85..0d204e7ef 100644 --- a/common/src/app/common/data/macros.cljc +++ b/common/src/app/common/data/macros.cljc @@ -7,7 +7,7 @@ #_:clj-kondo/ignore (ns app.common.data.macros "Data retrieval & manipulation specific macros." - (:refer-clojure :exclude [get-in select-keys str]) + (:refer-clojure :exclude [get-in select-keys str with-open]) #?(:cljs (:require-macros [app.common.data.macros])) (:require #?(:clj [clojure.core :as c] @@ -94,5 +94,16 @@ [s & params] `(str/ffmt ~s ~@params)) - - +(defmacro with-open + [bindings & body] + {:pre [(vector? bindings) + (even? (count bindings)) + (pos? (count bindings))]} + (reduce (fn [acc bindings] + `(let ~(vec bindings) + (try + ~acc + (finally + (d/close! ~(first bindings)))))) + `(do ~@body) + (reverse (partition 2 bindings)))) diff --git a/common/src/app/common/exceptions.cljc b/common/src/app/common/exceptions.cljc index e13628515..11a1d432f 100644 --- a/common/src/app/common/exceptions.cljc +++ b/common/src/app/common/exceptions.cljc @@ -50,6 +50,10 @@ [& exprs] `(try* (^:once fn* [] ~@exprs) identity)) +(defmacro try! + [& exprs] + `(try* (^:once fn* [] ~@exprs) identity)) + (defn with-always "A helper that evaluates an exptession independently if the body raises exception or not."