mirror of
https://github.com/penpot/penpot.git
synced 2025-01-06 14:50:20 -05:00
♻️ Refactor file changes gc tasks
Make it more friendly with the current snapshoting mechanism
This commit is contained in:
parent
5f4af76d28
commit
32126d1874
8 changed files with 133 additions and 97 deletions
|
@ -484,10 +484,7 @@
|
|||
{::wrk/registry (ig/ref ::wrk/registry)
|
||||
::db/pool (ig/ref ::db/pool)
|
||||
::wrk/entries
|
||||
[{:cron #app/cron "0 0 * * * ?" ;; hourly
|
||||
:task :file-xlog-gc}
|
||||
|
||||
{:cron #app/cron "0 0 0 * * ?" ;; daily
|
||||
[{:cron #app/cron "0 0 0 * * ?" ;; daily
|
||||
:task :session-gc}
|
||||
|
||||
{:cron #app/cron "0 0 0 * * ?" ;; daily
|
||||
|
|
|
@ -421,7 +421,10 @@
|
|||
:fn (mg/resource "app/migrations/sql/0132-mod-file-change-table.sql")}
|
||||
|
||||
{:name "0133-mod-file-table"
|
||||
:fn (mg/resource "app/migrations/sql/0133-mod-file-table.sql")}])
|
||||
:fn (mg/resource "app/migrations/sql/0133-mod-file-table.sql")}
|
||||
|
||||
{:name "0134-mod-file-change-table"
|
||||
:fn (mg/resource "app/migrations/sql/0134-mod-file-change-table.sql")}])
|
||||
|
||||
(defn apply-migrations!
|
||||
[pool name migrations]
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
ALTER TABLE file_change
|
||||
ADD COLUMN updated_at timestamptz DEFAULT now(),
|
||||
ADD COLUMN deleted_at timestamptz DEFAULT NULL,
|
||||
ALTER COLUMN created_at SET DEFAULT now();
|
||||
|
||||
DROP INDEX file_change__created_at__idx;
|
||||
DROP INDEX file_change__created_at__label__idx;
|
||||
DROP INDEX file_change__label__idx;
|
||||
|
||||
CREATE INDEX file_change__deleted_at__idx
|
||||
ON file_change (deleted_at, id)
|
||||
WHERE deleted_at IS NOT NULL;
|
||||
|
||||
CREATE INDEX file_change__system_snapshots__idx
|
||||
ON file_change (file_id, created_at)
|
||||
WHERE data IS NOT NULL
|
||||
AND created_by = 'system'
|
||||
AND deleted_at IS NULL;
|
|
@ -10,6 +10,7 @@
|
|||
[app.common.logging :as l]
|
||||
[app.common.schema :as sm]
|
||||
[app.common.uuid :as uuid]
|
||||
[app.config :as cf]
|
||||
[app.db :as db]
|
||||
[app.db.sql :as-alias sql]
|
||||
[app.features.fdata :as feat.fdata]
|
||||
|
@ -30,6 +31,7 @@
|
|||
FROM file_change
|
||||
WHERE file_id = ?
|
||||
AND data IS NOT NULL
|
||||
AND (deleted_at IS NULL OR deleted_at > now())
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 20")
|
||||
|
||||
|
@ -85,6 +87,11 @@
|
|||
"system"
|
||||
"user")
|
||||
|
||||
deleted-at
|
||||
(if (= label :system)
|
||||
(dt/plus (dt/now) (cf/get-deletion-delay))
|
||||
nil)
|
||||
|
||||
label
|
||||
(if (= label :system)
|
||||
(str "internal/snapshot/" (:revn file))
|
||||
|
@ -113,6 +120,7 @@
|
|||
:profile-id profile-id
|
||||
:file-id (:id file)
|
||||
:label label
|
||||
:deleted-at deleted-at
|
||||
:created-by created-by}
|
||||
{::db/return-keys false})
|
||||
|
||||
|
@ -235,7 +243,8 @@
|
|||
[conn snapshot-id label]
|
||||
(-> (db/update! conn :file-change
|
||||
{:label label
|
||||
:created-by "user"}
|
||||
:created-by "user"
|
||||
:deleted-at nil}
|
||||
{:id snapshot-id}
|
||||
{::db/return-keys true})
|
||||
(dissoc :data :features)))
|
||||
|
@ -245,7 +254,7 @@
|
|||
[conn id]
|
||||
(db/get conn :file-change
|
||||
{:id id}
|
||||
{::sql/columns [:id :file-id :created-by]
|
||||
{::sql/columns [:id :file-id :created-by :deleted-at]
|
||||
::db/for-update true}))
|
||||
|
||||
(sv/defmethod ::update-file-snapshot
|
||||
|
@ -264,7 +273,8 @@
|
|||
|
||||
(defn- delete-file-snapshot!
|
||||
[conn snapshot-id]
|
||||
(db/delete! conn :file-change
|
||||
(db/update! conn :file-change
|
||||
{:deleted-at (dt/now)}
|
||||
{:id snapshot-id}
|
||||
{::db/return-keys false})
|
||||
nil)
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
[app.util.pointer-map :as pmap]
|
||||
[app.util.services :as sv]
|
||||
[app.util.time :as dt]
|
||||
[app.worker :as-alias wrk]
|
||||
[app.worker :as wrk]
|
||||
[clojure.set :as set]
|
||||
[promesa.exec :as px]))
|
||||
|
||||
|
@ -44,7 +44,6 @@
|
|||
(declare ^:private update-file*)
|
||||
(declare ^:private process-changes-and-validate)
|
||||
(declare ^:private take-snapshot?)
|
||||
(declare ^:private delete-old-snapshots!)
|
||||
|
||||
;; PUBLIC API; intended to be used outside of this module
|
||||
(declare update-file!)
|
||||
|
@ -224,23 +223,34 @@
|
|||
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
|
||||
(some->> (:data-ref-id file) (sto/touch-object! storage))))
|
||||
|
||||
;; TODO: move this to asynchronous task
|
||||
(when (::snapshot-data file)
|
||||
(delete-old-snapshots! cfg file))
|
||||
(-> cfg
|
||||
(assoc ::wrk/task :file-xlog-gc)
|
||||
(assoc ::wrk/label (str "xlog:" (:id file)))
|
||||
(assoc ::wrk/params {:file-id (:id file)})
|
||||
(assoc ::wrk/delay (dt/duration "5m"))
|
||||
(assoc ::wrk/dedupe true)
|
||||
(assoc ::wrk/priority 1)
|
||||
(wrk/submit!))
|
||||
|
||||
(persist-file! cfg file)
|
||||
|
||||
(let [params (assoc params :file file)
|
||||
response {:revn (:revn file)
|
||||
:lagged (get-lagged-changes conn params)}
|
||||
features (db/create-array conn "text" (:features file))]
|
||||
features (db/create-array conn "text" (:features file))
|
||||
deleted-at (if (::snapshot-data file)
|
||||
(dt/plus timestamp (cf/get-deletion-delay))
|
||||
(dt/plus timestamp (dt/duration {:hours 1})))]
|
||||
|
||||
;; Insert change (xlog)
|
||||
;; Insert change (xlog) with deleted_at in a future data for
|
||||
;; make them automatically eleggible for GC once they expires
|
||||
(db/insert! conn :file-change
|
||||
{:id (uuid/next)
|
||||
:session-id session-id
|
||||
:profile-id profile-id
|
||||
:created-at timestamp
|
||||
:updated-at timestamp
|
||||
:deleted-at deleted-at
|
||||
:file-id (:id file)
|
||||
:revn (:revn file)
|
||||
:version (:version file)
|
||||
|
@ -458,33 +468,6 @@
|
|||
(> (inst-ms (dt/diff modified-at (dt/now)))
|
||||
(inst-ms timeout))))))
|
||||
|
||||
;; Get the latest available snapshots without exceeding the total
|
||||
;; snapshot limit.
|
||||
(def ^:private sql:get-latest-snapshots
|
||||
"SELECT fch.id, fch.created_at
|
||||
FROM file_change AS fch
|
||||
WHERE fch.file_id = ?
|
||||
AND fch.created_by = 'system'
|
||||
ORDER BY fch.created_at DESC
|
||||
LIMIT ?")
|
||||
|
||||
;; Mark all snapshots that are outside the allowed total threshold
|
||||
;; available for the GC.
|
||||
(def ^:private sql:delete-snapshots
|
||||
"UPDATE file_change
|
||||
SET label = NULL
|
||||
WHERE file_id = ?
|
||||
AND created_by LIKE 'system'
|
||||
AND created_at < ?")
|
||||
|
||||
(defn- delete-old-snapshots!
|
||||
[{:keys [::db/conn] :as cfg} {:keys [id] :as file}]
|
||||
(when-let [snapshots (not-empty (db/exec! conn [sql:get-latest-snapshots id
|
||||
(cf/get :auto-file-snapshot-total 10)]))]
|
||||
(let [last-date (-> snapshots peek :created-at)
|
||||
result (db/exec-one! conn [sql:delete-snapshots id last-date])]
|
||||
(l/trc :hint "delete old snapshots" :file-id (str id) :total (db/get-update-count result)))))
|
||||
|
||||
(def ^:private sql:lagged-changes
|
||||
"select s.id, s.revn, s.file_id,
|
||||
s.session_id, s.changes
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
f.data_ref_id
|
||||
FROM file_change AS f
|
||||
WHERE f.file_id = ?
|
||||
AND f.label IS NOT NULL
|
||||
AND f.data IS NOT NULL
|
||||
ORDER BY f.created_at ASC")
|
||||
|
||||
(def ^:private sql:mark-file-media-object-deleted
|
||||
|
|
|
@ -5,47 +5,51 @@
|
|||
;; Copyright (c) KALEIDOS INC
|
||||
|
||||
(ns app.tasks.file-xlog-gc
|
||||
"A maintenance task that performs a garbage collection of the file
|
||||
change (transaction) log."
|
||||
(:require
|
||||
[app.common.logging :as l]
|
||||
[app.config :as cf]
|
||||
[app.db :as db]
|
||||
[app.features.fdata :as feat.fdata]
|
||||
[app.storage :as sto]
|
||||
[app.util.time :as dt]
|
||||
[clojure.spec.alpha :as s]
|
||||
[integrant.core :as ig]))
|
||||
|
||||
(def ^:private
|
||||
sql:delete-files-xlog
|
||||
"DELETE FROM file_change
|
||||
WHERE id IN (SELECT id FROM file_change
|
||||
WHERE label IS NULL
|
||||
AND created_at < ?
|
||||
ORDER BY created_at LIMIT ?)
|
||||
RETURNING id, data_backend, data_ref_id")
|
||||
;; Get the latest available snapshots without exceeding the total
|
||||
;; snapshot limit
|
||||
(def ^:private sql:get-latest-snapshots
|
||||
"SELECT fch.id, fch.created_at
|
||||
FROM file_change AS fch
|
||||
WHERE fch.file_id = ?
|
||||
AND fch.created_by = 'system'
|
||||
AND fch.data IS NOT NULL
|
||||
AND fch.deleted_at > now()
|
||||
ORDER BY fch.created_at DESC
|
||||
LIMIT ?")
|
||||
|
||||
(def xf:filter-offloded
|
||||
(comp
|
||||
(filter feat.fdata/offloaded?)
|
||||
(keep :data-ref-id)))
|
||||
;; Mark all snapshots that are outside the allowed total threshold
|
||||
;; available for the GC
|
||||
(def ^:private sql:delete-snapshots
|
||||
"UPDATE file_change
|
||||
SET deleted_at = now()
|
||||
WHERE file_id = ?
|
||||
AND deleted_at > now()
|
||||
AND data IS NOT NULL
|
||||
AND created_by = 'system'
|
||||
AND created_at < ?")
|
||||
|
||||
(defn- delete-in-chunks
|
||||
[{:keys [::chunk-size ::threshold] :as cfg}]
|
||||
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
|
||||
(loop [total 0]
|
||||
(let [chunk (db/exec! cfg [sql:delete-files-xlog threshold chunk-size])
|
||||
length (count chunk)]
|
||||
(defn- get-alive-snapshots
|
||||
[conn file-id]
|
||||
(let [total (cf/get :auto-file-snapshot-total 10)
|
||||
snapshots (db/exec! conn [sql:get-latest-snapshots file-id total])]
|
||||
(not-empty snapshots)))
|
||||
|
||||
;; touch all references on offloaded changes entries
|
||||
(doseq [data-ref-id (sequence xf:filter-offloded chunk)]
|
||||
(l/trc :hint "touching referenced storage object"
|
||||
:storage-object-id (str data-ref-id))
|
||||
(sto/touch-object! storage data-ref-id))
|
||||
|
||||
(if (pos? length)
|
||||
(recur (+ total length))
|
||||
total)))))
|
||||
(defn- delete-old-snapshots!
|
||||
[{:keys [::db/conn] :as cfg} file-id]
|
||||
(when-let [snapshots (get-alive-snapshots conn file-id)]
|
||||
(let [last-date (-> snapshots peek :created-at)
|
||||
result (db/exec-one! conn [sql:delete-snapshots file-id last-date])]
|
||||
(l/inf :hint "delete old file snapshots"
|
||||
:file-id (str file-id)
|
||||
:current (count snapshots)
|
||||
:deleted (db/get-update-count result)))))
|
||||
|
||||
(defmethod ig/pre-init-spec ::handler [_]
|
||||
(s/keys :req [::db/pool]))
|
||||
|
@ -53,16 +57,8 @@
|
|||
(defmethod ig/init-key ::handler
|
||||
[_ cfg]
|
||||
(fn [{:keys [props] :as task}]
|
||||
(let [min-age (or (:min-age props)
|
||||
(dt/duration "72h"))
|
||||
chunk-size (:chunk-size props 5000)
|
||||
threshold (dt/minus (dt/now) min-age)]
|
||||
|
||||
(let [file-id (:file-id props)]
|
||||
(assert (uuid? file-id) "expected file-id on props")
|
||||
(-> cfg
|
||||
(assoc ::db/rollback (:rollback props false))
|
||||
(assoc ::threshold threshold)
|
||||
(assoc ::chunk-size chunk-size)
|
||||
(db/tx-run! (fn [cfg]
|
||||
(let [total (delete-in-chunks cfg)]
|
||||
(l/trc :hint "file xlog cleaned" :total total)
|
||||
total)))))))
|
||||
(db/tx-run! delete-old-snapshots! file-id)))))
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
|
||||
(defn- delete-profiles!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [id photo-id]}]
|
||||
(l/trc :hint "permanently delete" :rel "profile" :id (str id))
|
||||
|
||||
|
@ -50,7 +50,7 @@
|
|||
|
||||
(defn- delete-teams!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [id photo-id deleted-at]}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "team"
|
||||
|
@ -78,7 +78,7 @@
|
|||
|
||||
(defn- delete-fonts!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [id team-id deleted-at] :as font}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "team-font-variant"
|
||||
|
@ -110,7 +110,7 @@
|
|||
|
||||
(defn- delete-projects!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [id team-id deleted-at]}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "project"
|
||||
|
@ -136,7 +136,7 @@
|
|||
|
||||
(defn- delete-files!
|
||||
[{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [id deleted-at project-id] :as file}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "file"
|
||||
|
@ -165,7 +165,7 @@
|
|||
|
||||
(defn delete-file-thumbnails!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [file-id revn media-id deleted-at]}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "file-thumbnail"
|
||||
|
@ -194,7 +194,7 @@
|
|||
|
||||
(defn delete-file-object-thumbnails!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [file-id object-id media-id deleted-at]}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "file-tagged-object-thumbnail"
|
||||
|
@ -223,7 +223,7 @@
|
|||
|
||||
(defn- delete-file-data-fragments!
|
||||
[{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [file-id id deleted-at data-ref-id]}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "file-data-fragment"
|
||||
|
@ -249,7 +249,7 @@
|
|||
|
||||
(defn- delete-file-media-objects!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 1})
|
||||
(->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [id file-id deleted-at] :as fmo}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "file-media-object"
|
||||
|
@ -266,6 +266,34 @@
|
|||
(inc total))
|
||||
0)))
|
||||
|
||||
(def ^:private sql:get-file-change
|
||||
"SELECT id, file_id, deleted_at, data_backend, data_ref_id
|
||||
FROM file_change
|
||||
WHERE deleted_at IS NOT NULL
|
||||
AND deleted_at < now() - ?::interval
|
||||
ORDER BY deleted_at ASC
|
||||
LIMIT ?
|
||||
FOR UPDATE
|
||||
SKIP LOCKED")
|
||||
|
||||
(defn- delete-file-change!
|
||||
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
|
||||
(->> (db/cursor conn [sql:get-file-change min-age chunk-size] {:chunk-size 5})
|
||||
(reduce (fn [total {:keys [id file-id deleted-at] :as xlog}]
|
||||
(l/trc :hint "permanently delete"
|
||||
:rel "file-change"
|
||||
:id (str id)
|
||||
:file-id (str file-id)
|
||||
:deleted-at (dt/format-instant deleted-at))
|
||||
|
||||
(when (= "objects-storage" (:data-backend xlog))
|
||||
(sto/touch-object! storage (:data-ref-id xlog)))
|
||||
|
||||
(db/delete! conn :file-change {:id id})
|
||||
|
||||
(inc total))
|
||||
0)))
|
||||
|
||||
(def ^:private deletion-proc-vars
|
||||
[#'delete-profiles!
|
||||
#'delete-file-media-objects!
|
||||
|
@ -275,7 +303,8 @@
|
|||
#'delete-files!
|
||||
#'delete-projects!
|
||||
#'delete-fonts!
|
||||
#'delete-teams!])
|
||||
#'delete-teams!
|
||||
#'delete-file-change!])
|
||||
|
||||
(defn- execute-proc!
|
||||
"A generic function that executes the specified proc iterativelly
|
||||
|
@ -296,7 +325,7 @@
|
|||
[_ cfg]
|
||||
(assoc cfg
|
||||
::min-age (cf/get-deletion-delay)
|
||||
::chunk-size 10))
|
||||
::chunk-size 50))
|
||||
|
||||
(defmethod ig/init-key ::handler
|
||||
[_ cfg]
|
||||
|
|
Loading…
Reference in a new issue