From b1b3ad61a53cc4e6a8d0c090be7a4dbe31a6e63c Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 6 Aug 2020 16:05:14 +0200 Subject: [PATCH] :recycle: Refactor task worker. --- backend/bin/repl | 3 +- backend/resources/log4j2.xml | 6 +- .../migrations/0015-improve-tasks-tables.sql | 29 +++ .../0016-truncate-and-alter-tokens-table.sql | 3 + backend/src/uxbox/db.clj | 90 ++++++- backend/src/uxbox/emails.clj | 1 + backend/src/uxbox/http/session.clj | 2 +- backend/src/uxbox/migrations.clj | 10 +- backend/src/uxbox/services/tokens.clj | 15 +- backend/src/uxbox/tasks.clj | 24 +- backend/src/uxbox/tasks/impl.clj | 229 +++++++++++------- backend/src/uxbox/util/async.clj | 31 +++ backend/src/uxbox/util/transit.clj | 5 + 13 files changed, 317 insertions(+), 131 deletions(-) create mode 100644 backend/resources/migrations/0015-improve-tasks-tables.sql create mode 100644 backend/resources/migrations/0016-truncate-and-alter-tokens-table.sql create mode 100644 backend/src/uxbox/util/async.clj diff --git a/backend/bin/repl b/backend/bin/repl index 697f13c1c..7bf7d1beb 100755 --- a/backend/bin/repl +++ b/backend/bin/repl @@ -1,3 +1,4 @@ #!/usr/bin/env bash set -ex -clojure -Ojmx-remote -A:dev -e "(set! *warn-on-reflection* true)" -m rebel-readline.main +#clojure -Ojmx-remote -A:dev -e "(set! *warn-on-reflection* true)" -m rebel-readline.main +clojure -Ojmx-remote -A:dev -m rebel-readline.main diff --git a/backend/resources/log4j2.xml b/backend/resources/log4j2.xml index 12cbb25f7..24e949c4a 100644 --- a/backend/resources/log4j2.xml +++ b/backend/resources/log4j2.xml @@ -6,8 +6,10 @@ - - + + + + diff --git a/backend/resources/migrations/0015-improve-tasks-tables.sql b/backend/resources/migrations/0015-improve-tasks-tables.sql new file mode 100644 index 000000000..5427757cf --- /dev/null +++ b/backend/resources/migrations/0015-improve-tasks-tables.sql @@ -0,0 +1,29 @@ +DROP TABLE task; + +CREATE TABLE task ( + id uuid DEFAULT uuid_generate_v4(), + created_at timestamptz NOT NULL DEFAULT clock_timestamp(), + modified_at timestamptz NOT NULL DEFAULT clock_timestamp(), + completed_at timestamptz NULL DEFAULT NULL, + scheduled_at timestamptz NOT NULL, + priority smallint DEFAULT 100, + + queue text NOT NULL, + + name text NOT NULL, + props jsonb NOT NULL, + + error text NULL DEFAULT NULL, + retry_num smallint NOT NULL DEFAULT 0, + max_retries smallint NOT NULL DEFAULT 3, + status text NOT NULL DEFAULT 'new', + + PRIMARY KEY (id, status) +) PARTITION BY list(status); + +CREATE TABLE task_completed partition OF task FOR VALUES IN ('completed', 'failed'); +CREATE TABLE task_default partition OF task default; + +CREATE INDEX task__scheduled_at__queue__idx + ON task (scheduled_at, queue) + WHERE status = 'new' or status = 'retry'; diff --git a/backend/resources/migrations/0016-truncate-and-alter-tokens-table.sql b/backend/resources/migrations/0016-truncate-and-alter-tokens-table.sql new file mode 100644 index 000000000..2a4e8a5b6 --- /dev/null +++ b/backend/resources/migrations/0016-truncate-and-alter-tokens-table.sql @@ -0,0 +1,3 @@ +delete from generic_token; +alter table generic_token drop column content; +alter table generic_token add column content jsonb not null; diff --git a/backend/src/uxbox/db.clj b/backend/src/uxbox/db.clj index e77e88cba..f28a7c66b 100644 --- a/backend/src/uxbox/db.clj +++ b/backend/src/uxbox/db.clj @@ -6,6 +6,7 @@ (ns uxbox.db (:require + [clojure.spec.alpha :as s] [clojure.data.json :as json] [clojure.string :as str] [clojure.tools.logging :as log] @@ -17,16 +18,23 @@ [next.jdbc.result-set :as jdbc-rs] [next.jdbc.sql :as jdbc-sql] [next.jdbc.sql.builder :as jdbc-bld] - [uxbox.metrics :as mtx] [uxbox.common.exceptions :as ex] [uxbox.config :as cfg] + [uxbox.metrics :as mtx] + [uxbox.util.time :as dt] + [uxbox.util.transit :as t] [uxbox.util.data :as data]) (:import org.postgresql.util.PGobject + org.postgresql.util.PGInterval com.zaxxer.hikari.metrics.prometheus.PrometheusMetricsTrackerFactory com.zaxxer.hikari.HikariConfig com.zaxxer.hikari.HikariDataSource)) +(def initsql + (str "SET statement_timeout = 10000;\n" + "SET idle_in_transaction_session_timeout = 30000;")) + (defn- create-datasource-config [cfg] (let [dburi (:database-uri cfg) @@ -39,17 +47,28 @@ (.setPoolName "main") (.setAutoCommit true) (.setReadOnly false) - (.setConnectionTimeout 30000) ;; 30seg - (.setValidationTimeout 5000) ;; 5seg - (.setIdleTimeout 900000) ;; 15min + (.setConnectionTimeout 8000) ;; 8seg + (.setValidationTimeout 4000) ;; 4seg + (.setIdleTimeout 300000) ;; 5min (.setMaxLifetime 900000) ;; 15min - (.setMinimumIdle 5) - (.setMaximumPoolSize 10) + (.setMinimumIdle 0) + (.setMaximumPoolSize 15) + (.setConnectionInitSql initsql) (.setMetricsTrackerFactory mfactory)) (when username (.setUsername config username)) (when password (.setPassword config password)) config)) +(defn pool? + [v] + (instance? javax.sql.DataSource v)) + +(s/def ::pool pool?) + +(defn pool-closed? + [pool] + (.isClosed ^com.zaxxer.hikari.HikariDataSource pool)) + (defn- create-pool [cfg] (let [dsc (create-datasource-config cfg)] @@ -135,6 +154,33 @@ [v] (instance? PGobject v)) +(defn pginterval? + [v] + (instance? PGInterval v)) + +(defn pginterval + [data] + (org.postgresql.util.PGInterval. ^String data)) + +(defn interval + [data] + (cond + (integer? data) + (->> (/ data 1000.0) + (format "%s seconds") + (pginterval)) + + (string? data) + (pginterval data) + + (dt/duration? data) + (->> (/ (.toMillis data) 1000.0) + (format "%s seconds") + (pginterval)) + + :else + (ex/raise :type :not-implemented))) + (defn decode-pgobject [^PGobject obj] (let [typ (.getType obj) @@ -144,6 +190,38 @@ (json/read-str val) val))) +(defn decode-json-pgobject + [^PGobject o] + (let [typ (.getType o) + val (.getValue o)] + (if (or (= typ "json") + (= typ "jsonb")) + (json/read-str val :key-fn keyword) + val))) + +(defn decode-transit-pgobject + [^PGobject o] + (let [typ (.getType o) + val (.getValue o)] + (if (or (= typ "json") + (= typ "jsonb")) + (t/decode-str val) + val))) + +(defn tjson + "Encode as transit json." + [data] + (doto (org.postgresql.util.PGobject.) + (.setType "jsonb") + (.setValue (t/encode-verbose-str data)))) + +(defn json + "Encode as plain json." + [data] + (doto (org.postgresql.util.PGobject.) + (.setType "jsonb") + (.setValue (json/write-str data)))) + ;; Instrumentation (mtx/instrument-with-counter! diff --git a/backend/src/uxbox/emails.clj b/backend/src/uxbox/emails.clj index b276ca901..14efd7a5e 100644 --- a/backend/src/uxbox/emails.clj +++ b/backend/src/uxbox/emails.clj @@ -46,6 +46,7 @@ email (email-factory data)] (tasks/submit! conn {:name "sendmail" :delay 0 + :priority 200 :props email})))) ;; --- Emails diff --git a/backend/src/uxbox/http/session.clj b/backend/src/uxbox/http/session.clj index 4765900ca..f548725be 100644 --- a/backend/src/uxbox/http/session.clj +++ b/backend/src/uxbox/http/session.clj @@ -23,7 +23,7 @@ (defn create [profile-id user-agent] - (let [id (tokens/next)] + (let [id (tokens/next-token)] (db/insert! db/pool :http-session {:id id :profile-id profile-id :user-agent user-agent}) diff --git a/backend/src/uxbox/migrations.clj b/backend/src/uxbox/migrations.clj index 4907a1dc0..4e7005753 100644 --- a/backend/src/uxbox/migrations.clj +++ b/backend/src/uxbox/migrations.clj @@ -71,7 +71,15 @@ {:desc "Refactor media storage" :name "0014-refactor-media-storage.sql" - :fn (mg/resource "migrations/0014-refactor-media-storage.sql")}]}) + :fn (mg/resource "migrations/0014-refactor-media-storage.sql")} + + {:desc "Improve and partition task related tables" + :name "0015-improve-tasks-tables" + :fn (mg/resource "migrations/0015-improve-tasks-tables.sql")} + + {:desc "Truncate & alter tokens tables" + :name "0016-truncate-and-alter-tokens-table" + :fn (mg/resource "migrations/0016-truncate-and-alter-tokens-table.sql")}]}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Entry point diff --git a/backend/src/uxbox/services/tokens.clj b/backend/src/uxbox/services/tokens.clj index 1c48e01e8..b993e04c7 100644 --- a/backend/src/uxbox/services/tokens.clj +++ b/backend/src/uxbox/services/tokens.clj @@ -8,7 +8,6 @@ ;; Copyright (c) 2020 UXBOX Labs SL (ns uxbox.services.tokens - (:refer-clojure :exclude [next]) (:require [clojure.spec.alpha :as s] [cuerdas.core :as str] @@ -16,14 +15,11 @@ [sodi.util] [uxbox.common.exceptions :as ex] [uxbox.common.spec :as us] - [uxbox.common.uuid :as uuid] - [uxbox.config :as cfg] [uxbox.util.time :as dt] - [uxbox.util.blob :as blob] [uxbox.db :as db])) -(defn next - ([] (next 64)) +(defn next-token + ([] (next-token 64)) ([n] (-> (sodi.prng/random-bytes n) (sodi.util/bytes->b64s)))) @@ -35,15 +31,16 @@ [{:keys [content] :as row}] (when row (cond-> row - content (assoc :content (blob/decode content))))) + (db/pgobject? content) + (assoc :content (db/decode-transit-pgobject content))))) (defn create! ([conn payload] (create! conn payload {})) ([conn payload {:keys [valid] :or {valid default-duration}}] - (let [token (next) + (let [token (next-token) until (dt/plus (dt/now) (dt/duration valid))] (db/insert! conn :generic-token - {:content (blob/encode payload) + {:content (db/tjson payload) :token token :valid-until until}) token))) diff --git a/backend/src/uxbox/tasks.clj b/backend/src/uxbox/tasks.clj index c46ac1c0b..132a1fd6a 100644 --- a/backend/src/uxbox/tasks.clj +++ b/backend/src/uxbox/tasks.clj @@ -23,24 +23,12 @@ [uxbox.tasks.delete-profile] [uxbox.tasks.delete-object] [uxbox.tasks.impl :as impl] - [uxbox.util.time :as dt]) - (:import - java.util.concurrent.ScheduledExecutorService - java.util.concurrent.Executors)) + [uxbox.util.time :as dt])) ;; --- Scheduler Executor Initialization -(defstate scheduler - :start (Executors/newScheduledThreadPool (int 1)) - :stop (.shutdownNow ^ScheduledExecutorService scheduler)) - ;; --- State initialization -;; TODO: missing self maintanance task; when the queue table is full -;; of completed/failed task, the performance starts degrading -;; linearly, so after some arbitrary number of tasks is processed, we -;; need to perform a maintenance and delete some old tasks. - (def ^:private tasks {"delete-profile" #'uxbox.tasks.delete-profile/handler "delete-object" #'uxbox.tasks.delete-object/handler @@ -52,14 +40,12 @@ :cron (dt/cron "1 1 */1 * * ? *") :fn #'uxbox.tasks.gc/remove-media}]) -(defstate tasks-worker - :start (impl/start-worker! {:tasks tasks - :xtor scheduler}) - :stop (impl/stop! tasks-worker)) +(defstate worker + :start (impl/start-worker! {:tasks tasks :name "worker1"}) + :stop (impl/stop! worker)) (defstate scheduler-worker - :start (impl/start-scheduler-worker! {:schedule schedule - :xtor scheduler}) + :start (impl/start-scheduler-worker! {:schedule schedule}) :stop (impl/stop! scheduler-worker)) ;; --- Public API diff --git a/backend/src/uxbox/tasks/impl.clj b/backend/src/uxbox/tasks/impl.clj index 121797e6f..90bc68310 100644 --- a/backend/src/uxbox/tasks/impl.clj +++ b/backend/src/uxbox/tasks/impl.clj @@ -10,6 +10,7 @@ (ns uxbox.tasks.impl "Async tasks implementation." (:require + [cuerdas.core :as str] [clojure.core.async :as a] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] @@ -18,6 +19,7 @@ [uxbox.common.uuid :as uuid] [uxbox.config :as cfg] [uxbox.db :as db] + [uxbox.util.async :as aa] [uxbox.util.blob :as blob] [uxbox.util.time :as dt]) (:import @@ -40,6 +42,7 @@ sql:mark-as-retry "update task set scheduled_at = clock_timestamp() + '5 seconds'::interval, + modified_at = clock_timestamp(), error = ?, status = 'retry', retry_num = retry_num + 1 @@ -57,25 +60,29 @@ (let [explain (ex-message error)] (db/update! conn :task {:error explain + :modified-at (dt/now) :status "failed"} {:id (:id task)}) nil)) (defn- mark-as-completed [conn task] - (db/update! conn :task - {:completed-at (dt/now) - :status "completed"} - {:id (:id task)}) - nil) + (let [now (dt/now)] + (db/update! conn :task + {:completed-at now + :modified-at now + :status "completed"} + {:id (:id task)}) + nil)) + (def ^:private sql:select-next-task "select * from task as t where t.scheduled_at <= now() and t.queue = ? - and (t.status = 'new' or (t.status = 'retry' and t.retry_num <= ?)) - order by t.scheduled_at + and (t.status = 'new' or t.status = 'retry') + order by t.priority desc, t.scheduled_at limit 1 for update skip locked") @@ -83,12 +90,13 @@ [{:keys [props] :as row}] (when row (cond-> row - props (assoc :props (blob/decode props))))) + (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))))) + (defn- log-task-error [item err] - (log/error "Unhandled exception on task '" (:name item) - "' (retry:" (:retry-num item) ") \n" + (log/error (str/format "Unhandled exception on task '%s' (retry: %s)\n" (:name item) (:retry-num item)) + (str/format "Props: %s\n" (pr-str (:props item))) (with-out-str (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*))))) @@ -101,38 +109,107 @@ (log/warn "no task handler found for" (pr-str name)) nil)))) -(defn- event-loop-fn - [{:keys [tasks] :as options}] - (let [queue (:queue options "default") - max-retries (:max-retries options 3)] - (db/with-atomic [conn db/pool] - (let [item (-> (db/exec-one! conn [sql:select-next-task queue max-retries]) - (decode-task-row))] - (when item - (log/info "Execute task" (:name item)) - (try - (handle-task tasks item) - (mark-as-completed conn item) - ::handled - (catch Throwable e - (log-task-error item e) - (if (>= (:retry-num item) max-retries) - (mark-as-failed conn item e) - (mark-as-retry conn item e))))))))) +(defn- run-task + [{:keys [tasks conn]} item] + (try + (log/debug (str/format "Started task '%s/%s'." (:name item) (:id item))) + (handle-task tasks item) + (log/debug (str/format "Finished task '%s/%s'." (:name item) (:id item))) + (mark-as-completed conn item) + (catch Exception e + (log-task-error item e) + (if (>= (:retry-num item) (:max-retries item)) + (mark-as-failed conn item e) + (mark-as-retry conn item e))))) -(defn- execute-worker-task - [{:keys [::stop ::xtor poll-interval] +(defn- event-loop-fn + [{:keys [tasks] :as opts}] + (aa/thread-try + (db/with-atomic [conn db/pool] + (let [queue (:queue opts "default") + item (-> (db/exec-one! conn [sql:select-next-task queue]) + (decode-task-row)) + opts (assoc opts :conn conn)] + + (cond + (nil? item) + ::empty + + (or (= "new" (:status item)) + (= "retry" (:status item))) + (do + (run-task opts item) + ::handled) + + :else + (do + (log/warn "Unexpected condition on worker event loop:" (pr-str item)) + ::handled)))))) + +(s/def ::poll-interval ::us/integer) +(s/def ::fn (s/or :var var? :fn fn?)) +(s/def ::tasks (s/map-of string? ::fn)) +(s/def ::start-worker-params + (s/keys :req-un [::tasks] + :opt-un [::poll-interval])) + +(defn start-worker! + [{:keys [poll-interval] :or {poll-interval 5000} :as opts}] - (try - (when-not @stop - (let [res (event-loop-fn opts)] - (if (= res ::handled) - (px/schedule! xtor 0 (partial execute-worker-task opts)) - (px/schedule! xtor poll-interval (partial execute-worker-task opts))))) - (catch Throwable e - (log/error "unexpected exception:" e) - (px/schedule! xtor poll-interval (partial execute-worker-task opts))))) + (us/assert ::start-worker-params opts) + (log/info (str/format "Starting worker '%s' on queue '%s'." + (:name opts "anonymous") + (:queue opts "default"))) + (let [cch (a/chan 1)] + (a/go-loop [] + (let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)] + (cond + ;; Terminate the loop if close channel is closed or + ;; event-loop-fn returns nil. + (or (= port cch) (nil? val)) + (log/info (str/format "Stop condition found. Shutdown worker: '%s'" + (:name opts "anonymous"))) + + (db/pool-closed? db/pool) + (do + (log/info "Worker eventloop is aborted because pool is closed.") + (a/close! cch)) + + (and (instance? java.sql.SQLException val) + (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val))) + (do + (log/error "Connection error, trying resume in some instants.") + (a/pginterval - [^Duration d] - (->> (/ (.toMillis d) 1000.0) - (format "%s seconds"))) - (defn submit! - [conn {:keys [name delay props queue key] - :or {delay 0 props {} queue "default"} + [conn {:keys [name delay props queue priority max-retries key] + :or {delay 0 props {} queue "default" priority 100 max-retries 3} :as options}] (us/verify ::task-options options) - (let [duration (dt/duration delay) - pginterval (duration->pginterval duration) - props (blob/encode props) - id (uuid/next)] - (log/info "Submit task" name "to be executed in" (str duration)) - (db/exec-one! conn [sql:insert-new-task - id name props queue pginterval]) + (let [duration (dt/duration delay) + interval (db/interval duration) + props (db/tjson props) + id (uuid/next)] + (log/info (str/format "Submit task '%s' to be executed in '%s'." name (str duration))) + (db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval]) id)) diff --git a/backend/src/uxbox/util/async.clj b/backend/src/uxbox/util/async.clj new file mode 100644 index 000000000..8626db83a --- /dev/null +++ b/backend/src/uxbox/util/async.clj @@ -0,0 +1,31 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) 2020 Andrey Antukh + +(ns uxbox.util.async + (:require [clojure.core.async :as a])) + +(defmacro go-try + [& body] + `(a/go + (try + ~@body + (catch Throwable e# e#)))) + +(defmacro > (encode message) (bytes->str))) +(defn encode-verbose-str + [message] + (->> (encode message {:type :json-verbose}) + (bytes->str))) + ;; --- Helpers (defn str->bytes