From 584a0fdba17011da5854dc2baa1e606bb6e4777e Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 20 May 2024 16:53:36 +0200 Subject: [PATCH 1/4] :sparkles: Reduce locking on objects-gc task The main issue was the long running gc operation that affects storage objects with deduplication. The long running transacion ends locking some storage object rows which collaterally made operations like import-binfile become blocked indefinitelly because of the same rows (because of deduplication). The solution used in this commit is split operations on small chunks so we no longer use long running transactions that holds too many locks. With this approach we will make a window to work concurrently all operarate the distinct operations that requires locks on the same rows. --- backend/src/app/main.clj | 5 + backend/src/app/rpc/commands/files.clj | 11 +- backend/src/app/rpc/commands/projects.clj | 11 +- backend/src/app/rpc/commands/teams.clj | 14 +- backend/src/app/srepl/main.clj | 78 +++++++- backend/src/app/tasks/delete_object.clj | 69 ++++++++ backend/src/app/tasks/objects_gc.clj | 177 ++++++++----------- backend/src/app/worker/runner.clj | 170 +++++++++--------- backend/test/backend_tests/helpers.clj | 15 +- backend/test/backend_tests/rpc_file_test.clj | 3 +- backend/test/backend_tests/rpc_team_test.clj | 4 + 11 files changed, 364 insertions(+), 193 deletions(-) create mode 100644 backend/src/app/tasks/delete_object.clj diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 97171825c..80f0651bf 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -351,6 +351,8 @@ :object-update (ig/ref :app.tasks.object-update/handler) + :delete-object + (ig/ref :app.tasks.delete-object/handler) :process-webhook-event (ig/ref ::webhooks/process-event-handler) :run-webhook @@ -383,6 +385,9 @@ :app.tasks.object-update/handler {::db/pool (ig/ref ::db/pool)} + :app.tasks.delete-object/handler + {::db/pool (ig/ref ::db/pool)} + :app.tasks.file-gc/handler {::db/pool (ig/ref ::db/pool) ::sto/storage (ig/ref ::sto/storage)} diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index 601907e10..e165173f2 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -35,6 +35,7 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str])) @@ -916,7 +917,8 @@ (db/update! conn :file {:deleted-at (dt/now)} {:id file-id} - {::db/return-keys [:id :name :is-shared :project-id :created-at :modified-at]})) + {::db/return-keys [:id :name :is-shared :deleted-at + :project-id :created-at :modified-at]})) (def ^:private schema:delete-file @@ -929,6 +931,13 @@ (check-edition-permissions! conn profile-id id) (let [file (mark-file-deleted! conn id)] + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :file + :deleted-at (:deleted-at file) + :id id}) + ;; NOTE: when a file is a shared library, then we proceed to load ;; the whole file, proceed with feature checking and properly execute ;; the absorb-library procedure diff --git a/backend/src/app/rpc/commands/projects.clj b/backend/src/app/rpc/commands/projects.clj index caa3fe7a0..29cbeaf51 100644 --- a/backend/src/app/rpc/commands/projects.clj +++ b/backend/src/app/rpc/commands/projects.clj @@ -20,6 +20,7 @@ [app.rpc.quotes :as quotes] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s])) (s/def ::id ::us/uuid) @@ -262,10 +263,16 @@ {:deleted-at (dt/now)} {:id id :is-default false} {::db/return-keys true})] + + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :project + :deleted-at (:deleted-at project) + :id id}) + (rph/with-meta (rph/wrap) {::audit/props {:team-id (:team-id project) :name (:name project) :created-at (:created-at project) :modified-at (:modified-at project)}})))) - - diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index 7a21a1241..dabf6c848 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -31,6 +31,7 @@ [app.tokens :as tokens] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str])) @@ -528,14 +529,23 @@ {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] (db/with-atomic [conn pool] - (let [perms (get-permissions conn profile-id id)] + (let [perms (get-permissions conn profile-id id) + deleted-at (dt/now)] + (when-not (:is-owner perms) (ex/raise :type :validation :code :only-owner-can-delete-team)) (db/update! conn :team - {:deleted-at (dt/now)} + {:deleted-at deleted-at} {:id id :is-default false}) + + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :team + :deleted-at deleted-at + :id id}) nil))) diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 48a15d811..622dc840e 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -192,7 +192,6 @@ ;; NOTIFICATIONS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (defn notify! [{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level] :or {code :generic level :info} @@ -474,6 +473,83 @@ :rollback rollback? :elapsed elapsed)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; RESTORE DELETED OBJECTS +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn restore-deleted-team! + "Mark a team and all related objects as not deleted" + [team-id] + (let [team-id (h/parse-uuid team-id)] + (db/tx-run! main/system + (fn [{:keys [::db/conn]}] + (db/update! conn :team-font-variant + {:deleted-at nil} + {:team-id team-id}) + + (doseq [project (db/update! conn :project + {:deleted-at nil} + {:team-id team-id} + {::db/return-keys [:id] + ::db/many true})] + + (doseq [file (db/update! conn :file + {:deleted-at nil + :has-media-trimmed false} + {:project-id (:id project)} + {::db/return-keys [:id] + ::db/many true})] + + ;; Fragments are not handled here because they + ;; use the database cascade operation and they + ;; are not marked for deletion with objects-gc + ;; task + + (db/update! conn :file-media-object + {:deleted-at nil} + {:file-id (:id file)}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at nil} + {:file-id (:id file)}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at nil} + {:file-id (:id file)}))))))) + + +(defn restore-deleted-project! + "Mark a project and all related objects as not deleted" + [project-id] + (let [project-id (h/parse-uuid project-id)] + (db/tx-run! main/system + (fn [{:keys [::db/conn]}] + (doseq [file (db/update! conn :file + {:deleted-at nil + :has-media-trimmed false} + {:project-id project-id} + {::db/return-keys [:id] + ::db/many true})] + + ;; Fragments are not handled here because they use + ;; the database cascade operation and they are not + ;; marked for deletion with objects-gc task + + (db/update! conn :file-media-object + {:deleted-at nil} + {:file-id (:id file)}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at nil} + {:file-id (:id file)}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at nil} + {:file-id (:id file)})))))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MISC ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/tasks/delete_object.clj b/backend/src/app/tasks/delete_object.clj new file mode 100644 index 000000000..f0a60d30a --- /dev/null +++ b/backend/src/app/tasks/delete_object.clj @@ -0,0 +1,69 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.delete-object + "A generic task for object deletion cascade handling" + (:require + [app.common.logging :as l] + [app.db :as db] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(defmulti delete-object + (fn [_ props] (:object props))) + +(defmethod delete-object :file + [{:keys [::db/conn]} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "file" :id id) + ;; Mark file media objects to be deleted + (db/update! conn :file-media-object + {:deleted-at deleted-at} + {:file-id id}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at deleted-at} + {:file-id id}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at deleted-at} + {:file-id id})) + +(defmethod delete-object :project + [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "project" :id id) + (doseq [file (db/update! conn :file + {:deleted-at deleted-at} + {:project-id id} + {::db/return-keys [:id :deleted-at] + ::db/many true})] + (delete-object cfg (assoc file :object :file)))) + +(defmethod delete-object :team + [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "team" :id id) + (db/update! conn :team-font-variant + {:deleted-at deleted-at} + {:team-id id}) + + (doseq [project (db/update! conn :project + {:deleted-at deleted-at} + {:team-id id} + {::db/return-keys [:id :deleted-at] + ::db/many true})] + (delete-object cfg (assoc project :object :project)))) + +(defmethod delete-object :default + [_cfg props] + (l/wrn :hint "not implementation found" :rel (:object props))) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [{:keys [props] :as params}] + (db/tx-run! cfg delete-object props))) diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 3caed3271..da9e1232f 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -17,67 +17,18 @@ [clojure.spec.alpha :as s] [integrant.core :as ig])) -(declare ^:private delete-file-data-fragments!) -(declare ^:private delete-file-media-objects!) -(declare ^:private delete-file-object-thumbnails!) -(declare ^:private delete-file-thumbnails!) -(declare ^:private delete-files!) -(declare ^:private delete-fonts!) -(declare ^:private delete-profiles!) -(declare ^:private delete-projects!) -(declare ^:private delete-teams!) - -(defmethod ig/pre-init-spec ::handler [_] - (s/keys :req [::db/pool ::sto/storage])) - -(defmethod ig/prep-key ::handler - [_ cfg] - (assoc cfg ::min-age cf/deletion-delay)) - -(defmethod ig/init-key ::handler - [_ cfg] - (fn [params] - (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] - ;; Disable deletion protection for the current transaction - (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) - (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"]) - - (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) - cfg (-> cfg - (assoc ::min-age (db/interval min-age)) - (update ::sto/storage media/configure-assets-storage conn)) - - total (reduce + 0 - [(delete-profiles! cfg) - (delete-teams! cfg) - (delete-fonts! cfg) - (delete-projects! cfg) - (delete-files! cfg) - (delete-file-thumbnails! cfg) - (delete-file-object-thumbnails! cfg) - (delete-file-data-fragments! cfg) - (delete-file-media-objects! cfg)])] - - (l/info :hint "task finished" - :deleted total - :rollback? (boolean (:rollback? params))) - - (when (:rollback? params) - (db/rollback! conn)) - - {:processed total}))))) - (def ^:private sql:get-profiles "SELECT id, photo_id FROM profile WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-profiles! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-profiles min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id photo-id]}] (l/trc :hint "permanently delete" :rel "profile" :id (str id)) @@ -99,13 +50,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-teams! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - - (->> (db/cursor conn [sql:get-teams min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id photo-id deleted-at]}] (l/trc :hint "permanently delete" :rel "team" @@ -118,15 +69,6 @@ ;; And finally, permanently delete the team. (db/delete! conn :team {:id id}) - ;; Mark for deletion in cascade - (db/update! conn :team-font-variant - {:deleted-at deleted-at} - {:team-id id}) - - (db/update! conn :project - {:deleted-at deleted-at} - {:team-id id}) - (inc total)) 0))) @@ -136,12 +78,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-fonts! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-fonts min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id team-id deleted-at] :as font}] (l/trc :hint "permanently delete" :rel "team-font-variant" @@ -167,12 +110,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-projects! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-projects min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id team-id deleted-at]}] (l/trc :hint "permanently delete" :rel "project" @@ -183,11 +127,6 @@ ;; And finally, permanently delete the project. (db/delete! conn :project {:id id}) - ;; Mark files to be deleted - (db/update! conn :file - {:deleted-at deleted-at} - {:project-id id}) - (inc total)) 0))) @@ -197,12 +136,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-files! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-files min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id deleted-at project-id]}] (l/trc :hint "permanently delete" :rel "file" @@ -210,26 +150,9 @@ :project-id (str project-id) :deleted-at (dt/format-instant deleted-at)) - ;; NOTE: fragments not handled here because they have - ;; cascade. - ;; And finally, permanently delete the file. (db/delete! conn :file {:id id}) - ;; Mark file media objects to be deleted - (db/update! conn :file-media-object - {:deleted-at deleted-at} - {:file-id id}) - - ;; Mark thumbnails to be deleted - (db/update! conn :file-thumbnail - {:deleted-at deleted-at} - {:file-id id}) - - (db/update! conn :file-tagged-object-thumbnail - {:deleted-at deleted-at} - {:file-id id}) - (inc total)) 0))) @@ -239,12 +162,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn delete-file-thumbnails! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-thumbnails min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id revn media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-thumbnail" @@ -267,12 +191,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn delete-file-object-thumbnails! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-object-thumbnails min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id object-id media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-tagged-object-thumbnail" @@ -295,12 +220,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-file-data-fragments! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-file-data-fragments min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-data-fragment" @@ -319,12 +245,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-file-media-objects! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-media-objects min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id file-id deleted-at] :as fmo}] (l/trc :hint "permanently delete" :rel "file-media-object" @@ -340,3 +267,53 @@ (inc total)) 0))) + +(def ^:private deletion-proc-vars + [#'delete-file-media-objects! + #'delete-file-data-fragments! + #'delete-file-object-thumbnails! + #'delete-file-thumbnails! + #'delete-files! + #'delete-projects! + #'delete-fonts! + #'delete-teams! + #'delete-profiles!]) + +(defn- execute-proc! + "A generic function that executes the specified proc iterativelly + until 0 results is returned" + [cfg proc-fn] + (loop [total 0] + (let [result (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] + (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) + (proc-fn cfg)))] + (if (pos? result) + (recur (+ total result)) + total)))) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool ::sto/storage])) + +(defmethod ig/prep-key ::handler + [_ cfg] + (assoc cfg + ::min-age cf/deletion-delay + ::chunk-size 10)) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [params] + (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) + cfg (-> cfg + (assoc ::min-age (db/interval min-age)) + (update ::sto/storage media/configure-assets-storage))] + + (loop [procs (map deref deletion-proc-vars) + total 0] + (if-let [proc-fn (first procs)] + (let [result (execute-proc! cfg proc-fn)] + (recur (rest procs) + (+ total result))) + (do + (l/inf :hint "task finished" :deleted total) + {:processed total})))))) diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index be3663365..4082c4a3a 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -35,8 +35,92 @@ [_ item] {:params item}) +(defn- get-task + [{:keys [::db/pool]} task-id] + (ex/try! + (some-> (db/get* pool :task {:id task-id}) + (decode-task-row)))) + +(defn- run-task + [{:keys [::wrk/registry ::id ::queue] :as cfg} task] + (try + (l/dbg :hint "start" + :name (:name task) + :task-id (str (:id task)) + :queue queue + :runner-id id + :retry (:retry-num task)) + (let [tpoint (dt/tpoint) + task-fn (get registry (:name task)) + result (if task-fn + (task-fn task) + {:status :completed :task task}) + elapsed (dt/format-duration (tpoint))] + + (when-not task-fn + (l/wrn :hint "no task handler found" :name (:name task))) + + (l/dbg :hint "end" + :name (:name task) + :task-id (str (:id task)) + :queue queue + :runner-id id + :retry (:retry-num task) + :elapsed elapsed) + + result) + + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (let [edata (ex-data cause)] + (if (and (< (:retry-num task) + (:max-retries task)) + (= ::retry (:type edata))) + (cond-> {:status :retry :task task :error cause} + (dt/duration? (:delay edata)) + (assoc :delay (:delay edata)) + + (= ::noop (:strategy edata)) + (assoc :inc-by 0)) + (do + (l/err :hint "unhandled exception on task" + ::l/context (get-error-context cause task) + :cause cause) + (if (>= (:retry-num task) (:max-retries task)) + {:status :failed :task task :error cause} + {:status :retry :task task :error cause}))))))) + +(defn- run-task! + [{:keys [::rds/rconn ::id] :as cfg} task-id] + (loop [task (get-task cfg task-id)] + (cond + (ex/exception? task) + (if (or (db/connection-error? task) + (db/serialization-error? task)) + (do + (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task cfg task-id))) + (do + (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task cfg task-id)))) + + (nil? task) + (l/wrn :hint "no task found on the database" + :id id + :task-id task-id) + + :else + (run-task cfg task)))) + (defn- run-worker-loop! - [{:keys [::db/pool ::rds/rconn ::wrk/registry ::timeout ::queue ::id]}] + [{:keys [::db/pool ::rds/rconn ::timeout ::queue] :as cfg}] (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}] (let [explain (ex-message error) nretry (+ (:retry-num task) inc-by) @@ -82,88 +166,6 @@ :length (alength payload) :cause cause)))) - (handle-task [{:keys [name] :as task}] - (let [task-fn (get registry name)] - (if task-fn - (task-fn task) - (l/wrn :hint "no task handler found" :name name)) - {:status :completed :task task})) - - (handle-task-exception [cause task] - (let [edata (ex-data cause)] - (if (and (< (:retry-num task) - (:max-retries task)) - (= ::retry (:type edata))) - (cond-> {:status :retry :task task :error cause} - (dt/duration? (:delay edata)) - (assoc :delay (:delay edata)) - - (= ::noop (:strategy edata)) - (assoc :inc-by 0)) - (do - (l/err :hint "unhandled exception on task" - ::l/context (get-error-context cause task) - :cause cause) - (if (>= (:retry-num task) (:max-retries task)) - {:status :failed :task task :error cause} - {:status :retry :task task :error cause}))))) - - (get-task [task-id] - (ex/try! - (some-> (db/get* pool :task {:id task-id}) - (decode-task-row)))) - - (run-task [task-id] - (loop [task (get-task task-id)] - (cond - (ex/exception? task) - (if (or (db/connection-error? task) - (db/serialization-error? task)) - (do - (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" - :id id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id))) - (do - (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" - :id id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id)))) - - (nil? task) - (l/wrn :hint "no task found on the database" - :id id - :task-id task-id) - - :else - (try - (l/dbg :hint "start" - :name (:name task) - :task-id (str task-id) - :queue queue - :runner-id id - :retry (:retry-num task)) - (let [tpoint (dt/tpoint) - result (handle-task task) - elapsed (dt/format-duration (tpoint))] - - (l/dbg :hint "end" - :name (:name task) - :task-id (str task-id) - :queue queue - :runner-id id - :retry (:retry-num task) - :elapsed elapsed) - - result) - - (catch InterruptedException cause - (throw cause)) - (catch Throwable cause - (handle-task-exception cause task)))))) - (process-result [{:keys [status] :as result}] (ex/try! (case status @@ -173,7 +175,7 @@ nil))) (run-task-loop [task-id] - (loop [result (run-task task-id)] + (loop [result (run-task! cfg task-id)] (when-let [cause (process-result result)] (if (or (db/connection-error? cause) (db/serialization-error? cause)) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 61b5f42bf..a83cec1b6 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -20,8 +20,8 @@ [app.config :as cf] [app.db :as db] [app.main :as main] - [app.media] [app.media :as-alias mtx] + [app.media] [app.migrations] [app.msgbus :as-alias mbus] [app.rpc :as-alias rpc] @@ -34,6 +34,7 @@ [app.util.blob :as blob] [app.util.services :as sv] [app.util.time :as dt] + [app.worker.runner] [clojure.java.io :as io] [clojure.spec.alpha :as s] [clojure.test :as t] @@ -425,6 +426,18 @@ (let [task-fn (get tasks (d/name name))] (task-fn params))))) +(def sql:pending-tasks + "select t.* from task as t + where t.status = 'new' + order by t.priority desc, t.scheduled_at") + +(defn run-pending-tasks! + [] + (db/tx-run! *system* (fn [{:keys [::db/conn] :as cfg}] + (let [tasks (->> (db/exec! conn [sql:pending-tasks]) + (map #'app.worker.runner/decode-task-row))] + (run! (partial #'app.worker.runner/run-task cfg) tasks))))) + ;; --- UTILS (defn print-error! diff --git a/backend/test/backend_tests/rpc_file_test.clj b/backend/test/backend_tests/rpc_file_test.clj index c50c58252..35d76231f 100644 --- a/backend/test/backend_tests/rpc_file_test.clj +++ b/backend/test/backend_tests/rpc_file_test.clj @@ -1189,6 +1189,7 @@ (t/is (nil? error)) (t/is (map? result))) + ;; insert another thumbnail with different revn (let [data {::th/type :create-file-thumbnail ::rpc/profile-id (:id prof) :file-id (:id file) @@ -1207,8 +1208,6 @@ (t/is (= 2 (count rows))))) (t/testing "gc task" - ;; make the file eligible for GC waiting 300ms (configured - ;; timeout for testing) (let [res (th/run-task! :file-gc {:min-age 0})] (t/is (= 1 (:processed res)))) diff --git a/backend/test/backend_tests/rpc_team_test.clj b/backend/test/backend_tests/rpc_team_test.clj index 65acef49d..3bd6ac3b9 100644 --- a/backend/test/backend_tests/rpc_team_test.clj +++ b/backend/test/backend_tests/rpc_team_test.clj @@ -391,6 +391,8 @@ (t/is (= 1 (count result))) (t/is (= (:default-team-id profile1) (get-in result [0 :id]))))) + (th/run-pending-tasks!) + ;; run permanent deletion (should be noop) (let [result (th/run-task! :objects-gc {:min-age (dt/duration {:minutes 1})})] (t/is (= 0 (:processed result)))) @@ -457,6 +459,8 @@ #_(th/print-result! out) (t/is (nil? (:error out)))) + (th/run-pending-tasks!) + (let [rows (th/db-exec! ["select * from team where id = ?" (:id team)])] (t/is (= 1 (count rows))) (t/is (dt/instant? (:deleted-at (first rows))))) From d241f4525311e2e4050a95051ba3b535a00e487b Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 23 May 2024 15:20:48 +0200 Subject: [PATCH 2/4] :fire: Remove unnecessary async touch operation --- .../src/app/rpc/commands/files_thumbnails.clj | 2 +- backend/src/app/storage.clj | 25 +++++-------------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/backend/src/app/rpc/commands/files_thumbnails.clj b/backend/src/app/rpc/commands/files_thumbnails.clj index bd982ce17..d766acd3c 100644 --- a/backend/src/app/rpc/commands/files_thumbnails.clj +++ b/backend/src/app/rpc/commands/files_thumbnails.clj @@ -271,7 +271,7 @@ (when (and (some? th1) (not= (:media-id th1) (:media-id th2))) - (sto/touch-object! storage (:media-id th1) :async true)) + (sto/touch-object! storage (:media-id th1))) th2)) diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index 070c53f3f..c818b03fa 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -16,7 +16,6 @@ [app.storage.impl :as impl] [app.storage.s3 :as ss3] [app.util.time :as dt] - [app.worker :as wrk] [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] @@ -171,28 +170,16 @@ (impl/put-object object content)) object))) -(def ^:private default-touch-delay - "A default delay for the asynchronous touch operation" - (dt/duration "5m")) - (defn touch-object! "Mark object as touched." - [{:keys [::db/pool-or-conn] :as storage} object-or-id & {:keys [async]}] + [{:keys [::db/pool-or-conn] :as storage} object-or-id] (us/assert! ::storage storage) (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)] - (if async - (wrk/submit! ::wrk/conn pool-or-conn - ::wrk/task :object-update - ::wrk/delay default-touch-delay - :object :storage-object - :id id - :key :touched-at - :val (dt/now)) - (-> (db/update! pool-or-conn :storage-object - {:touched-at (dt/now)} - {:id id}) - (db/get-update-count) - (pos?))))) + (-> (db/update! pool-or-conn :storage-object + {:touched-at (dt/now)} + {:id id}) + (db/get-update-count) + (pos?)))) (defn get-object-data "Return an input stream instance of the object content." From f3346786ea9fc0fc79bd22a3489406a7e7a02afb Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 23 May 2024 15:30:13 +0200 Subject: [PATCH 3/4] :fire: Remove unused object-update task --- backend/src/app/main.clj | 5 --- backend/src/app/tasks/object_update.clj | 32 ------------------- .../rpc_file_thumbnails_test.clj | 10 +----- 3 files changed, 1 insertion(+), 46 deletions(-) delete mode 100644 backend/src/app/tasks/object_update.clj diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 80f0651bf..023783828 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -349,8 +349,6 @@ :audit-log-archive (ig/ref :app.loggers.audit.archive-task/handler) :audit-log-gc (ig/ref :app.loggers.audit.gc-task/handler) - :object-update - (ig/ref :app.tasks.object-update/handler) :delete-object (ig/ref :app.tasks.delete-object/handler) :process-webhook-event @@ -382,9 +380,6 @@ :app.tasks.orphan-teams-gc/handler {::db/pool (ig/ref ::db/pool)} - :app.tasks.object-update/handler - {::db/pool (ig/ref ::db/pool)} - :app.tasks.delete-object/handler {::db/pool (ig/ref ::db/pool)} diff --git a/backend/src/app/tasks/object_update.clj b/backend/src/app/tasks/object_update.clj deleted file mode 100644 index cfe5fda44..000000000 --- a/backend/src/app/tasks/object_update.clj +++ /dev/null @@ -1,32 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) KALEIDOS INC - -(ns app.tasks.object-update - "A task used for perform simple object properties update - in an asynchronous flow." - (:require - [app.common.data :as d] - [app.common.logging :as l] - [app.db :as db] - [clojure.spec.alpha :as s] - [integrant.core :as ig])) - -(defn- update-object - [{:keys [::db/conn] :as cfg} {:keys [id object key val] :as props}] - (l/trc :hint "update object prop" - :id (str id) - :object (d/name object) - :key (d/name key) - :val val) - (db/update! conn object {key val} {:id id} {::db/return-keys false})) - -(defmethod ig/pre-init-spec ::handler [_] - (s/keys :req [::db/pool])) - -(defmethod ig/init-key ::handler - [_ cfg] - (fn [{:keys [props] :as params}] - (db/tx-run! cfg update-object props))) diff --git a/backend/test/backend_tests/rpc_file_thumbnails_test.clj b/backend/test/backend_tests/rpc_file_thumbnails_test.clj index 11ed4f352..e5cd918b1 100644 --- a/backend/test/backend_tests/rpc_file_thumbnails_test.clj +++ b/backend/test/backend_tests/rpc_file_thumbnails_test.clj @@ -346,13 +346,5 @@ (assoc :size 312043)))) out (th/command! data)] (t/is (nil? (:error out))) - (t/is (map? (:result out)))) + (t/is (map? (:result out)))))) - (let [[row1 :as rows] - (->> (th/db-query :task {:name "object-update"}) - (map #(update % :props db/decode-transit-pgobject)))] - - ;; (app.common.pprint/pprint rows) - (t/is (= 1 (count rows))) - (t/is (> (inst-ms (dt/diff (:created-at row1) (:scheduled-at row1))) - (inst-ms (dt/duration "4m"))))))) From 72facff282314ceb3b93cb4180c3b806dc7319c3 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 23 May 2024 16:34:37 +0200 Subject: [PATCH 4/4] :fire: Remove unnecessary code from test helpers --- backend/test/backend_tests/helpers.clj | 49 ++------------------------ 1 file changed, 2 insertions(+), 47 deletions(-) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index a83cec1b6..12d76785e 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -20,8 +20,8 @@ [app.config :as cf] [app.db :as db] [app.main :as main] - [app.media :as-alias mtx] [app.media] + [app.media :as-alias mtx] [app.migrations] [app.msgbus :as-alias mbus] [app.rpc :as-alias rpc] @@ -78,47 +78,6 @@ :enable-feature-components-v2 :disable-file-validation]) -(def test-init-sql - ["alter table project_profile_rel set unlogged;\n" - "alter table file_profile_rel set unlogged;\n" - "alter table presence set unlogged;\n" - "alter table presence set unlogged;\n" - "alter table http_session set unlogged;\n" - "alter table team_profile_rel set unlogged;\n" - "alter table team_project_profile_rel set unlogged;\n" - "alter table comment_thread_status set unlogged;\n" - "alter table comment set unlogged;\n" - "alter table comment_thread set unlogged;\n" - "alter table profile_complaint_report set unlogged;\n" - "alter table file_change set unlogged;\n" - "alter table team_font_variant set unlogged;\n" - "alter table share_link set unlogged;\n" - "alter table usage_quote set unlogged;\n" - "alter table access_token set unlogged;\n" - "alter table profile set unlogged;\n" - "alter table file_library_rel set unlogged;\n" - "alter table file_thumbnail set unlogged;\n" - "alter table file_object_thumbnail set unlogged;\n" - "alter table file_tagged_object_thumbnail set unlogged;\n" - "alter table file_media_object set unlogged;\n" - "alter table file_data_fragment set unlogged;\n" - "alter table file set unlogged;\n" - "alter table project set unlogged;\n" - "alter table team_invitation set unlogged;\n" - "alter table webhook_delivery set unlogged;\n" - "alter table webhook set unlogged;\n" - "alter table team set unlogged;\n" - ;; For some reason, modifying the task realted tables is very very - ;; slow (5s); so we just don't alter them - ;; "alter table task set unlogged;\n" - ;; "alter table task_default set unlogged;\n" - ;; "alter table task_completed set unlogged;\n" - "alter table audit_log set unlogged ;\n" - "alter table storage_object set unlogged;\n" - "alter table server_error_report set unlogged;\n" - "alter table server_prop set unlogged;\n" - "alter table global_complaint_report set unlogged;\n"]) - (defn state-init [next] (with-redefs [app.config/flags (flags/parse flags/default default-flags) @@ -165,9 +124,6 @@ (try (binding [*system* system *pool* (:app.db/pool system)] - (db/with-atomic [conn *pool*] - (doseq [sql test-init-sql] - (db/exec! conn [sql]))) (next)) (finally (ig/halt! system)))))) @@ -182,8 +138,7 @@ (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"]) (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) (let [result (->> (db/exec! conn [sql]) - (map :table-name) - (remove #(= "task" %)))] + (map :table-name))] (doseq [table result] (db/exec! conn [(str "delete from " table ";")]))))