mirror of
https://github.com/penpot/penpot.git
synced 2025-04-06 12:01:19 -05:00
♻️ Improve tasks implementation.
This commit is contained in:
parent
27181bf863
commit
4bfa70853c
14 changed files with 258 additions and 251 deletions
|
@ -6,6 +6,7 @@
|
|||
|
||||
(ns uxbox.db
|
||||
(:require
|
||||
[clojure.data.json :as json]
|
||||
[clojure.string :as str]
|
||||
[clojure.tools.logging :as log]
|
||||
[lambdaisland.uri :refer [uri]]
|
||||
|
@ -20,6 +21,7 @@
|
|||
[uxbox.config :as cfg]
|
||||
[uxbox.util.data :as data])
|
||||
(:import
|
||||
org.postgresql.util.PGobject
|
||||
com.zaxxer.hikari.HikariConfig
|
||||
com.zaxxer.hikari.HikariDataSource))
|
||||
|
||||
|
@ -112,3 +114,16 @@
|
|||
(get-by-params ds table {:id id} nil))
|
||||
([ds table id opts]
|
||||
(get-by-params ds table {:id id} opts)))
|
||||
|
||||
(defn pgobject?
|
||||
[v]
|
||||
(instance? PGobject v))
|
||||
|
||||
(defn decode-pgobject
|
||||
[^PGobject obj]
|
||||
(let [typ (.getType obj)
|
||||
val (.getValue obj)]
|
||||
(if (or (= typ "json")
|
||||
(= typ "jsonb"))
|
||||
(json/read-str val)
|
||||
val)))
|
||||
|
|
|
@ -41,9 +41,9 @@
|
|||
:reply-to (:sendmail-reply-to cfg/config)}
|
||||
data (merge defaults context)
|
||||
email (email-factory data)]
|
||||
(tasks/schedule! conn {:name "sendmail"
|
||||
:delay 0
|
||||
:props email}))))
|
||||
(tasks/submit! conn {:name "sendmail"
|
||||
:delay 0
|
||||
:props email}))))
|
||||
|
||||
;; --- Emails
|
||||
|
||||
|
|
|
@ -104,9 +104,9 @@
|
|||
(teams/check-edition-permissions! conn profile-id (:team-id lib))
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :color-library}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :color-library}})
|
||||
|
||||
(db/update! conn :color-library
|
||||
{:deleted-at (dt/now)}
|
||||
|
@ -188,9 +188,9 @@
|
|||
(teams/check-edition-permissions! conn profile-id (:team-id clr))
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :color}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :color}})
|
||||
|
||||
(db/update! conn :color
|
||||
{:deleted-at (dt/now)}
|
||||
|
|
|
@ -38,8 +38,8 @@
|
|||
:password password})
|
||||
|
||||
;; Schedule deletion of the demo profile
|
||||
(tasks/schedule! conn {:name "delete-profile"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:profile-id id}})
|
||||
(tasks/submit! conn {:name "delete-profile"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:profile-id id}})
|
||||
{:email email
|
||||
:password password})))
|
||||
|
|
|
@ -113,9 +113,9 @@
|
|||
(files/check-edition-permissions! conn profile-id id)
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :file}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :file}})
|
||||
|
||||
(mark-file-deleted conn params)))
|
||||
|
||||
|
|
|
@ -111,9 +111,9 @@
|
|||
(teams/check-edition-permissions! conn profile-id (:team-id lib))
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :icon-library}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :icon-library}})
|
||||
|
||||
(db/update! conn :icon-library
|
||||
{:deleted-at (dt/now)}
|
||||
|
@ -196,9 +196,9 @@
|
|||
(teams/check-edition-permissions! conn profile-id (:team-id icn))
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :icon}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :icon}})
|
||||
|
||||
(db/update! conn :icon
|
||||
{:deleted-at (dt/now)}
|
||||
|
|
|
@ -96,9 +96,9 @@
|
|||
(teams/check-edition-permissions! conn profile-id (:team-id lib))
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :image-library}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :image-library}})
|
||||
|
||||
(db/update! conn :image-library
|
||||
{:deleted-at (dt/now)}
|
||||
|
@ -226,9 +226,9 @@
|
|||
(teams/check-edition-permissions! conn profile-id (:team-id img))
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :image}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :image}})
|
||||
|
||||
(db/update! conn :image
|
||||
{:deleted-at (dt/now)}
|
||||
|
|
|
@ -242,9 +242,9 @@
|
|||
(files/check-edition-permissions! conn profile-id (:file-id page))
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :page}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :page}})
|
||||
|
||||
(db/update! conn :page
|
||||
{:deleted-at (dt/now)}
|
||||
|
|
|
@ -155,8 +155,8 @@
|
|||
;; Schedule deletion of old photo
|
||||
(when (and (string? (:photo profile))
|
||||
(not (str/blank? (:photo profile))))
|
||||
(tasks/schedule! conn {:name "remove-media"
|
||||
:props {:path (:photo profile)}}))
|
||||
(tasks/submit! conn {:name "remove-media"
|
||||
:props {:path (:photo profile)}}))
|
||||
;; Save new photo
|
||||
(update-profile-photo conn profile-id photo))))
|
||||
|
||||
|
@ -363,9 +363,9 @@
|
|||
(check-teams-ownership! conn profile-id)
|
||||
|
||||
;; Schedule a complete deletion of profile
|
||||
(tasks/schedule! conn {:name "delete-profile"
|
||||
:delay (dt/duration {:hours 48})
|
||||
:props {:profile-id profile-id}})
|
||||
(tasks/submit! conn {:name "delete-profile"
|
||||
:delay (dt/duration {:hours 48})
|
||||
:props {:profile-id profile-id}})
|
||||
|
||||
(db/update! conn :profile
|
||||
{:deleted-at (dt/now)}
|
||||
|
|
|
@ -124,9 +124,9 @@
|
|||
(check-edition-permissions! conn profile-id id)
|
||||
|
||||
;; Schedule object deletion
|
||||
(tasks/schedule! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :project}})
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
:delay cfg/default-deletion-delay
|
||||
:props {:id id :type :project}})
|
||||
|
||||
(mark-project-deleted conn params)))
|
||||
|
||||
|
|
|
@ -17,11 +17,21 @@
|
|||
[uxbox.config :as cfg]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.tasks.sendmail]
|
||||
[uxbox.tasks.gc]
|
||||
[uxbox.tasks.remove-media]
|
||||
[uxbox.tasks.delete-profile]
|
||||
[uxbox.tasks.delete-object]
|
||||
[uxbox.tasks.impl :as impl]
|
||||
[uxbox.util.time :as dt]))
|
||||
[uxbox.util.time :as dt])
|
||||
(:import
|
||||
java.util.concurrent.ScheduledExecutorService
|
||||
java.util.concurrent.Executors))
|
||||
|
||||
;; --- Scheduler Executor Initialization
|
||||
|
||||
(defstate scheduler
|
||||
:start (Executors/newScheduledThreadPool (int 1))
|
||||
:stop (.shutdownNow ^ScheduledExecutorService scheduler))
|
||||
|
||||
;; --- State initialization
|
||||
|
||||
|
@ -36,33 +46,25 @@
|
|||
"remove-media" #'uxbox.tasks.remove-media/handler
|
||||
"sendmail" #'uxbox.tasks.sendmail/handler})
|
||||
|
||||
(def ^:private schedule
|
||||
[{:id "remove-deleted-media"
|
||||
:cron (dt/cron "1 1 */1 * * ? *")
|
||||
:fn #'uxbox.tasks.gc/remove-media}])
|
||||
|
||||
(defstate worker
|
||||
:start (impl/start-worker! {:tasks tasks})
|
||||
:start (impl/start-worker! {:tasks tasks
|
||||
:xtor scheduler})
|
||||
:stop (impl/stop! worker))
|
||||
|
||||
(defstate scheduler-worker
|
||||
:start (impl/start-scheduler-worker! {:schedule schedule
|
||||
:xtor scheduler})
|
||||
:stop (impl/stop! worker))
|
||||
|
||||
;; --- Public API
|
||||
|
||||
(defn schedule!
|
||||
([opts] (schedule! db/pool opts))
|
||||
(defn submit!
|
||||
([opts] (submit! db/pool opts))
|
||||
([conn opts]
|
||||
(s/assert ::impl/task-options opts)
|
||||
(impl/schedule! conn opts)))
|
||||
|
||||
;; (defstate scheduler
|
||||
;; :start (impl/start-scheduler! tasks)
|
||||
;; :stop (impl/stop! tasks-worker))
|
||||
|
||||
;; :start (as-> (impl/worker-verticle {:tasks tasks}) $$
|
||||
;; (vc/deploy! system $$ {:instances 1})
|
||||
;; (deref $$)))
|
||||
|
||||
;; (def ^:private schedule
|
||||
;; [{:id "every 1 hour"
|
||||
;; :cron (dt/cron "1 1 */1 * * ? *")
|
||||
;; :fn #'uxbox.tasks.gc/handler
|
||||
;; :props {:foo 1}}])
|
||||
|
||||
;; (defstate scheduler
|
||||
;; :start (as-> (impl/scheduler-verticle {:schedule schedule}) $$
|
||||
;; (vc/deploy! system $$ {:instances 1 :worker true})
|
||||
;; (deref $$)))
|
||||
(impl/submit! conn opts)))
|
||||
|
|
|
@ -9,8 +9,8 @@
|
|||
|
||||
(ns uxbox.tasks.gc
|
||||
(:require
|
||||
[clojure.tools.logging :as log]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[cuerdas.core :as str]
|
||||
[postal.core :as postal]
|
||||
[promesa.core :as p]
|
||||
|
@ -18,36 +18,47 @@
|
|||
[uxbox.common.spec :as us]
|
||||
[uxbox.config :as cfg]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.util.blob :as blob]))
|
||||
[uxbox.media :as media]
|
||||
[uxbox.util.blob :as blob]
|
||||
[uxbox.util.storage :as ust]))
|
||||
|
||||
;; TODO: delete media referenced in pendint_to_delete table
|
||||
(def ^:private sql:delete-items
|
||||
"with items_part as (
|
||||
select i.id
|
||||
from pending_to_delete as i
|
||||
order by i.created_at
|
||||
limit ?
|
||||
for update skip locked
|
||||
)
|
||||
delete from pending_to_delete
|
||||
where id in (select id from items_part)
|
||||
returning *")
|
||||
|
||||
;; (def ^:private sql:delete-item
|
||||
;; "with items_part as (
|
||||
;; select i.id
|
||||
;; from pending_to_delete as i
|
||||
;; order by i.created_at
|
||||
;; limit 1
|
||||
;; for update skip locked
|
||||
;; )
|
||||
;; delete from pending_to_delete
|
||||
;; where id in (select id from items_part)
|
||||
;; returning *")
|
||||
(defn- impl-remove-media
|
||||
[result]
|
||||
(run! (fn [item]
|
||||
(let [path1 (get item "path")
|
||||
path2 (get item "thumb_path")]
|
||||
(ust/delete! media/media-storage path1)
|
||||
(ust/delete! media/media-storage path2)))
|
||||
result))
|
||||
|
||||
;; (defn- remove-items
|
||||
;; []
|
||||
;; (vu/loop []
|
||||
;; (db/with-atomic [conn db/pool]
|
||||
;; (-> (db/query-one conn sql:delete-item)
|
||||
;; (p/then decode-row)
|
||||
;; (p/then (vu/wrap-blocking remove-media))
|
||||
;; (p/then (fn [item]
|
||||
;; (when (not (empty? items))
|
||||
;; (p/recur))))))))
|
||||
(defn- decode-row
|
||||
[{:keys [data] :as row}]
|
||||
(cond-> row
|
||||
(db/pgobject? data) (assoc :data (db/decode-pgobject data))))
|
||||
|
||||
(defn- get-items
|
||||
[conn]
|
||||
(->> (db/exec! conn [sql:delete-items 10])
|
||||
(map decode-row)
|
||||
(map :data)))
|
||||
|
||||
(defn remove-media
|
||||
[{:keys [props] :as task}]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(loop [result (get-items conn)]
|
||||
(when-not (empty? result)
|
||||
(impl-remove-media result)
|
||||
(recur (get-items conn))))))
|
||||
|
||||
;; (defn- remove-media
|
||||
;; [{:keys
|
||||
;; (doseq [item files]
|
||||
;; (ust/delete! media/media-storage (:path item))
|
||||
;; (ust/delete! media/media-storage (:thumb-path item)))
|
||||
;; files)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
[clojure.core.async :as a]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[promesa.exec :as px]
|
||||
[uxbox.common.spec :as us]
|
||||
[uxbox.common.uuid :as uuid]
|
||||
[uxbox.config :as cfg]
|
||||
|
@ -20,14 +21,12 @@
|
|||
[uxbox.util.blob :as blob]
|
||||
[uxbox.util.time :as dt])
|
||||
(:import
|
||||
java.util.concurrent.ScheduledExecutorService
|
||||
java.util.concurrent.Executors
|
||||
java.time.Duration
|
||||
java.time.Instant
|
||||
java.util.Date))
|
||||
|
||||
(defrecord Worker [stop]
|
||||
java.lang.AutoCloseable
|
||||
(close [_] (a/close! stop)))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Tasks
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
@ -37,7 +36,8 @@
|
|||
(with-out-str
|
||||
(.printStackTrace err (java.io.PrintWriter. *out*))))
|
||||
|
||||
(def ^:private sql:mark-as-retry
|
||||
(def ^:private
|
||||
sql:mark-as-retry
|
||||
"update task
|
||||
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
|
||||
error = ?,
|
||||
|
@ -45,48 +45,32 @@
|
|||
retry_num = retry_num + 1
|
||||
where id = ?")
|
||||
|
||||
(defn- reschedule
|
||||
(defn- mark-as-retry
|
||||
[conn task error]
|
||||
(let [explain (ex-message error)
|
||||
sqlv [sql:mark-as-retry explain (:id task)]]
|
||||
(db/exec-one! conn sqlv)
|
||||
nil))
|
||||
|
||||
(def ^:private sql:mark-as-failed
|
||||
"update task
|
||||
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
|
||||
error = ?,
|
||||
status = 'failed'
|
||||
where id = ?;")
|
||||
|
||||
(defn- mark-as-failed
|
||||
[conn task error]
|
||||
(let [explain (ex-message error)
|
||||
sqlv [sql:mark-as-failed explain (:id task)]]
|
||||
(db/exec-one! conn sqlv)
|
||||
(let [explain (ex-message error)]
|
||||
(db/update! conn :task
|
||||
{:error explain
|
||||
:status "failed"}
|
||||
{:id (:id task)})
|
||||
nil))
|
||||
|
||||
(def ^:private sql:mark-as-completed
|
||||
"update task
|
||||
set completed_at = clock_timestamp(),
|
||||
status = 'completed'
|
||||
where id = ?")
|
||||
|
||||
(defn- mark-as-completed
|
||||
[conn task]
|
||||
(db/exec-one! conn [sql:mark-as-completed (:id task)])
|
||||
(db/update! conn :task
|
||||
{:completed-at (dt/now)
|
||||
:status "completed"}
|
||||
{:id (:id task)})
|
||||
nil)
|
||||
|
||||
(defn- handle-task
|
||||
[tasks {:keys [name] :as item}]
|
||||
(let [task-fn (get tasks name)]
|
||||
(if task-fn
|
||||
(task-fn item)
|
||||
(do
|
||||
(log/warn "no task handler found for" (pr-str name))
|
||||
nil))))
|
||||
|
||||
(def ^:private sql:select-next-task
|
||||
(def ^:private
|
||||
sql:select-next-task
|
||||
"select * from task as t
|
||||
where t.scheduled_at <= now()
|
||||
and t.queue = ?
|
||||
|
@ -108,6 +92,15 @@
|
|||
(with-out-str
|
||||
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
|
||||
|
||||
(defn- handle-task
|
||||
[tasks {:keys [name] :as item}]
|
||||
(let [task-fn (get tasks name)]
|
||||
(if task-fn
|
||||
(task-fn item)
|
||||
(do
|
||||
(log/warn "no task handler found for" (pr-str name))
|
||||
nil))))
|
||||
|
||||
(defn- event-loop-fn
|
||||
[{:keys [tasks] :as options}]
|
||||
(let [queue (:queue options "default")
|
||||
|
@ -125,156 +118,140 @@
|
|||
(log-task-error item e)
|
||||
(if (>= (:retry-num item) max-retries)
|
||||
(mark-as-failed conn item e)
|
||||
(reschedule conn item e)))))))))
|
||||
(mark-as-retry conn item e)))))))))
|
||||
|
||||
(defn- start-worker-eventloop!
|
||||
[options]
|
||||
(let [stop (::stop options)
|
||||
mbs (:max-batch-size options 10)]
|
||||
(a/go-loop []
|
||||
(let [timeout (a/timeout 5000)
|
||||
[val port] (a/alts! [stop timeout])]
|
||||
(when (= port timeout)
|
||||
(a/<! (a/thread
|
||||
;; Tasks batching in one event loop execution.
|
||||
(loop [cnt 1
|
||||
res (event-loop-fn options)]
|
||||
(when (and (= res ::handled)
|
||||
(> mbs cnt))
|
||||
(recur (inc 1)
|
||||
(event-loop-fn options))))))
|
||||
(recur))))))
|
||||
|
||||
(defn- duration->pginterval
|
||||
[^Duration d]
|
||||
(->> (/ (.toMillis d) 1000.0)
|
||||
(format "%s seconds")))
|
||||
|
||||
(defn start-worker!
|
||||
[options]
|
||||
(let [stop (a/chan)]
|
||||
(start-worker-eventloop! (assoc options ::stop stop))
|
||||
(->Worker stop)))
|
||||
|
||||
(defn stop!
|
||||
[worker]
|
||||
(.close ^java.lang.AutoCloseable worker))
|
||||
(defn- execute-worker-task
|
||||
[{:keys [::stop ::xtor poll-interval]
|
||||
:or {poll-interval 5000}
|
||||
:as opts}]
|
||||
(try
|
||||
(when-not @stop
|
||||
(let [res (event-loop-fn opts)]
|
||||
(if (= res ::handled)
|
||||
(px/schedule! xtor 0 (partial execute-worker-task opts))
|
||||
(px/schedule! xtor poll-interval (partial execute-worker-task opts)))))
|
||||
(catch Throwable e
|
||||
(log/error "unexpected exception:" e)
|
||||
(px/schedule! xtor poll-interval (partial execute-worker-task opts)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Scheduled Tasks
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; (def ^:privatr sql:upsert-scheduled-task
|
||||
;; "insert into scheduled_task (id, cron_expr)
|
||||
;; values ($1, $2)
|
||||
;; on conflict (id)
|
||||
;; do update set cron_expr=$2")
|
||||
(def ^:private
|
||||
sql:upsert-scheduled-task
|
||||
"insert into scheduled_task (id, cron_expr)
|
||||
values (?, ?)
|
||||
on conflict (id)
|
||||
do update set cron_expr=?")
|
||||
|
||||
;; (defn- synchronize-schedule-item
|
||||
;; [conn {:keys [id cron]}]
|
||||
;; (-> (db/query-one conn [sql:upsert-scheduled-task id (str cron)])
|
||||
;; (p/then' (constantly nil))))
|
||||
(defn- synchronize-schedule-item
|
||||
[conn {:keys [id cron] :as item}]
|
||||
(let [cron (str cron)]
|
||||
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
|
||||
|
||||
;; (defn- synchronize-schedule
|
||||
;; [schedule]
|
||||
;; (db/with-atomic [conn db/pool]
|
||||
;; (p/run! (partial synchronize-schedule-item conn) schedule)))
|
||||
(defn- synchronize-schedule!
|
||||
[schedule]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(run! (partial synchronize-schedule-item conn) schedule)))
|
||||
|
||||
;; (def ^:private sql:lock-scheduled-task
|
||||
;; "select id from scheduled_task where id=$1 for update skip locked")
|
||||
(def ^:private sql:lock-scheduled-task
|
||||
"select id from scheduled_task where id=? for update skip locked")
|
||||
|
||||
;; (declare schedule-task)
|
||||
(declare schedule-task!)
|
||||
|
||||
;; (defn- log-scheduled-task-error
|
||||
;; [item err]
|
||||
;; (log/error "Unhandled exception on scheduled task '" (:id item) "' \n"
|
||||
;; (with-out-str
|
||||
;; (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
|
||||
(defn- log-scheduled-task-error
|
||||
[item err]
|
||||
(log/error "Unhandled exception on scheduled task '" (:id item) "' \n"
|
||||
(with-out-str
|
||||
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
|
||||
|
||||
;; (defn- execute-scheduled-task
|
||||
;; [{:keys [id cron] :as stask}]
|
||||
;; (db/with-atomic [conn db/pool]
|
||||
;; ;; First we try to lock the task in the database, if locking us
|
||||
;; ;; successful, then we execute the scheduled task; if locking is
|
||||
;; ;; not possible (because other instance is already locked id) we
|
||||
;; ;; just skip it and schedule to be executed in the next slot.
|
||||
;; (-> (db/query-one conn [sql:lock-scheduled-task id])
|
||||
;; (p/then (fn [result]
|
||||
;; (when result
|
||||
;; (-> (p/do! ((:fn stask) stask))
|
||||
;; (p/catch (fn [e]
|
||||
;; (log-scheduled-task-error stask e)
|
||||
;; nil))))))
|
||||
;; (p/finally (fn [v e]
|
||||
;; (-> (vu/current-context)
|
||||
;; (schedule-task stask)))))))
|
||||
;; (defn ms-until-valid
|
||||
;; [cron]
|
||||
;; (s/assert dt/cron? cron)
|
||||
;; (let [^Instant now (dt/now)
|
||||
;; ^Instant next (dt/next-valid-instant-from cron now)
|
||||
;; ^Duration duration (Duration/between now next)]
|
||||
;; (.toMillis duration)))
|
||||
(defn- execute-scheduled-task
|
||||
[{:keys [id cron ::xtor] :as task}]
|
||||
(try
|
||||
(db/with-atomic [conn db/pool]
|
||||
;; First we try to lock the task in the database, if locking is
|
||||
;; successful, then we execute the scheduled task; if locking is
|
||||
;; not possible (because other instance is already locked id) we
|
||||
;; just skip it and schedule to be executed in the next slot.
|
||||
(when (db/exec-one! conn [sql:lock-scheduled-task id])
|
||||
(log/info "Executing scheduled task" id)
|
||||
((:fn task) task)))
|
||||
|
||||
;; (defn- schedule-task
|
||||
;; [ctx {:keys [cron] :as stask}]
|
||||
;; (let [ms (ms-until-valid cron)]
|
||||
;; (vt/schedule! ctx (assoc stask
|
||||
;; :ctx ctx
|
||||
;; ::vt/once true
|
||||
;; ::vt/delay ms
|
||||
;; ::vt/fn execute-scheduled-task))))
|
||||
(catch Throwable e
|
||||
(log-scheduled-task-error task e))
|
||||
(finally
|
||||
(schedule-task! xtor task))))
|
||||
|
||||
;; (defn- on-scheduler-start
|
||||
;; [ctx {:keys [schedule] :as options}]
|
||||
;; (-> (synchronize-schedule schedule)
|
||||
;; (p/then' (fn [_]
|
||||
;; (run! #(schedule-task ctx %) schedule)))))
|
||||
(defn ms-until-valid
|
||||
[cron]
|
||||
(s/assert dt/cron? cron)
|
||||
(let [^Instant now (dt/now)
|
||||
^Instant next (dt/next-valid-instant-from cron now)]
|
||||
(inst-ms (dt/duration-between now next))))
|
||||
|
||||
(defn- schedule-task!
|
||||
[xtor {:keys [cron] :as task}]
|
||||
(let [ms (ms-until-valid cron)
|
||||
task (assoc task ::xtor xtor)]
|
||||
(px/schedule! xtor ms (partial execute-scheduled-task task))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Public API
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; --- Worker Verticle
|
||||
(s/def ::id string?)
|
||||
(s/def ::name string?)
|
||||
(s/def ::cron dt/cron?)
|
||||
(s/def ::fn (s/or :var var? :fn fn?))
|
||||
(s/def ::props (s/nilable map?))
|
||||
(s/def ::xtor #(instance? ScheduledExecutorService %))
|
||||
|
||||
;; (s/def ::callable (s/or :fn fn? :var var?))
|
||||
;; (s/def ::max-batch-size ::us/integer)
|
||||
;; (s/def ::max-retries ::us/integer)
|
||||
;; (s/def ::tasks (s/map-of string? ::callable))
|
||||
(s/def ::scheduled-task
|
||||
(s/keys :req-un [::id ::cron ::fn]
|
||||
:opt-un [::props]))
|
||||
|
||||
;; (s/def ::worker-verticle-options
|
||||
;; (s/keys :req-un [::tasks]
|
||||
;; :opt-un [::queue ::max-batch-size]))
|
||||
(s/def ::tasks (s/map-of string? ::fn))
|
||||
(s/def ::schedule (s/coll-of ::scheduled-task))
|
||||
|
||||
;; (defn worker-verticle
|
||||
;; [options]
|
||||
;; (s/assert ::worker-verticle-options options)
|
||||
;; (let [on-start #(on-worker-start % options)]
|
||||
;; (vc/verticle {:on-start on-start})))
|
||||
(defn start-scheduler-worker!
|
||||
[{:keys [schedule xtor] :as opts}]
|
||||
(us/assert ::xtor xtor)
|
||||
(us/assert ::schedule schedule)
|
||||
(let [stop (atom false)]
|
||||
(synchronize-schedule! schedule)
|
||||
(run! (partial schedule-task! xtor) schedule)
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(reset! stop true)))))
|
||||
|
||||
;; --- Scheduler Verticle
|
||||
(defn start-worker!
|
||||
[{:keys [tasks xtor poll-interval]
|
||||
:or {poll-interval 5000}
|
||||
:as opts}]
|
||||
(us/assert ::tasks tasks)
|
||||
(us/assert ::xtor xtor)
|
||||
(us/assert number? poll-interval)
|
||||
(let [stop (atom false)
|
||||
opts (assoc opts
|
||||
::xtor xtor
|
||||
::stop stop)]
|
||||
(px/schedule! xtor poll-interval (partial execute-worker-task opts))
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(reset! stop true)))))
|
||||
|
||||
;; (s/def ::id string?)
|
||||
;; (s/def ::cron dt/cron?)
|
||||
;; (s/def ::fn ::callable)
|
||||
;; (s/def ::props (s/nilable map?))
|
||||
(defn stop!
|
||||
[worker]
|
||||
(.close ^java.lang.AutoCloseable worker))
|
||||
|
||||
;; (s/def ::scheduled-task
|
||||
;; (s/keys :req-un [::id ::cron ::fn]
|
||||
;; :opt-un [::props]))
|
||||
|
||||
;; (s/def ::schedule (s/coll-of ::scheduled-task))
|
||||
|
||||
;; (s/def ::scheduler-verticle-options
|
||||
;; (s/keys :opt-un [::schedule]))
|
||||
|
||||
;; (defn scheduler-verticle
|
||||
;; [options]
|
||||
;; (s/assert ::scheduler-verticle-options options)
|
||||
;; (let [on-start #(on-scheduler-start % options)]
|
||||
;; (vc/verticle {:on-start on-start})))
|
||||
|
||||
;; --- Schedule API
|
||||
;; --- Submit API
|
||||
|
||||
(s/def ::name ::us/string)
|
||||
(s/def ::delay
|
||||
|
@ -290,7 +267,12 @@
|
|||
values (?, ?, ?, ?, clock_timestamp()+cast(?::text as interval))
|
||||
returning id")
|
||||
|
||||
(defn schedule!
|
||||
(defn- duration->pginterval
|
||||
[^Duration d]
|
||||
(->> (/ (.toMillis d) 1000.0)
|
||||
(format "%s seconds")))
|
||||
|
||||
(defn submit!
|
||||
[conn {:keys [name delay props queue key]
|
||||
:or {delay 0 props {} queue "default"}
|
||||
:as options}]
|
||||
|
@ -299,9 +281,7 @@
|
|||
pginterval (duration->pginterval duration)
|
||||
props (blob/encode props)
|
||||
id (uuid/next)]
|
||||
(log/info "Schedule task" name
|
||||
;; "with props" (pr-str props)
|
||||
"to be executed in" (str duration))
|
||||
(log/info "Submit task" name "to be executed in" (str duration))
|
||||
(db/exec-one! conn [sql:insert-new-task
|
||||
id name props queue pginterval])
|
||||
id))
|
||||
|
|
|
@ -38,7 +38,6 @@ services:
|
|||
- 9090:9090
|
||||
|
||||
environment:
|
||||
- CLOJURE_OPTS=-J-XX:-OmitStackTraceInFastThrow
|
||||
- UXBOX_DATABASE_URI=postgresql://postgres/uxbox
|
||||
- UXBOX_DATABASE_USERNAME=uxbox
|
||||
- UXBOX_DATABASE_PASSWORD=uxbox
|
||||
|
|
Loading…
Add table
Reference in a new issue