diff --git a/backend/deps.edn b/backend/deps.edn index fffaacf9f..d0c322f80 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -14,13 +14,13 @@ org.apache.logging.log4j/log4j-web {:mvn/version "2.13.3"} org.apache.logging.log4j/log4j-jul {:mvn/version "2.13.3"} org.apache.logging.log4j/log4j-slf4j-impl {:mvn/version "2.13.3"} + org.slf4j/slf4j-api {:mvn/version "1.7.30"} io.prometheus/simpleclient {:mvn/version "0.9.0"} io.prometheus/simpleclient_hotspot {:mvn/version "0.9.0"} io.prometheus/simpleclient_httpserver {:mvn/version "0.9.0"} selmer/selmer {:mvn/version "1.12.28"} - expound/expound {:mvn/version "0.8.5"} com.cognitect/transit-clj {:mvn/version "1.0.324"} @@ -28,9 +28,9 @@ java-http-clj/java-http-clj {:mvn/version "0.4.1"} info.sunng/ring-jetty9-adapter {:mvn/version "0.14.0"} - seancorfield/next.jdbc {:mvn/version "1.1.581"} + seancorfield/next.jdbc {:mvn/version "1.1.582"} metosin/reitit-ring {:mvn/version "0.5.5"} - org.postgresql/postgresql {:mvn/version "42.2.14"} + org.postgresql/postgresql {:mvn/version "42.2.15"} com.zaxxer/HikariCP {:mvn/version "3.4.5"} funcool/datoteka {:mvn/version "1.2.0"} diff --git a/backend/resources/log4j2.xml b/backend/resources/log4j2.xml index ffda464ec..df7bc897e 100644 --- a/backend/resources/log4j2.xml +++ b/backend/resources/log4j2.xml @@ -5,42 +5,31 @@ - + - - - - - - - - - - - - + + - - - + - + + diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 42a732b5b..f0d90af36 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -9,23 +9,41 @@ (ns app.worker (:require - [clojure.spec.alpha :as s] - [clojure.tools.logging :as log] - [mount.core :as mount :refer [defstate]] + [app.common.exceptions :as ex] [app.common.spec :as us] - [app.config :as cfg] + [app.common.uuid :as uuid] [app.db :as db] - [app.metrics :as mtx] [app.tasks.delete-object] [app.tasks.delete-profile] [app.tasks.gc] [app.tasks.remove-media] [app.tasks.sendmail] [app.tasks.trim-file] + [app.util.async :as aa] + [app.util.blob :as blob] [app.util.time :as dt] - [app.worker-impl :as impl])) + [clojure.core.async :as a] + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] + [mount.core :as mount :refer [defstate]] + [promesa.exec :as px]) + (:import + org.eclipse.jetty.util.thread.QueuedThreadPool + java.util.concurrent.ExecutorService + java.util.concurrent.Executors + java.util.concurrent.Executor + java.time.Duration + java.time.Instant + java.util.Date)) -;; --- State initialization +(declare start-scheduler-worker!) +(declare start-worker!) +(declare thread-pool) +(declare stop!) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Entry Point (state initialization) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (def ^:private tasks {"delete-profile" #'app.tasks.delete-profile/handler @@ -44,20 +62,337 @@ (defstate executor - :start (impl/thread-pool {:idle-timeout 10000 + :start (thread-pool {:idle-timeout 10000 :min-threads 0 :max-threads 256}) - :stop (impl/stop! executor)) + :stop (stop! executor)) (defstate worker - :start (impl/start-worker! + :start (start-worker! {:tasks tasks :name "worker1" :batch-size 1 :executor executor}) - :stop (impl/stop! worker)) + :stop (stop! worker)) (defstate scheduler-worker - :start (impl/start-scheduler-worker! {:schedule schedule + :start (start-scheduler-worker! {:schedule schedule :executor executor}) - :stop (impl/stop! scheduler-worker)) + :stop (stop! scheduler-worker)) + + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Worker Impl +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def ^:private + sql:mark-as-retry + "update task + set scheduled_at = clock_timestamp() + '10 seconds'::interval, + modified_at = clock_timestamp(), + error = ?, + status = 'retry', + retry_num = retry_num + ? + where id = ?") + +(defn- mark-as-retry + [conn {:keys [task error inc-by] + :or {inc-by 1}}] + (let [explain (ex-message error) + sqlv [sql:mark-as-retry explain inc-by (:id task)]] + (db/exec-one! conn sqlv) + nil)) + +(defn- mark-as-failed + [conn {:keys [task error]}] + (let [explain (ex-message error)] + (db/update! conn :task + {:error explain + :modified-at (dt/now) + :status "failed"} + {:id (:id task)}) + nil)) + +(defn- mark-as-completed + [conn {:keys [task] :as opts}] + (let [now (dt/now)] + (db/update! conn :task + {:completed-at now + :modified-at now + :status "completed"} + {:id (:id task)}) + nil)) + +(defn- decode-task-row + [{:keys [props] :as row}] + (when row + (cond-> row + (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))))) + +(defn- handle-task + [tasks {:keys [name] :as item}] + (let [task-fn (get tasks name)] + (if task-fn + (task-fn item) + (do + (log/warn "no task handler found for" (pr-str name)) + nil)))) + +(defn- run-task + [{:keys [tasks conn]} item] + (try + (log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item)) + (handle-task tasks item) + {:status :completed :task item} + (catch Exception e + (let [data (ex-data e)] + (cond + (and (= ::retry (:type data)) + (= ::noop (:strategy data))) + {:status :retry :task item :error e :inc-by 0} + + (and (< (:retry-num item) + (:max-retries item)) + (= ::retry (:type data))) + {:status :retry :task item :error e} + + :else + (do + (log/errorf e "Unhandled exception on task '%s' (retry: %s)\nProps: %s" + (:name item) (:retry-num item) (pr-str (:props item))) + (if (>= (:retry-num item) (:max-retries item)) + {:status :failed :task item :error e} + {:status :retry :task item :error e}))))) + (finally + (log/debugf "Finished task '%s/%s/%s'." (:name item) (:id item) (:retry-num item))))) + +(def ^:private + sql:select-next-tasks + "select * from task as t + where t.scheduled_at <= now() + and t.queue = ? + and (t.status = 'new' or t.status = 'retry') + order by t.priority desc, t.scheduled_at + limit ? + for update skip locked") + +(defn- event-loop-fn* + [{:keys [tasks executor batch-size] :as opts}] + (db/with-atomic [conn db/pool] + (let [queue (:queue opts "default") + items (->> (db/exec! conn [sql:select-next-tasks queue batch-size]) + (map decode-task-row) + (seq)) + opts (assoc opts :conn conn)] + + (if (nil? items) + ::empty + (let [results (->> items + (map #(partial run-task opts %)) + (map #(px/submit! executor %)))] + (doseq [res results] + (let [res (deref res)] + (case (:status res) + :retry (mark-as-retry conn res) + :failed (mark-as-failed conn res) + :completed (mark-as-completed conn res)))) + ::handled))))) + +(defn- event-loop-fn + [{:keys [executor] :as opts}] + (aa/thread-call executor #(event-loop-fn* opts))) + +(s/def ::batch-size ::us/integer) +(s/def ::poll-interval ::us/integer) +(s/def ::fn (s/or :var var? :fn fn?)) +(s/def ::tasks (s/map-of string? ::fn)) + +(s/def ::start-worker-params + (s/keys :req-un [::tasks ::aa/executor ::batch-size] + :opt-un [::poll-interval])) + +(defn start-worker! + [{:keys [poll-interval executor] + :or {poll-interval 5000} + :as opts}] + (us/assert ::start-worker-params opts) + (log/infof "Starting worker '%s' on queue '%s'." + (:name opts "anonymous") + (:queue opts "default")) + (let [cch (a/chan 1)] + (a/go-loop [] + (let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)] + (cond + ;; Terminate the loop if close channel is closed or + ;; event-loop-fn returns nil. + (or (= port cch) (nil? val)) + (log/infof "Stop condition found. Shutdown worker: '%s'" + (:name opts "anonymous")) + + (db/pool-closed? db/pool) + (do + (log/info "Worker eventloop is aborted because pool is closed.") + (a/close! cch)) + + (and (instance? java.sql.SQLException val) + (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val))) + (do + (log/error "Connection error, trying resume in some instants.") + (a/string + [error] + (with-out-str + (.printStackTrace ^Throwable error (java.io.PrintWriter. *out*)))) + +(defn- execute-scheduled-task + [{:keys [scheduler executor] :as opts} {:keys [id cron] :as task}] + (letfn [(run-task [conn] + (try + (when (db/exec-one! conn [sql:lock-scheduled-task id]) + (log/info "Executing scheduled task" id) + ((:fn task) task)) + (catch Exception e + e))) + + (handle-task* [conn] + (let [result (run-task conn)] + (if (instance? Throwable result) + (do + (log/warnf result "Unhandled exception on scheduled task '%s'." id) + (db/insert! conn :scheduled-task-history + {:id (uuid/next) + :task-id id + :is-error true + :reason (exception->string result)})) + (db/insert! conn :scheduled-task-history + {:id (uuid/next) + :task-id id})))) + (handle-task [] + (db/with-atomic [conn db/pool] + (handle-task* conn)))] + + (try + (px/run! executor handle-task) + (finally + (schedule-task! opts task))))) + +(defn ms-until-valid + [cron] + (s/assert dt/cron? cron) + (let [^Instant now (dt/now) + ^Instant next (dt/next-valid-instant-from cron now)] + (inst-ms (dt/duration-between now next)))) + +(defn- schedule-task! + [{:keys [scheduler] :as opts} {:keys [cron] :as task}] + (let [ms (ms-until-valid cron)] + (px/schedule! scheduler ms (partial execute-scheduled-task opts task)))) + +(s/def ::fn (s/or :var var? :fn fn?)) +(s/def ::id string?) +(s/def ::cron dt/cron?) +(s/def ::props (s/nilable map?)) +(s/def ::scheduled-task + (s/keys :req-un [::id ::cron ::fn] + :opt-un [::props])) + +(s/def ::schedule (s/coll-of ::scheduled-task)) +(s/def ::start-scheduler-worker-params + (s/keys :req-un [::schedule])) + +(defn start-scheduler-worker! + [{:keys [schedule] :as opts}] + (us/assert ::start-scheduler-worker-params opts) + (let [scheduler (Executors/newScheduledThreadPool (int 1)) + opts (assoc opts :scheduler scheduler)] + (synchronize-schedule! schedule) + (run! (partial schedule-task! opts) schedule) + (reify + java.lang.AutoCloseable + (close [_] + (.shutdownNow ^ExecutorService scheduler))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Thread Pool +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn thread-pool + ([] (thread-pool {})) + ([{:keys [min-threads max-threads idle-timeout name] + :or {min-threads 0 max-threads 128 idle-timeout 60000}}] + (let [executor (QueuedThreadPool. max-threads min-threads)] + (.setName executor (or name "default-tp")) + (.start executor) + executor))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Helpers +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn stop! + [o] + (cond + (instance? java.lang.AutoCloseable o) + (.close ^java.lang.AutoCloseable o) + + (instance? org.eclipse.jetty.util.component.ContainerLifeCycle o) + (.stop ^org.eclipse.jetty.util.component.ContainerLifeCycle o) + + :else + (ex/raise :type :not-implemented))) diff --git a/backend/src/app/worker_impl.clj b/backend/src/app/worker_impl.clj deleted file mode 100644 index 86c20de2e..000000000 --- a/backend/src/app/worker_impl.clj +++ /dev/null @@ -1,357 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; This Source Code Form is "Incompatible With Secondary Licenses", as -;; defined by the Mozilla Public License, v. 2.0. -;; -;; Copyright (c) 2020 UXBOX Labs SL - -(ns app.worker-impl - (:require - [cuerdas.core :as str] - [clojure.core.async :as a] - [clojure.spec.alpha :as s] - [clojure.tools.logging :as log] - [promesa.exec :as px] - [app.common.exceptions :as ex] - [app.common.spec :as us] - [app.common.uuid :as uuid] - [app.config :as cfg] - [app.db :as db] - [app.util.async :as aa] - [app.util.blob :as blob] - [app.util.time :as dt]) - (:import - org.eclipse.jetty.util.thread.QueuedThreadPool - java.util.concurrent.ExecutorService - java.util.concurrent.Executors - java.util.concurrent.Executor - java.time.Duration - java.time.Instant - java.util.Date)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Tasks -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(def ^:private - sql:mark-as-retry - "update task - set scheduled_at = clock_timestamp() + '10 seconds'::interval, - modified_at = clock_timestamp(), - error = ?, - status = 'retry', - retry_num = retry_num + ? - where id = ?") - -(defn- mark-as-retry - [conn {:keys [task error inc-by] - :or {inc-by 1}}] - (let [explain (ex-message error) - sqlv [sql:mark-as-retry explain inc-by (:id task)]] - (db/exec-one! conn sqlv) - nil)) - -(defn- mark-as-failed - [conn {:keys [task error]}] - (let [explain (ex-message error)] - (db/update! conn :task - {:error explain - :modified-at (dt/now) - :status "failed"} - {:id (:id task)}) - nil)) - -(defn- mark-as-completed - [conn {:keys [task] :as opts}] - (let [now (dt/now)] - (db/update! conn :task - {:completed-at now - :modified-at now - :status "completed"} - {:id (:id task)}) - nil)) - -(defn- decode-task-row - [{:keys [props] :as row}] - (when row - (cond-> row - (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))))) - - -(defn- log-task-error - [item err] - (log/error (str/format "Unhandled exception on task '%s' (retry: %s)\n" (:name item) (:retry-num item)) - (str/format "Props: %s\n" (pr-str (:props item))) - (with-out-str - (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*))))) - -(defn- handle-task - [tasks {:keys [name] :as item}] - (let [task-fn (get tasks name)] - (if task-fn - (task-fn item) - (do - (log/warn "no task handler found for" (pr-str name)) - nil)))) - -(defn- run-task - [{:keys [tasks conn]} item] - (try - (log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item)) - (handle-task tasks item) - {:status :completed :task item} - (catch Exception e - (let [data (ex-data e)] - (cond - (and (= ::retry (:type data)) - (= ::noop (:strategy data))) - {:status :retry :task item :error e :inc-by 0} - - (and (< (:retry-num item) - (:max-retries item)) - (= ::retry (:type data))) - {:status :retry :task item :error e} - - :else - (do - (log/errorf e "Unhandled exception on task '%s' (retry: %s)\nProps: %s" - (:name item) (:retry-num item) (pr-str (:props item))) - (if (>= (:retry-num item) (:max-retries item)) - {:status :failed :task item :error e} - {:status :retry :task item :error e}))))) - (finally - (log/debugf "Finished task '%s/%s/%s'." (:name item) (:id item) (:retry-num item))))) - -(def ^:private - sql:select-next-tasks - "select * from task as t - where t.scheduled_at <= now() - and t.queue = ? - and (t.status = 'new' or t.status = 'retry') - order by t.priority desc, t.scheduled_at - limit ? - for update skip locked") - -(defn- event-loop-fn* - [{:keys [tasks executor batch-size] :as opts}] - (db/with-atomic [conn db/pool] - (let [queue (:queue opts "default") - items (->> (db/exec! conn [sql:select-next-tasks queue batch-size]) - (map decode-task-row) - (seq)) - opts (assoc opts :conn conn)] - - (if (nil? items) - ::empty - (let [results (->> items - (map #(partial run-task opts %)) - (map #(px/submit! executor %)))] - (doseq [res results] - (let [res (deref res)] - (case (:status res) - :retry (mark-as-retry conn res) - :failed (mark-as-failed conn res) - :completed (mark-as-completed conn res)))) - ::handled))))) - -(defn- event-loop-fn - [{:keys [executor] :as opts}] - (aa/thread-call executor #(event-loop-fn* opts))) - -(s/def ::batch-size ::us/integer) -(s/def ::poll-interval ::us/integer) -(s/def ::fn (s/or :var var? :fn fn?)) -(s/def ::tasks (s/map-of string? ::fn)) - -(s/def ::start-worker-params - (s/keys :req-un [::tasks ::aa/executor ::batch-size] - :opt-un [::poll-interval])) - -(defn start-worker! - [{:keys [poll-interval executor] - :or {poll-interval 5000} - :as opts}] - (us/assert ::start-worker-params opts) - (log/infof "Starting worker '%s' on queue '%s'." - (:name opts "anonymous") - (:queue opts "default")) - (let [cch (a/chan 1)] - (a/go-loop [] - (let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)] - (cond - ;; Terminate the loop if close channel is closed or - ;; event-loop-fn returns nil. - (or (= port cch) (nil? val)) - (log/infof "Stop condition found. Shutdown worker: '%s'" - (:name opts "anonymous")) - - (db/pool-closed? db/pool) - (do - (log/info "Worker eventloop is aborted because pool is closed.") - (a/close! cch)) - - (and (instance? java.sql.SQLException val) - (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val))) - (do - (log/error "Connection error, trying resume in some instants.") - (a/string - [error] - (with-out-str - (.printStackTrace ^Throwable error (java.io.PrintWriter. *out*)))) - -(defn- execute-scheduled-task - [{:keys [scheduler executor] :as opts} {:keys [id cron] :as task}] - (letfn [(run-task [conn] - (try - (when (db/exec-one! conn [sql:lock-scheduled-task id]) - (log/info "Executing scheduled task" id) - ((:fn task) task)) - (catch Exception e - e))) - - (handle-task* [conn] - (let [result (run-task conn)] - (if (instance? Throwable result) - (do - (log/warnf result "Unhandled exception on scheduled task '%s'." id) - (db/insert! conn :scheduled-task-history - {:id (uuid/next) - :task-id id - :is-error true - :reason (exception->string result)})) - (db/insert! conn :scheduled-task-history - {:id (uuid/next) - :task-id id})))) - (handle-task [] - (db/with-atomic [conn db/pool] - (handle-task* conn)))] - - (try - (px/run! executor handle-task) - (finally - (schedule-task! opts task))))) - -(defn ms-until-valid - [cron] - (s/assert dt/cron? cron) - (let [^Instant now (dt/now) - ^Instant next (dt/next-valid-instant-from cron now)] - (inst-ms (dt/duration-between now next)))) - -(defn- schedule-task! - [{:keys [scheduler] :as opts} {:keys [cron] :as task}] - (let [ms (ms-until-valid cron)] - (px/schedule! scheduler ms (partial execute-scheduled-task opts task)))) - -(s/def ::fn (s/or :var var? :fn fn?)) -(s/def ::id string?) -(s/def ::cron dt/cron?) -(s/def ::props (s/nilable map?)) -(s/def ::scheduled-task - (s/keys :req-un [::id ::cron ::fn] - :opt-un [::props])) - -(s/def ::schedule (s/coll-of ::scheduled-task)) -(s/def ::start-scheduler-worker-params - (s/keys :req-un [::schedule])) - -(defn start-scheduler-worker! - [{:keys [schedule] :as opts}] - (us/assert ::start-scheduler-worker-params opts) - (let [scheduler (Executors/newScheduledThreadPool (int 1)) - opts (assoc opts :scheduler scheduler)] - (synchronize-schedule! schedule) - (run! (partial schedule-task! opts) schedule) - (reify - java.lang.AutoCloseable - (close [_] - (.shutdownNow ^ExecutorService scheduler))))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Thread Pool -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn thread-pool - ([] (thread-pool {})) - ([{:keys [min-threads max-threads idle-timeout name] - :or {min-threads 0 max-threads 128 idle-timeout 60000}}] - (let [executor (QueuedThreadPool. max-threads min-threads)] - (.setName executor (or name "default-tp")) - (.start executor) - executor))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Helpers -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn stop! - [o] - (cond - (instance? java.lang.AutoCloseable o) - (.close ^java.lang.AutoCloseable o) - - (instance? org.eclipse.jetty.util.component.ContainerLifeCycle o) - (.stop ^org.eclipse.jetty.util.component.ContainerLifeCycle o) - - :else - (ex/raise :type :not-implemented))) - - diff --git a/frontend/src/app/main/ui/dashboard.cljs b/frontend/src/app/main/ui/dashboard.cljs index 77f185008..3dc6356db 100644 --- a/frontend/src/app/main/ui/dashboard.cljs +++ b/frontend/src/app/main/ui/dashboard.cljs @@ -59,7 +59,7 @@ (let [profile (mf/deref refs/profile) page (get-in route [:data :name]) {:keys [search-term team-id project-id] :as params} - (parse-params route profile)] + (parse-params route profile)] [:* [:& global-notifications {:profile profile}] [:section.dashboard-layout