diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 1e6ae5644..071890663 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -484,10 +484,7 @@ {::wrk/registry (ig/ref ::wrk/registry) ::db/pool (ig/ref ::db/pool) ::wrk/entries - [{:cron #app/cron "0 0 * * * ?" ;; hourly - :task :file-xlog-gc} - - {:cron #app/cron "0 0 0 * * ?" ;; daily + [{:cron #app/cron "0 0 0 * * ?" ;; daily :task :session-gc} {:cron #app/cron "0 0 0 * * ?" ;; daily diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index e09d45752..e43cc92f8 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -421,7 +421,10 @@ :fn (mg/resource "app/migrations/sql/0132-mod-file-change-table.sql")} {:name "0133-mod-file-table" - :fn (mg/resource "app/migrations/sql/0133-mod-file-table.sql")}]) + :fn (mg/resource "app/migrations/sql/0133-mod-file-table.sql")} + + {:name "0134-mod-file-change-table" + :fn (mg/resource "app/migrations/sql/0134-mod-file-change-table.sql")}]) (defn apply-migrations! [pool name migrations] diff --git a/backend/src/app/migrations/sql/0134-mod-file-change-table.sql b/backend/src/app/migrations/sql/0134-mod-file-change-table.sql new file mode 100644 index 000000000..f6c925f9d --- /dev/null +++ b/backend/src/app/migrations/sql/0134-mod-file-change-table.sql @@ -0,0 +1,18 @@ +ALTER TABLE file_change + ADD COLUMN updated_at timestamptz DEFAULT now(), + ADD COLUMN deleted_at timestamptz DEFAULT NULL, +ALTER COLUMN created_at SET DEFAULT now(); + +DROP INDEX file_change__created_at__idx; +DROP INDEX file_change__created_at__label__idx; +DROP INDEX file_change__label__idx; + +CREATE INDEX file_change__deleted_at__idx + ON file_change (deleted_at, id) + WHERE deleted_at IS NOT NULL; + +CREATE INDEX file_change__system_snapshots__idx + ON file_change (file_id, created_at) + WHERE data IS NOT NULL + AND created_by = 'system' + AND deleted_at IS NULL; diff --git a/backend/src/app/rpc/commands/files_snapshot.clj b/backend/src/app/rpc/commands/files_snapshot.clj index 83626fcae..1c97d65ae 100644 --- a/backend/src/app/rpc/commands/files_snapshot.clj +++ b/backend/src/app/rpc/commands/files_snapshot.clj @@ -10,6 +10,7 @@ [app.common.logging :as l] [app.common.schema :as sm] [app.common.uuid :as uuid] + [app.config :as cf] [app.db :as db] [app.db.sql :as-alias sql] [app.features.fdata :as feat.fdata] @@ -30,6 +31,7 @@ FROM file_change WHERE file_id = ? AND data IS NOT NULL + AND (deleted_at IS NULL OR deleted_at > now()) ORDER BY created_at DESC LIMIT 20") @@ -85,6 +87,11 @@ "system" "user") + deleted-at + (if (= label :system) + (dt/plus (dt/now) (cf/get-deletion-delay)) + nil) + label (if (= label :system) (str "internal/snapshot/" (:revn file)) @@ -113,6 +120,7 @@ :profile-id profile-id :file-id (:id file) :label label + :deleted-at deleted-at :created-by created-by} {::db/return-keys false}) @@ -235,7 +243,8 @@ [conn snapshot-id label] (-> (db/update! conn :file-change {:label label - :created-by "user"} + :created-by "user" + :deleted-at nil} {:id snapshot-id} {::db/return-keys true}) (dissoc :data :features))) @@ -245,7 +254,7 @@ [conn id] (db/get conn :file-change {:id id} - {::sql/columns [:id :file-id :created-by] + {::sql/columns [:id :file-id :created-by :deleted-at] ::db/for-update true})) (sv/defmethod ::update-file-snapshot @@ -264,7 +273,8 @@ (defn- delete-file-snapshot! [conn snapshot-id] - (db/delete! conn :file-change + (db/update! conn :file-change + {:deleted-at (dt/now)} {:id snapshot-id} {::db/return-keys false}) nil) diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index dd41f84c7..fb17be891 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -34,7 +34,7 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] - [app.worker :as-alias wrk] + [app.worker :as wrk] [clojure.set :as set] [promesa.exec :as px])) @@ -44,7 +44,6 @@ (declare ^:private update-file*) (declare ^:private process-changes-and-validate) (declare ^:private take-snapshot?) -(declare ^:private delete-old-snapshots!) ;; PUBLIC API; intended to be used outside of this module (declare update-file!) @@ -224,23 +223,34 @@ (let [storage (sto/resolve cfg ::db/reuse-conn true)] (some->> (:data-ref-id file) (sto/touch-object! storage)))) - ;; TODO: move this to asynchronous task - (when (::snapshot-data file) - (delete-old-snapshots! cfg file)) + (-> cfg + (assoc ::wrk/task :file-xlog-gc) + (assoc ::wrk/label (str "xlog:" (:id file))) + (assoc ::wrk/params {:file-id (:id file)}) + (assoc ::wrk/delay (dt/duration "5m")) + (assoc ::wrk/dedupe true) + (assoc ::wrk/priority 1) + (wrk/submit!)) (persist-file! cfg file) (let [params (assoc params :file file) response {:revn (:revn file) :lagged (get-lagged-changes conn params)} - features (db/create-array conn "text" (:features file))] + features (db/create-array conn "text" (:features file)) + deleted-at (if (::snapshot-data file) + (dt/plus timestamp (cf/get-deletion-delay)) + (dt/plus timestamp (dt/duration {:hours 1})))] - ;; Insert change (xlog) + ;; Insert change (xlog) with deleted_at in a future data for + ;; make them automatically eleggible for GC once they expires (db/insert! conn :file-change {:id (uuid/next) :session-id session-id :profile-id profile-id :created-at timestamp + :updated-at timestamp + :deleted-at deleted-at :file-id (:id file) :revn (:revn file) :version (:version file) @@ -458,33 +468,6 @@ (> (inst-ms (dt/diff modified-at (dt/now))) (inst-ms timeout)))))) -;; Get the latest available snapshots without exceeding the total -;; snapshot limit. -(def ^:private sql:get-latest-snapshots - "SELECT fch.id, fch.created_at - FROM file_change AS fch - WHERE fch.file_id = ? - AND fch.created_by = 'system' - ORDER BY fch.created_at DESC - LIMIT ?") - -;; Mark all snapshots that are outside the allowed total threshold -;; available for the GC. -(def ^:private sql:delete-snapshots - "UPDATE file_change - SET label = NULL - WHERE file_id = ? - AND created_by LIKE 'system' - AND created_at < ?") - -(defn- delete-old-snapshots! - [{:keys [::db/conn] :as cfg} {:keys [id] :as file}] - (when-let [snapshots (not-empty (db/exec! conn [sql:get-latest-snapshots id - (cf/get :auto-file-snapshot-total 10)]))] - (let [last-date (-> snapshots peek :created-at) - result (db/exec-one! conn [sql:delete-snapshots id last-date])] - (l/trc :hint "delete old snapshots" :file-id (str id) :total (db/get-update-count result))))) - (def ^:private sql:lagged-changes "select s.id, s.revn, s.file_id, s.session_id, s.changes diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index a903a6730..279ab63dc 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -44,7 +44,7 @@ f.data_ref_id FROM file_change AS f WHERE f.file_id = ? - AND f.label IS NOT NULL + AND f.data IS NOT NULL ORDER BY f.created_at ASC") (def ^:private sql:mark-file-media-object-deleted diff --git a/backend/src/app/tasks/file_xlog_gc.clj b/backend/src/app/tasks/file_xlog_gc.clj index 6bbacd250..f430e107d 100644 --- a/backend/src/app/tasks/file_xlog_gc.clj +++ b/backend/src/app/tasks/file_xlog_gc.clj @@ -5,47 +5,51 @@ ;; Copyright (c) KALEIDOS INC (ns app.tasks.file-xlog-gc - "A maintenance task that performs a garbage collection of the file - change (transaction) log." (:require [app.common.logging :as l] + [app.config :as cf] [app.db :as db] - [app.features.fdata :as feat.fdata] - [app.storage :as sto] - [app.util.time :as dt] [clojure.spec.alpha :as s] [integrant.core :as ig])) -(def ^:private - sql:delete-files-xlog - "DELETE FROM file_change - WHERE id IN (SELECT id FROM file_change - WHERE label IS NULL - AND created_at < ? - ORDER BY created_at LIMIT ?) - RETURNING id, data_backend, data_ref_id") +;; Get the latest available snapshots without exceeding the total +;; snapshot limit +(def ^:private sql:get-latest-snapshots + "SELECT fch.id, fch.created_at + FROM file_change AS fch + WHERE fch.file_id = ? + AND fch.created_by = 'system' + AND fch.data IS NOT NULL + AND fch.deleted_at > now() + ORDER BY fch.created_at DESC + LIMIT ?") -(def xf:filter-offloded - (comp - (filter feat.fdata/offloaded?) - (keep :data-ref-id))) +;; Mark all snapshots that are outside the allowed total threshold +;; available for the GC +(def ^:private sql:delete-snapshots + "UPDATE file_change + SET deleted_at = now() + WHERE file_id = ? + AND deleted_at > now() + AND data IS NOT NULL + AND created_by = 'system' + AND created_at < ?") -(defn- delete-in-chunks - [{:keys [::chunk-size ::threshold] :as cfg}] - (let [storage (sto/resolve cfg ::db/reuse-conn true)] - (loop [total 0] - (let [chunk (db/exec! cfg [sql:delete-files-xlog threshold chunk-size]) - length (count chunk)] +(defn- get-alive-snapshots + [conn file-id] + (let [total (cf/get :auto-file-snapshot-total 10) + snapshots (db/exec! conn [sql:get-latest-snapshots file-id total])] + (not-empty snapshots))) - ;; touch all references on offloaded changes entries - (doseq [data-ref-id (sequence xf:filter-offloded chunk)] - (l/trc :hint "touching referenced storage object" - :storage-object-id (str data-ref-id)) - (sto/touch-object! storage data-ref-id)) - - (if (pos? length) - (recur (+ total length)) - total))))) +(defn- delete-old-snapshots! + [{:keys [::db/conn] :as cfg} file-id] + (when-let [snapshots (get-alive-snapshots conn file-id)] + (let [last-date (-> snapshots peek :created-at) + result (db/exec-one! conn [sql:delete-snapshots file-id last-date])] + (l/inf :hint "delete old file snapshots" + :file-id (str file-id) + :current (count snapshots) + :deleted (db/get-update-count result))))) (defmethod ig/pre-init-spec ::handler [_] (s/keys :req [::db/pool])) @@ -53,16 +57,8 @@ (defmethod ig/init-key ::handler [_ cfg] (fn [{:keys [props] :as task}] - (let [min-age (or (:min-age props) - (dt/duration "72h")) - chunk-size (:chunk-size props 5000) - threshold (dt/minus (dt/now) min-age)] - + (let [file-id (:file-id props)] + (assert (uuid? file-id) "expected file-id on props") (-> cfg (assoc ::db/rollback (:rollback props false)) - (assoc ::threshold threshold) - (assoc ::chunk-size chunk-size) - (db/tx-run! (fn [cfg] - (let [total (delete-in-chunks cfg)] - (l/trc :hint "file xlog cleaned" :total total) - total))))))) + (db/tx-run! delete-old-snapshots! file-id))))) diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 67ed8f9aa..76fead713 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -27,7 +27,7 @@ (defn- delete-profiles! [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [id photo-id]}] (l/trc :hint "permanently delete" :rel "profile" :id (str id)) @@ -50,7 +50,7 @@ (defn- delete-teams! [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [id photo-id deleted-at]}] (l/trc :hint "permanently delete" :rel "team" @@ -78,7 +78,7 @@ (defn- delete-fonts! [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [id team-id deleted-at] :as font}] (l/trc :hint "permanently delete" :rel "team-font-variant" @@ -110,7 +110,7 @@ (defn- delete-projects! [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] - (->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [id team-id deleted-at]}] (l/trc :hint "permanently delete" :rel "project" @@ -136,7 +136,7 @@ (defn- delete-files! [{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}] - (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [id deleted-at project-id] :as file}] (l/trc :hint "permanently delete" :rel "file" @@ -165,7 +165,7 @@ (defn delete-file-thumbnails! [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [file-id revn media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-thumbnail" @@ -194,7 +194,7 @@ (defn delete-file-object-thumbnails! [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [file-id object-id media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-tagged-object-thumbnail" @@ -223,7 +223,7 @@ (defn- delete-file-data-fragments! [{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}] - (->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [file-id id deleted-at data-ref-id]}] (l/trc :hint "permanently delete" :rel "file-data-fragment" @@ -249,7 +249,7 @@ (defn- delete-file-media-objects! [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 1}) + (->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 5}) (reduce (fn [total {:keys [id file-id deleted-at] :as fmo}] (l/trc :hint "permanently delete" :rel "file-media-object" @@ -266,6 +266,34 @@ (inc total)) 0))) +(def ^:private sql:get-file-change + "SELECT id, file_id, deleted_at, data_backend, data_ref_id + FROM file_change + WHERE deleted_at IS NOT NULL + AND deleted_at < now() - ?::interval + ORDER BY deleted_at ASC + LIMIT ? + FOR UPDATE + SKIP LOCKED") + +(defn- delete-file-change! + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-change min-age chunk-size] {:chunk-size 5}) + (reduce (fn [total {:keys [id file-id deleted-at] :as xlog}] + (l/trc :hint "permanently delete" + :rel "file-change" + :id (str id) + :file-id (str file-id) + :deleted-at (dt/format-instant deleted-at)) + + (when (= "objects-storage" (:data-backend xlog)) + (sto/touch-object! storage (:data-ref-id xlog))) + + (db/delete! conn :file-change {:id id}) + + (inc total)) + 0))) + (def ^:private deletion-proc-vars [#'delete-profiles! #'delete-file-media-objects! @@ -275,7 +303,8 @@ #'delete-files! #'delete-projects! #'delete-fonts! - #'delete-teams!]) + #'delete-teams! + #'delete-file-change!]) (defn- execute-proc! "A generic function that executes the specified proc iterativelly @@ -296,7 +325,7 @@ [_ cfg] (assoc cfg ::min-age (cf/get-deletion-delay) - ::chunk-size 10)) + ::chunk-size 50)) (defmethod ig/init-key ::handler [_ cfg]