0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-04-04 02:51:20 -05:00

♻️ Add API consistency fixes for task calling

Also adds a helper for calling tasks synchronously
This commit is contained in:
Andrey Antukh 2024-06-25 11:56:23 +02:00
parent aa1cf3e03a
commit ec4260830c
19 changed files with 145 additions and 152 deletions

View file

@ -262,13 +262,12 @@
(let [email (if factory
(factory context)
(dissoc context ::conn))]
(wrk/submit! (merge
{::wrk/task :sendmail
::wrk/delay 0
::wrk/max-retries 4
::wrk/priority 200
::wrk/conn conn}
email))))
(wrk/submit! {::wrk/task :sendmail
::wrk/delay 0
::wrk/max-retries 4
::wrk/priority 200
::db/conn conn
::wrk/params email})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SENDMAIL FN / TASK HANDLER

View file

@ -241,18 +241,16 @@
:else label)
dedupe? (boolean (and batch-key batch-timeout))]
(wrk/submit! ::wrk/conn (::db/conn cfg)
::wrk/task :process-webhook-event
::wrk/queue :webhooks
::wrk/max-retries 0
::wrk/delay (or batch-timeout 0)
::wrk/dedupe dedupe?
::wrk/label label
::webhooks/event
(-> params
(dissoc :ip-addr)
(dissoc :type)))))
(wrk/submit! (-> cfg
(assoc ::wrk/task :process-webhook-event)
(assoc ::wrk/queue :webhooks)
(assoc ::wrk/max-retries 0)
(assoc ::wrk/delay (or batch-timeout 0))
(assoc ::wrk/dedupe dedupe?)
(assoc ::wrk/label label)
(assoc ::wrk/params (-> params
(dissoc :ip-addr)
(dissoc :type)))))))
params))
(defn submit!

View file

@ -64,22 +64,22 @@
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::process-event-handler
[_ {:keys [::db/pool] :as cfg}]
[_ cfg]
(fn [{:keys [props] :as task}]
(let [event (::event props)]
(let [event (:event props)]
(l/dbg :hint "process webhook event" :name (:name event))
(when-let [items (lookup-webhooks cfg event)]
(l/trc :hint "webhooks found for event" :total (count items))
(db/with-atomic [conn pool]
(doseq [item items]
(wrk/submit! ::wrk/conn conn
::wrk/task :run-webhook
::wrk/queue :webhooks
::wrk/max-retries 3
::event event
::config item)))))))
(db/tx-run! cfg (fn [cfg]
(doseq [item items]
(wrk/submit! (-> cfg
(assoc ::wrk/task :run-webhook)
(assoc ::wrk/queue :webhooks)
(assoc ::wrk/max-retries 3)
(assoc ::wrk/params {:event event
:config item}))))))))))
;; --- RUN
@ -128,8 +128,8 @@
:rsp-data (db/tjson rsp)}))]
(fn [{:keys [props] :as task}]
(let [event (::event props)
whook (::config props)
(let [event (:event props)
whook (:config props)
body (case (:mtype whook)
"application/json" (json/write-str event json-write-opts)

View file

@ -927,11 +927,11 @@
{:id file-id}
{::db/return-keys [:id :name :is-shared :deleted-at
:project-id :created-at :modified-at]})]
(wrk/submit! {::wrk/task :delete-object
::wrk/conn conn
:object :file
:deleted-at (:deleted-at file)
:id file-id})
(wrk/submit! {::db/conn conn
::wrk/task :delete-object
::wrk/params {:object :file
:deleted-at (:deleted-at file)
:id file-id}})
file))
(def ^:private

View file

@ -258,11 +258,11 @@
:code :non-deletable-project
:hint "impossible to delete default project"))
(wrk/submit! {::wrk/task :delete-object
::wrk/conn conn
:object :project
:deleted-at (:deleted-at project)
:id project-id})
(wrk/submit! {::db/conn conn
::wrk/task :delete-object
::wrk/params {:object :project
:deleted-at (:deleted-at project)
:id project-id}})
project))

View file

@ -527,11 +527,11 @@
:code :non-deletable-team
:hint "impossible to delete default team"))
(wrk/submit! {::wrk/task :delete-object
::wrk/conn conn
:object :team
:deleted-at deleted-at
:id team-id})
(wrk/submit! {::db/conn conn
::wrk/task :delete-object
::wrk/params {:object :team
:deleted-at deleted-at
:id team-id}})
team))
(def ^:private schema:delete-team

View file

@ -83,17 +83,17 @@
"- Quote ID: '~(::target params)'\n"
"- Max: ~(::quote params)\n"
"- Total: ~(::total params) (INCR ~(::incr params 1))\n")]
(wrk/submit! {::wrk/task :sendmail
(wrk/submit! {::db/conn conn
::wrk/task :sendmail
::wrk/delay (dt/duration "30s")
::wrk/max-retries 4
::wrk/priority 200
::wrk/conn conn
::wrk/dedupe true
::wrk/label "quotes-notification"
:to (vec admins)
:subject subject
:body [{:type "text/plain"
:content content}]}))))
::wrk/params {:to (vec admins)
:subject subject
:body [{:type "text/plain"
:content content}]}}))))
(defn- generic-check!
[{:keys [::db/conn ::incr ::quote-sql ::count-sql ::default ::target] :or {incr 1} :as params}]

View file

@ -59,32 +59,27 @@
([tname]
(run-task! tname {}))
([tname params]
(let [tasks (:app.worker/registry main/system)
tname (if (keyword? tname) (name tname) name)]
(if-let [task-fn (get tasks tname)]
(task-fn params)
(println (format "no task '%s' found" tname))))))
(wrk/invoke! (-> main/system
(assoc ::wrk/task tname)
(assoc ::wrk/params params)))))
(defn schedule-task!
([name]
(schedule-task! name {}))
([name props]
(let [pool (:app.db/pool main/system)]
(wrk/submit!
::wrk/conn pool
::wrk/task name
::wrk/props props))))
([name params]
(wrk/submit! (-> main/system
(assoc ::wrk/task name)
(assoc ::wrk/params params)))))
(defn send-test-email!
[destination]
(us/verify!
:expr (string? destination)
:hint "destination should be provided")
(let [handler (:app.email/sendmail main/system)]
(handler {:body "test email"
:subject "test email"
:to [destination]})))
(assert (string? destination) "destination should be provided")
(-> main/system
(assoc ::wrk/task :sendmail)
(assoc ::wrk/params {:body "test email"
:subject "test email"
:to [destination]})
(wrk/invoke!)))
(defn resend-email-verification-email!
[email]
@ -562,22 +557,30 @@
"Mark a team for deletion"
[team-id]
(let [team-id (h/parse-uuid team-id)]
(db/tx-run! main/system (fn [{:keys [::db/conn]}]
(#'teams/delete-team conn team-id)))))
(wrk/invoke! (-> main/system
(assoc ::wrk/task :delete-object)
(assoc ::wrk/params {:object :team
:deleted-at (dt/now)
:id team-id})))))
(defn delete-project!
"Mark a project for deletion"
[project-id]
(let [project-id (h/parse-uuid project-id)]
(db/tx-run! main/system (fn [{:keys [::db/conn]}]
(#'projects/delete-project conn project-id)))))
(wrk/invoke! (-> main/system
(assoc ::wrk/task :delete-object)
(assoc ::wrk/params {:object :project
:deleted-at (dt/now)
:id project-id})))))
(defn delete-file!
"Mark a project for deletion"
[file-id]
(let [file-id (h/parse-uuid file-id)]
(db/tx-run! main/system (fn [{:keys [::db/conn]}]
(#'files/mark-file-deleted conn file-id)))))
(wrk/invoke! (-> main/system
(assoc ::wrk/task :delete-object)
(assoc ::wrk/params {:object :file
:deleted-at (dt/now)
:id file-id})))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; MISC

View file

@ -110,8 +110,8 @@
(defmethod ig/init-key ::handler
[_ {:keys [::min-age] :as cfg}]
(fn [params]
(let [min-age (dt/duration (or (:min-age params) min-age))]
(fn [{:keys [props] :as task}]
(let [min-age (dt/duration (or (:min-age props) min-age))]
(db/tx-run! cfg (fn [cfg]
(let [cfg (assoc cfg ::min-age min-age)
total (clean-deleted! cfg)]

View file

@ -23,9 +23,9 @@
(when-let [file (db/get* conn :file {:id id} {::db/remove-deleted false})]
(l/trc :hint "marking for deletion" :rel "file" :id (str id))
(db/update! conn :file
{:deleted-at deleted-at}
{:id id}
{::db/return-keys false})
{:deleted-at deleted-at}
{:id id}
{::db/return-keys false})
(when (and (:is-shared file)
(not *team-deletion*))
@ -97,5 +97,5 @@
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as params}]
(fn [{:keys [props] :as task}]
(db/tx-run! cfg delete-object props)))

View file

@ -299,13 +299,13 @@
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [file-id] :as params}]
(fn [{:keys [props] :as task}]
(db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}]
(let [min-age (dt/duration (or (:min-age params) (::min-age cfg)))
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))
cfg (-> cfg
(update ::sto/storage media/configure-assets-storage conn)
(assoc ::file-id file-id)
(assoc ::file-id (:file-id props))
(assoc ::min-age min-age))
total (reduce (fn [total file]
@ -319,7 +319,7 @@
:processed total)
;; Allow optional rollback passed by params
(when (:rollback? params)
(when (:rollback? props)
(db/rollback! conn))
{:processed total})))))

View file

@ -29,8 +29,8 @@
(defmethod ig/init-key ::handler
[_ {:keys [::db/pool] :as cfg}]
(fn [params]
(let [min-age (or (:min-age params) (::min-age cfg))]
(fn [{:keys [props] :as task}]
(let [min-age (or (:min-age props) (::min-age cfg))]
(db/with-atomic [conn pool]
(let [interval (db/interval min-age)
result (db/exec-one! conn [sql:delete-files-xlog interval])
@ -38,7 +38,7 @@
(l/info :hint "task finished" :min-age (dt/format-duration min-age) :total result)
(when (:rollback? params)
(when (:rollback? props)
(db/rollback! conn))
result)))))

View file

@ -302,8 +302,8 @@
(defmethod ig/init-key ::handler
[_ cfg]
(fn [params]
(let [min-age (dt/duration (or (:min-age params) (::min-age cfg)))
(fn [{:keys [props] :as task}]
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))
cfg (-> cfg
(assoc ::min-age (db/interval min-age))
(update ::sto/storage media/configure-assets-storage))]

View file

@ -39,12 +39,11 @@
{:deleted-at deleted-at}
{:id team-id})
(wrk/submit! {::wrk/task :delete-object
::wrk/conn conn
:object :team
:deleted-at deleted-at
:id team-id})
(wrk/submit! (-> cfg
(assoc ::wrk/task :delete-object)
(assoc ::wrk/params {:object :team
:deleted-at deleted-at
:id team-id})))
(inc total))
0))))
@ -53,15 +52,15 @@
(defmethod ig/init-key ::handler
[_ cfg]
(fn [params]
(fn [{:keys [props] :as task}]
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(l/inf :hint "gc started" :rollback? (boolean (:rollback? params)))
(l/inf :hint "gc started" :rollback? (boolean (:rollback? props)))
(let [total (delete-orphan-teams cfg)]
(l/inf :hint "task finished"
:teams total
:rollback? (boolean (:rollback? params)))
:rollback? (boolean (:rollback? props)))
(when (:rollback? params)
(when (:rollback? props)
(db/rollback! conn))
{:processed total})))))

View file

@ -27,8 +27,8 @@
(defmethod ig/init-key ::handler
[_ {:keys [::db/pool ::min-age] :as cfg}]
(fn [params]
(let [min-age (or (:min-age params) min-age)]
(fn [{:keys [props] :as task}]
(let [min-age (or (:min-age props) min-age)]
(db/with-atomic [conn pool]
(let [interval (db/interval min-age)
result (db/exec-one! conn [sql:delete-completed-tasks interval])
@ -36,7 +36,7 @@
(l/debug :hint "task finished" :total result)
(when (:rollback? params)
(when (:rollback? props)
(db/rollback! conn))
result)))))

View file

@ -206,14 +206,16 @@
(defmethod ig/init-key ::handler
[_ {:keys [::db/pool ::setup/props] :as cfg}]
(fn [{:keys [send? enabled?] :or {send? true enabled? false}}]
(let [subs {:newsletter-updates (get-subscriptions-newsletter-updates pool)
:newsletter-news (get-subscriptions-newsletter-news pool)}
enabled? (or enabled?
(fn [task]
(let [params (:props task)
send? (get params :send? true)
enabled? (or (get params :enabled? false)
(contains? cf/flags :telemetry)
(cf/get :telemetry-enabled))
subs {:newsletter-updates (get-subscriptions-newsletter-updates pool)
:newsletter-news (get-subscriptions-newsletter-news pool)}
data {:subscriptions subs
:version (:full cf/version)
:instance-id (:instance-id props)}]

View file

@ -8,6 +8,7 @@
"Async tasks abstraction (impl)."
(:require
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.uuid :as uuid]
@ -58,17 +59,6 @@
;; SUBMIT API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- extract-props
[options]
(let [cns (namespace ::sample)]
(persistent!
(reduce-kv (fn [res k v]
(cond-> res
(not= (namespace k) cns)
(assoc! k v)))
(transient {})
options))))
(def ^:private sql:insert-new-task
"insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, ?, now() + ?)
@ -87,14 +77,13 @@
(s/def ::task (s/or :kw keyword? :str string?))
(s/def ::queue (s/or :kw keyword? :str string?))
(s/def ::delay (s/or :int integer? :duration dt/duration?))
(s/def ::conn (s/or :pool ::db/pool :connection some?))
(s/def ::priority integer?)
(s/def ::max-retries integer?)
(s/def ::dedupe boolean?)
(s/def ::submit-options
(s/and
(s/keys :req [::task ::conn]
(s/keys :req [::task]
:opt [::label ::delay ::queue ::priority ::max-retries ::dedupe])
(fn [{:keys [::dedupe ::label] :or {label ""}}]
(if dedupe
@ -102,21 +91,23 @@
true))))
(defn submit!
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label]
[& {:keys [::params ::task ::delay ::queue ::priority ::max-retries ::dedupe ::label]
:or {delay 0 queue :default priority 100 max-retries 3 label ""}
:as options}]
(us/verify! ::submit-options options)
(let [duration (dt/duration delay)
interval (db/interval duration)
props (-> options extract-props db/tjson)
props (db/tjson params)
id (uuid/next)
tenant (cf/get :tenant)
task (d/name task)
queue (str/ffmt "%:%" tenant (d/name queue))
conn (db/get-connectable options)
deleted (when dedupe
(-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label])
:next.jdbc/update-count))]
(l/trc :hint "submit task"
:name task
:task-id (str id)
@ -126,7 +117,13 @@
:delay (dt/format-duration duration)
:replace (or deleted 0))
(db/exec-one! conn [sql:insert-new-task id task props queue
label priority max-retries interval])
id))
(defn invoke!
[{:keys [::task ::params] :as cfg}]
(assert (contains? cfg :app.worker/registry)
"missing worker registry on `cfg`")
(let [task-fn (dm/get-in cfg [:app.worker/registry (name task)])]
(task-fn {:props params})))

View file

@ -34,6 +34,7 @@
[app.util.blob :as blob]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as wrk]
[app.worker.runner]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
@ -377,9 +378,9 @@
([name]
(run-task! name {}))
([name params]
(let [tasks (:app.worker/registry *system*)]
(let [task-fn (get tasks (d/name name))]
(task-fn params)))))
(wrk/invoke! (-> *system*
(assoc ::wrk/task name)
(assoc ::wrk/params params)))))
(def sql:pending-tasks
"select t.* from task as t

View file

@ -21,11 +21,10 @@
(with-mocks [submit-mock {:target 'app.worker/submit! :return nil}]
(let [prof (th/create-profile* 1 {:is-active true})
res (th/run-task! :process-webhook-event
{:props
{:app.loggers.webhooks/event
{:type "command"
:name "create-project"
:props {:team-id (:default-team-id prof)}}}})]
{:event
{:type "command"
:name "create-project"
:props {:team-id (:default-team-id prof)}}})]
(t/is (= 0 (:call-count @submit-mock)))
(t/is (nil? res)))))
@ -35,11 +34,10 @@
(let [prof (th/create-profile* 1 {:is-active true})
whk (th/create-webhook* {:team-id (:default-team-id prof)})
res (th/run-task! :process-webhook-event
{:props
{:app.loggers.webhooks/event
{:type "command"
:name "create-project"
:props {:team-id (:default-team-id prof)}}}})]
{:event
{:type "command"
:name "create-project"
:props {:team-id (:default-team-id prof)}}})]
(t/is (= 1 (:call-count @submit-mock)))
(t/is (nil? res)))))
@ -52,9 +50,8 @@
:name "create-project"
:props {:team-id (:default-team-id prof)}}
res (th/run-task! :run-webhook
{:props
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})]
{:event evt
:config whk})]
(t/is (= 1 (:call-count @http-mock)))
@ -75,9 +72,8 @@
:name "create-project"
:props {:team-id (:default-team-id prof)}}
res (th/run-task! :run-webhook
{:props
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})]
{:event evt
:config whk})]
(t/is (= 1 (:call-count @http-mock)))
@ -94,14 +90,12 @@
;; RUN 2 times more
(th/run-task! :run-webhook
{:props
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})
{:event evt
:config whk})
(th/run-task! :run-webhook
{:props
{:app.loggers.webhooks/event evt
:app.loggers.webhooks/config whk}})
{:event evt
:config whk})
(let [rows (th/db-query :webhook-delivery {:webhook-id (:id whk)})]