diff --git a/backend/src/app/features/fdata.clj b/backend/src/app/features/fdata.clj index e44488539..1d9a649f3 100644 --- a/backend/src/app/features/fdata.clj +++ b/backend/src/app/features/fdata.clj @@ -17,6 +17,14 @@ [app.util.objects-map :as omap] [app.util.pointer-map :as pmap])) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; OFFLOAD +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn offloaded? + [file] + (= "objects-storage" (:data-backend file))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; OBJECTS-MAP ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -58,18 +66,16 @@ (defn get-file-data "Get file data given a file instance." - [system {:keys [data-backend data-ref-id] :as file} & {:keys [touch]}] - (if (= data-backend "objects-storage") - (let [storage (sto/resolve system ::db/reuse-conn true) - object (sto/get-object storage data-ref-id)] - - (when touch (sto/touch-object! storage data-ref-id)) - (sto/get-object-bytes storage object)) + [system file] + (if (offloaded? file) + (let [storage (sto/resolve system ::db/reuse-conn true)] + (->> (sto/get-object storage (:data-ref-id file)) + (sto/get-object-bytes storage))) (:data file))) (defn resolve-file-data - [system file & {:as opts}] - (let [data (get-file-data system file opts)] + [system file] + (let [data (get-file-data system file)] (assoc file :data data))) (defn load-pointer diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 692d9c190..314732d9f 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -404,7 +404,8 @@ ::sto/storage (ig/ref ::sto/storage)} :app.tasks.file-xlog-gc/handler - {::db/pool (ig/ref ::db/pool)} + {::db/pool (ig/ref ::db/pool) + ::sto/storage (ig/ref ::sto/storage)} :app.tasks.telemetry/handler {::db/pool (ig/ref ::db/pool) diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 112ea6be0..bc61a89d1 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -406,7 +406,10 @@ :fn (mg/resource "app/migrations/sql/0127-mod-storage-object-table.sql")} {:name "0128-mod-task-table" - :fn (mg/resource "app/migrations/sql/0128-mod-task-table.sql")}]) + :fn (mg/resource "app/migrations/sql/0128-mod-task-table.sql")} + + {:name "0129-mod-file-change-table" + :fn (mg/resource "app/migrations/sql/0129-mod-file-change-table.sql")}]) (defn apply-migrations! [pool name migrations] diff --git a/backend/src/app/migrations/sql/0129-mod-file-change-table.sql b/backend/src/app/migrations/sql/0129-mod-file-change-table.sql new file mode 100644 index 000000000..fcf1d4f4c --- /dev/null +++ b/backend/src/app/migrations/sql/0129-mod-file-change-table.sql @@ -0,0 +1,6 @@ +ALTER TABLE file_change + ADD COLUMN data_backend text NULL, + ADD COLUMN data_ref_id uuid NULL; + +CREATE INDEX IF NOT EXISTS file_change__data_ref_id__idx + ON file_change (data_ref_id); diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index f49c77b41..d3317ac2b 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -273,7 +273,7 @@ (defn get-minimal-file [cfg id & {:as opts}] - (let [opts (assoc opts ::sql/columns [:id :modified-at :revn])] + (let [opts (assoc opts ::sql/columns [:id :modified-at :revn :data-ref-id :data-backend])] (db/get cfg :file {:id id} opts))) (defn get-file-etag diff --git a/backend/src/app/rpc/commands/files_snapshot.clj b/backend/src/app/rpc/commands/files_snapshot.clj index 99b241317..c4a2fa3ce 100644 --- a/backend/src/app/rpc/commands/files_snapshot.clj +++ b/backend/src/app/rpc/commands/files_snapshot.clj @@ -13,12 +13,15 @@ [app.config :as cf] [app.db :as db] [app.db.sql :as-alias sql] + [app.features.fdata :as feat.fdata] [app.main :as-alias main] [app.rpc :as-alias rpc] [app.rpc.commands.files :as files] [app.rpc.commands.profile :as profile] [app.rpc.doc :as-alias doc] [app.storage :as sto] + [app.util.blob :as blob] + [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] [cuerdas.core :as str])) @@ -33,20 +36,22 @@ :code :authentication-required :hint "only admins allowed"))) +(def sql:get-file-snapshots + "SELECT id, label, revn, created_at + FROM file_change + WHERE file_id = ? + AND created_at < ? + AND label is not null + ORDER BY created_at desc + LIMIT ?") + (defn get-file-snapshots [{:keys [::db/conn]} {:keys [file-id limit start-at] :or {limit Long/MAX_VALUE}}] - (let [query (str "select id, label, revn, created_at " - " from file_change " - " where file_id = ? " - " and created_at < ? " - " and data is not null " - " order by created_at desc " - " limit ?") - start-at (or start-at (dt/now)) + (let [start-at (or start-at (dt/now)) limit (min limit 20)] - (->> (db/exec! conn [query file-id start-at limit]) + (->> (db/exec! conn [sql:get-file-snapshots file-id start-at limit]) (mapv (fn [row] (update row :created-at dt/format-instant :rfc1123)))))) @@ -77,43 +82,52 @@ :id id :file-id file-id)) - (when-not (:data snapshot) - (ex/raise :type :precondition - :code :snapshot-without-data - :hint "snapshot has no data" - :label (:label snapshot) - :file-id file-id)) + (let [snapshot (feat.fdata/resolve-file-data cfg snapshot)] + (when-not (:data snapshot) + (ex/raise :type :precondition + :code :snapshot-without-data + :hint "snapshot has no data" + :label (:label snapshot) + :file-id file-id)) - (l/dbg :hint "restoring snapshot" - :file-id (str file-id) - :label (:label snapshot) - :snapshot-id (str (:id snapshot))) + (l/dbg :hint "restoring snapshot" + :file-id (str file-id) + :label (:label snapshot) + :snapshot-id (str (:id snapshot))) - (db/update! conn :file - {:data (:data snapshot) - :revn (inc (:revn file)) - :features (:features snapshot)} - {:id file-id}) + ;; If the file was already offloaded, on restring the snapshot + ;; we are going to replace the file data, so we need to touch + ;; the old referenced storage object and avoid possible leaks + (when (feat.fdata/offloaded? file) + (sto/touch-object! storage (:data-ref-id file))) - ;; clean object thumbnails - (let [sql (str "update file_tagged_object_thumbnail " - " set deleted_at = now() " - " where file_id=? returning media_id") - res (db/exec! conn [sql file-id])] + (db/update! conn :file + {:data (:data snapshot) + :revn (inc (:revn file)) + :data-backend nil + :data-ref-id nil + :has-media-trimmed false + :features (:features snapshot)} + {:id file-id}) - (doseq [media-id (into #{} (keep :media-id) res)] - (sto/touch-object! storage media-id))) + ;; clean object thumbnails + (let [sql (str "update file_tagged_object_thumbnail " + " set deleted_at = now() " + " where file_id=? returning media_id") + res (db/exec! conn [sql file-id])] + (doseq [media-id (into #{} (keep :media-id) res)] + (sto/touch-object! storage media-id))) - ;; clean object thumbnails - (let [sql (str "update file_thumbnail " - " set deleted_at = now() " - " where file_id=? returning media_id") - res (db/exec! conn [sql file-id])] - (doseq [media-id (into #{} (keep :media-id) res)] - (sto/touch-object! storage media-id))) + ;; clean file thumbnails + (let [sql (str "update file_thumbnail " + " set deleted_at = now() " + " where file_id=? returning media_id") + res (db/exec! conn [sql file-id])] + (doseq [media-id (into #{} (keep :media-id) res)] + (sto/touch-object! storage media-id))) - {:id (:id snapshot) - :label (:label snapshot)})) + {:id (:id snapshot) + :label (:label snapshot)}))) (defn- resolve-snapshot-by-label [conn file-id label] @@ -145,17 +159,27 @@ (merge (resolve-snapshot-by-label conn file-id label)))] (restore-file-snapshot! cfg params))))) +(defn- get-file + [cfg file-id] + (let [file (->> (db/get cfg :file {:id file-id}) + (feat.fdata/resolve-file-data cfg))] + (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg file-id)] + (-> file + (update :data blob/decode) + (update :data feat.fdata/process-pointers deref) + (update :data feat.fdata/process-objects (partial into {})) + (update :data blob/encode))))) + (defn take-file-snapshot! [cfg {:keys [file-id label]}] - (let [conn (db/get-connection cfg) - file (db/get conn :file {:id file-id}) + (let [file (get-file cfg file-id) id (uuid/next)] (l/debug :hint "creating file snapshot" :file-id (str file-id) :label label) - (db/insert! conn :file-change + (db/insert! cfg :file-change {:id id :revn (:revn file) :data (:data file) diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index e0d07eeb8..e4114a2be 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -29,6 +29,7 @@ [app.rpc.commands.teams :as teams] [app.rpc.doc :as-alias doc] [app.rpc.helpers :as rph] + [app.storage :as sto] [app.util.blob :as blob] [app.util.pointer-map :as pmap] [app.util.services :as sv] @@ -228,7 +229,7 @@ [{:keys [::db/conn ::wrk/executor] :as cfg} {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] (let [;; Retrieve the file data - file (feat.fdata/resolve-file-data cfg file {:touch true}) + file (feat.fdata/resolve-file-data cfg file) ;; Process the file data on separated thread for avoid to do ;; the CPU intensive operation on vthread. @@ -236,6 +237,13 @@ file (px/invoke! executor (partial update-file-data cfg file changes skip-validate)) features (db/create-array conn "text" (:features file))] + ;; NOTE: if file was offloaded, we need to touch the referenced + ;; storage object because on this update operation the data will + ;; be overwritted. + (when (= "objects-storage" (:data-backend file)) + (let [storage (sto/resolve cfg ::db/reuse-conn true)] + (sto/touch-object! storage (:data-ref-id file)))) + (db/insert! conn :file-change {:id (uuid/next) :session-id session-id @@ -406,7 +414,7 @@ "UPDATE file_change SET label = NULL WHERE file_id = ? - AND label IS NOT NULL + AND label LIKE 'internal/%' AND created_at < ?") (defn- delete-old-snapshots! diff --git a/backend/src/app/storage/gc_touched.clj b/backend/src/app/storage/gc_touched.clj index 155496b41..03fe0f426 100644 --- a/backend/src/app/storage/gc_touched.clj +++ b/backend/src/app/storage/gc_touched.clj @@ -94,6 +94,15 @@ (-> (db/exec-one! conn [sql:has-file-data-fragment-refs id]) (get :has-refs))) +(def ^:private + sql:has-file-change-refs + "SELECT EXISTS (SELECT 1 FROM file_change WHERE data_ref_id = ?) AS has_refs") + +(defn- has-file-change-refs? + [conn id] + (-> (db/exec-one! conn [sql:has-file-change-refs id]) + (get :has-refs))) + (def ^:private sql:mark-freeze-in-bulk "UPDATE storage_object SET touched_at = NULL @@ -168,6 +177,7 @@ "profile" (process-objects! conn has-profile-refs? ids bucket) "file-data" (process-objects! conn has-file-data-refs? ids bucket) "file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket) + "file-change" (process-objects! conn has-file-change-refs? ids bucket) (ex/raise :type :internal :code :unexpected-unknown-reference :hint (dm/fmt "unknown reference '%'" bucket)))) diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index 730dbe8ae..e88cfcef0 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -11,6 +11,7 @@ inactivity (the default threshold is 72h)." (:require [app.binfile.common :as bfc] + [app.common.exceptions :as ex] [app.common.files.migrations :as fmg] [app.common.files.validate :as cfv] [app.common.logging :as l] @@ -235,6 +236,16 @@ (defn- decode-file [cfg {:keys [id] :as file}] + ;; NOTE: a preventive check that does not allow proceed the gc for + ;; already offloaded file; if this exception happens, means that + ;; something external modified the file flag without preloading the + ;; file back again to the table + (when (feat.fdata/offloaded? file) + (ex/raise :hint "unable to run file-gc on an already offloaded file" + :type :internal + :code :file-already-offloaded + :file-id id)) + (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)] (-> file (update :features db/decode-pgarray #{}) diff --git a/backend/src/app/tasks/file_xlog_gc.clj b/backend/src/app/tasks/file_xlog_gc.clj index ebc5993d3..07253f0d8 100644 --- a/backend/src/app/tasks/file_xlog_gc.clj +++ b/backend/src/app/tasks/file_xlog_gc.clj @@ -10,6 +10,7 @@ (:require [app.common.logging :as l] [app.db :as db] + [app.storage :as sto] [app.util.time :as dt] [clojure.spec.alpha :as s] [integrant.core :as ig])) @@ -20,22 +21,36 @@ WHERE id IN (SELECT id FROM file_change WHERE label IS NULL AND created_at < ? - ORDER BY created_at LIMIT ?)") + ORDER BY created_at LIMIT ?) + RETURNING id, data_backend, data_ref_id") + +(def xf:filter-offloded + (comp + (filter #(= "objects-storage" (:data-backend %))) + (map :data-ref-id))) (defn- delete-in-chunks [{:keys [::chunk-size ::threshold] :as cfg}] - (loop [total 0] - (let [result (-> (db/exec-one! cfg [sql:delete-files-xlog threshold chunk-size]) - (db/get-update-count))] - (if (pos? result) - (recur (+ total result)) - total)))) + (let [storage (sto/resolve cfg ::db/reuse-conn true)] + (loop [total 0] + (let [chunk (db/exec! cfg [sql:delete-files-xlog threshold chunk-size]) + length (count chunk)] + + ;; touch all references on offloaded changes entries + (doseq [data-ref-id (sequence xf:filter-offloded chunk)] + (l/trc :hint "touching referenced storage object" + :storage-object-id (str data-ref-id)) + (sto/touch-object! storage data-ref-id)) + + (if (pos? length) + (recur (+ total length)) + total))))) (defmethod ig/pre-init-spec ::handler [_] (s/keys :req [::db/pool])) (defmethod ig/init-key ::handler - [_ {:keys [::db/pool] :as cfg}] + [_ cfg] (fn [{:keys [props] :as task}] (let [min-age (or (:min-age props) (dt/duration "72h")) @@ -43,7 +58,7 @@ threshold (dt/minus (dt/now) min-age)] (-> cfg - (assoc ::db/rollback (:rollback? props false)) + (assoc ::db/rollback (:rollback props false)) (assoc ::threshold threshold) (assoc ::chunk-size chunk-size) (db/tx-run! (fn [cfg] diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 95d3128bd..67ed8f9aa 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -125,7 +125,7 @@ 0))) (def ^:private sql:get-files - "SELECT id, deleted_at, project_id, data_ref_id + "SELECT id, deleted_at, project_id, data_backend, data_ref_id FROM file WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval @@ -137,14 +137,15 @@ (defn- delete-files! [{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}] (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1}) - (reduce (fn [total {:keys [id deleted-at project-id data-ref-id]}] + (reduce (fn [total {:keys [id deleted-at project-id] :as file}] (l/trc :hint "permanently delete" :rel "file" :id (str id) :project-id (str project-id) :deleted-at (dt/format-instant deleted-at)) - (some->> data-ref-id (sto/touch-object! storage)) + (when (= "objects-storage" (:data-backend file)) + (sto/touch-object! storage (:data-ref-id file))) ;; And finally, permanently delete the file. (db/delete! conn :file {:id id}) diff --git a/backend/src/app/tasks/offload_file_data.clj b/backend/src/app/tasks/offload_file_data.clj index 99788cb9f..cfe50970f 100644 --- a/backend/src/app/tasks/offload_file_data.clj +++ b/backend/src/app/tasks/offload_file_data.clj @@ -73,6 +73,38 @@ {:id (:id fragment)} {::db/return-keys false})))) +(def sql:get-snapshots + "SELECT fc.* + FROM file_change AS fc + WHERE fc.file_id = ? + AND fc.label IS NOT NULL + AND fc.data IS NOT NULL + AND fc.data_backend IS NULL") + +(defn- offload-file-snapshots! + [{:keys [::db/conn ::sto/storage ::file-id] :as cfg}] + (doseq [snapshot (db/exec! conn [sql:get-snapshots file-id])] + (let [data (sto/content (:data snapshot)) + sobj (sto/put-object! storage + {::sto/content data + ::sto/touch true + :bucket "file-change" + :content-type "application/octet-stream" + :file-id file-id + :file-change-id (:id snapshot)})] + + (l/trc :hint "offload file change" + :file-id (str file-id) + :file-change-id (str (:id snapshot)) + :storage-id (str (:id sobj))) + + (db/update! conn :file-change + {:data-backend "objects-storage" + :data-ref-id (:id sobj) + :data nil} + {:id (:id snapshot)} + {::db/return-keys false})))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HANDLER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -88,4 +120,5 @@ (assoc ::file-id (:file-id props)) (db/tx-run! (fn [cfg] (offload-file-data! cfg) - (offload-file-data-fragments! cfg)))))) + (offload-file-data-fragments! cfg) + (offload-file-snapshots! cfg)))))) diff --git a/frontend/src/debug.cljs b/frontend/src/debug.cljs index 9d79a2c24..01b11784c 100644 --- a/frontend/src/debug.cljs +++ b/frontend/src/debug.cljs @@ -499,17 +499,16 @@ (js/console.log "EE:" cause)))))) (defn ^:export restore-snapshot - [id file-id] + [label file-id] (when-let [file-id (or (d/parse-uuid file-id) (:current-file-id @st/state))] - (when-let [id (d/parse-uuid id)] - (->> (http/send! {:method :post - :uri (u/join cf/public-uri "api/rpc/command/restore-file-snapshot") - :body (http/transit-data {:file-id file-id :id id})}) - (rx/map http/conditional-decode-transit) - (rx/mapcat rp/handle-response) - (rx/subs! (fn [_] - (println "Snapshot restored " id) - #_(.reload js/location)) - (fn [cause] - (js/console.log "EE:" cause))))))) + (->> (http/send! {:method :post + :uri (u/join cf/public-uri "api/rpc/command/restore-file-snapshot") + :body (http/transit-data {:file-id file-id :label label})}) + (rx/map http/conditional-decode-transit) + (rx/mapcat rp/handle-response) + (rx/subs! (fn [_] + (println "Snapshot restored " label) + #_(.reload js/location)) + (fn [cause] + (js/console.log "EE:" cause))))))