diff --git a/backend/bin/repl b/backend/bin/repl
index 697f13c1c..7bf7d1beb 100755
--- a/backend/bin/repl
+++ b/backend/bin/repl
@@ -1,3 +1,4 @@
#!/usr/bin/env bash
set -ex
-clojure -Ojmx-remote -A:dev -e "(set! *warn-on-reflection* true)" -m rebel-readline.main
+#clojure -Ojmx-remote -A:dev -e "(set! *warn-on-reflection* true)" -m rebel-readline.main
+clojure -Ojmx-remote -A:dev -m rebel-readline.main
diff --git a/backend/resources/log4j2.xml b/backend/resources/log4j2.xml
index 12cbb25f7..24e949c4a 100644
--- a/backend/resources/log4j2.xml
+++ b/backend/resources/log4j2.xml
@@ -6,8 +6,10 @@
-
-
+
+
+
+
diff --git a/backend/resources/migrations/0015-improve-tasks-tables.sql b/backend/resources/migrations/0015-improve-tasks-tables.sql
new file mode 100644
index 000000000..5427757cf
--- /dev/null
+++ b/backend/resources/migrations/0015-improve-tasks-tables.sql
@@ -0,0 +1,29 @@
+DROP TABLE task;
+
+CREATE TABLE task (
+ id uuid DEFAULT uuid_generate_v4(),
+ created_at timestamptz NOT NULL DEFAULT clock_timestamp(),
+ modified_at timestamptz NOT NULL DEFAULT clock_timestamp(),
+ completed_at timestamptz NULL DEFAULT NULL,
+ scheduled_at timestamptz NOT NULL,
+ priority smallint DEFAULT 100,
+
+ queue text NOT NULL,
+
+ name text NOT NULL,
+ props jsonb NOT NULL,
+
+ error text NULL DEFAULT NULL,
+ retry_num smallint NOT NULL DEFAULT 0,
+ max_retries smallint NOT NULL DEFAULT 3,
+ status text NOT NULL DEFAULT 'new',
+
+ PRIMARY KEY (id, status)
+) PARTITION BY list(status);
+
+CREATE TABLE task_completed partition OF task FOR VALUES IN ('completed', 'failed');
+CREATE TABLE task_default partition OF task default;
+
+CREATE INDEX task__scheduled_at__queue__idx
+ ON task (scheduled_at, queue)
+ WHERE status = 'new' or status = 'retry';
diff --git a/backend/resources/migrations/0016-truncate-and-alter-tokens-table.sql b/backend/resources/migrations/0016-truncate-and-alter-tokens-table.sql
new file mode 100644
index 000000000..2a4e8a5b6
--- /dev/null
+++ b/backend/resources/migrations/0016-truncate-and-alter-tokens-table.sql
@@ -0,0 +1,3 @@
+delete from generic_token;
+alter table generic_token drop column content;
+alter table generic_token add column content jsonb not null;
diff --git a/backend/src/uxbox/db.clj b/backend/src/uxbox/db.clj
index e77e88cba..f28a7c66b 100644
--- a/backend/src/uxbox/db.clj
+++ b/backend/src/uxbox/db.clj
@@ -6,6 +6,7 @@
(ns uxbox.db
(:require
+ [clojure.spec.alpha :as s]
[clojure.data.json :as json]
[clojure.string :as str]
[clojure.tools.logging :as log]
@@ -17,16 +18,23 @@
[next.jdbc.result-set :as jdbc-rs]
[next.jdbc.sql :as jdbc-sql]
[next.jdbc.sql.builder :as jdbc-bld]
- [uxbox.metrics :as mtx]
[uxbox.common.exceptions :as ex]
[uxbox.config :as cfg]
+ [uxbox.metrics :as mtx]
+ [uxbox.util.time :as dt]
+ [uxbox.util.transit :as t]
[uxbox.util.data :as data])
(:import
org.postgresql.util.PGobject
+ org.postgresql.util.PGInterval
com.zaxxer.hikari.metrics.prometheus.PrometheusMetricsTrackerFactory
com.zaxxer.hikari.HikariConfig
com.zaxxer.hikari.HikariDataSource))
+(def initsql
+ (str "SET statement_timeout = 10000;\n"
+ "SET idle_in_transaction_session_timeout = 30000;"))
+
(defn- create-datasource-config
[cfg]
(let [dburi (:database-uri cfg)
@@ -39,17 +47,28 @@
(.setPoolName "main")
(.setAutoCommit true)
(.setReadOnly false)
- (.setConnectionTimeout 30000) ;; 30seg
- (.setValidationTimeout 5000) ;; 5seg
- (.setIdleTimeout 900000) ;; 15min
+ (.setConnectionTimeout 8000) ;; 8seg
+ (.setValidationTimeout 4000) ;; 4seg
+ (.setIdleTimeout 300000) ;; 5min
(.setMaxLifetime 900000) ;; 15min
- (.setMinimumIdle 5)
- (.setMaximumPoolSize 10)
+ (.setMinimumIdle 0)
+ (.setMaximumPoolSize 15)
+ (.setConnectionInitSql initsql)
(.setMetricsTrackerFactory mfactory))
(when username (.setUsername config username))
(when password (.setPassword config password))
config))
+(defn pool?
+ [v]
+ (instance? javax.sql.DataSource v))
+
+(s/def ::pool pool?)
+
+(defn pool-closed?
+ [pool]
+ (.isClosed ^com.zaxxer.hikari.HikariDataSource pool))
+
(defn- create-pool
[cfg]
(let [dsc (create-datasource-config cfg)]
@@ -135,6 +154,33 @@
[v]
(instance? PGobject v))
+(defn pginterval?
+ [v]
+ (instance? PGInterval v))
+
+(defn pginterval
+ [data]
+ (org.postgresql.util.PGInterval. ^String data))
+
+(defn interval
+ [data]
+ (cond
+ (integer? data)
+ (->> (/ data 1000.0)
+ (format "%s seconds")
+ (pginterval))
+
+ (string? data)
+ (pginterval data)
+
+ (dt/duration? data)
+ (->> (/ (.toMillis data) 1000.0)
+ (format "%s seconds")
+ (pginterval))
+
+ :else
+ (ex/raise :type :not-implemented)))
+
(defn decode-pgobject
[^PGobject obj]
(let [typ (.getType obj)
@@ -144,6 +190,38 @@
(json/read-str val)
val)))
+(defn decode-json-pgobject
+ [^PGobject o]
+ (let [typ (.getType o)
+ val (.getValue o)]
+ (if (or (= typ "json")
+ (= typ "jsonb"))
+ (json/read-str val :key-fn keyword)
+ val)))
+
+(defn decode-transit-pgobject
+ [^PGobject o]
+ (let [typ (.getType o)
+ val (.getValue o)]
+ (if (or (= typ "json")
+ (= typ "jsonb"))
+ (t/decode-str val)
+ val)))
+
+(defn tjson
+ "Encode as transit json."
+ [data]
+ (doto (org.postgresql.util.PGobject.)
+ (.setType "jsonb")
+ (.setValue (t/encode-verbose-str data))))
+
+(defn json
+ "Encode as plain json."
+ [data]
+ (doto (org.postgresql.util.PGobject.)
+ (.setType "jsonb")
+ (.setValue (json/write-str data))))
+
;; Instrumentation
(mtx/instrument-with-counter!
diff --git a/backend/src/uxbox/emails.clj b/backend/src/uxbox/emails.clj
index b276ca901..14efd7a5e 100644
--- a/backend/src/uxbox/emails.clj
+++ b/backend/src/uxbox/emails.clj
@@ -46,6 +46,7 @@
email (email-factory data)]
(tasks/submit! conn {:name "sendmail"
:delay 0
+ :priority 200
:props email}))))
;; --- Emails
diff --git a/backend/src/uxbox/http/session.clj b/backend/src/uxbox/http/session.clj
index 4765900ca..f548725be 100644
--- a/backend/src/uxbox/http/session.clj
+++ b/backend/src/uxbox/http/session.clj
@@ -23,7 +23,7 @@
(defn create
[profile-id user-agent]
- (let [id (tokens/next)]
+ (let [id (tokens/next-token)]
(db/insert! db/pool :http-session {:id id
:profile-id profile-id
:user-agent user-agent})
diff --git a/backend/src/uxbox/migrations.clj b/backend/src/uxbox/migrations.clj
index 4907a1dc0..4e7005753 100644
--- a/backend/src/uxbox/migrations.clj
+++ b/backend/src/uxbox/migrations.clj
@@ -71,7 +71,15 @@
{:desc "Refactor media storage"
:name "0014-refactor-media-storage.sql"
- :fn (mg/resource "migrations/0014-refactor-media-storage.sql")}]})
+ :fn (mg/resource "migrations/0014-refactor-media-storage.sql")}
+
+ {:desc "Improve and partition task related tables"
+ :name "0015-improve-tasks-tables"
+ :fn (mg/resource "migrations/0015-improve-tasks-tables.sql")}
+
+ {:desc "Truncate & alter tokens tables"
+ :name "0016-truncate-and-alter-tokens-table"
+ :fn (mg/resource "migrations/0016-truncate-and-alter-tokens-table.sql")}]})
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Entry point
diff --git a/backend/src/uxbox/services/tokens.clj b/backend/src/uxbox/services/tokens.clj
index 1c48e01e8..b993e04c7 100644
--- a/backend/src/uxbox/services/tokens.clj
+++ b/backend/src/uxbox/services/tokens.clj
@@ -8,7 +8,6 @@
;; Copyright (c) 2020 UXBOX Labs SL
(ns uxbox.services.tokens
- (:refer-clojure :exclude [next])
(:require
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
@@ -16,14 +15,11 @@
[sodi.util]
[uxbox.common.exceptions :as ex]
[uxbox.common.spec :as us]
- [uxbox.common.uuid :as uuid]
- [uxbox.config :as cfg]
[uxbox.util.time :as dt]
- [uxbox.util.blob :as blob]
[uxbox.db :as db]))
-(defn next
- ([] (next 64))
+(defn next-token
+ ([] (next-token 64))
([n]
(-> (sodi.prng/random-bytes n)
(sodi.util/bytes->b64s))))
@@ -35,15 +31,16 @@
[{:keys [content] :as row}]
(when row
(cond-> row
- content (assoc :content (blob/decode content)))))
+ (db/pgobject? content)
+ (assoc :content (db/decode-transit-pgobject content)))))
(defn create!
([conn payload] (create! conn payload {}))
([conn payload {:keys [valid] :or {valid default-duration}}]
- (let [token (next)
+ (let [token (next-token)
until (dt/plus (dt/now) (dt/duration valid))]
(db/insert! conn :generic-token
- {:content (blob/encode payload)
+ {:content (db/tjson payload)
:token token
:valid-until until})
token)))
diff --git a/backend/src/uxbox/tasks.clj b/backend/src/uxbox/tasks.clj
index c46ac1c0b..132a1fd6a 100644
--- a/backend/src/uxbox/tasks.clj
+++ b/backend/src/uxbox/tasks.clj
@@ -23,24 +23,12 @@
[uxbox.tasks.delete-profile]
[uxbox.tasks.delete-object]
[uxbox.tasks.impl :as impl]
- [uxbox.util.time :as dt])
- (:import
- java.util.concurrent.ScheduledExecutorService
- java.util.concurrent.Executors))
+ [uxbox.util.time :as dt]))
;; --- Scheduler Executor Initialization
-(defstate scheduler
- :start (Executors/newScheduledThreadPool (int 1))
- :stop (.shutdownNow ^ScheduledExecutorService scheduler))
-
;; --- State initialization
-;; TODO: missing self maintanance task; when the queue table is full
-;; of completed/failed task, the performance starts degrading
-;; linearly, so after some arbitrary number of tasks is processed, we
-;; need to perform a maintenance and delete some old tasks.
-
(def ^:private tasks
{"delete-profile" #'uxbox.tasks.delete-profile/handler
"delete-object" #'uxbox.tasks.delete-object/handler
@@ -52,14 +40,12 @@
:cron (dt/cron "1 1 */1 * * ? *")
:fn #'uxbox.tasks.gc/remove-media}])
-(defstate tasks-worker
- :start (impl/start-worker! {:tasks tasks
- :xtor scheduler})
- :stop (impl/stop! tasks-worker))
+(defstate worker
+ :start (impl/start-worker! {:tasks tasks :name "worker1"})
+ :stop (impl/stop! worker))
(defstate scheduler-worker
- :start (impl/start-scheduler-worker! {:schedule schedule
- :xtor scheduler})
+ :start (impl/start-scheduler-worker! {:schedule schedule})
:stop (impl/stop! scheduler-worker))
;; --- Public API
diff --git a/backend/src/uxbox/tasks/impl.clj b/backend/src/uxbox/tasks/impl.clj
index 121797e6f..90bc68310 100644
--- a/backend/src/uxbox/tasks/impl.clj
+++ b/backend/src/uxbox/tasks/impl.clj
@@ -10,6 +10,7 @@
(ns uxbox.tasks.impl
"Async tasks implementation."
(:require
+ [cuerdas.core :as str]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
@@ -18,6 +19,7 @@
[uxbox.common.uuid :as uuid]
[uxbox.config :as cfg]
[uxbox.db :as db]
+ [uxbox.util.async :as aa]
[uxbox.util.blob :as blob]
[uxbox.util.time :as dt])
(:import
@@ -40,6 +42,7 @@
sql:mark-as-retry
"update task
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
+ modified_at = clock_timestamp(),
error = ?,
status = 'retry',
retry_num = retry_num + 1
@@ -57,25 +60,29 @@
(let [explain (ex-message error)]
(db/update! conn :task
{:error explain
+ :modified-at (dt/now)
:status "failed"}
{:id (:id task)})
nil))
(defn- mark-as-completed
[conn task]
- (db/update! conn :task
- {:completed-at (dt/now)
- :status "completed"}
- {:id (:id task)})
- nil)
+ (let [now (dt/now)]
+ (db/update! conn :task
+ {:completed-at now
+ :modified-at now
+ :status "completed"}
+ {:id (:id task)})
+ nil))
+
(def ^:private
sql:select-next-task
"select * from task as t
where t.scheduled_at <= now()
and t.queue = ?
- and (t.status = 'new' or (t.status = 'retry' and t.retry_num <= ?))
- order by t.scheduled_at
+ and (t.status = 'new' or t.status = 'retry')
+ order by t.priority desc, t.scheduled_at
limit 1
for update skip locked")
@@ -83,12 +90,13 @@
[{:keys [props] :as row}]
(when row
(cond-> row
- props (assoc :props (blob/decode props)))))
+ (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))))
+
(defn- log-task-error
[item err]
- (log/error "Unhandled exception on task '" (:name item)
- "' (retry:" (:retry-num item) ") \n"
+ (log/error (str/format "Unhandled exception on task '%s' (retry: %s)\n" (:name item) (:retry-num item))
+ (str/format "Props: %s\n" (pr-str (:props item)))
(with-out-str
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
@@ -101,38 +109,107 @@
(log/warn "no task handler found for" (pr-str name))
nil))))
-(defn- event-loop-fn
- [{:keys [tasks] :as options}]
- (let [queue (:queue options "default")
- max-retries (:max-retries options 3)]
- (db/with-atomic [conn db/pool]
- (let [item (-> (db/exec-one! conn [sql:select-next-task queue max-retries])
- (decode-task-row))]
- (when item
- (log/info "Execute task" (:name item))
- (try
- (handle-task tasks item)
- (mark-as-completed conn item)
- ::handled
- (catch Throwable e
- (log-task-error item e)
- (if (>= (:retry-num item) max-retries)
- (mark-as-failed conn item e)
- (mark-as-retry conn item e)))))))))
+(defn- run-task
+ [{:keys [tasks conn]} item]
+ (try
+ (log/debug (str/format "Started task '%s/%s'." (:name item) (:id item)))
+ (handle-task tasks item)
+ (log/debug (str/format "Finished task '%s/%s'." (:name item) (:id item)))
+ (mark-as-completed conn item)
+ (catch Exception e
+ (log-task-error item e)
+ (if (>= (:retry-num item) (:max-retries item))
+ (mark-as-failed conn item e)
+ (mark-as-retry conn item e)))))
-(defn- execute-worker-task
- [{:keys [::stop ::xtor poll-interval]
+(defn- event-loop-fn
+ [{:keys [tasks] :as opts}]
+ (aa/thread-try
+ (db/with-atomic [conn db/pool]
+ (let [queue (:queue opts "default")
+ item (-> (db/exec-one! conn [sql:select-next-task queue])
+ (decode-task-row))
+ opts (assoc opts :conn conn)]
+
+ (cond
+ (nil? item)
+ ::empty
+
+ (or (= "new" (:status item))
+ (= "retry" (:status item)))
+ (do
+ (run-task opts item)
+ ::handled)
+
+ :else
+ (do
+ (log/warn "Unexpected condition on worker event loop:" (pr-str item))
+ ::handled))))))
+
+(s/def ::poll-interval ::us/integer)
+(s/def ::fn (s/or :var var? :fn fn?))
+(s/def ::tasks (s/map-of string? ::fn))
+(s/def ::start-worker-params
+ (s/keys :req-un [::tasks]
+ :opt-un [::poll-interval]))
+
+(defn start-worker!
+ [{:keys [poll-interval]
:or {poll-interval 5000}
:as opts}]
- (try
- (when-not @stop
- (let [res (event-loop-fn opts)]
- (if (= res ::handled)
- (px/schedule! xtor 0 (partial execute-worker-task opts))
- (px/schedule! xtor poll-interval (partial execute-worker-task opts)))))
- (catch Throwable e
- (log/error "unexpected exception:" e)
- (px/schedule! xtor poll-interval (partial execute-worker-task opts)))))
+ (us/assert ::start-worker-params opts)
+ (log/info (str/format "Starting worker '%s' on queue '%s'."
+ (:name opts "anonymous")
+ (:queue opts "default")))
+ (let [cch (a/chan 1)]
+ (a/go-loop []
+ (let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)]
+ (cond
+ ;; Terminate the loop if close channel is closed or
+ ;; event-loop-fn returns nil.
+ (or (= port cch) (nil? val))
+ (log/info (str/format "Stop condition found. Shutdown worker: '%s'"
+ (:name opts "anonymous")))
+
+ (db/pool-closed? db/pool)
+ (do
+ (log/info "Worker eventloop is aborted because pool is closed.")
+ (a/close! cch))
+
+ (and (instance? java.sql.SQLException val)
+ (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val)))
+ (do
+ (log/error "Connection error, trying resume in some instants.")
+ (a/pginterval
- [^Duration d]
- (->> (/ (.toMillis d) 1000.0)
- (format "%s seconds")))
-
(defn submit!
- [conn {:keys [name delay props queue key]
- :or {delay 0 props {} queue "default"}
+ [conn {:keys [name delay props queue priority max-retries key]
+ :or {delay 0 props {} queue "default" priority 100 max-retries 3}
:as options}]
(us/verify ::task-options options)
- (let [duration (dt/duration delay)
- pginterval (duration->pginterval duration)
- props (blob/encode props)
- id (uuid/next)]
- (log/info "Submit task" name "to be executed in" (str duration))
- (db/exec-one! conn [sql:insert-new-task
- id name props queue pginterval])
+ (let [duration (dt/duration delay)
+ interval (db/interval duration)
+ props (db/tjson props)
+ id (uuid/next)]
+ (log/info (str/format "Submit task '%s' to be executed in '%s'." name (str duration)))
+ (db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval])
id))
diff --git a/backend/src/uxbox/util/async.clj b/backend/src/uxbox/util/async.clj
new file mode 100644
index 000000000..8626db83a
--- /dev/null
+++ b/backend/src/uxbox/util/async.clj
@@ -0,0 +1,31 @@
+;; This Source Code Form is subject to the terms of the Mozilla Public
+;; License, v. 2.0. If a copy of the MPL was not distributed with this
+;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
+;;
+;; Copyright (c) 2020 Andrey Antukh
+
+(ns uxbox.util.async
+ (:require [clojure.core.async :as a]))
+
+(defmacro go-try
+ [& body]
+ `(a/go
+ (try
+ ~@body
+ (catch Throwable e# e#))))
+
+(defmacro
+ [ch]
+ `(let [r# (a/> (encode message)
(bytes->str)))
+(defn encode-verbose-str
+ [message]
+ (->> (encode message {:type :json-verbose})
+ (bytes->str)))
+
;; --- Helpers
(defn str->bytes