0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-09 08:20:45 -05:00

🎉 Add task deduplication by label

This commit is contained in:
Andrey Antukh 2022-12-13 23:13:11 +01:00
parent ae79ee435e
commit d7459db292
3 changed files with 47 additions and 17 deletions

View file

@ -268,6 +268,9 @@
{:name "0086-add-webhook-delivery-table"
:fn (mg/resource "app/migrations/sql/0086-add-webhook-delivery-table.sql")}
{:name "0087-mod-task-table"
:fn (mg/resource "app/migrations/sql/0087-mod-task-table.sql")}
])

View file

@ -0,0 +1,9 @@
ALTER TABLE task
ADD COLUMN label text NULL;
ALTER TABLE task
ALTER COLUMN label SET STORAGE external;
CREATE INDEX task__label__idx
ON task (label, name, queue)
WHERE status = 'new';

View file

@ -97,7 +97,7 @@
(l/info :hint "registry initialized" :tasks (count tasks))
(reduce-kv (fn [registry k v]
(let [tname (name k)]
(l/debug :hint "register task" :name tname)
(l/trace :hint "register task" :name tname)
(assoc registry tname (wrap-task-handler metrics tname v))))
{}
tasks))
@ -214,10 +214,10 @@
(db/create-array conn "uuid" ids)]]
(db/exec-one! conn sql)
(l/debug :hist "dispatcher: push tasks to redis"
(l/debug :hist "dispatcher: queue tasks"
:queue queue
:tasks (count ids)
:queued res)))
:total-queued res)))
(run-batch! [rconn]
(db/with-atomic [conn pool]
@ -445,7 +445,7 @@
:else
(try
(l/trace :hint "worker: executing task"
(l/debug :hint "worker: executing task"
:worker-id worker-id
:task-id (:id task)
:task-name (:name task)
@ -649,39 +649,57 @@
options))))
(def ^:private sql:insert-new-task
"insert into task (id, name, props, queue, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, now() + ?)
"insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, ?, now() + ?)
returning id")
(def ^:private
sql:remove-not-started-tasks
"delete from task
where name=? and queue=? and label=? and status = 'new' and scheduled_at > now()")
(s/def ::label string?)
(s/def ::task (s/or :kw keyword? :str string?))
(s/def ::queue (s/or :kw keyword? :str string?))
(s/def ::delay (s/or :int ::us/integer :duration dt/duration?))
(s/def ::delay (s/or :int integer? :duration dt/duration?))
(s/def ::conn (s/or :pool ::db/pool :connection some?))
(s/def ::priority ::us/integer)
(s/def ::max-retries ::us/integer)
(s/def ::priority integer?)
(s/def ::max-retries integer?)
(s/def ::dedupe boolean?)
(s/def ::submit-options
(s/keys :req [::task ::conn]
:opt [::delay ::queue ::priority ::max-retries]))
(s/and
(s/keys :req [::task ::conn]
:opt [::label ::delay ::queue ::priority ::max-retries ::dedupe])
(fn [{:keys [::dedupe ::label] :or {label ""}}]
(if dedupe
(not= label "")
true))))
(defn submit!
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn]
:or {delay 0 queue :default priority 100 max-retries 3}
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label]
:or {delay 0 queue :default priority 100 max-retries 3 label ""}
:as options}]
(us/verify ::submit-options options)
(us/verify! ::submit-options options)
(let [duration (dt/duration delay)
interval (db/interval duration)
props (-> options extract-props db/tjson)
id (uuid/next)
tenant (cf/get :tenant)
task (d/name task)
queue (str/ffmt "%:%" tenant (d/name queue))]
queue (str/ffmt "%:%" tenant (d/name queue))
deleted (when dedupe
(-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label])
:next.jdbc/update-count))]
(l/debug :hint "submit task"
:name task
:queue queue
:label label
:dedupe (boolean dedupe)
:deleted (or deleted 0)
:in (dt/format-duration duration))
(db/exec-one! conn [sql:insert-new-task id task props
queue priority max-retries interval])
(db/exec-one! conn [sql:insert-new-task id task props queue
label priority max-retries interval])
id))