mirror of
https://github.com/penpot/penpot.git
synced 2025-01-24 07:29:08 -05:00
Merge pull request #5027 from penpot/niwinz-backend-improvements
✨ Add efficiency improvements to xlog task
This commit is contained in:
commit
67cd855e97
15 changed files with 229 additions and 92 deletions
|
@ -17,6 +17,14 @@
|
||||||
[app.util.objects-map :as omap]
|
[app.util.objects-map :as omap]
|
||||||
[app.util.pointer-map :as pmap]))
|
[app.util.pointer-map :as pmap]))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; OFFLOAD
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
(defn offloaded?
|
||||||
|
[file]
|
||||||
|
(= "objects-storage" (:data-backend file)))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; OBJECTS-MAP
|
;; OBJECTS-MAP
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
@ -58,18 +66,16 @@
|
||||||
|
|
||||||
(defn get-file-data
|
(defn get-file-data
|
||||||
"Get file data given a file instance."
|
"Get file data given a file instance."
|
||||||
[system {:keys [data-backend data-ref-id] :as file} & {:keys [touch]}]
|
[system file]
|
||||||
(if (= data-backend "objects-storage")
|
(if (offloaded? file)
|
||||||
(let [storage (sto/resolve system ::db/reuse-conn true)
|
(let [storage (sto/resolve system ::db/reuse-conn true)]
|
||||||
object (sto/get-object storage data-ref-id)]
|
(->> (sto/get-object storage (:data-ref-id file))
|
||||||
|
(sto/get-object-bytes storage)))
|
||||||
(when touch (sto/touch-object! storage data-ref-id))
|
|
||||||
(sto/get-object-bytes storage object))
|
|
||||||
(:data file)))
|
(:data file)))
|
||||||
|
|
||||||
(defn resolve-file-data
|
(defn resolve-file-data
|
||||||
[system file & {:as opts}]
|
[system file]
|
||||||
(let [data (get-file-data system file opts)]
|
(let [data (get-file-data system file)]
|
||||||
(assoc file :data data)))
|
(assoc file :data data)))
|
||||||
|
|
||||||
(defn load-pointer
|
(defn load-pointer
|
||||||
|
|
|
@ -404,7 +404,8 @@
|
||||||
::sto/storage (ig/ref ::sto/storage)}
|
::sto/storage (ig/ref ::sto/storage)}
|
||||||
|
|
||||||
:app.tasks.file-xlog-gc/handler
|
:app.tasks.file-xlog-gc/handler
|
||||||
{::db/pool (ig/ref ::db/pool)}
|
{::db/pool (ig/ref ::db/pool)
|
||||||
|
::sto/storage (ig/ref ::sto/storage)}
|
||||||
|
|
||||||
:app.tasks.telemetry/handler
|
:app.tasks.telemetry/handler
|
||||||
{::db/pool (ig/ref ::db/pool)
|
{::db/pool (ig/ref ::db/pool)
|
||||||
|
|
|
@ -400,7 +400,16 @@
|
||||||
:fn (mg/resource "app/migrations/sql/0125-mod-file-table.sql")}
|
:fn (mg/resource "app/migrations/sql/0125-mod-file-table.sql")}
|
||||||
|
|
||||||
{:name "0126-add-team-access-request-table"
|
{:name "0126-add-team-access-request-table"
|
||||||
:fn (mg/resource "app/migrations/sql/0126-add-team-access-request-table.sql")}])
|
:fn (mg/resource "app/migrations/sql/0126-add-team-access-request-table.sql")}
|
||||||
|
|
||||||
|
{:name "0127-mod-storage-object-table"
|
||||||
|
:fn (mg/resource "app/migrations/sql/0127-mod-storage-object-table.sql")}
|
||||||
|
|
||||||
|
{:name "0128-mod-task-table"
|
||||||
|
:fn (mg/resource "app/migrations/sql/0128-mod-task-table.sql")}
|
||||||
|
|
||||||
|
{:name "0129-mod-file-change-table"
|
||||||
|
:fn (mg/resource "app/migrations/sql/0129-mod-file-change-table.sql")}])
|
||||||
|
|
||||||
(defn apply-migrations!
|
(defn apply-migrations!
|
||||||
[pool name migrations]
|
[pool name migrations]
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
--- This setting allow to optimize the table for heavy write workload
|
||||||
|
--- leaving space on the page for HOT updates
|
||||||
|
ALTER TABLE storage_object SET (FILLFACTOR=60);
|
3
backend/src/app/migrations/sql/0128-mod-task-table.sql
Normal file
3
backend/src/app/migrations/sql/0128-mod-task-table.sql
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
--- This setting allow to optimize the table for heavy write workload
|
||||||
|
--- leaving space on the page for HOT updates
|
||||||
|
ALTER TABLE task SET (FILLFACTOR=60);
|
|
@ -0,0 +1,6 @@
|
||||||
|
ALTER TABLE file_change
|
||||||
|
ADD COLUMN data_backend text NULL,
|
||||||
|
ADD COLUMN data_ref_id uuid NULL;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS file_change__data_ref_id__idx
|
||||||
|
ON file_change (data_ref_id);
|
|
@ -273,7 +273,7 @@
|
||||||
|
|
||||||
(defn get-minimal-file
|
(defn get-minimal-file
|
||||||
[cfg id & {:as opts}]
|
[cfg id & {:as opts}]
|
||||||
(let [opts (assoc opts ::sql/columns [:id :modified-at :revn])]
|
(let [opts (assoc opts ::sql/columns [:id :modified-at :revn :data-ref-id :data-backend])]
|
||||||
(db/get cfg :file {:id id} opts)))
|
(db/get cfg :file {:id id} opts)))
|
||||||
|
|
||||||
(defn get-file-etag
|
(defn get-file-etag
|
||||||
|
|
|
@ -13,12 +13,15 @@
|
||||||
[app.config :as cf]
|
[app.config :as cf]
|
||||||
[app.db :as db]
|
[app.db :as db]
|
||||||
[app.db.sql :as-alias sql]
|
[app.db.sql :as-alias sql]
|
||||||
|
[app.features.fdata :as feat.fdata]
|
||||||
[app.main :as-alias main]
|
[app.main :as-alias main]
|
||||||
[app.rpc :as-alias rpc]
|
[app.rpc :as-alias rpc]
|
||||||
[app.rpc.commands.files :as files]
|
[app.rpc.commands.files :as files]
|
||||||
[app.rpc.commands.profile :as profile]
|
[app.rpc.commands.profile :as profile]
|
||||||
[app.rpc.doc :as-alias doc]
|
[app.rpc.doc :as-alias doc]
|
||||||
[app.storage :as sto]
|
[app.storage :as sto]
|
||||||
|
[app.util.blob :as blob]
|
||||||
|
[app.util.pointer-map :as pmap]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
[app.util.time :as dt]
|
[app.util.time :as dt]
|
||||||
[cuerdas.core :as str]))
|
[cuerdas.core :as str]))
|
||||||
|
@ -33,20 +36,22 @@
|
||||||
:code :authentication-required
|
:code :authentication-required
|
||||||
:hint "only admins allowed")))
|
:hint "only admins allowed")))
|
||||||
|
|
||||||
|
(def sql:get-file-snapshots
|
||||||
|
"SELECT id, label, revn, created_at
|
||||||
|
FROM file_change
|
||||||
|
WHERE file_id = ?
|
||||||
|
AND created_at < ?
|
||||||
|
AND label is not null
|
||||||
|
ORDER BY created_at desc
|
||||||
|
LIMIT ?")
|
||||||
|
|
||||||
(defn get-file-snapshots
|
(defn get-file-snapshots
|
||||||
[{:keys [::db/conn]} {:keys [file-id limit start-at]
|
[{:keys [::db/conn]} {:keys [file-id limit start-at]
|
||||||
:or {limit Long/MAX_VALUE}}]
|
:or {limit Long/MAX_VALUE}}]
|
||||||
(let [query (str "select id, label, revn, created_at "
|
(let [start-at (or start-at (dt/now))
|
||||||
" from file_change "
|
|
||||||
" where file_id = ? "
|
|
||||||
" and created_at < ? "
|
|
||||||
" and data is not null "
|
|
||||||
" order by created_at desc "
|
|
||||||
" limit ?")
|
|
||||||
start-at (or start-at (dt/now))
|
|
||||||
limit (min limit 20)]
|
limit (min limit 20)]
|
||||||
|
|
||||||
(->> (db/exec! conn [query file-id start-at limit])
|
(->> (db/exec! conn [sql:get-file-snapshots file-id start-at limit])
|
||||||
(mapv (fn [row]
|
(mapv (fn [row]
|
||||||
(update row :created-at dt/format-instant :rfc1123))))))
|
(update row :created-at dt/format-instant :rfc1123))))))
|
||||||
|
|
||||||
|
@ -77,43 +82,52 @@
|
||||||
:id id
|
:id id
|
||||||
:file-id file-id))
|
:file-id file-id))
|
||||||
|
|
||||||
(when-not (:data snapshot)
|
(let [snapshot (feat.fdata/resolve-file-data cfg snapshot)]
|
||||||
(ex/raise :type :precondition
|
(when-not (:data snapshot)
|
||||||
:code :snapshot-without-data
|
(ex/raise :type :precondition
|
||||||
:hint "snapshot has no data"
|
:code :snapshot-without-data
|
||||||
:label (:label snapshot)
|
:hint "snapshot has no data"
|
||||||
:file-id file-id))
|
:label (:label snapshot)
|
||||||
|
:file-id file-id))
|
||||||
|
|
||||||
(l/dbg :hint "restoring snapshot"
|
(l/dbg :hint "restoring snapshot"
|
||||||
:file-id (str file-id)
|
:file-id (str file-id)
|
||||||
:label (:label snapshot)
|
:label (:label snapshot)
|
||||||
:snapshot-id (str (:id snapshot)))
|
:snapshot-id (str (:id snapshot)))
|
||||||
|
|
||||||
(db/update! conn :file
|
;; If the file was already offloaded, on restring the snapshot
|
||||||
{:data (:data snapshot)
|
;; we are going to replace the file data, so we need to touch
|
||||||
:revn (inc (:revn file))
|
;; the old referenced storage object and avoid possible leaks
|
||||||
:features (:features snapshot)}
|
(when (feat.fdata/offloaded? file)
|
||||||
{:id file-id})
|
(sto/touch-object! storage (:data-ref-id file)))
|
||||||
|
|
||||||
;; clean object thumbnails
|
(db/update! conn :file
|
||||||
(let [sql (str "update file_tagged_object_thumbnail "
|
{:data (:data snapshot)
|
||||||
" set deleted_at = now() "
|
:revn (inc (:revn file))
|
||||||
" where file_id=? returning media_id")
|
:data-backend nil
|
||||||
res (db/exec! conn [sql file-id])]
|
:data-ref-id nil
|
||||||
|
:has-media-trimmed false
|
||||||
|
:features (:features snapshot)}
|
||||||
|
{:id file-id})
|
||||||
|
|
||||||
(doseq [media-id (into #{} (keep :media-id) res)]
|
;; clean object thumbnails
|
||||||
(sto/touch-object! storage media-id)))
|
(let [sql (str "update file_tagged_object_thumbnail "
|
||||||
|
" set deleted_at = now() "
|
||||||
|
" where file_id=? returning media_id")
|
||||||
|
res (db/exec! conn [sql file-id])]
|
||||||
|
(doseq [media-id (into #{} (keep :media-id) res)]
|
||||||
|
(sto/touch-object! storage media-id)))
|
||||||
|
|
||||||
;; clean object thumbnails
|
;; clean file thumbnails
|
||||||
(let [sql (str "update file_thumbnail "
|
(let [sql (str "update file_thumbnail "
|
||||||
" set deleted_at = now() "
|
" set deleted_at = now() "
|
||||||
" where file_id=? returning media_id")
|
" where file_id=? returning media_id")
|
||||||
res (db/exec! conn [sql file-id])]
|
res (db/exec! conn [sql file-id])]
|
||||||
(doseq [media-id (into #{} (keep :media-id) res)]
|
(doseq [media-id (into #{} (keep :media-id) res)]
|
||||||
(sto/touch-object! storage media-id)))
|
(sto/touch-object! storage media-id)))
|
||||||
|
|
||||||
{:id (:id snapshot)
|
{:id (:id snapshot)
|
||||||
:label (:label snapshot)}))
|
:label (:label snapshot)})))
|
||||||
|
|
||||||
(defn- resolve-snapshot-by-label
|
(defn- resolve-snapshot-by-label
|
||||||
[conn file-id label]
|
[conn file-id label]
|
||||||
|
@ -145,17 +159,27 @@
|
||||||
(merge (resolve-snapshot-by-label conn file-id label)))]
|
(merge (resolve-snapshot-by-label conn file-id label)))]
|
||||||
(restore-file-snapshot! cfg params)))))
|
(restore-file-snapshot! cfg params)))))
|
||||||
|
|
||||||
|
(defn- get-file
|
||||||
|
[cfg file-id]
|
||||||
|
(let [file (->> (db/get cfg :file {:id file-id})
|
||||||
|
(feat.fdata/resolve-file-data cfg))]
|
||||||
|
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg file-id)]
|
||||||
|
(-> file
|
||||||
|
(update :data blob/decode)
|
||||||
|
(update :data feat.fdata/process-pointers deref)
|
||||||
|
(update :data feat.fdata/process-objects (partial into {}))
|
||||||
|
(update :data blob/encode)))))
|
||||||
|
|
||||||
(defn take-file-snapshot!
|
(defn take-file-snapshot!
|
||||||
[cfg {:keys [file-id label]}]
|
[cfg {:keys [file-id label]}]
|
||||||
(let [conn (db/get-connection cfg)
|
(let [file (get-file cfg file-id)
|
||||||
file (db/get conn :file {:id file-id})
|
|
||||||
id (uuid/next)]
|
id (uuid/next)]
|
||||||
|
|
||||||
(l/debug :hint "creating file snapshot"
|
(l/debug :hint "creating file snapshot"
|
||||||
:file-id (str file-id)
|
:file-id (str file-id)
|
||||||
:label label)
|
:label label)
|
||||||
|
|
||||||
(db/insert! conn :file-change
|
(db/insert! cfg :file-change
|
||||||
{:id id
|
{:id id
|
||||||
:revn (:revn file)
|
:revn (:revn file)
|
||||||
:data (:data file)
|
:data (:data file)
|
||||||
|
|
|
@ -29,6 +29,7 @@
|
||||||
[app.rpc.commands.teams :as teams]
|
[app.rpc.commands.teams :as teams]
|
||||||
[app.rpc.doc :as-alias doc]
|
[app.rpc.doc :as-alias doc]
|
||||||
[app.rpc.helpers :as rph]
|
[app.rpc.helpers :as rph]
|
||||||
|
[app.storage :as sto]
|
||||||
[app.util.blob :as blob]
|
[app.util.blob :as blob]
|
||||||
[app.util.pointer-map :as pmap]
|
[app.util.pointer-map :as pmap]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
|
@ -228,7 +229,7 @@
|
||||||
[{:keys [::db/conn ::wrk/executor] :as cfg}
|
[{:keys [::db/conn ::wrk/executor] :as cfg}
|
||||||
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
|
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
|
||||||
(let [;; Retrieve the file data
|
(let [;; Retrieve the file data
|
||||||
file (feat.fdata/resolve-file-data cfg file {:touch true})
|
file (feat.fdata/resolve-file-data cfg file)
|
||||||
|
|
||||||
;; Process the file data on separated thread for avoid to do
|
;; Process the file data on separated thread for avoid to do
|
||||||
;; the CPU intensive operation on vthread.
|
;; the CPU intensive operation on vthread.
|
||||||
|
@ -236,6 +237,13 @@
|
||||||
file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))
|
file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))
|
||||||
features (db/create-array conn "text" (:features file))]
|
features (db/create-array conn "text" (:features file))]
|
||||||
|
|
||||||
|
;; NOTE: if file was offloaded, we need to touch the referenced
|
||||||
|
;; storage object because on this update operation the data will
|
||||||
|
;; be overwritted.
|
||||||
|
(when (= "objects-storage" (:data-backend file))
|
||||||
|
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
|
||||||
|
(sto/touch-object! storage (:data-ref-id file))))
|
||||||
|
|
||||||
(db/insert! conn :file-change
|
(db/insert! conn :file-change
|
||||||
{:id (uuid/next)
|
{:id (uuid/next)
|
||||||
:session-id session-id
|
:session-id session-id
|
||||||
|
@ -406,7 +414,7 @@
|
||||||
"UPDATE file_change
|
"UPDATE file_change
|
||||||
SET label = NULL
|
SET label = NULL
|
||||||
WHERE file_id = ?
|
WHERE file_id = ?
|
||||||
AND label IS NOT NULL
|
AND label LIKE 'internal/%'
|
||||||
AND created_at < ?")
|
AND created_at < ?")
|
||||||
|
|
||||||
(defn- delete-old-snapshots!
|
(defn- delete-old-snapshots!
|
||||||
|
|
|
@ -94,6 +94,15 @@
|
||||||
(-> (db/exec-one! conn [sql:has-file-data-fragment-refs id])
|
(-> (db/exec-one! conn [sql:has-file-data-fragment-refs id])
|
||||||
(get :has-refs)))
|
(get :has-refs)))
|
||||||
|
|
||||||
|
(def ^:private
|
||||||
|
sql:has-file-change-refs
|
||||||
|
"SELECT EXISTS (SELECT 1 FROM file_change WHERE data_ref_id = ?) AS has_refs")
|
||||||
|
|
||||||
|
(defn- has-file-change-refs?
|
||||||
|
[conn id]
|
||||||
|
(-> (db/exec-one! conn [sql:has-file-change-refs id])
|
||||||
|
(get :has-refs)))
|
||||||
|
|
||||||
(def ^:private sql:mark-freeze-in-bulk
|
(def ^:private sql:mark-freeze-in-bulk
|
||||||
"UPDATE storage_object
|
"UPDATE storage_object
|
||||||
SET touched_at = NULL
|
SET touched_at = NULL
|
||||||
|
@ -168,6 +177,7 @@
|
||||||
"profile" (process-objects! conn has-profile-refs? ids bucket)
|
"profile" (process-objects! conn has-profile-refs? ids bucket)
|
||||||
"file-data" (process-objects! conn has-file-data-refs? ids bucket)
|
"file-data" (process-objects! conn has-file-data-refs? ids bucket)
|
||||||
"file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket)
|
"file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket)
|
||||||
|
"file-change" (process-objects! conn has-file-change-refs? ids bucket)
|
||||||
(ex/raise :type :internal
|
(ex/raise :type :internal
|
||||||
:code :unexpected-unknown-reference
|
:code :unexpected-unknown-reference
|
||||||
:hint (dm/fmt "unknown reference '%'" bucket))))
|
:hint (dm/fmt "unknown reference '%'" bucket))))
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
inactivity (the default threshold is 72h)."
|
inactivity (the default threshold is 72h)."
|
||||||
(:require
|
(:require
|
||||||
[app.binfile.common :as bfc]
|
[app.binfile.common :as bfc]
|
||||||
|
[app.common.exceptions :as ex]
|
||||||
[app.common.files.migrations :as fmg]
|
[app.common.files.migrations :as fmg]
|
||||||
[app.common.files.validate :as cfv]
|
[app.common.files.validate :as cfv]
|
||||||
[app.common.logging :as l]
|
[app.common.logging :as l]
|
||||||
|
@ -235,6 +236,16 @@
|
||||||
|
|
||||||
(defn- decode-file
|
(defn- decode-file
|
||||||
[cfg {:keys [id] :as file}]
|
[cfg {:keys [id] :as file}]
|
||||||
|
;; NOTE: a preventive check that does not allow proceed the gc for
|
||||||
|
;; already offloaded file; if this exception happens, means that
|
||||||
|
;; something external modified the file flag without preloading the
|
||||||
|
;; file back again to the table
|
||||||
|
(when (feat.fdata/offloaded? file)
|
||||||
|
(ex/raise :hint "unable to run file-gc on an already offloaded file"
|
||||||
|
:type :internal
|
||||||
|
:code :file-already-offloaded
|
||||||
|
:file-id id))
|
||||||
|
|
||||||
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
|
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
|
||||||
(-> file
|
(-> file
|
||||||
(update :features db/decode-pgarray #{})
|
(update :features db/decode-pgarray #{})
|
||||||
|
|
|
@ -10,35 +10,58 @@
|
||||||
(:require
|
(:require
|
||||||
[app.common.logging :as l]
|
[app.common.logging :as l]
|
||||||
[app.db :as db]
|
[app.db :as db]
|
||||||
|
[app.storage :as sto]
|
||||||
[app.util.time :as dt]
|
[app.util.time :as dt]
|
||||||
[clojure.spec.alpha :as s]
|
[clojure.spec.alpha :as s]
|
||||||
[integrant.core :as ig]))
|
[integrant.core :as ig]))
|
||||||
|
|
||||||
(def ^:private
|
(def ^:private
|
||||||
sql:delete-files-xlog
|
sql:delete-files-xlog
|
||||||
"delete from file_change
|
"DELETE FROM file_change
|
||||||
where created_at < now() - ?::interval
|
WHERE id IN (SELECT id FROM file_change
|
||||||
and label is NULL")
|
WHERE label IS NULL
|
||||||
|
AND created_at < ?
|
||||||
|
ORDER BY created_at LIMIT ?)
|
||||||
|
RETURNING id, data_backend, data_ref_id")
|
||||||
|
|
||||||
|
(def xf:filter-offloded
|
||||||
|
(comp
|
||||||
|
(filter #(= "objects-storage" (:data-backend %)))
|
||||||
|
(map :data-ref-id)))
|
||||||
|
|
||||||
|
(defn- delete-in-chunks
|
||||||
|
[{:keys [::chunk-size ::threshold] :as cfg}]
|
||||||
|
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
|
||||||
|
(loop [total 0]
|
||||||
|
(let [chunk (db/exec! cfg [sql:delete-files-xlog threshold chunk-size])
|
||||||
|
length (count chunk)]
|
||||||
|
|
||||||
|
;; touch all references on offloaded changes entries
|
||||||
|
(doseq [data-ref-id (sequence xf:filter-offloded chunk)]
|
||||||
|
(l/trc :hint "touching referenced storage object"
|
||||||
|
:storage-object-id (str data-ref-id))
|
||||||
|
(sto/touch-object! storage data-ref-id))
|
||||||
|
|
||||||
|
(if (pos? length)
|
||||||
|
(recur (+ total length))
|
||||||
|
total)))))
|
||||||
|
|
||||||
(defmethod ig/pre-init-spec ::handler [_]
|
(defmethod ig/pre-init-spec ::handler [_]
|
||||||
(s/keys :req [::db/pool]))
|
(s/keys :req [::db/pool]))
|
||||||
|
|
||||||
(defmethod ig/prep-key ::handler
|
|
||||||
[_ cfg]
|
|
||||||
(assoc cfg ::min-age (dt/duration {:hours 72})))
|
|
||||||
|
|
||||||
(defmethod ig/init-key ::handler
|
(defmethod ig/init-key ::handler
|
||||||
[_ {:keys [::db/pool] :as cfg}]
|
[_ cfg]
|
||||||
(fn [{:keys [props] :as task}]
|
(fn [{:keys [props] :as task}]
|
||||||
(let [min-age (or (:min-age props) (::min-age cfg))]
|
(let [min-age (or (:min-age props)
|
||||||
(db/with-atomic [conn pool]
|
(dt/duration "72h"))
|
||||||
(let [interval (db/interval min-age)
|
chunk-size (:chunk-size props 5000)
|
||||||
result (db/exec-one! conn [sql:delete-files-xlog interval])
|
threshold (dt/minus (dt/now) min-age)]
|
||||||
result (db/get-update-count result)]
|
|
||||||
|
|
||||||
(l/info :hint "task finished" :min-age (dt/format-duration min-age) :total result)
|
(-> cfg
|
||||||
|
(assoc ::db/rollback (:rollback props false))
|
||||||
(when (:rollback? props)
|
(assoc ::threshold threshold)
|
||||||
(db/rollback! conn))
|
(assoc ::chunk-size chunk-size)
|
||||||
|
(db/tx-run! (fn [cfg]
|
||||||
result)))))
|
(let [total (delete-in-chunks cfg)]
|
||||||
|
(l/trc :hint "file xlog cleaned" :total total)
|
||||||
|
total)))))))
|
||||||
|
|
|
@ -125,7 +125,7 @@
|
||||||
0)))
|
0)))
|
||||||
|
|
||||||
(def ^:private sql:get-files
|
(def ^:private sql:get-files
|
||||||
"SELECT id, deleted_at, project_id, data_ref_id
|
"SELECT id, deleted_at, project_id, data_backend, data_ref_id
|
||||||
FROM file
|
FROM file
|
||||||
WHERE deleted_at IS NOT NULL
|
WHERE deleted_at IS NOT NULL
|
||||||
AND deleted_at < now() - ?::interval
|
AND deleted_at < now() - ?::interval
|
||||||
|
@ -137,14 +137,15 @@
|
||||||
(defn- delete-files!
|
(defn- delete-files!
|
||||||
[{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
|
[{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
|
||||||
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1})
|
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1})
|
||||||
(reduce (fn [total {:keys [id deleted-at project-id data-ref-id]}]
|
(reduce (fn [total {:keys [id deleted-at project-id] :as file}]
|
||||||
(l/trc :hint "permanently delete"
|
(l/trc :hint "permanently delete"
|
||||||
:rel "file"
|
:rel "file"
|
||||||
:id (str id)
|
:id (str id)
|
||||||
:project-id (str project-id)
|
:project-id (str project-id)
|
||||||
:deleted-at (dt/format-instant deleted-at))
|
:deleted-at (dt/format-instant deleted-at))
|
||||||
|
|
||||||
(some->> data-ref-id (sto/touch-object! storage))
|
(when (= "objects-storage" (:data-backend file))
|
||||||
|
(sto/touch-object! storage (:data-ref-id file)))
|
||||||
|
|
||||||
;; And finally, permanently delete the file.
|
;; And finally, permanently delete the file.
|
||||||
(db/delete! conn :file {:id id})
|
(db/delete! conn :file {:id id})
|
||||||
|
|
|
@ -73,6 +73,38 @@
|
||||||
{:id (:id fragment)}
|
{:id (:id fragment)}
|
||||||
{::db/return-keys false}))))
|
{::db/return-keys false}))))
|
||||||
|
|
||||||
|
(def sql:get-snapshots
|
||||||
|
"SELECT fc.*
|
||||||
|
FROM file_change AS fc
|
||||||
|
WHERE fc.file_id = ?
|
||||||
|
AND fc.label IS NOT NULL
|
||||||
|
AND fc.data IS NOT NULL
|
||||||
|
AND fc.data_backend IS NULL")
|
||||||
|
|
||||||
|
(defn- offload-file-snapshots!
|
||||||
|
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
|
||||||
|
(doseq [snapshot (db/exec! conn [sql:get-snapshots file-id])]
|
||||||
|
(let [data (sto/content (:data snapshot))
|
||||||
|
sobj (sto/put-object! storage
|
||||||
|
{::sto/content data
|
||||||
|
::sto/touch true
|
||||||
|
:bucket "file-change"
|
||||||
|
:content-type "application/octet-stream"
|
||||||
|
:file-id file-id
|
||||||
|
:file-change-id (:id snapshot)})]
|
||||||
|
|
||||||
|
(l/trc :hint "offload file change"
|
||||||
|
:file-id (str file-id)
|
||||||
|
:file-change-id (str (:id snapshot))
|
||||||
|
:storage-id (str (:id sobj)))
|
||||||
|
|
||||||
|
(db/update! conn :file-change
|
||||||
|
{:data-backend "objects-storage"
|
||||||
|
:data-ref-id (:id sobj)
|
||||||
|
:data nil}
|
||||||
|
{:id (:id snapshot)}
|
||||||
|
{::db/return-keys false}))))
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; HANDLER
|
;; HANDLER
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
@ -88,4 +120,5 @@
|
||||||
(assoc ::file-id (:file-id props))
|
(assoc ::file-id (:file-id props))
|
||||||
(db/tx-run! (fn [cfg]
|
(db/tx-run! (fn [cfg]
|
||||||
(offload-file-data! cfg)
|
(offload-file-data! cfg)
|
||||||
(offload-file-data-fragments! cfg))))))
|
(offload-file-data-fragments! cfg)
|
||||||
|
(offload-file-snapshots! cfg))))))
|
||||||
|
|
|
@ -499,17 +499,16 @@
|
||||||
(js/console.log "EE:" cause))))))
|
(js/console.log "EE:" cause))))))
|
||||||
|
|
||||||
(defn ^:export restore-snapshot
|
(defn ^:export restore-snapshot
|
||||||
[id file-id]
|
[label file-id]
|
||||||
(when-let [file-id (or (d/parse-uuid file-id)
|
(when-let [file-id (or (d/parse-uuid file-id)
|
||||||
(:current-file-id @st/state))]
|
(:current-file-id @st/state))]
|
||||||
(when-let [id (d/parse-uuid id)]
|
(->> (http/send! {:method :post
|
||||||
(->> (http/send! {:method :post
|
:uri (u/join cf/public-uri "api/rpc/command/restore-file-snapshot")
|
||||||
:uri (u/join cf/public-uri "api/rpc/command/restore-file-snapshot")
|
:body (http/transit-data {:file-id file-id :label label})})
|
||||||
:body (http/transit-data {:file-id file-id :id id})})
|
(rx/map http/conditional-decode-transit)
|
||||||
(rx/map http/conditional-decode-transit)
|
(rx/mapcat rp/handle-response)
|
||||||
(rx/mapcat rp/handle-response)
|
(rx/subs! (fn [_]
|
||||||
(rx/subs! (fn [_]
|
(println "Snapshot restored " label)
|
||||||
(println "Snapshot restored " id)
|
#_(.reload js/location))
|
||||||
#_(.reload js/location))
|
(fn [cause]
|
||||||
(fn [cause]
|
(js/console.log "EE:" cause))))))
|
||||||
(js/console.log "EE:" cause)))))))
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue