Minor usability improvements on async tast subsystem.

Andrey Antukh 2020-01-24 11:58:47 +01:00
Andrey Antukh 2020-01-24 11:58:47 +01:00
parent 7355b91796
commit bd5fd97fb7
4 changed files with 106 additions and 70 deletions

@ -1,21 +1,22 @@
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
queue text NOT NULL,
created_at timestamptz NOT NULL DEFAULT clock_timestamp(),
modified_at timestamptz NOT NULL DEFAULT clock_timestamp(),
completed_at timestamptz NULL DEFAULT NULL,
scheduled_at timestamptz NOT NULL,
queue text NOT NULL,
name text NOT NULL,
props bytea NOT NULL,
error_text text NULL DEFAULT NULL,
result bytea NULL DEFAULT NULL,
retry_num smallint NOT NULL DEFAULT 0,
status text NOT NULL DEFAULT 'new'
CREATE INDEX tasks__scheduled_at__idx
CREATE INDEX tasks__scheduled_at__queue__idx
ON tasks (scheduled_at, queue);

@ -26,16 +26,9 @@
;; --- Public API
(s/def ::name ::us/string)
(s/def ::delay ::us/integer)
(s/def ::props map?)
(s/def ::task-spec
(s/keys :req-un [::name ::delay] :opt-un [::props]))
(defn schedule!
([task] (schedule! db/pool task))
([conn task]
(us/assert ::task-spec task)
(impl/schedule! conn task)))
;; --- State initialization
@ -50,6 +43,6 @@
(defstate small-tasks
:start (as-> (impl/verticle {:tasks tasks :queue "default"}) $$
:start (as-> (impl/verticle tasks {:queue "default"}) $$
(vc/deploy! system $$ {:instances 1})
(deref $$)))

@ -10,12 +10,12 @@
(ns uxbox.tasks.demo-gc
"Demo accounts garbage collector."
[clojure.tools.logging :as log]))
[clojure.tools.logging :as log]
[uxbox.common.exceptions :as ex]))
(defn handler
{:uxbox.tasks/name "demo-gc"}
[{:keys [props] :as task}]
(prn "debug" props)
(ex/raise :type :foobar
:code :foobaz
:hint "Foo bar"))

@ -29,22 +29,37 @@
(.printStackTrace err (java.io.PrintWriter. *out*))))
(def ^:private sql:update-failed-task
(def ^:private sql:mark-as-retry
"update tasks
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
error_text = $1
status = 'error'
error = $1,
status = 'retry',
retry_num = retry_num + 1
where id = $2;")
(defn- reschedule
[conn task error]
(let [error (string-strack-trace error)
sqlv [sql:update-failed-task error (:id task)]]
(let [explain (string-strack-trace error)
sqlv [sql:mark-as-retry explain (:id task)]]
(-> (db/query-one conn sqlv)
(p/then' (constantly nil)))))
(def ^:private sql:update-completed-task
(def ^:private sql:mark-as-failed
"update tasks
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
error = $1,
status = 'failed'
where id = $2;")
(defn- mark-as-failed
[conn task error]
(let [error (string-strack-trace error)
sqlv [sql:mark-as-failed error (:id task)]]
(-> (db/query-one conn sqlv)
(p/then' (constantly nil)))))
(def ^:private sql:mark-as-completed
"update tasks
set completed_at = clock_timestamp(),
status = 'completed'
@ -52,7 +67,7 @@
(defn- mark-as-completed
[conn task]
(-> (db/query-one conn [sql:update-completed-task (:id task)])
(-> (db/query-one conn [sql:mark-as-completed (:id task)])
(p/then' (constantly nil))))
(defn- handle-task
@ -68,7 +83,7 @@
"select * from tasks as t
where t.scheduled_at <= now()
and t.queue = $1
and (t.status = 'new' or (t.status = 'error' and t.retry_num < 3))
and (t.status = 'new' or (t.status = 'retry' and t.retry_num <= $2))
order by t.scheduled_at
limit 1
for update skip locked")
@ -80,70 +95,97 @@
props (assoc :props (blob/decode props)))))
(defn- event-loop
[{:keys [handlers queue] :as opts}]
(db/with-atomic [conn db/pool]
(-> (db/query-one conn [sql:select-next-task queue])
(p/then decode-task-row)
(p/then (fn [item]
(when item
(-> (p/do! (handle-task handlers item))
(p/handle (fn [v e]
(if e
(reschedule conn item e)
(mark-as-completed conn item))))
(p/then' (constantly ::handled)))))))))
[{:keys [handlers] :as options}]
(let [queue (:queue options "default")
max-retries (:max-retries options 3)]
(db/with-atomic [conn db/pool]
(-> (db/query-one conn [sql:select-next-task queue max-retries])
(p/then decode-task-row)
(p/then (fn [item]
(when item
(-> (p/do! (handle-task handlers item))
(p/handle (fn [v e]
(if e
(if (>= (:retry-num item) max-retries)
(mark-as-failed conn item e)
(reschedule conn item e))
(mark-as-completed conn item))))
(p/then' (constantly ::handled))))))))))
(defn- event-loop-handler
[{:keys [::counter max-barch-size]
:or {counter 1 max-barch-size 10}
:as opts}]
(-> (event-loop opts)
(p/then (fn [result]
(when (and (= result ::handled)
(> max-barch-size counter))
(event-loop-handler (assoc opts ::counter (inc counter))))))))
(let [counter (::counter options 1)
mbs (:max-batch-size options 10)]
(-> (event-loop options)
(p/then (fn [result]
(when (and (= result ::handled)
(> mbs counter))
(event-loop-handler (assoc options ::counter (inc counter)))))))))
(def ^:private sql:insert-new-task
"insert into tasks (name, props, queue, scheduled_at)
values ($1, $2, $3, clock_timestamp()+cast($4::text as interval))
returning id")
(s/def ::name ::us/string)
(s/def ::delay ::us/integer)
(s/def ::props map?)
(s/def ::queue ::us/string)
(s/def ::task-options
(s/keys :req-un [::name ::delay]
:opt-un [::props ::queue]))
(defn- duration->pginterval
[^Duration d]
(->> (/ (.toMillis d) 1000.0)
(format "%s seconds")))
(defn schedule!
[conn {:keys [name delay props queue] :as task}]
[conn {:keys [name delay props queue key] :as options}]
(us/assert ::task-options options)
(let [queue (if (string? queue) queue "default")
delay (tm/duration delay)
duration (->> (/ (.toMillis ^Duration delay) 1000.0)
(format "%s seconds"))
duration (-> (tm/duration delay)
props (blob/encode props)]
(-> (db/query-one conn [sql:insert-new-task name props queue duration])
(p/then' (fn [task] (:id task))))))
(defn- on-start
[ctx queue handlers]
(vt/schedule! ctx {::vt/fn #'event-loop-handler
::vt/delay 1000
::vt/repeat true
:max-batch-size 10
:queue queue
:handlers handlers}))
[ctx handlers options]
(vt/schedule! ctx (assoc options
::vt/fn #'event-loop-handler
::vt/delay 3000
::vt/repeat true
:handlers handlers)))
(defn- resolve-handlers
(s/assert (s/coll-of ::callable) tasks)
(reduce (fn [acc f]
(let [task-name (:uxbox.tasks/name (meta f))]
(if task-name
(assoc acc task-name f)
(log/warn "skiping task, no name provided in metadata" (pr-str f))
(s/def ::callable (s/or :fn fn? :var var?))
(s/def ::max-batch-size ::us/integer)
(s/def ::max-retries ::us/integer)
(s/def ::verticle-tasks
(s/coll-of ::callable))
(s/def ::tasks (s/coll-of (s/or :fn fn? :var var?)))
(s/def ::queue ::us/string)
(s/def ::verticle-options
(s/keys :req-un [::tasks ::queue]))
(s/keys :opt-un [::queue ::max-batch-size]))
(defn verticle
[{:keys [tasks queue] :as options}]
[tasks options]
(s/assert ::verticle-tasks tasks)
(s/assert ::verticle-options options)
(let [handlers (reduce (fn [acc f]
(let [task-name (:uxbox.tasks/name (meta f))]
(if task-name
(assoc acc task-name f)
(log/warn "skiping task, no name provided in metadata" (pr-str f))
on-start #(on-start % queue handlers)]
(let [handlers (resolve-handlers tasks)
on-start #(on-start % handlers options)]
(vc/verticle {:on-start on-start})))