0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-22 14:39:45 -05:00

Add offload mechanism for file snapshots

This commit is contained in:
Andrey Antukh 2024-08-23 16:32:02 +02:00
parent 8dea5d5158
commit ceaafdbb1c
13 changed files with 199 additions and 82 deletions

View file

@ -17,6 +17,14 @@
[app.util.objects-map :as omap]
[app.util.pointer-map :as pmap]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; OFFLOAD
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn offloaded?
[file]
(= "objects-storage" (:data-backend file)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; OBJECTS-MAP
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -58,18 +66,16 @@
(defn get-file-data
"Get file data given a file instance."
[system {:keys [data-backend data-ref-id] :as file} & {:keys [touch]}]
(if (= data-backend "objects-storage")
(let [storage (sto/resolve system ::db/reuse-conn true)
object (sto/get-object storage data-ref-id)]
(when touch (sto/touch-object! storage data-ref-id))
(sto/get-object-bytes storage object))
[system file]
(if (offloaded? file)
(let [storage (sto/resolve system ::db/reuse-conn true)]
(->> (sto/get-object storage (:data-ref-id file))
(sto/get-object-bytes storage)))
(:data file)))
(defn resolve-file-data
[system file & {:as opts}]
(let [data (get-file-data system file opts)]
[system file]
(let [data (get-file-data system file)]
(assoc file :data data)))
(defn load-pointer

View file

@ -404,7 +404,8 @@
::sto/storage (ig/ref ::sto/storage)}
:app.tasks.file-xlog-gc/handler
{::db/pool (ig/ref ::db/pool)}
{::db/pool (ig/ref ::db/pool)
::sto/storage (ig/ref ::sto/storage)}
:app.tasks.telemetry/handler
{::db/pool (ig/ref ::db/pool)

View file

@ -406,7 +406,10 @@
:fn (mg/resource "app/migrations/sql/0127-mod-storage-object-table.sql")}
{:name "0128-mod-task-table"
:fn (mg/resource "app/migrations/sql/0128-mod-task-table.sql")}])
:fn (mg/resource "app/migrations/sql/0128-mod-task-table.sql")}
{:name "0129-mod-file-change-table"
:fn (mg/resource "app/migrations/sql/0129-mod-file-change-table.sql")}])
(defn apply-migrations!
[pool name migrations]

View file

@ -0,0 +1,6 @@
ALTER TABLE file_change
ADD COLUMN data_backend text NULL,
ADD COLUMN data_ref_id uuid NULL;
CREATE INDEX IF NOT EXISTS file_change__data_ref_id__idx
ON file_change (data_ref_id);

View file

@ -273,7 +273,7 @@
(defn get-minimal-file
[cfg id & {:as opts}]
(let [opts (assoc opts ::sql/columns [:id :modified-at :revn])]
(let [opts (assoc opts ::sql/columns [:id :modified-at :revn :data-ref-id :data-backend])]
(db/get cfg :file {:id id} opts)))
(defn get-file-etag

View file

@ -13,12 +13,15 @@
[app.config :as cf]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.features.fdata :as feat.fdata]
[app.main :as-alias main]
[app.rpc :as-alias rpc]
[app.rpc.commands.files :as files]
[app.rpc.commands.profile :as profile]
[app.rpc.doc :as-alias doc]
[app.storage :as sto]
[app.util.blob :as blob]
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[app.util.time :as dt]
[cuerdas.core :as str]))
@ -33,20 +36,22 @@
:code :authentication-required
:hint "only admins allowed")))
(def sql:get-file-snapshots
"SELECT id, label, revn, created_at
FROM file_change
WHERE file_id = ?
AND created_at < ?
AND label is not null
ORDER BY created_at desc
LIMIT ?")
(defn get-file-snapshots
[{:keys [::db/conn]} {:keys [file-id limit start-at]
:or {limit Long/MAX_VALUE}}]
(let [query (str "select id, label, revn, created_at "
" from file_change "
" where file_id = ? "
" and created_at < ? "
" and data is not null "
" order by created_at desc "
" limit ?")
start-at (or start-at (dt/now))
(let [start-at (or start-at (dt/now))
limit (min limit 20)]
(->> (db/exec! conn [query file-id start-at limit])
(->> (db/exec! conn [sql:get-file-snapshots file-id start-at limit])
(mapv (fn [row]
(update row :created-at dt/format-instant :rfc1123))))))
@ -77,43 +82,52 @@
:id id
:file-id file-id))
(when-not (:data snapshot)
(ex/raise :type :precondition
:code :snapshot-without-data
:hint "snapshot has no data"
:label (:label snapshot)
:file-id file-id))
(let [snapshot (feat.fdata/resolve-file-data cfg snapshot)]
(when-not (:data snapshot)
(ex/raise :type :precondition
:code :snapshot-without-data
:hint "snapshot has no data"
:label (:label snapshot)
:file-id file-id))
(l/dbg :hint "restoring snapshot"
:file-id (str file-id)
:label (:label snapshot)
:snapshot-id (str (:id snapshot)))
(l/dbg :hint "restoring snapshot"
:file-id (str file-id)
:label (:label snapshot)
:snapshot-id (str (:id snapshot)))
(db/update! conn :file
{:data (:data snapshot)
:revn (inc (:revn file))
:features (:features snapshot)}
{:id file-id})
;; If the file was already offloaded, on restring the snapshot
;; we are going to replace the file data, so we need to touch
;; the old referenced storage object and avoid possible leaks
(when (feat.fdata/offloaded? file)
(sto/touch-object! storage (:data-ref-id file)))
;; clean object thumbnails
(let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(db/update! conn :file
{:data (:data snapshot)
:revn (inc (:revn file))
:data-backend nil
:data-ref-id nil
:has-media-trimmed false
:features (:features snapshot)}
{:id file-id})
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; clean object thumbnails
(let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; clean object thumbnails
(let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; clean file thumbnails
(let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
{:id (:id snapshot)
:label (:label snapshot)}))
{:id (:id snapshot)
:label (:label snapshot)})))
(defn- resolve-snapshot-by-label
[conn file-id label]
@ -145,17 +159,27 @@
(merge (resolve-snapshot-by-label conn file-id label)))]
(restore-file-snapshot! cfg params)))))
(defn- get-file
[cfg file-id]
(let [file (->> (db/get cfg :file {:id file-id})
(feat.fdata/resolve-file-data cfg))]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg file-id)]
(-> file
(update :data blob/decode)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(update :data blob/encode)))))
(defn take-file-snapshot!
[cfg {:keys [file-id label]}]
(let [conn (db/get-connection cfg)
file (db/get conn :file {:id file-id})
(let [file (get-file cfg file-id)
id (uuid/next)]
(l/debug :hint "creating file snapshot"
:file-id (str file-id)
:label label)
(db/insert! conn :file-change
(db/insert! cfg :file-change
{:id id
:revn (:revn file)
:data (:data file)

View file

@ -29,6 +29,7 @@
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.storage :as sto]
[app.util.blob :as blob]
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
@ -228,7 +229,7 @@
[{:keys [::db/conn ::wrk/executor] :as cfg}
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
(let [;; Retrieve the file data
file (feat.fdata/resolve-file-data cfg file {:touch true})
file (feat.fdata/resolve-file-data cfg file)
;; Process the file data on separated thread for avoid to do
;; the CPU intensive operation on vthread.
@ -236,6 +237,13 @@
file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))
features (db/create-array conn "text" (:features file))]
;; NOTE: if file was offloaded, we need to touch the referenced
;; storage object because on this update operation the data will
;; be overwritted.
(when (= "objects-storage" (:data-backend file))
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
(sto/touch-object! storage (:data-ref-id file))))
(db/insert! conn :file-change
{:id (uuid/next)
:session-id session-id
@ -406,7 +414,7 @@
"UPDATE file_change
SET label = NULL
WHERE file_id = ?
AND label IS NOT NULL
AND label LIKE 'internal/%'
AND created_at < ?")
(defn- delete-old-snapshots!

View file

@ -94,6 +94,15 @@
(-> (db/exec-one! conn [sql:has-file-data-fragment-refs id])
(get :has-refs)))
(def ^:private
sql:has-file-change-refs
"SELECT EXISTS (SELECT 1 FROM file_change WHERE data_ref_id = ?) AS has_refs")
(defn- has-file-change-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-change-refs id])
(get :has-refs)))
(def ^:private sql:mark-freeze-in-bulk
"UPDATE storage_object
SET touched_at = NULL
@ -168,6 +177,7 @@
"profile" (process-objects! conn has-profile-refs? ids bucket)
"file-data" (process-objects! conn has-file-data-refs? ids bucket)
"file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket)
"file-change" (process-objects! conn has-file-change-refs? ids bucket)
(ex/raise :type :internal
:code :unexpected-unknown-reference
:hint (dm/fmt "unknown reference '%'" bucket))))

View file

@ -11,6 +11,7 @@
inactivity (the default threshold is 72h)."
(:require
[app.binfile.common :as bfc]
[app.common.exceptions :as ex]
[app.common.files.migrations :as fmg]
[app.common.files.validate :as cfv]
[app.common.logging :as l]
@ -235,6 +236,16 @@
(defn- decode-file
[cfg {:keys [id] :as file}]
;; NOTE: a preventive check that does not allow proceed the gc for
;; already offloaded file; if this exception happens, means that
;; something external modified the file flag without preloading the
;; file back again to the table
(when (feat.fdata/offloaded? file)
(ex/raise :hint "unable to run file-gc on an already offloaded file"
:type :internal
:code :file-already-offloaded
:file-id id))
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(-> file
(update :features db/decode-pgarray #{})

View file

@ -10,6 +10,7 @@
(:require
[app.common.logging :as l]
[app.db :as db]
[app.storage :as sto]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
@ -20,22 +21,36 @@
WHERE id IN (SELECT id FROM file_change
WHERE label IS NULL
AND created_at < ?
ORDER BY created_at LIMIT ?)")
ORDER BY created_at LIMIT ?)
RETURNING id, data_backend, data_ref_id")
(def xf:filter-offloded
(comp
(filter #(= "objects-storage" (:data-backend %)))
(map :data-ref-id)))
(defn- delete-in-chunks
[{:keys [::chunk-size ::threshold] :as cfg}]
(loop [total 0]
(let [result (-> (db/exec-one! cfg [sql:delete-files-xlog threshold chunk-size])
(db/get-update-count))]
(if (pos? result)
(recur (+ total result))
total))))
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
(loop [total 0]
(let [chunk (db/exec! cfg [sql:delete-files-xlog threshold chunk-size])
length (count chunk)]
;; touch all references on offloaded changes entries
(doseq [data-ref-id (sequence xf:filter-offloded chunk)]
(l/trc :hint "touching referenced storage object"
:storage-object-id (str data-ref-id))
(sto/touch-object! storage data-ref-id))
(if (pos? length)
(recur (+ total length))
total)))))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::handler
[_ {:keys [::db/pool] :as cfg}]
[_ cfg]
(fn [{:keys [props] :as task}]
(let [min-age (or (:min-age props)
(dt/duration "72h"))
@ -43,7 +58,7 @@
threshold (dt/minus (dt/now) min-age)]
(-> cfg
(assoc ::db/rollback (:rollback? props false))
(assoc ::db/rollback (:rollback props false))
(assoc ::threshold threshold)
(assoc ::chunk-size chunk-size)
(db/tx-run! (fn [cfg]

View file

@ -125,7 +125,7 @@
0)))
(def ^:private sql:get-files
"SELECT id, deleted_at, project_id, data_ref_id
"SELECT id, deleted_at, project_id, data_backend, data_ref_id
FROM file
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
@ -137,14 +137,15 @@
(defn- delete-files!
[{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id deleted-at project-id data-ref-id]}]
(reduce (fn [total {:keys [id deleted-at project-id] :as file}]
(l/trc :hint "permanently delete"
:rel "file"
:id (str id)
:project-id (str project-id)
:deleted-at (dt/format-instant deleted-at))
(some->> data-ref-id (sto/touch-object! storage))
(when (= "objects-storage" (:data-backend file))
(sto/touch-object! storage (:data-ref-id file)))
;; And finally, permanently delete the file.
(db/delete! conn :file {:id id})

View file

@ -73,6 +73,38 @@
{:id (:id fragment)}
{::db/return-keys false}))))
(def sql:get-snapshots
"SELECT fc.*
FROM file_change AS fc
WHERE fc.file_id = ?
AND fc.label IS NOT NULL
AND fc.data IS NOT NULL
AND fc.data_backend IS NULL")
(defn- offload-file-snapshots!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(doseq [snapshot (db/exec! conn [sql:get-snapshots file-id])]
(let [data (sto/content (:data snapshot))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-change"
:content-type "application/octet-stream"
:file-id file-id
:file-change-id (:id snapshot)})]
(l/trc :hint "offload file change"
:file-id (str file-id)
:file-change-id (str (:id snapshot))
:storage-id (str (:id sobj)))
(db/update! conn :file-change
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id (:id snapshot)}
{::db/return-keys false}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -88,4 +120,5 @@
(assoc ::file-id (:file-id props))
(db/tx-run! (fn [cfg]
(offload-file-data! cfg)
(offload-file-data-fragments! cfg))))))
(offload-file-data-fragments! cfg)
(offload-file-snapshots! cfg))))))

View file

@ -499,17 +499,16 @@
(js/console.log "EE:" cause))))))
(defn ^:export restore-snapshot
[id file-id]
[label file-id]
(when-let [file-id (or (d/parse-uuid file-id)
(:current-file-id @st/state))]
(when-let [id (d/parse-uuid id)]
(->> (http/send! {:method :post
:uri (u/join cf/public-uri "api/rpc/command/restore-file-snapshot")
:body (http/transit-data {:file-id file-id :id id})})
(rx/map http/conditional-decode-transit)
(rx/mapcat rp/handle-response)
(rx/subs! (fn [_]
(println "Snapshot restored " id)
#_(.reload js/location))
(fn [cause]
(js/console.log "EE:" cause)))))))
(->> (http/send! {:method :post
:uri (u/join cf/public-uri "api/rpc/command/restore-file-snapshot")
:body (http/transit-data {:file-id file-id :label label})})
(rx/map http/conditional-decode-transit)
(rx/mapcat rp/handle-response)
(rx/subs! (fn [_]
(println "Snapshot restored " label)
#_(.reload js/location))
(fn [cause]
(js/console.log "EE:" cause))))))