From aa1cf3e03a6f0f442fd0fa76708afe1cea37be2b Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 25 Jun 2024 11:54:52 +0200 Subject: [PATCH 1/3] :sparkles: Add some redundancy to delete_object task --- backend/src/app/tasks/delete_object.clj | 45 +++++++++++++++++-------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/backend/src/app/tasks/delete_object.clj b/backend/src/app/tasks/delete_object.clj index 557e35b59..87741b6cc 100644 --- a/backend/src/app/tasks/delete_object.clj +++ b/backend/src/app/tasks/delete_object.clj @@ -20,8 +20,13 @@ (defmethod delete-object :file [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] - (l/trc :hint "marking for deletion" :rel "file" :id (str id)) (when-let [file (db/get* conn :file {:id id} {::db/remove-deleted false})] + (l/trc :hint "marking for deletion" :rel "file" :id (str id)) + (db/update! conn :file + {:deleted-at deleted-at} + {:id id} + {::db/return-keys false}) + (when (and (:is-shared file) (not *team-deletion*)) ;; NOTE: we don't prevent file deletion on absorb operation failure @@ -49,27 +54,39 @@ (defmethod delete-object :project [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] (l/trc :hint "marking for deletion" :rel "project" :id (str id)) - (doseq [file (db/update! conn :file - {:deleted-at deleted-at} - {:project-id id} - {::db/return-keys [:id :deleted-at] - ::db/many true})] - (delete-object cfg (assoc file :object :file)))) + (db/update! conn :project + {:deleted-at deleted-at} + {:id id} + {::db/return-keys false}) + + (doseq [file (db/query conn :file + {:project-id id} + {::db/columns [:id :deleted-at]})] + (delete-object cfg (assoc file + :object :file + :deleted-at deleted-at)))) (defmethod delete-object :team [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] (l/trc :hint "marking for deletion" :rel "team" :id (str id)) + (db/update! conn :team + {:deleted-at deleted-at} + {:id id} + {::db/return-keys false}) + (db/update! conn :team-font-variant {:deleted-at deleted-at} - {:team-id id}) + {:team-id id} + {::db/return-keys false}) (binding [*team-deletion* true] - (doseq [project (db/update! conn :project - {:deleted-at deleted-at} - {:team-id id} - {::db/return-keys [:id :deleted-at] - ::db/many true})] - (delete-object cfg (assoc project :object :project))))) + (doseq [project (db/query conn :project + {:team-id id} + {::db/columns [:id :deleted-at]})] + (delete-object cfg (assoc project + :object :project + :deleted-at deleted-at))))) + (defmethod delete-object :default [_cfg props] From ec4260830c8cd38dcc7ed0d74a14f50c6e7494c4 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 25 Jun 2024 11:56:23 +0200 Subject: [PATCH 2/3] :recycle: Add API consistency fixes for task calling Also adds a helper for calling tasks synchronously --- backend/src/app/email.clj | 13 ++--- backend/src/app/loggers/audit.clj | 22 ++++---- backend/src/app/loggers/webhooks.clj | 24 ++++---- backend/src/app/rpc/commands/files.clj | 10 ++-- backend/src/app/rpc/commands/projects.clj | 10 ++-- backend/src/app/rpc/commands/teams.clj | 10 ++-- backend/src/app/rpc/quotes.clj | 12 ++-- backend/src/app/srepl/main.clj | 55 ++++++++++--------- backend/src/app/storage/gc_deleted.clj | 4 +- backend/src/app/tasks/delete_object.clj | 8 +-- backend/src/app/tasks/file_gc.clj | 8 +-- backend/src/app/tasks/file_xlog_gc.clj | 6 +- backend/src/app/tasks/objects_gc.clj | 4 +- backend/src/app/tasks/orphan_teams_gc.clj | 19 +++---- backend/src/app/tasks/tasks_gc.clj | 6 +- backend/src/app/tasks/telemetry.clj | 12 ++-- backend/src/app/worker.clj | 29 +++++----- backend/test/backend_tests/helpers.clj | 7 ++- .../backend_tests/loggers_webhooks_test.clj | 38 ++++++------- 19 files changed, 145 insertions(+), 152 deletions(-) diff --git a/backend/src/app/email.clj b/backend/src/app/email.clj index 2cc47a37a..0f7e356b6 100644 --- a/backend/src/app/email.clj +++ b/backend/src/app/email.clj @@ -262,13 +262,12 @@ (let [email (if factory (factory context) (dissoc context ::conn))] - (wrk/submit! (merge - {::wrk/task :sendmail - ::wrk/delay 0 - ::wrk/max-retries 4 - ::wrk/priority 200 - ::wrk/conn conn} - email)))) + (wrk/submit! {::wrk/task :sendmail + ::wrk/delay 0 + ::wrk/max-retries 4 + ::wrk/priority 200 + ::db/conn conn + ::wrk/params email}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; SENDMAIL FN / TASK HANDLER diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index 08167da87..32cccb770 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -241,18 +241,16 @@ :else label) dedupe? (boolean (and batch-key batch-timeout))] - (wrk/submit! ::wrk/conn (::db/conn cfg) - ::wrk/task :process-webhook-event - ::wrk/queue :webhooks - ::wrk/max-retries 0 - ::wrk/delay (or batch-timeout 0) - ::wrk/dedupe dedupe? - ::wrk/label label - - ::webhooks/event - (-> params - (dissoc :ip-addr) - (dissoc :type))))) + (wrk/submit! (-> cfg + (assoc ::wrk/task :process-webhook-event) + (assoc ::wrk/queue :webhooks) + (assoc ::wrk/max-retries 0) + (assoc ::wrk/delay (or batch-timeout 0)) + (assoc ::wrk/dedupe dedupe?) + (assoc ::wrk/label label) + (assoc ::wrk/params (-> params + (dissoc :ip-addr) + (dissoc :type))))))) params)) (defn submit! diff --git a/backend/src/app/loggers/webhooks.clj b/backend/src/app/loggers/webhooks.clj index 5f13bc55b..cd6385429 100644 --- a/backend/src/app/loggers/webhooks.clj +++ b/backend/src/app/loggers/webhooks.clj @@ -64,22 +64,22 @@ (s/keys :req [::db/pool])) (defmethod ig/init-key ::process-event-handler - [_ {:keys [::db/pool] :as cfg}] + [_ cfg] (fn [{:keys [props] :as task}] - (let [event (::event props)] + (let [event (:event props)] (l/dbg :hint "process webhook event" :name (:name event)) (when-let [items (lookup-webhooks cfg event)] (l/trc :hint "webhooks found for event" :total (count items)) - (db/with-atomic [conn pool] - (doseq [item items] - (wrk/submit! ::wrk/conn conn - ::wrk/task :run-webhook - ::wrk/queue :webhooks - ::wrk/max-retries 3 - ::event event - ::config item))))))) + (db/tx-run! cfg (fn [cfg] + (doseq [item items] + (wrk/submit! (-> cfg + (assoc ::wrk/task :run-webhook) + (assoc ::wrk/queue :webhooks) + (assoc ::wrk/max-retries 3) + (assoc ::wrk/params {:event event + :config item})))))))))) ;; --- RUN @@ -128,8 +128,8 @@ :rsp-data (db/tjson rsp)}))] (fn [{:keys [props] :as task}] - (let [event (::event props) - whook (::config props) + (let [event (:event props) + whook (:config props) body (case (:mtype whook) "application/json" (json/write-str event json-write-opts) diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index dc48abd6e..b0c74feb1 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -927,11 +927,11 @@ {:id file-id} {::db/return-keys [:id :name :is-shared :deleted-at :project-id :created-at :modified-at]})] - (wrk/submit! {::wrk/task :delete-object - ::wrk/conn conn - :object :file - :deleted-at (:deleted-at file) - :id file-id}) + (wrk/submit! {::db/conn conn + ::wrk/task :delete-object + ::wrk/params {:object :file + :deleted-at (:deleted-at file) + :id file-id}}) file)) (def ^:private diff --git a/backend/src/app/rpc/commands/projects.clj b/backend/src/app/rpc/commands/projects.clj index a8236008e..16e892b17 100644 --- a/backend/src/app/rpc/commands/projects.clj +++ b/backend/src/app/rpc/commands/projects.clj @@ -258,11 +258,11 @@ :code :non-deletable-project :hint "impossible to delete default project")) - (wrk/submit! {::wrk/task :delete-object - ::wrk/conn conn - :object :project - :deleted-at (:deleted-at project) - :id project-id}) + (wrk/submit! {::db/conn conn + ::wrk/task :delete-object + ::wrk/params {:object :project + :deleted-at (:deleted-at project) + :id project-id}}) project)) diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index ed018fa8e..4730ee06e 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -527,11 +527,11 @@ :code :non-deletable-team :hint "impossible to delete default team")) - (wrk/submit! {::wrk/task :delete-object - ::wrk/conn conn - :object :team - :deleted-at deleted-at - :id team-id}) + (wrk/submit! {::db/conn conn + ::wrk/task :delete-object + ::wrk/params {:object :team + :deleted-at deleted-at + :id team-id}}) team)) (def ^:private schema:delete-team diff --git a/backend/src/app/rpc/quotes.clj b/backend/src/app/rpc/quotes.clj index 3244bd03f..87f9bf7f7 100644 --- a/backend/src/app/rpc/quotes.clj +++ b/backend/src/app/rpc/quotes.clj @@ -83,17 +83,17 @@ "- Quote ID: '~(::target params)'\n" "- Max: ~(::quote params)\n" "- Total: ~(::total params) (INCR ~(::incr params 1))\n")] - (wrk/submit! {::wrk/task :sendmail + (wrk/submit! {::db/conn conn + ::wrk/task :sendmail ::wrk/delay (dt/duration "30s") ::wrk/max-retries 4 ::wrk/priority 200 - ::wrk/conn conn ::wrk/dedupe true ::wrk/label "quotes-notification" - :to (vec admins) - :subject subject - :body [{:type "text/plain" - :content content}]})))) + ::wrk/params {:to (vec admins) + :subject subject + :body [{:type "text/plain" + :content content}]}})))) (defn- generic-check! [{:keys [::db/conn ::incr ::quote-sql ::count-sql ::default ::target] :or {incr 1} :as params}] diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 193f72d1c..c71c58db0 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -59,32 +59,27 @@ ([tname] (run-task! tname {})) ([tname params] - (let [tasks (:app.worker/registry main/system) - tname (if (keyword? tname) (name tname) name)] - (if-let [task-fn (get tasks tname)] - (task-fn params) - (println (format "no task '%s' found" tname)))))) + (wrk/invoke! (-> main/system + (assoc ::wrk/task tname) + (assoc ::wrk/params params))))) (defn schedule-task! ([name] (schedule-task! name {})) - ([name props] - (let [pool (:app.db/pool main/system)] - (wrk/submit! - ::wrk/conn pool - ::wrk/task name - ::wrk/props props)))) + ([name params] + (wrk/submit! (-> main/system + (assoc ::wrk/task name) + (assoc ::wrk/params params))))) (defn send-test-email! [destination] - (us/verify! - :expr (string? destination) - :hint "destination should be provided") - - (let [handler (:app.email/sendmail main/system)] - (handler {:body "test email" - :subject "test email" - :to [destination]}))) + (assert (string? destination) "destination should be provided") + (-> main/system + (assoc ::wrk/task :sendmail) + (assoc ::wrk/params {:body "test email" + :subject "test email" + :to [destination]}) + (wrk/invoke!))) (defn resend-email-verification-email! [email] @@ -562,22 +557,30 @@ "Mark a team for deletion" [team-id] (let [team-id (h/parse-uuid team-id)] - (db/tx-run! main/system (fn [{:keys [::db/conn]}] - (#'teams/delete-team conn team-id))))) - + (wrk/invoke! (-> main/system + (assoc ::wrk/task :delete-object) + (assoc ::wrk/params {:object :team + :deleted-at (dt/now) + :id team-id}))))) (defn delete-project! "Mark a project for deletion" [project-id] (let [project-id (h/parse-uuid project-id)] - (db/tx-run! main/system (fn [{:keys [::db/conn]}] - (#'projects/delete-project conn project-id))))) + (wrk/invoke! (-> main/system + (assoc ::wrk/task :delete-object) + (assoc ::wrk/params {:object :project + :deleted-at (dt/now) + :id project-id}))))) (defn delete-file! "Mark a project for deletion" [file-id] (let [file-id (h/parse-uuid file-id)] - (db/tx-run! main/system (fn [{:keys [::db/conn]}] - (#'files/mark-file-deleted conn file-id))))) + (wrk/invoke! (-> main/system + (assoc ::wrk/task :delete-object) + (assoc ::wrk/params {:object :file + :deleted-at (dt/now) + :id file-id}))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MISC diff --git a/backend/src/app/storage/gc_deleted.clj b/backend/src/app/storage/gc_deleted.clj index 8d1d0e5ad..52cdce4b1 100644 --- a/backend/src/app/storage/gc_deleted.clj +++ b/backend/src/app/storage/gc_deleted.clj @@ -110,8 +110,8 @@ (defmethod ig/init-key ::handler [_ {:keys [::min-age] :as cfg}] - (fn [params] - (let [min-age (dt/duration (or (:min-age params) min-age))] + (fn [{:keys [props] :as task}] + (let [min-age (dt/duration (or (:min-age props) min-age))] (db/tx-run! cfg (fn [cfg] (let [cfg (assoc cfg ::min-age min-age) total (clean-deleted! cfg)] diff --git a/backend/src/app/tasks/delete_object.clj b/backend/src/app/tasks/delete_object.clj index 87741b6cc..06bbd36f7 100644 --- a/backend/src/app/tasks/delete_object.clj +++ b/backend/src/app/tasks/delete_object.clj @@ -23,9 +23,9 @@ (when-let [file (db/get* conn :file {:id id} {::db/remove-deleted false})] (l/trc :hint "marking for deletion" :rel "file" :id (str id)) (db/update! conn :file - {:deleted-at deleted-at} - {:id id} - {::db/return-keys false}) + {:deleted-at deleted-at} + {:id id} + {::db/return-keys false}) (when (and (:is-shared file) (not *team-deletion*)) @@ -97,5 +97,5 @@ (defmethod ig/init-key ::handler [_ cfg] - (fn [{:keys [props] :as params}] + (fn [{:keys [props] :as task}] (db/tx-run! cfg delete-object props))) diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index 88f1a74b4..8a2db9c57 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -299,13 +299,13 @@ (defmethod ig/init-key ::handler [_ cfg] - (fn [{:keys [file-id] :as params}] + (fn [{:keys [props] :as task}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] - (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) + (let [min-age (dt/duration (or (:min-age props) (::min-age cfg))) cfg (-> cfg (update ::sto/storage media/configure-assets-storage conn) - (assoc ::file-id file-id) + (assoc ::file-id (:file-id props)) (assoc ::min-age min-age)) total (reduce (fn [total file] @@ -319,7 +319,7 @@ :processed total) ;; Allow optional rollback passed by params - (when (:rollback? params) + (when (:rollback? props) (db/rollback! conn)) {:processed total}))))) diff --git a/backend/src/app/tasks/file_xlog_gc.clj b/backend/src/app/tasks/file_xlog_gc.clj index c88f42a84..4e240d7f7 100644 --- a/backend/src/app/tasks/file_xlog_gc.clj +++ b/backend/src/app/tasks/file_xlog_gc.clj @@ -29,8 +29,8 @@ (defmethod ig/init-key ::handler [_ {:keys [::db/pool] :as cfg}] - (fn [params] - (let [min-age (or (:min-age params) (::min-age cfg))] + (fn [{:keys [props] :as task}] + (let [min-age (or (:min-age props) (::min-age cfg))] (db/with-atomic [conn pool] (let [interval (db/interval min-age) result (db/exec-one! conn [sql:delete-files-xlog interval]) @@ -38,7 +38,7 @@ (l/info :hint "task finished" :min-age (dt/format-duration min-age) :total result) - (when (:rollback? params) + (when (:rollback? props) (db/rollback! conn)) result))))) diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index da9e1232f..16f745836 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -302,8 +302,8 @@ (defmethod ig/init-key ::handler [_ cfg] - (fn [params] - (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) + (fn [{:keys [props] :as task}] + (let [min-age (dt/duration (or (:min-age props) (::min-age cfg))) cfg (-> cfg (assoc ::min-age (db/interval min-age)) (update ::sto/storage media/configure-assets-storage))] diff --git a/backend/src/app/tasks/orphan_teams_gc.clj b/backend/src/app/tasks/orphan_teams_gc.clj index 8869c72cc..5bdb360c0 100644 --- a/backend/src/app/tasks/orphan_teams_gc.clj +++ b/backend/src/app/tasks/orphan_teams_gc.clj @@ -39,12 +39,11 @@ {:deleted-at deleted-at} {:id team-id}) - (wrk/submit! {::wrk/task :delete-object - ::wrk/conn conn - :object :team - :deleted-at deleted-at - :id team-id}) - + (wrk/submit! (-> cfg + (assoc ::wrk/task :delete-object) + (assoc ::wrk/params {:object :team + :deleted-at deleted-at + :id team-id}))) (inc total)) 0)))) @@ -53,15 +52,15 @@ (defmethod ig/init-key ::handler [_ cfg] - (fn [params] + (fn [{:keys [props] :as task}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] - (l/inf :hint "gc started" :rollback? (boolean (:rollback? params))) + (l/inf :hint "gc started" :rollback? (boolean (:rollback? props))) (let [total (delete-orphan-teams cfg)] (l/inf :hint "task finished" :teams total - :rollback? (boolean (:rollback? params))) + :rollback? (boolean (:rollback? props))) - (when (:rollback? params) + (when (:rollback? props) (db/rollback! conn)) {:processed total}))))) diff --git a/backend/src/app/tasks/tasks_gc.clj b/backend/src/app/tasks/tasks_gc.clj index 77f1f92fa..cd6982799 100644 --- a/backend/src/app/tasks/tasks_gc.clj +++ b/backend/src/app/tasks/tasks_gc.clj @@ -27,8 +27,8 @@ (defmethod ig/init-key ::handler [_ {:keys [::db/pool ::min-age] :as cfg}] - (fn [params] - (let [min-age (or (:min-age params) min-age)] + (fn [{:keys [props] :as task}] + (let [min-age (or (:min-age props) min-age)] (db/with-atomic [conn pool] (let [interval (db/interval min-age) result (db/exec-one! conn [sql:delete-completed-tasks interval]) @@ -36,7 +36,7 @@ (l/debug :hint "task finished" :total result) - (when (:rollback? params) + (when (:rollback? props) (db/rollback! conn)) result))))) diff --git a/backend/src/app/tasks/telemetry.clj b/backend/src/app/tasks/telemetry.clj index ec07c67b3..410595f72 100644 --- a/backend/src/app/tasks/telemetry.clj +++ b/backend/src/app/tasks/telemetry.clj @@ -206,14 +206,16 @@ (defmethod ig/init-key ::handler [_ {:keys [::db/pool ::setup/props] :as cfg}] - (fn [{:keys [send? enabled?] :or {send? true enabled? false}}] - (let [subs {:newsletter-updates (get-subscriptions-newsletter-updates pool) - :newsletter-news (get-subscriptions-newsletter-news pool)} - - enabled? (or enabled? + (fn [task] + (let [params (:props task) + send? (get params :send? true) + enabled? (or (get params :enabled? false) (contains? cf/flags :telemetry) (cf/get :telemetry-enabled)) + subs {:newsletter-updates (get-subscriptions-newsletter-updates pool) + :newsletter-news (get-subscriptions-newsletter-news pool)} + data {:subscriptions subs :version (:full cf/version) :instance-id (:instance-id props)}] diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 1da2e8de0..d5a2a8551 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -8,6 +8,7 @@ "Async tasks abstraction (impl)." (:require [app.common.data :as d] + [app.common.data.macros :as dm] [app.common.logging :as l] [app.common.spec :as us] [app.common.uuid :as uuid] @@ -58,17 +59,6 @@ ;; SUBMIT API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn- extract-props - [options] - (let [cns (namespace ::sample)] - (persistent! - (reduce-kv (fn [res k v] - (cond-> res - (not= (namespace k) cns) - (assoc! k v))) - (transient {}) - options)))) - (def ^:private sql:insert-new-task "insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at) values (?, ?, ?, ?, ?, ?, ?, now() + ?) @@ -87,14 +77,13 @@ (s/def ::task (s/or :kw keyword? :str string?)) (s/def ::queue (s/or :kw keyword? :str string?)) (s/def ::delay (s/or :int integer? :duration dt/duration?)) -(s/def ::conn (s/or :pool ::db/pool :connection some?)) (s/def ::priority integer?) (s/def ::max-retries integer?) (s/def ::dedupe boolean?) (s/def ::submit-options (s/and - (s/keys :req [::task ::conn] + (s/keys :req [::task] :opt [::label ::delay ::queue ::priority ::max-retries ::dedupe]) (fn [{:keys [::dedupe ::label] :or {label ""}}] (if dedupe @@ -102,21 +91,23 @@ true)))) (defn submit! - [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label] + [& {:keys [::params ::task ::delay ::queue ::priority ::max-retries ::dedupe ::label] :or {delay 0 queue :default priority 100 max-retries 3 label ""} :as options}] (us/verify! ::submit-options options) (let [duration (dt/duration delay) interval (db/interval duration) - props (-> options extract-props db/tjson) + props (db/tjson params) id (uuid/next) tenant (cf/get :tenant) task (d/name task) queue (str/ffmt "%:%" tenant (d/name queue)) + conn (db/get-connectable options) deleted (when dedupe (-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label]) :next.jdbc/update-count))] + (l/trc :hint "submit task" :name task :task-id (str id) @@ -126,7 +117,13 @@ :delay (dt/format-duration duration) :replace (or deleted 0)) - (db/exec-one! conn [sql:insert-new-task id task props queue label priority max-retries interval]) id)) + +(defn invoke! + [{:keys [::task ::params] :as cfg}] + (assert (contains? cfg :app.worker/registry) + "missing worker registry on `cfg`") + (let [task-fn (dm/get-in cfg [:app.worker/registry (name task)])] + (task-fn {:props params}))) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index cb399e70f..9f754c243 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -34,6 +34,7 @@ [app.util.blob :as blob] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [app.worker.runner] [clojure.java.io :as io] [clojure.spec.alpha :as s] @@ -377,9 +378,9 @@ ([name] (run-task! name {})) ([name params] - (let [tasks (:app.worker/registry *system*)] - (let [task-fn (get tasks (d/name name))] - (task-fn params))))) + (wrk/invoke! (-> *system* + (assoc ::wrk/task name) + (assoc ::wrk/params params))))) (def sql:pending-tasks "select t.* from task as t diff --git a/backend/test/backend_tests/loggers_webhooks_test.clj b/backend/test/backend_tests/loggers_webhooks_test.clj index ab3a4e82e..d0a8e7475 100644 --- a/backend/test/backend_tests/loggers_webhooks_test.clj +++ b/backend/test/backend_tests/loggers_webhooks_test.clj @@ -21,11 +21,10 @@ (with-mocks [submit-mock {:target 'app.worker/submit! :return nil}] (let [prof (th/create-profile* 1 {:is-active true}) res (th/run-task! :process-webhook-event - {:props - {:app.loggers.webhooks/event - {:type "command" - :name "create-project" - :props {:team-id (:default-team-id prof)}}}})] + {:event + {:type "command" + :name "create-project" + :props {:team-id (:default-team-id prof)}}})] (t/is (= 0 (:call-count @submit-mock))) (t/is (nil? res))))) @@ -35,11 +34,10 @@ (let [prof (th/create-profile* 1 {:is-active true}) whk (th/create-webhook* {:team-id (:default-team-id prof)}) res (th/run-task! :process-webhook-event - {:props - {:app.loggers.webhooks/event - {:type "command" - :name "create-project" - :props {:team-id (:default-team-id prof)}}}})] + {:event + {:type "command" + :name "create-project" + :props {:team-id (:default-team-id prof)}}})] (t/is (= 1 (:call-count @submit-mock))) (t/is (nil? res))))) @@ -52,9 +50,8 @@ :name "create-project" :props {:team-id (:default-team-id prof)}} res (th/run-task! :run-webhook - {:props - {:app.loggers.webhooks/event evt - :app.loggers.webhooks/config whk}})] + {:event evt + :config whk})] (t/is (= 1 (:call-count @http-mock))) @@ -75,9 +72,8 @@ :name "create-project" :props {:team-id (:default-team-id prof)}} res (th/run-task! :run-webhook - {:props - {:app.loggers.webhooks/event evt - :app.loggers.webhooks/config whk}})] + {:event evt + :config whk})] (t/is (= 1 (:call-count @http-mock))) @@ -94,14 +90,12 @@ ;; RUN 2 times more (th/run-task! :run-webhook - {:props - {:app.loggers.webhooks/event evt - :app.loggers.webhooks/config whk}}) + {:event evt + :config whk}) (th/run-task! :run-webhook - {:props - {:app.loggers.webhooks/event evt - :app.loggers.webhooks/config whk}}) + {:event evt + :config whk}) (let [rows (th/db-query :webhook-delivery {:webhook-id (:id whk)})] From fc30e81072342fd162f6637e0aeb8171366c1521 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 25 Jun 2024 15:47:24 +0200 Subject: [PATCH 3/3] :bug: Make component changes watcher look on local commits only --- frontend/src/app/main/data/workspace/libraries.cljs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/frontend/src/app/main/data/workspace/libraries.cljs b/frontend/src/app/main/data/workspace/libraries.cljs index b8f700472..81b064c49 100644 --- a/frontend/src/app/main/data/workspace/libraries.cljs +++ b/frontend/src/app/main/data/workspace/libraries.cljs @@ -1164,14 +1164,15 @@ changes-s (->> stream - (rx/filter #(or (dch/commit? %) - (ptk/type? % ::dwn/handle-file-change))) + (rx/filter dch/commit?) + (rx/map deref) + (rx/filter #(= :local (:source %))) (rx/observe-on :async)) check-changes (fn [[event [old-data _mid_data _new-data]]] (when old-data - (let [{:keys [file-id changes save-undo? undo-group]} (deref event) + (let [{:keys [file-id changes save-undo? undo-group]} event changed-components (when (or (nil? file-id) (= file-id (:id old-data))) @@ -1181,7 +1182,7 @@ (if (d/not-empty? changed-components) (if save-undo? - (do (log/info :msg "DETECTED COMPONENTS CHANGED" + (do (log/info :hint "detected component changes" :ids (map str changed-components) :undo-group undo-group) @@ -1190,7 +1191,8 @@ ;; even if save-undo? is false, we need to update the :modified-date of the component ;; (for example, for undos) (->> (rx/from changed-components) - (rx/map #(touch-component %)))) + (rx/map touch-component))) + (rx/empty))))) changes-s