From 253b9e5bd839c9a0e382c130175269c80d1c1043 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 1 Aug 2024 16:11:25 +0200 Subject: [PATCH 1/6] :sparkles: Split file-gc task in two separated tasks Add a new file-gc-scheduler task for analizing all files for elegibility and leave file-gc task with the responsability to performn the GC operation. --- backend/src/app/main.clj | 6 +- backend/src/app/tasks/file_gc.clj | 175 +++++++++----------- backend/src/app/tasks/file_gc_scheduler.clj | 65 ++++++++ 3 files changed, 148 insertions(+), 98 deletions(-) create mode 100644 backend/src/app/tasks/file_gc_scheduler.clj diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ee58a21b5..07946e59a 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -344,6 +344,7 @@ {:sendmail (ig/ref ::email/handler) :objects-gc (ig/ref :app.tasks.objects-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler) + :file-gc-scheduler (ig/ref :app.tasks.file-gc-scheduler/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) @@ -394,6 +395,9 @@ {::db/pool (ig/ref ::db/pool) ::sto/storage (ig/ref ::sto/storage)} + :app.tasks.file-gc-scheduler/handler + {::db/pool (ig/ref ::db/pool)} + :app.tasks.file-xlog-gc/handler {::db/pool (ig/ref ::db/pool)} @@ -485,7 +489,7 @@ :task :tasks-gc} {:cron #app/cron "0 0 2 * * ?" ;; daily - :task :file-gc} + :task :file-gc-scheduler} {:cron #app/cron "0 30 */3,23 * * ?" :task :telemetry} diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index 79f5ff8b9..e007485f5 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -10,6 +10,7 @@ file is eligible to be garbage collected after some period of inactivity (the default threshold is 72h)." (:require + [app.common.data :as d] [app.binfile.common :as bfc] [app.common.files.migrations :as fmg] [app.common.files.validate :as cfv] @@ -30,69 +31,9 @@ [clojure.spec.alpha :as s] [integrant.core :as ig])) -(declare ^:private clean-file!) - -(defn- decode-file - [cfg {:keys [id] :as file}] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)] - (-> file - (update :features db/decode-pgarray #{}) - (update :data blob/decode) - (update :data feat.fdata/process-pointers deref) - (update :data feat.fdata/process-objects (partial into {})) - (update :data assoc :id id) - (fmg/migrate-file)))) - -(defn- update-file! - [{:keys [::db/conn] :as cfg} {:keys [id] :as file}] - (let [file (if (contains? (:features file) "fdata/objects-map") - (feat.fdata/enable-objects-map file) - file) - - file (if (contains? (:features file) "fdata/pointer-map") - (binding [pmap/*tracked* (pmap/create-tracked)] - (let [file (feat.fdata/enable-pointer-map file)] - (feat.fdata/persist-pointers! cfg id) - file)) - file) - - file (-> file - (update :features db/encode-pgarray conn "text") - (update :data blob/encode))] - - (db/update! conn :file - {:has-media-trimmed true - :features (:features file) - :version (:version file) - :data (:data file)} - {:id id} - {::db/return-keys true}))) - -(def ^:private - sql:get-candidates - "SELECT f.id, - f.data, - f.revn, - f.version, - f.features, - f.modified_at - FROM file AS f - WHERE f.has_media_trimmed IS false - AND f.modified_at < now() - ?::interval - AND f.deleted_at IS NULL - ORDER BY f.modified_at DESC - FOR UPDATE - SKIP LOCKED") - -(defn- get-candidates - [{:keys [::db/conn ::min-age ::file-id]}] - (if (uuid? file-id) - (do - (l/warn :hint "explicit file id passed on params" :file-id (str file-id)) - (db/query conn :file {:id file-id})) - - (let [min-age (db/interval min-age)] - (db/cursor conn [sql:get-candidates min-age] {:chunk-size 1})))) +(declare ^:private get-file) +(declare ^:private decode-file) +(declare ^:private persist-file!) (def ^:private sql:mark-file-media-object-deleted "UPDATE file_media_object @@ -172,7 +113,6 @@ file)) - (def ^:private sql:get-files-for-library "SELECT f.id, f.data, f.modified_at, f.features, f.version FROM file AS f @@ -274,16 +214,74 @@ (cfv/validate-file-schema! file) file)) +(def ^:private sql:get-file + "SELECT f.id, + f.data, + f.revn, + f.version, + f.features, + f.modified_at + FROM file AS f + WHERE f.has_media_trimmed IS false + AND f.modified_at < now() - ?::interval + AND f.deleted_at IS NULL + AND f.id = ? + FOR UPDATE + SKIP LOCKED") + +(defn- get-file + [{:keys [::db/conn ::min-age ::file-id]}] + (->> (db/exec! conn [sql:get-file min-age file-id]) + (first))) + +(defn- decode-file + [cfg {:keys [id] :as file}] + (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)] + (-> file + (update :features db/decode-pgarray #{}) + (update :data blob/decode) + (update :data feat.fdata/process-pointers deref) + (update :data feat.fdata/process-objects (partial into {})) + (update :data assoc :id id) + (fmg/migrate-file)))) + +(defn- persist-file! + [{:keys [::db/conn] :as cfg} {:keys [id] :as file}] + (let [file (if (contains? (:features file) "fdata/objects-map") + (feat.fdata/enable-objects-map file) + file) + + file (if (contains? (:features file) "fdata/pointer-map") + (binding [pmap/*tracked* (pmap/create-tracked)] + (let [file (feat.fdata/enable-pointer-map file)] + (feat.fdata/persist-pointers! cfg id) + file)) + file) + + file (-> file + (update :features db/encode-pgarray conn "text") + (update :data blob/encode))] + + (db/update! conn :file + {:has-media-trimmed true + :features (:features file) + :version (:version file) + :data (:data file)} + {:id id} + {::db/return-keys true}))) + (defn- process-file! - [cfg file] + [cfg] (try - (let [file (decode-file cfg file) - file (clean-media! cfg file) - file (update-file! cfg file)] - (clean-data-fragments! cfg file)) + (if-let [file (get-file cfg)] + (let [file (decode-file cfg file) + file (clean-media! cfg file) + file (persist-file! cfg file)] + (clean-data-fragments! cfg file)) + (l/dbg :hint "skip" :file-id (str (::file-id cfg)))) (catch Throwable cause - (l/err :hint "error on cleaning file (skiping)" - :file-id (str (:id file)) + (l/err :hint "error on cleaning file" + :file-id (str (::file-id cfg)) :cause cause)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -293,33 +291,16 @@ (defmethod ig/pre-init-spec ::handler [_] (s/keys :req [::db/pool ::sto/storage])) -(defmethod ig/prep-key ::handler - [_ cfg] - (assoc cfg ::min-age (cf/get-deletion-delay))) - (defmethod ig/init-key ::handler [_ cfg] (fn [{:keys [props] :as task}] - (db/tx-run! cfg - (fn [{:keys [::db/conn] :as cfg}] - (let [min-age (dt/duration (or (:min-age props) (::min-age cfg))) - cfg (-> cfg - (update ::sto/storage media/configure-assets-storage conn) - (assoc ::file-id (:file-id props)) - (assoc ::min-age min-age)) + (let [min-age (dt/duration (:min-age props 0)) + cfg (-> cfg + (assoc ::db/rollback (:rollback? props)) + (assoc ::file-id (:file-id props)) + (assoc ::min-age (db/interval min-age)))] - total (reduce (fn [total file] - (process-file! cfg file) - (inc total)) - 0 - (get-candidates cfg))] - - (l/inf :hint "finished" - :min-age (dt/format-duration min-age) - :processed total) - - ;; Allow optional rollback passed by params - (when (:rollback? props) - (db/rollback! conn)) - - {:processed total}))))) + (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] + (let [cfg (update cfg ::sto/storage media/configure-assets-storage conn)] + (process-file! cfg)))) + nil))) diff --git a/backend/src/app/tasks/file_gc_scheduler.clj b/backend/src/app/tasks/file_gc_scheduler.clj new file mode 100644 index 000000000..11c24e889 --- /dev/null +++ b/backend/src/app/tasks/file_gc_scheduler.clj @@ -0,0 +1,65 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.file-gc-scheduler + "A maintenance task that is responsible of properly scheduling the + file-gc task for all files that matches the eligibility threshold." + (:require + [app.common.logging :as l] + [app.config :as cf] + [app.db :as db] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(def ^:private + sql:get-candidates + "SELECT f.id, + f.modified_at + FROM file AS f + WHERE f.has_media_trimmed IS false + AND f.modified_at < now() - ?::interval + AND f.deleted_at IS NULL + ORDER BY f.modified_at DESC + FOR UPDATE + SKIP LOCKED") + +(defn- get-candidates + [{:keys [::db/conn ::min-age] :as cfg}] + (let [min-age (db/interval min-age)] + (db/cursor conn [sql:get-candidates min-age] {:chunk-size 10}))) + +(defn- schedule! + [{:keys [::min-age] :as cfg}] + (let [total (reduce (fn [total {:keys [id]}] + (let [params {:file-id id :min-age min-age}] + (wrk/submit! (assoc cfg ::wrk/params params)) + (inc total))) + 0 + (get-candidates cfg))] + + {:processed total})) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/prep-key ::handler + [_ cfg] + (assoc cfg ::min-age (cf/get-deletion-delay))) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [{:keys [props] :as task}] + (let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))] + (-> cfg + (assoc ::db/rollback (:rollback? props)) + (assoc ::min-age min-age) + (assoc ::wrk/task :file-gc) + (assoc ::wrk/priority 10) + (assoc ::wrk/mark-retries 0) + (assoc ::wrk/delay 1000) + (db/tx-run! schedule!))))) From f6bfe3931cc761f4e8ad745832305bdf04279095 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 2 Aug 2024 12:25:47 +0200 Subject: [PATCH 2/6] :zap: Add performance enhacements to storage/gc-touched task --- backend/src/app/storage/gc_deleted.clj | 2 - backend/src/app/storage/gc_touched.clj | 151 ++++++++++++------------- 2 files changed, 74 insertions(+), 79 deletions(-) diff --git a/backend/src/app/storage/gc_deleted.clj b/backend/src/app/storage/gc_deleted.clj index 52cdce4b1..7f903b000 100644 --- a/backend/src/app/storage/gc_deleted.clj +++ b/backend/src/app/storage/gc_deleted.clj @@ -121,5 +121,3 @@ :total total) {:deleted total})))))) - - diff --git a/backend/src/app/storage/gc_touched.clj b/backend/src/app/storage/gc_touched.clj index bd499bb65..4e805bae6 100644 --- a/backend/src/app/storage/gc_touched.clj +++ b/backend/src/app/storage/gc_touched.clj @@ -28,58 +28,53 @@ [clojure.spec.alpha :as s] [integrant.core :as ig])) -(def ^:private sql:get-team-font-variant-nrefs - "SELECT ((SELECT count(*) FROM team_font_variant WHERE woff1_file_id = ?) + - (SELECT count(*) FROM team_font_variant WHERE woff2_file_id = ?) + - (SELECT count(*) FROM team_font_variant WHERE otf_file_id = ?) + - (SELECT count(*) FROM team_font_variant WHERE ttf_file_id = ?)) AS nrefs") +(def ^:private sql:has-team-font-variant-refs + "SELECT ((SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE woff1_file_id = ?)) OR + (SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE woff2_file_id = ?)) OR + (SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE otf_file_id = ?)) OR + (SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE ttf_file_id = ?))) AS has_refs") -(defn- get-team-font-variant-nrefs +(defn- has-team-font-variant-refs? [conn id] - (-> (db/exec-one! conn [sql:get-team-font-variant-nrefs id id id id]) - (get :nrefs))) - + (-> (db/exec-one! conn [sql:has-team-font-variant-refs id id id id]) + (get :has-refs))) (def ^:private - sql:get-file-media-object-nrefs - "SELECT ((SELECT count(*) FROM file_media_object WHERE media_id = ?) + - (SELECT count(*) FROM file_media_object WHERE thumbnail_id = ?)) AS nrefs") + sql:has-file-media-object-refs + "SELECT ((SELECT EXISTS (SELECT 1 FROM file_media_object WHERE media_id = ?)) OR + (SELECT EXISTS (SELECT 1 FROM file_media_object WHERE thumbnail_id = ?))) AS has_refs") -(defn- get-file-media-object-nrefs +(defn- has-file-media-object-refs? [conn id] - (-> (db/exec-one! conn [sql:get-file-media-object-nrefs id id]) - (get :nrefs))) + (-> (db/exec-one! conn [sql:has-file-media-object-refs id id]) + (get :has-refs))) +(def ^:private sql:has-profile-refs + "SELECT ((SELECT EXISTS (SELECT 1 FROM profile WHERE photo_id = ?)) OR + (SELECT EXISTS (SELECT 1 FROM team WHERE photo_id = ?))) AS has_refs") -(def ^:private sql:get-profile-nrefs - "SELECT ((SELECT count(*) FROM profile WHERE photo_id = ?) + - (SELECT count(*) FROM team WHERE photo_id = ?)) AS nrefs") - -(defn- get-profile-nrefs +(defn- has-profile-refs? [conn id] - (-> (db/exec-one! conn [sql:get-profile-nrefs id id]) - (get :nrefs))) - + (-> (db/exec-one! conn [sql:has-profile-refs id id]) + (get :has-refs))) (def ^:private - sql:get-file-object-thumbnail-nrefs - "SELECT (SELECT count(*) FROM file_tagged_object_thumbnail WHERE media_id = ?) AS nrefs") + sql:has-file-object-thumbnail-refs + "SELECT EXISTS (SELECT 1 FROM file_tagged_object_thumbnail WHERE media_id = ?) AS has_refs") -(defn- get-file-object-thumbnails +(defn- has-file-object-thumbnails-refs? [conn id] - (-> (db/exec-one! conn [sql:get-file-object-thumbnail-nrefs id]) - (get :nrefs))) - + (-> (db/exec-one! conn [sql:has-file-object-thumbnail-refs id]) + (get :has-refs))) (def ^:private - sql:get-file-thumbnail-nrefs - "SELECT (SELECT count(*) FROM file_thumbnail WHERE media_id = ?) AS nrefs") + sql:has-file-thumbnail-refs + "SELECT EXISTS (SELECT 1 FROM file_thumbnail WHERE media_id = ?) AS has_refs") -(defn- get-file-thumbnails +(defn- has-file-thumbnails-refs? [conn id] - (-> (db/exec-one! conn [sql:get-file-thumbnail-nrefs id]) - (get :nrefs))) - + (-> (db/exec-one! conn [sql:has-file-thumbnail-refs id]) + (get :has-refs))) (def ^:private sql:mark-freeze-in-bulk "UPDATE storage_object @@ -91,7 +86,6 @@ (let [ids (db/create-array conn "uuid" ids)] (db/exec-one! conn [sql:mark-freeze-in-bulk ids]))) - (def ^:private sql:mark-delete-in-bulk "UPDATE storage_object SET deleted_at = now(), @@ -123,25 +117,24 @@ "file-media-object")) (defn- process-objects! - [conn get-fn ids bucket] + [conn has-refs? ids bucket] (loop [to-freeze #{} to-delete #{} ids (seq ids)] (if-let [id (first ids)] - (let [nrefs (get-fn conn id)] - (if (pos? nrefs) - (do - (l/debug :hint "processing object" - :id (str id) - :status "freeze" - :bucket bucket :refs nrefs) - (recur (conj to-freeze id) to-delete (rest ids))) - (do - (l/debug :hint "processing object" - :id (str id) - :status "delete" - :bucket bucket :refs nrefs) - (recur to-freeze (conj to-delete id) (rest ids))))) + (if (has-refs? conn id) + (do + (l/debug :hint "processing object" + :id (str id) + :status "freeze" + :bucket bucket) + (recur (conj to-freeze id) to-delete (rest ids))) + (do + (l/debug :hint "processing object" + :id (str id) + :status "delete" + :bucket bucket) + (recur to-freeze (conj to-delete id) (rest ids)))) (do (some->> (seq to-freeze) (mark-freeze-in-bulk! conn)) (some->> (seq to-delete) (mark-delete-in-bulk! conn)) @@ -150,15 +143,23 @@ (defn- process-bucket! [conn bucket ids] (case bucket - "file-media-object" (process-objects! conn get-file-media-object-nrefs ids bucket) - "team-font-variant" (process-objects! conn get-team-font-variant-nrefs ids bucket) - "file-object-thumbnail" (process-objects! conn get-file-object-thumbnails ids bucket) - "file-thumbnail" (process-objects! conn get-file-thumbnails ids bucket) - "profile" (process-objects! conn get-profile-nrefs ids bucket) + "file-media-object" (process-objects! conn has-file-media-object-refs? ids bucket) + "team-font-variant" (process-objects! conn has-team-font-variant-refs? ids bucket) + "file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? ids bucket) + "file-thumbnail" (process-objects! conn has-file-thumbnails-refs? ids bucket) + "profile" (process-objects! conn has-profile-refs? ids bucket) (ex/raise :type :internal :code :unexpected-unknown-reference - :hint (dm/fmt "unknown reference %" bucket)))) + :hint (dm/fmt "unknown reference '%'" bucket)))) +(defn process-chunk! + [{:keys [::db/conn]} chunk] + (reduce-kv (fn [[nfo ndo] bucket ids] + (let [[nfo' ndo'] (process-bucket! conn bucket ids)] + [(+ nfo nfo') + (+ ndo ndo')])) + [0 0] + (d/group-by lookup-bucket :id #{} chunk))) (def ^:private sql:get-touched-storage-objects @@ -167,29 +168,22 @@ WHERE so.touched_at IS NOT NULL ORDER BY touched_at ASC FOR UPDATE - SKIP LOCKED") + SKIP LOCKED + LIMIT 10") -(defn- group-by-bucket - [row] - (d/group-by lookup-bucket :id #{} row)) - -(defn- get-buckets +(defn get-chunk [conn] - (sequence - (comp (map impl/decode-row) - (partition-all 25) - (mapcat group-by-bucket)) - (db/cursor conn sql:get-touched-storage-objects))) + (->> (db/exec! conn [sql:get-touched-storage-objects]) + (map impl/decode-row) + (not-empty))) (defn- process-touched! - [{:keys [::db/conn]}] - (loop [buckets (get-buckets conn) - freezed 0 + [{:keys [::db/pool] :as cfg}] + (loop [freezed 0 deleted 0] - (if-let [[bucket ids] (first buckets)] - (let [[nfo ndo] (process-bucket! conn bucket ids)] - (recur (rest buckets) - (+ freezed nfo) + (if-let [chunk (get-chunk pool)] + (let [[nfo ndo] (db/tx-run! cfg process-chunk! chunk)] + (recur (+ freezed nfo) (+ deleted ndo))) (do (l/inf :hint "task finished" @@ -198,11 +192,14 @@ {:freeze freezed :delete deleted})))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; HANDLER +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + (defmethod ig/pre-init-spec ::handler [_] (s/keys :req [::db/pool])) (defmethod ig/init-key ::handler [_ cfg] - (fn [_] - (db/tx-run! cfg process-touched!))) + (fn [_] (process-touched! cfg))) From 0e92bcc0de2526fcd58cf5cbfccd02c44dff74c3 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 1 Aug 2024 16:17:22 +0200 Subject: [PATCH 3/6] :tada: Add file-data offload mechanism --- backend/scripts/repl | 8 +- backend/scripts/start-dev | 7 +- backend/src/app/binfile/v1.clj | 9 +- backend/src/app/binfile/v2.clj | 6 +- backend/src/app/config.clj | 16 +- backend/src/app/db.clj | 2 +- backend/src/app/features/components_v2.clj | 3 +- backend/src/app/features/fdata.clj | 23 ++- backend/src/app/http/assets.clj | 9 +- backend/src/app/main.clj | 34 +++- backend/src/app/media.clj | 14 -- backend/src/app/migrations.clj | 11 +- .../sql/0122-mod-file-data-fragment-table.sql | 6 + .../sql/0122-mod-file-fragment-table.sql | 6 + .../migrations/sql/0122-mod-file-table.sql | 4 + .../sql/0123-mod-file-change-table.sql | 2 + backend/src/app/rpc/commands/auth.clj | 1 - backend/src/app/rpc/commands/files.clj | 23 ++- .../src/app/rpc/commands/files_snapshot.clj | 5 +- .../src/app/rpc/commands/files_thumbnails.clj | 8 +- backend/src/app/rpc/commands/files_update.clj | 7 +- backend/src/app/rpc/commands/fonts.clj | 30 +--- backend/src/app/rpc/commands/media.clj | 31 ++-- backend/src/app/rpc/commands/profile.clj | 3 +- backend/src/app/rpc/commands/teams.clj | 3 +- backend/src/app/storage.clj | 88 +++++++--- backend/src/app/storage/gc_touched.clj | 20 +++ backend/src/app/storage/impl.clj | 6 +- backend/src/app/tasks/file_gc.clj | 48 +++-- backend/src/app/tasks/file_gc_scheduler.clj | 1 - backend/src/app/tasks/objects_gc.clj | 20 +-- backend/src/app/tasks/offload_file_data.clj | 85 +++++++++ backend/test/backend_tests/rpc_file_test.clj | 165 ++++++++++++++---- .../rpc_file_thumbnails_test.clj | 8 +- backend/test/backend_tests/rpc_font_test.clj | 6 +- 35 files changed, 502 insertions(+), 216 deletions(-) create mode 100644 backend/src/app/migrations/sql/0122-mod-file-data-fragment-table.sql create mode 100644 backend/src/app/migrations/sql/0122-mod-file-fragment-table.sql create mode 100644 backend/src/app/migrations/sql/0122-mod-file-table.sql create mode 100644 backend/src/app/migrations/sql/0123-mod-file-change-table.sql create mode 100644 backend/src/app/tasks/offload_file_data.clj diff --git a/backend/scripts/repl b/backend/scripts/repl index 0debeece2..a9efcb623 100755 --- a/backend/scripts/repl +++ b/backend/scripts/repl @@ -27,6 +27,7 @@ export PENPOT_FLAGS="\ enable-file-snapshot \ enable-webhooks \ enable-access-tokens \ + enable-tiered-file-data-storage \ enable-file-validation \ enable-file-schema-validation"; @@ -62,9 +63,10 @@ mc mb penpot-s3/penpot -p -q export AWS_ACCESS_KEY_ID=penpot-devenv export AWS_SECRET_ACCESS_KEY=penpot-devenv -export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3 -export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000 -export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot + +export PENPOT_OBJECTS_STORAGE_BACKEND=s3 +export PENPOT_OBJECTS_STORAGE_S3_ENDPOINT=http://minio:9000 +export PENPOT_OBJECTS_STORAGE_S3_BUCKET=penpot export OPTIONS=" -A:jmx-remote -A:dev \ diff --git a/backend/scripts/start-dev b/backend/scripts/start-dev index 564151bef..b137af101 100755 --- a/backend/scripts/start-dev +++ b/backend/scripts/start-dev @@ -19,6 +19,7 @@ export PENPOT_FLAGS="\ enable-smtp \ enable-file-snapshot \ enable-access-tokens \ + enable-tiered-file-data-storage \ enable-file-validation \ enable-file-schema-validation"; @@ -56,9 +57,9 @@ mc mb penpot-s3/penpot -p -q export AWS_ACCESS_KEY_ID=penpot-devenv export AWS_SECRET_ACCESS_KEY=penpot-devenv -export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3 -export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000 -export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot +export PENPOT_OBJECTS_STORAGE_BACKEND=s3 +export PENPOT_OBJECTS_STORAGE_S3_ENDPOINT=http://minio:9000 +export PENPOT_OBJECTS_STORAGE_S3_BUCKET=penpot entrypoint=${1:-app.main}; diff --git a/backend/src/app/binfile/v1.clj b/backend/src/app/binfile/v1.clj index 3e1c93aa0..87f02d391 100644 --- a/backend/src/app/binfile/v1.clj +++ b/backend/src/app/binfile/v1.clj @@ -22,7 +22,6 @@ [app.db :as db] [app.loggers.audit :as-alias audit] [app.loggers.webhooks :as-alias webhooks] - [app.media :as media] [app.rpc :as-alias rpc] [app.rpc.commands.teams :as teams] [app.rpc.doc :as-alias doc] @@ -403,9 +402,9 @@ (write-obj! output rels))) (defmethod write-section :v1/sobjects - [{:keys [::sto/storage ::output]}] + [{:keys [::output] :as cfg}] (let [sids (-> bfc/*state* deref :sids) - storage (media/configure-assets-storage storage)] + storage (sto/resolve cfg)] (l/dbg :hint "found sobjects" :items (count sids) @@ -620,8 +619,8 @@ ::l/sync? true)))))) (defmethod read-section :v1/sobjects - [{:keys [::sto/storage ::db/conn ::input ::bfc/overwrite ::bfc/timestamp]}] - (let [storage (media/configure-assets-storage storage) + [{:keys [::db/conn ::input ::bfc/overwrite ::bfc/timestamp] :as cfg}] + (let [storage (sto/resolve cfg) ids (read-obj! input) thumb? (into #{} (map :media-id) (:thumbnails @bfc/*state*))] diff --git a/backend/src/app/binfile/v2.clj b/backend/src/app/binfile/v2.clj index 1a5f10342..bef327acc 100644 --- a/backend/src/app/binfile/v2.clj +++ b/backend/src/app/binfile/v2.clj @@ -20,7 +20,6 @@ [app.db.sql :as sql] [app.loggers.audit :as-alias audit] [app.loggers.webhooks :as-alias webhooks] - [app.media :as media] [app.storage :as sto] [app.storage.tmp :as tmp] [app.util.events :as events] @@ -347,9 +346,7 @@ [cfg team-id] (let [id (uuid/next) tp (dt/tpoint) - - cfg (-> (create-database cfg) - (update ::sto/storage media/configure-assets-storage))] + cfg (create-database cfg)] (l/inf :hint "start" :operation "export" @@ -390,7 +387,6 @@ tp (dt/tpoint) cfg (-> (create-database cfg path) - (update ::sto/storage media/configure-assets-storage) (assoc ::bfc/timestamp (dt/now)))] (l/inf :hint "start" diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index d1315d48b..161ccc854 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -52,8 +52,8 @@ :redis-uri "redis://redis/0" - :assets-storage-backend :assets-fs - :storage-assets-fs-directory "assets" + :objects-storage-backend "fs" + :objects-storage-fs-directory "assets" :assets-path "/internal/assets/" :smtp-default-reply-to "Penpot " @@ -207,16 +207,24 @@ [:prepl-host {:optional true} :string] [:prepl-port {:optional true} :int] - [:assets-storage-backend {:optional true} :keyword] [:media-directory {:optional true} :string] ;; REVIEW [:media-uri {:optional true} :string] [:assets-path {:optional true} :string] + ;; Legacy, will be removed in 2.5 + [:assets-storage-backend {:optional true} :keyword] [:storage-assets-fs-directory {:optional true} :string] [:storage-assets-s3-bucket {:optional true} :string] [:storage-assets-s3-region {:optional true} :keyword] [:storage-assets-s3-endpoint {:optional true} :string] - [:storage-assets-s3-io-threads {:optional true} :int]])) + [:storage-assets-s3-io-threads {:optional true} :int] + + [:objects-storage-backend {:optional true} :keyword] + [:objects-storage-fs-directory {:optional true} :string] + [:objects-storage-s3-bucket {:optional true} :string] + [:objects-storage-s3-region {:optional true} :keyword] + [:objects-storage-s3-endpoint {:optional true} :string] + [:objects-storage-s3-io-threads {:optional true} :int]])) (def default-flags [:enable-backend-api-doc diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index 097ada50a..8c03be660 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -153,7 +153,7 @@ (s/def ::conn some?) (s/def ::nilable-pool (s/nilable ::pool)) (s/def ::pool pool?) -(s/def ::pool-or-conn some?) +(s/def ::connectable some?) (defn closed? [pool] diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index 47dc3fad0..d0e03eca6 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -62,6 +62,7 @@ [datoteka.io :as io] [promesa.util :as pu])) + (def ^:dynamic *stats* "A dynamic var for setting up state for collect stats globally." nil) @@ -1742,7 +1743,7 @@ :validate validate? :skip-on-graphic-error skip-on-graphic-error?) - (db/tx-run! (update system ::sto/storage media/configure-assets-storage) + (db/tx-run! system (fn [system] (binding [*system* system] (when (string? label) diff --git a/backend/src/app/features/fdata.clj b/backend/src/app/features/fdata.clj index 5370df038..e44488539 100644 --- a/backend/src/app/features/fdata.clj +++ b/backend/src/app/features/fdata.clj @@ -12,6 +12,7 @@ [app.common.logging :as l] [app.db :as db] [app.db.sql :as-alias sql] + [app.storage :as sto] [app.util.blob :as blob] [app.util.objects-map :as omap] [app.util.pointer-map :as pmap])) @@ -55,12 +56,28 @@ ;; POINTER-MAP ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defn get-file-data + "Get file data given a file instance." + [system {:keys [data-backend data-ref-id] :as file} & {:keys [touch]}] + (if (= data-backend "objects-storage") + (let [storage (sto/resolve system ::db/reuse-conn true) + object (sto/get-object storage data-ref-id)] + + (when touch (sto/touch-object! storage data-ref-id)) + (sto/get-object-bytes storage object)) + (:data file))) + +(defn resolve-file-data + [system file & {:as opts}] + (let [data (get-file-data system file opts)] + (assoc file :data data))) + (defn load-pointer "A database loader pointer helper" [system file-id id] (let [fragment (db/get* system :file-data-fragment {:id id :file-id file-id} - {::sql/columns [:data]})] + {::sql/columns [:data :data-backend :data-ref-id :id]})] (l/trc :hint "load pointer" :file-id (str file-id) @@ -74,7 +91,9 @@ :file-id file-id :fragment-id id)) - (blob/decode (:data fragment)))) + (let [data (get-file-data system fragment)] + ;; FIXME: conditional thread scheduling for decoding big objects + (blob/decode data)))) (defn persist-pointers! "Persist all currently tracked pointer objects" diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index 06c331849..9a8e69dbf 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -57,11 +57,10 @@ (defn- serve-object "Helper function that returns the appropriate response depending on the storage object backend type." - [{:keys [::sto/storage] :as cfg} {:keys [backend] :as obj}] - (let [backend (sto/resolve-backend storage backend)] - (case (::sto/type backend) - :s3 (serve-object-from-s3 cfg obj) - :fs (serve-object-from-fs cfg obj)))) + [cfg {:keys [backend] :as obj}] + (case backend + (:s3 :assets-s3) (serve-object-from-s3 cfg obj) + (:fs :assets-fs) (serve-object-from-fs cfg obj))) (defn objects-handler "Handler that servers storage objects by id." diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 07946e59a..692d9c190 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -345,6 +345,7 @@ :objects-gc (ig/ref :app.tasks.objects-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler) :file-gc-scheduler (ig/ref :app.tasks.file-gc-scheduler/handler) + :offload-file-data (ig/ref :app.tasks.offload-file-data/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) @@ -398,6 +399,10 @@ :app.tasks.file-gc-scheduler/handler {::db/pool (ig/ref ::db/pool)} + :app.tasks.offload-file-data/handler + {::db/pool (ig/ref ::db/pool) + ::sto/storage (ig/ref ::sto/storage)} + :app.tasks.file-xlog-gc/handler {::db/pool (ig/ref ::db/pool)} @@ -452,17 +457,28 @@ ::sto/storage {::db/pool (ig/ref ::db/pool) ::sto/backends - {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) - :assets-fs (ig/ref [::assets :app.storage.fs/backend])}} + {:s3 (ig/ref :app.storage.s3/backend) + :fs (ig/ref :app.storage.fs/backend) - [::assets :app.storage.s3/backend] - {::sto.s3/region (cf/get :storage-assets-s3-region) - ::sto.s3/endpoint (cf/get :storage-assets-s3-endpoint) - ::sto.s3/bucket (cf/get :storage-assets-s3-bucket) - ::sto.s3/io-threads (cf/get :storage-assets-s3-io-threads)} + ;; LEGACY (should not be removed, can only be removed after an + ;; explicit migration because the database objects/rows will + ;; still reference the old names). + :assets-s3 (ig/ref :app.storage.s3/backend) + :assets-fs (ig/ref :app.storage.fs/backend)}} - [::assets :app.storage.fs/backend] - {::sto.fs/directory (cf/get :storage-assets-fs-directory)}}) + :app.storage.s3/backend + {::sto.s3/region (or (cf/get :storage-assets-s3-region) + (cf/get :objects-storage-s3-region)) + ::sto.s3/endpoint (or (cf/get :storage-assets-s3-endpoint) + (cf/get :objects-storage-s3-endpoint)) + ::sto.s3/bucket (or (cf/get :storage-assets-s3-bucket) + (cf/get :objects-storage-s3-bucket)) + ::sto.s3/io-threads (or (cf/get :storage-assets-s3-io-threads) + (cf/get :objects-storage-s3-io-threads))} + + :app.storage.fs/backend + {::sto.fs/directory (or (cf/get :storage-assets-fs-directory) + (cf/get :objects-storage-fs-directory))}}) (def worker-config diff --git a/backend/src/app/media.clj b/backend/src/app/media.clj index 9e1a120fe..ba40a7a52 100644 --- a/backend/src/app/media.clj +++ b/backend/src/app/media.clj @@ -313,17 +313,3 @@ (= stype :ttf) (-> (assoc "font/otf" (ttf->otf sfnt)) (assoc "font/ttf" sfnt))))))))) - - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Utility functions -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn configure-assets-storage - "Given storage map, returns a storage configured with the appropriate - backend for assets and optional connection attached." - ([storage] - (assoc storage ::sto/backend (cf/get :assets-storage-backend :assets-fs))) - ([storage pool-or-conn] - (-> (configure-assets-storage storage) - (assoc ::db/pool-or-conn pool-or-conn)))) diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 32768abf9..965b0ce77 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -382,7 +382,16 @@ :fn (mg/resource "app/migrations/sql/0120-mod-audit-log-table.sql")} {:name "0121-mod-file-data-fragment-table" - :fn (mg/resource "app/migrations/sql/0121-mod-file-data-fragment-table.sql")}]) + :fn (mg/resource "app/migrations/sql/0121-mod-file-data-fragment-table.sql")} + + {:name "0122-mod-file-table" + :fn (mg/resource "app/migrations/sql/0122-mod-file-table.sql")} + + {:name "0122-mod-file-data-fragment-table" + :fn (mg/resource "app/migrations/sql/0122-mod-file-data-fragment-table.sql")} + + {:name "0123-mod-file-change-table" + :fn (mg/resource "app/migrations/sql/0123-mod-file-change-table.sql")}]) (defn apply-migrations! [pool name migrations] diff --git a/backend/src/app/migrations/sql/0122-mod-file-data-fragment-table.sql b/backend/src/app/migrations/sql/0122-mod-file-data-fragment-table.sql new file mode 100644 index 000000000..87955aea8 --- /dev/null +++ b/backend/src/app/migrations/sql/0122-mod-file-data-fragment-table.sql @@ -0,0 +1,6 @@ +ALTER TABLE file_data_fragment + ADD COLUMN data_backend text NULL, + ADD COLUMN data_ref_id uuid NULL; + +CREATE INDEX IF NOT EXISTS file_data_fragment__data_ref_id__idx + ON file_data_fragment (data_ref_id); diff --git a/backend/src/app/migrations/sql/0122-mod-file-fragment-table.sql b/backend/src/app/migrations/sql/0122-mod-file-fragment-table.sql new file mode 100644 index 000000000..87955aea8 --- /dev/null +++ b/backend/src/app/migrations/sql/0122-mod-file-fragment-table.sql @@ -0,0 +1,6 @@ +ALTER TABLE file_data_fragment + ADD COLUMN data_backend text NULL, + ADD COLUMN data_ref_id uuid NULL; + +CREATE INDEX IF NOT EXISTS file_data_fragment__data_ref_id__idx + ON file_data_fragment (data_ref_id); diff --git a/backend/src/app/migrations/sql/0122-mod-file-table.sql b/backend/src/app/migrations/sql/0122-mod-file-table.sql new file mode 100644 index 000000000..4f0a05155 --- /dev/null +++ b/backend/src/app/migrations/sql/0122-mod-file-table.sql @@ -0,0 +1,4 @@ +ALTER TABLE file ADD COLUMN data_ref_id uuid NULL; + +CREATE INDEX IF NOT EXISTS file__data_ref_id__idx + ON file (data_ref_id); diff --git a/backend/src/app/migrations/sql/0123-mod-file-change-table.sql b/backend/src/app/migrations/sql/0123-mod-file-change-table.sql new file mode 100644 index 000000000..37fccfd51 --- /dev/null +++ b/backend/src/app/migrations/sql/0123-mod-file-change-table.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS file_change__created_at__label__idx + ON file_change (created_at, label); diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index ff8bfdb8f..a8fc218ec 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -522,7 +522,6 @@ (create-recovery-token) (send-email-notification conn))))))) - (def schema:request-profile-recovery [:map {:title "request-profile-recovery"} [:email ::sm/email]]) diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index 4c8497693..336c4aeb2 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -68,6 +68,9 @@ :max-version fmg/version)) file)) + +;; --- FILE DATA + ;; --- FILE PERMISSIONS (def ^:private sql:file-permissions @@ -258,11 +261,12 @@ (let [params (merge {:id id} (when (some? project-id) {:project-id project-id})) - file (-> (db/get conn :file params - {::db/check-deleted (not include-deleted?) - ::db/remove-deleted (not include-deleted?) - ::sql/for-update lock-for-update?}) - (decode-row))] + file (->> (db/get conn :file params + {::db/check-deleted (not include-deleted?) + ::db/remove-deleted (not include-deleted?) + ::sql/for-update lock-for-update?}) + (feat.fdata/resolve-file-data cfg) + (decode-row))] (if (and migrate? (fmg/need-migration? file)) (migrate-file cfg file) file))) @@ -328,8 +332,10 @@ (defn- get-file-fragment [cfg file-id fragment-id] - (some-> (db/get cfg :file-data-fragment {:file-id file-id :id fragment-id}) - (update :data blob/decode))) + (let [resolve-file-data (partial feat.fdata/resolve-file-data cfg)] + (some-> (db/get cfg :file-data-fragment {:file-id file-id :id fragment-id}) + (resolve-file-data) + (update :data blob/decode)))) (sv/defmethod ::get-file-fragment "Retrieve a file fragment by its ID. Only authenticated users." @@ -802,7 +808,8 @@ (db/update! cfg :file {:revn (inc (:revn file)) :data (blob/encode (:data file)) - :modified-at (dt/now)} + :modified-at (dt/now) + :has-media-trimmed false} {:id file-id}) (feat.fdata/persist-pointers! cfg file-id)))) diff --git a/backend/src/app/rpc/commands/files_snapshot.clj b/backend/src/app/rpc/commands/files_snapshot.clj index 1e9c3081a..99b241317 100644 --- a/backend/src/app/rpc/commands/files_snapshot.clj +++ b/backend/src/app/rpc/commands/files_snapshot.clj @@ -14,7 +14,6 @@ [app.db :as db] [app.db.sql :as-alias sql] [app.main :as-alias main] - [app.media :as media] [app.rpc :as-alias rpc] [app.rpc.commands.files :as files] [app.rpc.commands.profile :as profile] @@ -63,8 +62,8 @@ (db/run! cfg get-file-snapshots params)) (defn restore-file-snapshot! - [{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}] - (let [storage (media/configure-assets-storage storage conn) + [{:keys [::db/conn] :as cfg} {:keys [file-id id]}] + (let [storage (sto/resolve cfg {::db/reuse-conn true}) file (files/get-minimal-file conn file-id {::db/for-update true}) snapshot (db/get* conn :file-change {:file-id file-id diff --git a/backend/src/app/rpc/commands/files_thumbnails.clj b/backend/src/app/rpc/commands/files_thumbnails.clj index 446de5378..411f4fef4 100644 --- a/backend/src/app/rpc/commands/files_thumbnails.clj +++ b/backend/src/app/rpc/commands/files_thumbnails.clj @@ -295,8 +295,7 @@ (db/run! cfg files/check-edition-permissions! profile-id file-id) - (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] - (create-file-object-thumbnail! cfg file-id object-id media (or tag "frame")))) + (create-file-object-thumbnail! cfg file-id object-id media (or tag "frame"))) ;; --- MUTATION COMMAND: delete-file-object-thumbnail @@ -327,7 +326,7 @@ (files/check-edition-permissions! cfg profile-id file-id) (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (-> cfg - (update ::sto/storage media/configure-assets-storage conn) + (update ::sto/storage sto/configure conn) (delete-file-object-thumbnail! file-id object-id)) nil))) @@ -405,7 +404,6 @@ (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (files/check-edition-permissions! conn profile-id file-id) (when-not (db/read-only? conn) - (let [cfg (update cfg ::sto/storage media/configure-assets-storage) - media (create-file-thumbnail! cfg params)] + (let [media (create-file-thumbnail! cfg params)] {:uri (files/resolve-public-uri (:id media)) :id (:id media)}))))) diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index 76b621b3c..c25d05c24 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -227,8 +227,12 @@ (defn- update-file* [{:keys [::db/conn ::wrk/executor] :as cfg} {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] - (let [;; Process the file data on separated thread for avoid to do + (let [;; Retrieve the file data + file (feat.fdata/resolve-file-data cfg file {:touch true}) + + ;; Process the file data on separated thread for avoid to do ;; the CPU intensive operation on vthread. + file (px/invoke! executor (partial update-file-data cfg file changes skip-validate)) features (db/create-array conn "text" (:features file))] @@ -254,6 +258,7 @@ :version (:version file) :features features :data-backend nil + :data-ref-id nil :modified-at created-at :has-media-trimmed false} {:id (:id file)}) diff --git a/backend/src/app/rpc/commands/fonts.clj b/backend/src/app/rpc/commands/fonts.clj index 0942da601..51081eb19 100644 --- a/backend/src/app/rpc/commands/fonts.clj +++ b/backend/src/app/rpc/commands/fonts.clj @@ -95,12 +95,11 @@ [cfg {:keys [::rpc/profile-id team-id] :as params}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] - (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] - (teams/check-edition-permissions! conn profile-id team-id) - (quotes/check-quote! conn {::quotes/id ::quotes/font-variants-per-team - ::quotes/profile-id profile-id - ::quotes/team-id team-id}) - (create-font-variant cfg (assoc params :profile-id profile-id)))))) + (teams/check-edition-permissions! conn profile-id team-id) + (quotes/check-quote! conn {::quotes/id ::quotes/font-variants-per-team + ::quotes/profile-id profile-id + ::quotes/team-id team-id}) + (create-font-variant cfg (assoc params :profile-id profile-id))))) (defn create-font-variant [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}] @@ -203,14 +202,13 @@ ::sm/params schema:delete-font} [cfg {:keys [::rpc/profile-id id team-id]}] (db/tx-run! cfg - (fn [{:keys [::db/conn ::sto/storage] :as cfg}] + (fn [{:keys [::db/conn] :as cfg}] (teams/check-edition-permissions! conn profile-id team-id) (let [fonts (db/query conn :team-font-variant {:team-id team-id :font-id id :deleted-at nil} {::sql/for-update true}) - storage (media/configure-assets-storage storage conn) tnow (dt/now)] (when-not (seq fonts) @@ -220,11 +218,7 @@ (doseq [font fonts] (db/update! conn :team-font-variant {:deleted-at tnow} - {:id (:id font)}) - (some->> (:woff1-file-id font) (sto/touch-object! storage)) - (some->> (:woff2-file-id font) (sto/touch-object! storage)) - (some->> (:ttf-file-id font) (sto/touch-object! storage)) - (some->> (:otf-file-id font) (sto/touch-object! storage))) + {:id (:id font)})) (rph/with-meta (rph/wrap) {::audit/props {:id id @@ -245,22 +239,16 @@ ::sm/params schema:delete-font-variant} [cfg {:keys [::rpc/profile-id id team-id]}] (db/tx-run! cfg - (fn [{:keys [::db/conn ::sto/storage] :as cfg}] + (fn [{:keys [::db/conn] :as cfg}] (teams/check-edition-permissions! conn profile-id team-id) (let [variant (db/get conn :team-font-variant {:id id :team-id team-id} - {::sql/for-update true}) - storage (media/configure-assets-storage storage conn)] + {::sql/for-update true})] (db/update! conn :team-font-variant {:deleted-at (dt/now)} {:id (:id variant)}) - (some->> (:woff1-file-id variant) (sto/touch-object! storage)) - (some->> (:woff2-file-id variant) (sto/touch-object! storage)) - (some->> (:ttf-file-id variant) (sto/touch-object! storage)) - (some->> (:otf-file-id variant) (sto/touch-object! storage)) - (rph/with-meta (rph/wrap) {::audit/props {:font-family (:font-family variant) :font-id (:font-id variant)}}))))) diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index 992c5d1da..d915933b6 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -56,21 +56,19 @@ ::climit/id [[:process-image/by-profile ::rpc/profile-id] [:process-image/global]]} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}] - (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] + (files/check-edition-permissions! pool profile-id file-id) + (media/validate-media-type! content) + (media/validate-media-size! content) - (files/check-edition-permissions! pool profile-id file-id) - (media/validate-media-type! content) - (media/validate-media-size! content) - - (db/run! cfg (fn [cfg] - (let [object (create-file-media-object cfg params) - props {:name (:name params) - :file-id file-id - :is-local (:is-local params) - :size (:size content) - :mtype (:mtype content)}] - (with-meta object - {::audit/replace-props props})))))) + (db/run! cfg (fn [cfg] + (let [object (create-file-media-object cfg params) + props {:name (:name params) + :file-id file-id + :is-local (:is-local params) + :size (:size content) + :mtype (:mtype content)}] + (with-meta object + {::audit/replace-props props}))))) (defn- big-enough-for-thumbnail? "Checks if the provided image info is big enough for @@ -183,9 +181,8 @@ {::doc/added "1.17" ::sm/params schema:create-file-media-object-from-url} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] - (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] - (files/check-edition-permissions! pool profile-id file-id) - (create-file-media-object-from-url cfg (assoc params :profile-id profile-id)))) + (files/check-edition-permissions! pool profile-id file-id) + (create-file-media-object-from-url cfg (assoc params :profile-id profile-id))) (defn download-image [{:keys [::http/client]} uri] diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index e3b453e26..0e594f978 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -210,8 +210,7 @@ [cfg {:keys [::rpc/profile-id file] :as params}] ;; Validate incoming mime type (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) - (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] - (update-profile-photo cfg (assoc params :profile-id profile-id)))) + (update-profile-photo cfg (assoc params :profile-id profile-id))) (defn update-profile-photo [{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}] diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index 74918de97..553257560 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -674,8 +674,7 @@ [cfg {:keys [::rpc/profile-id file] :as params}] ;; Validate incoming mime type (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) - (let [cfg (update cfg ::sto/storage media/configure-assets-storage)] - (update-team-photo cfg (assoc params :profile-id profile-id)))) + (update-team-photo cfg (assoc params :profile-id profile-id))) (defn update-team-photo [{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id team-id] :as params}] diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index c818b03fa..861730e33 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -6,11 +6,13 @@ (ns app.storage "Objects storage abstraction layer." + (:refer-clojure :exclude [resolve]) (:require [app.common.data :as d] [app.common.data.macros :as dm] [app.common.spec :as us] [app.common.uuid :as uuid] + [app.config :as cf] [app.db :as db] [app.storage.fs :as sfs] [app.storage.impl :as impl] @@ -18,16 +20,23 @@ [app.util.time :as dt] [clojure.spec.alpha :as s] [datoteka.fs :as fs] - [integrant.core :as ig] - [promesa.core :as p]) + [integrant.core :as ig]) (:import java.io.InputStream)) +(defn get-legacy-backend + [] + (let [name (cf/get :assets-storage-backend)] + (case name + :assets-fs :fs + :assets-s3 :s3 + :fs))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Storage Module State ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::id #{:assets-fs :assets-s3}) +(s/def ::id #{:assets-fs :assets-s3 :fs :s3}) (s/def ::s3 ::ss3/backend) (s/def ::fs ::sfs/backend) (s/def ::type #{:fs :s3}) @@ -45,11 +54,13 @@ [_ {:keys [::backends ::db/pool] :as cfg}] (-> (d/without-nils cfg) (assoc ::backends (d/without-nils backends)) - (assoc ::db/pool-or-conn pool))) + (assoc ::backend (or (get-legacy-backend) + (cf/get :objects-storage-backend :fs))) + (assoc ::db/connectable pool))) (s/def ::backend keyword?) (s/def ::storage - (s/keys :req [::backends ::db/pool ::db/pool-or-conn] + (s/keys :req [::backends ::db/pool ::db/connectable] :opt [::backend])) (s/def ::storage-with-backend @@ -61,23 +72,26 @@ (defn get-metadata [params] - (into {} - (remove (fn [[k _]] (qualified-keyword? k))) - params)) + (reduce-kv (fn [res k _] + (if (qualified-keyword? k) + (dissoc res k) + res)) + params + params)) (defn- get-database-object-by-hash - [pool-or-conn backend bucket hash] + [connectable backend bucket hash] (let [sql (str "select * from storage_object " " where (metadata->>'~:hash') = ? " " and (metadata->>'~:bucket') = ? " " and backend = ?" " and deleted_at is null" " limit 1")] - (some-> (db/exec-one! pool-or-conn [sql hash bucket (name backend)]) + (some-> (db/exec-one! connectable [sql hash bucket (name backend)]) (update :metadata db/decode-transit-pgobject)))) (defn- create-database-object - [{:keys [::backend ::db/pool-or-conn]} {:keys [::content ::expired-at ::touched-at] :as params}] + [{:keys [::backend ::db/connectable]} {:keys [::content ::expired-at ::touched-at ::touch] :as params}] (let [id (or (:id params) (uuid/random)) mdata (cond-> (get-metadata params) (satisfies? impl/IContentHash content) @@ -86,7 +100,9 @@ :always (dissoc :id)) - ;; FIXME: touch object on deduplicated put operation ?? + touched-at (if touch + (or touched-at (dt/now)) + touched-at) ;; NOTE: for now we don't reuse the deleted objects, but in ;; futute we can consider reusing deleted objects if we @@ -95,10 +111,20 @@ result (when (and (::deduplicate? params) (:hash mdata) (:bucket mdata)) - (get-database-object-by-hash pool-or-conn backend (:bucket mdata) (:hash mdata))) + (let [result (get-database-object-by-hash connectable backend + (:bucket mdata) + (:hash mdata))] + (if touch + (do + (db/update! connectable :storage-object + {:touched-at touched-at} + {:id (:id result)} + {::db/return-keys false}) + (assoc result :touced-at touched-at)) + result))) result (or result - (-> (db/insert! pool-or-conn :storage-object + (-> (db/insert! connectable :storage-object {:id id :size (impl/get-size content) :backend (name backend) @@ -154,9 +180,9 @@ (dm/export impl/object?) (defn get-object - [{:keys [::db/pool-or-conn] :as storage} id] + [{:keys [::db/connectable] :as storage} id] (us/assert! ::storage storage) - (retrieve-database-object pool-or-conn id)) + (retrieve-database-object connectable id)) (defn put-object! "Creates a new object with the provided content." @@ -172,10 +198,10 @@ (defn touch-object! "Mark object as touched." - [{:keys [::db/pool-or-conn] :as storage} object-or-id] + [{:keys [::db/connectable] :as storage} object-or-id] (us/assert! ::storage storage) (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)] - (-> (db/update! pool-or-conn :storage-object + (-> (db/update! connectable :storage-object {:touched-at (dt/now)} {:id id}) (db/get-update-count) @@ -195,11 +221,10 @@ "Returns a byte array of object content." [storage object] (us/assert! ::storage storage) - (if (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) + (when (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) (-> (impl/resolve-backend storage (:backend object)) - (impl/get-object-bytes object)) - (p/resolved nil))) + (impl/get-object-bytes object)))) (defn get-object-url ([storage object] @@ -223,13 +248,26 @@ (-> (impl/get-object-url backend object nil) file-url->path)))) (defn del-object! - [{:keys [::db/pool-or-conn] :as storage} object-or-id] + [{:keys [::db/connectable] :as storage} object-or-id] (us/assert! ::storage storage) (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) - res (db/update! pool-or-conn :storage-object + res (db/update! connectable :storage-object {:deleted-at (dt/now)} {:id id})] (pos? (db/get-update-count res)))) -(dm/export impl/resolve-backend) (dm/export impl/calculate-hash) + +(defn configure + [storage connectable] + (assoc storage ::db/connectable connectable)) + +(defn resolve + "Resolves the storage instance with preconfigured backend. You can + specify to reuse the database connection from provided + cfg/system (default false)." + [cfg & {:as opts}] + (let [storage (::storage cfg)] + (if (::db/reuse-conn opts false) + (configure storage (db/get-connectable cfg)) + storage))) diff --git a/backend/src/app/storage/gc_touched.clj b/backend/src/app/storage/gc_touched.clj index 4e805bae6..155496b41 100644 --- a/backend/src/app/storage/gc_touched.clj +++ b/backend/src/app/storage/gc_touched.clj @@ -76,6 +76,24 @@ (-> (db/exec-one! conn [sql:has-file-thumbnail-refs id]) (get :has-refs))) +(def ^:private + sql:has-file-data-refs + "SELECT EXISTS (SELECT 1 FROM file WHERE data_ref_id = ?) AS has_refs") + +(defn- has-file-data-refs? + [conn id] + (-> (db/exec-one! conn [sql:has-file-data-refs id]) + (get :has-refs))) + +(def ^:private + sql:has-file-data-fragment-refs + "SELECT EXISTS (SELECT 1 FROM file_data_fragment WHERE data_ref_id = ?) AS has_refs") + +(defn- has-file-data-fragment-refs? + [conn id] + (-> (db/exec-one! conn [sql:has-file-data-fragment-refs id]) + (get :has-refs))) + (def ^:private sql:mark-freeze-in-bulk "UPDATE storage_object SET touched_at = NULL @@ -148,6 +166,8 @@ "file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? ids bucket) "file-thumbnail" (process-objects! conn has-file-thumbnails-refs? ids bucket) "profile" (process-objects! conn has-profile-refs? ids bucket) + "file-data" (process-objects! conn has-file-data-refs? ids bucket) + "file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket) (ex/raise :type :internal :code :unexpected-unknown-reference :hint (dm/fmt "unknown reference '%'" bucket)))) diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj index 156d86b87..6de48b682 100644 --- a/backend/src/app/storage/impl.clj +++ b/backend/src/app/storage/impl.clj @@ -207,15 +207,13 @@ (str "blake2b:" result))) (defn resolve-backend - [{:keys [::db/pool] :as storage} backend-id] + [storage backend-id] (let [backend (get-in storage [::sto/backends backend-id])] (when-not backend (ex/raise :type :internal :code :backend-not-configured :hint (dm/fmt "backend '%' not configured" backend-id))) - (-> backend - (assoc ::sto/id backend-id) - (assoc ::db/pool pool)))) + (assoc backend ::sto/id backend-id))) (defrecord StorageObject [id size created-at expired-at touched-at backend]) diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index e007485f5..e84e5a450 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -10,7 +10,6 @@ file is eligible to be garbage collected after some period of inactivity (the default threshold is 72h)." (:require - [app.common.data :as d] [app.binfile.common :as bfc] [app.common.files.migrations :as fmg] [app.common.files.validate :as cfv] @@ -22,11 +21,11 @@ [app.config :as cf] [app.db :as db] [app.features.fdata :as feat.fdata] - [app.media :as media] [app.storage :as sto] [app.util.blob :as blob] [app.util.pointer-map :as pmap] [app.util.time :as dt] + [app.worker :as wrk] [clojure.set :as set] [clojure.spec.alpha :as s] [integrant.core :as ig])) @@ -272,17 +271,16 @@ (defn- process-file! [cfg] - (try - (if-let [file (get-file cfg)] - (let [file (decode-file cfg file) - file (clean-media! cfg file) - file (persist-file! cfg file)] - (clean-data-fragments! cfg file)) - (l/dbg :hint "skip" :file-id (str (::file-id cfg)))) - (catch Throwable cause - (l/err :hint "error on cleaning file" - :file-id (str (::file-id cfg)) - :cause cause)))) + (if-let [file (get-file cfg)] + (let [file (decode-file cfg file) + file (clean-media! cfg file) + file (persist-file! cfg file)] + (clean-data-fragments! cfg file) + true) + + (do + (l/dbg :hint "skip" :file-id (str (::file-id cfg))) + false))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HANDLER @@ -294,13 +292,27 @@ (defmethod ig/init-key ::handler [_ cfg] (fn [{:keys [props] :as task}] - (let [min-age (dt/duration (:min-age props 0)) + (let [min-age (dt/duration (or (:min-age props) + (cf/get-deletion-delay))) cfg (-> cfg (assoc ::db/rollback (:rollback? props)) (assoc ::file-id (:file-id props)) (assoc ::min-age (db/interval min-age)))] - (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] - (let [cfg (update cfg ::sto/storage media/configure-assets-storage conn)] - (process-file! cfg)))) - nil))) + (try + (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] + (let [cfg (update cfg ::sto/storage sto/configure conn) + res (process-file! cfg)] + + (when (contains? cf/flags :tiered-file-data-storage) + (wrk/submit! (-> cfg + (assoc ::wrk/task :offload-file-data) + (assoc ::wrk/params props) + (assoc ::wrk/priority 10) + (assoc ::wrk/delay 1000)))) + res))) + + (catch Throwable cause + (l/err :hint "error on cleaning file" + :file-id (str (:file-id props)) + :cause cause)))))) diff --git a/backend/src/app/tasks/file_gc_scheduler.clj b/backend/src/app/tasks/file_gc_scheduler.clj index 11c24e889..a133b6c41 100644 --- a/backend/src/app/tasks/file_gc_scheduler.clj +++ b/backend/src/app/tasks/file_gc_scheduler.clj @@ -8,7 +8,6 @@ "A maintenance task that is responsible of properly scheduling the file-gc task for all files that matches the eligibility threshold." (:require - [app.common.logging :as l] [app.config :as cf] [app.db :as db] [app.util.time :as dt] diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 9858585cc..95d3128bd 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -11,7 +11,6 @@ [app.common.logging :as l] [app.config :as cf] [app.db :as db] - [app.media :as media] [app.storage :as sto] [app.util.time :as dt] [clojure.spec.alpha :as s] @@ -126,7 +125,7 @@ 0))) (def ^:private sql:get-files - "SELECT id, deleted_at, project_id + "SELECT id, deleted_at, project_id, data_ref_id FROM file WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval @@ -136,15 +135,17 @@ SKIP LOCKED") (defn- delete-files! - [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + [{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}] (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1}) - (reduce (fn [total {:keys [id deleted-at project-id]}] + (reduce (fn [total {:keys [id deleted-at project-id data-ref-id]}] (l/trc :hint "permanently delete" :rel "file" :id (str id) :project-id (str project-id) :deleted-at (dt/format-instant deleted-at)) + (some->> data-ref-id (sto/touch-object! storage)) + ;; And finally, permanently delete the file. (db/delete! conn :file {:id id}) @@ -210,7 +211,7 @@ 0))) (def ^:private sql:get-file-data-fragments - "SELECT file_id, id, deleted_at + "SELECT file_id, id, deleted_at, data_ref_id FROM file_data_fragment WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval @@ -220,15 +221,16 @@ SKIP LOCKED") (defn- delete-file-data-fragments! - [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + [{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}] (->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1}) - (reduce (fn [total {:keys [file-id id deleted-at]}] + (reduce (fn [total {:keys [file-id id deleted-at data-ref-id]}] (l/trc :hint "permanently delete" :rel "file-data-fragment" :id (str id) :file-id (str file-id) :deleted-at (dt/format-instant deleted-at)) + (some->> data-ref-id (sto/touch-object! storage)) (db/delete! conn :file-data-fragment {:file-id file-id :id id}) (inc total)) @@ -299,9 +301,7 @@ [_ cfg] (fn [{:keys [props] :as task}] (let [min-age (dt/duration (or (:min-age props) (::min-age cfg))) - cfg (-> cfg - (assoc ::min-age (db/interval min-age)) - (update ::sto/storage media/configure-assets-storage))] + cfg (assoc cfg ::min-age (db/interval min-age))] (loop [procs (map deref deletion-proc-vars) total 0] diff --git a/backend/src/app/tasks/offload_file_data.clj b/backend/src/app/tasks/offload_file_data.clj new file mode 100644 index 000000000..ec8739179 --- /dev/null +++ b/backend/src/app/tasks/offload_file_data.clj @@ -0,0 +1,85 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.offload-file-data + "A maintenance task responsible of moving file data from hot + storage (the database row) to a cold storage (fs or s3)." + (:require + [app.common.logging :as l] + [app.db :as db] + [app.db.sql :as-alias sql] + [app.storage :as sto] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(defn- offload-file-data! + [{:keys [::db/conn ::sto/storage ::file-id] :as cfg}] + (let [file (db/get conn :file {:id file-id} + {::sql/for-update true}) + + data (sto/content (:data file)) + sobj (sto/put-object! storage + {::sto/content data + ::sto/touch true + :bucket "file-data" + :content-type "application/octet-stream" + :file-id file-id})] + + (l/trc :hint "offload file data" + :file-id (str file-id) + :storage-id (str (:id sobj))) + + (db/update! conn :file + {:data-backend "objects-storage" + :data-ref-id (:id sobj) + :data nil} + {:id file-id} + {::db/return-keys false}))) + +(defn- offload-file-data-fragments! + [{:keys [::db/conn ::sto/storage ::file-id] :as cfg}] + (doseq [fragment (db/query conn :file-data-fragment + {:file-id file-id + :deleted-at nil + :data-backend nil} + {::db/for-update true})] + (let [data (sto/content (:data fragment)) + sobj (sto/put-object! storage + {::sto/content data + ::sto/touch true + :bucket "file-data-fragment" + :content-type "application/octet-stream" + :file-id file-id + :file-fragment-id (:id fragment)})] + + (l/trc :hint "offload file data fragment" + :file-id (str file-id) + :file-fragment-id (str (:id fragment)) + :storage-id (str (:id sobj))) + + (db/update! conn :file-data-fragment + {:data-backend "objects-storage" + :data-ref-id (:id sobj) + :data nil} + {:id (:id fragment)} + {::db/return-keys false})))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; HANDLER +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool ::sto/storage])) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [{:keys [props] :as task}] + (-> cfg + (assoc ::db/rollback (:rollback? props)) + (assoc ::file-id (:file-id props)) + (db/tx-run! (fn [cfg] + (offload-file-data! cfg) + (offload-file-data-fragments! cfg)))))) diff --git a/backend/test/backend_tests/rpc_file_test.clj b/backend/test/backend_tests/rpc_file_test.clj index 5d1fe1824..8dd9ce412 100644 --- a/backend/test/backend_tests/rpc_file_test.clj +++ b/backend/test/backend_tests/rpc_file_test.clj @@ -149,8 +149,7 @@ shape-id (uuid/random)] ;; Preventive file-gc - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; Check the number of fragments before adding the page (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] @@ -171,8 +170,7 @@ (t/is (= 3 (count rows)))) ;; The file-gc should mark for remove unused fragments - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; Check the number of fragments (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] @@ -210,15 +208,13 @@ (t/is (= 3 (count rows)))) ;; The file-gc should mark for remove unused fragments - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; The objects-gc should remove unused fragments (let [res (th/run-task! :objects-gc {:min-age 0})] (t/is (= 3 (:processed res)))) - ;; Check the number of fragments; should be 3 because changes - ;; are also holding pointers to fragments; + ;; Check the number of fragments; (let [rows (th/db-query :file-data-fragment {:file-id (:id file) :deleted-at nil})] (t/is (= 2 (count rows)))) @@ -231,8 +227,7 @@ ;; The file-gc should remove fragments related to changes ;; snapshots previously deleted. - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; Check the number of fragments; (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] @@ -325,12 +320,10 @@ (t/is (= 0 (:delete res)))) ;; run the file-gc task immediately without forced min-age - (let [res (th/run-task! :file-gc)] - (t/is (= 0 (:processed res)))) + (t/is (false? (th/run-task! :file-gc {:file-id (:id file)}))) ;; run the task again - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; retrieve file and check trimmed attribute (let [row (th/db-get :file {:id (:id file)})] @@ -367,8 +360,7 @@ ;; Now, we have deleted the usage of pointers to the ;; file-media-objects, if we paste file-gc, they should be marked ;; as deleted. - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) (let [res (th/run-task! :objects-gc {:min-age 0})] (t/is (= 3 (:processed res)))) @@ -490,12 +482,10 @@ :strokes [{:opacity 1 :stroke-image {:id (:id fmo5) :width 100 :height 100 :mtype "image/jpeg"}}]})}]) ;; run the file-gc task immediately without forced min-age - (let [res (th/run-task! :file-gc)] - (t/is (= 0 (:processed res)))) + (t/is (false? (th/run-task! :file-gc {:file-id (:id file)}))) ;; run the task again - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) (let [res (th/run-task! :objects-gc {:min-age 0})] (t/is (= 2 (:processed res)))) @@ -534,9 +524,7 @@ ;; Now, we have deleted the usage of pointers to the ;; file-media-objects, if we paste file-gc, they should be marked ;; as deleted. - - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) (let [res (th/run-task! :objects-gc {:min-age 0})] (t/is (= 7 (:processed res)))) @@ -659,12 +647,10 @@ (t/is (= 0 (:delete res)))) ;; run the file-gc task immediately without forced min-age - (let [res (th/run-task! :file-gc)] - (t/is (= 0 (:processed res)))) + (t/is (false? (th/run-task! :file-gc {:file-id (:id file)}))) ;; run the task again - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; retrieve file and check trimmed attribute (let [row (th/db-get :file {:id (:id file)})] @@ -693,8 +679,7 @@ :page-id page-id :id frame-id-2}]) - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})] (t/is (= 2 (count rows))) @@ -727,8 +712,7 @@ :page-id page-id :id frame-id-1}]) - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})] (t/is (= 1 (count rows))) @@ -1127,8 +1111,7 @@ (th/sleep 300) ;; run the task - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; check that object thumbnails are still here (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})] @@ -1157,8 +1140,7 @@ (t/is (= 2 (count rows)))) ;; run the task again - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) ;; check that we have all object thumbnails (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})] @@ -1220,8 +1202,7 @@ (t/is (= 2 (count rows))))) (t/testing "gc task" - (let [res (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed res)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) (let [rows (th/db-query :file-thumbnail {:file-id (:id file)})] (t/is (= 2 (count rows))) @@ -1232,3 +1213,113 @@ (let [rows (th/db-query :file-thumbnail {:file-id (:id file)})] (t/is (= 1 (count rows))))))) + + +(defn- update-file! + [& {:keys [profile-id file-id changes revn] :or {revn 0}}] + (let [params {::th/type :update-file + ::rpc/profile-id profile-id + :id file-id + :session-id (uuid/random) + :revn revn + :features cfeat/supported-features + :changes changes} + out (th/command! params)] + ;; (th/print-result! out) + (t/is (nil? (:error out))) + (:result out))) + +(t/deftest file-tiered-storage + (let [profile (th/create-profile* 1) + file (th/create-file* 1 {:profile-id (:id profile) + :project-id (:default-project-id profile) + :is-shared false}) + + page-id (uuid/random) + shape-id (uuid/random)] + + ;; Preventive file-gc + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) + + ;; Preventive objects-gc + (let [result (th/run-task! :objects-gc {:min-age 0})] + (t/is (= 1 (:processed result)))) + + ;; Check the number of fragments before adding the page + (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] + (t/is (= 1 (count rows))) + (t/is (every? #(some? (:data %)) rows))) + + ;; Mark the file ellegible again for GC + (th/db-update! :file + {:has-media-trimmed false} + {:id (:id file)}) + + ;; Run FileGC again, with tiered storage activated + (with-redefs [app.config/flags (conj app.config/flags :tiered-file-data-storage)] + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) + + ;; The FileGC task will schedule an inner taskq + (th/run-pending-tasks!)) + + ;; Clean objects after file-gc + (let [result (th/run-task! :objects-gc {:min-age 0})] + (t/is (= 1 (:processed result)))) + + ;; Check the number of fragments before adding the page + (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] + (t/is (= 1 (count rows))) + (t/is (every? #(nil? (:data %)) rows)) + (t/is (every? #(uuid? (:data-ref-id %)) rows)) + (t/is (every? #(= "objects-storage" (:data-backend %)) rows))) + + (let [file (th/db-get :file {:id (:id file)}) + storage (sto/resolve th/*system*)] + (t/is (= "objects-storage" (:data-backend file))) + (t/is (nil? (:data file))) + (t/is (uuid? (:data-ref-id file))) + + (let [sobj (sto/get-object storage (:data-ref-id file))] + (t/is (= "file-data" (:bucket (meta sobj)))) + (t/is (= (:id file) (:file-id (meta sobj)))))) + + ;; Add shape to page that should load from cold storage again into the hot storage (db) + (update-file! + :file-id (:id file) + :profile-id (:id profile) + :revn 0 + :changes + [{:type :add-page + :name "test" + :id page-id}]) + + ;; Check the number of fragments + (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] + (t/is (= 2 (count rows)))) + + ;; Check the number of fragments + (let [[row1 row2 :as rows] + (th/db-query :file-data-fragment + {:file-id (:id file) + :deleted-at nil} + {:order-by [:created-at]})] + ;; (pp/pprint rows) + (t/is (= 2 (count rows))) + (t/is (nil? (:data row1))) + (t/is (= "objects-storage" (:data-backend row1))) + (t/is (bytes? (:data row2))) + (t/is (nil? (:data-backend row2)))) + + ;; The file-gc should mark for remove unused fragments + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) + + ;; The objects-gc should remove unused fragments + (let [res (th/run-task! :objects-gc {:min-age 0})] + (t/is (= 2 (:processed res)))) + + ;; Check the number of fragments before adding the page + (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] + (t/is (= 2 (count rows))) + (t/is (every? #(bytes? (:data %)) rows)) + (t/is (every? #(nil? (:data-ref-id %)) rows)) + (t/is (every? #(nil? (:data-backend %)) rows))))) diff --git a/backend/test/backend_tests/rpc_file_thumbnails_test.clj b/backend/test/backend_tests/rpc_file_thumbnails_test.clj index c73941aff..2ceffbddf 100644 --- a/backend/test/backend_tests/rpc_file_thumbnails_test.clj +++ b/backend/test/backend_tests/rpc_file_thumbnails_test.clj @@ -114,8 +114,7 @@ ;; Run the File GC task that should remove unused file object ;; thumbnails - (let [result (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed result)))) + (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}) (let [result (th/run-task! :objects-gc {:min-age 0})] (t/is (= 3 (:processed result)))) @@ -134,7 +133,7 @@ (t/is (some? (sto/get-object storage (:media-id row2)))) ;; run the task again - (let [res (th/run-task! "storage-gc-touched" {:min-age 0})] + (let [res (th/run-task! :storage-gc-touched {:min-age 0})] (t/is (= 1 (:delete res))) (t/is (= 0 (:freeze res)))) @@ -217,8 +216,7 @@ ;; Run the File GC task that should remove unused file object ;; thumbnails - (let [result (th/run-task! :file-gc {:min-age 0})] - (t/is (= 1 (:processed result)))) + (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)}))) (let [result (th/run-task! :objects-gc {:min-age 0})] (t/is (= 2 (:processed result)))) diff --git a/backend/test/backend_tests/rpc_font_test.clj b/backend/test/backend_tests/rpc_font_test.clj index 2d6404435..ab9b57f4b 100644 --- a/backend/test/backend_tests/rpc_font_test.clj +++ b/backend/test/backend_tests/rpc_font_test.clj @@ -145,7 +145,7 @@ (t/is (nil? (:result out)))) (let [res (th/run-task! :storage-gc-touched {:min-age 0})] - (t/is (= 6 (:freeze res))) + (t/is (= 0 (:freeze res))) (t/is (= 0 (:delete res)))) (let [res (th/run-task! :objects-gc {:min-age 0})] @@ -207,7 +207,7 @@ (t/is (nil? (:result out)))) (let [res (th/run-task! :storage-gc-touched {:min-age 0})] - (t/is (= 3 (:freeze res))) + (t/is (= 0 (:freeze res))) (t/is (= 0 (:delete res)))) (let [res (th/run-task! :objects-gc {:min-age 0})] @@ -268,7 +268,7 @@ (t/is (nil? (:result out)))) (let [res (th/run-task! :storage-gc-touched {:min-age 0})] - (t/is (= 3 (:freeze res))) + (t/is (= 0 (:freeze res))) (t/is (= 0 (:delete res)))) (let [res (th/run-task! :objects-gc {:min-age 0})] From ba167f256bbef2849b62f3f3f100cb46100a3d2d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 2 Aug 2024 13:07:44 +0200 Subject: [PATCH 4/6] :zap: Add performance enhancements on telemetry related queries --- backend/src/app/migrations.clj | 5 ++++- .../migrations/sql/0124-mod-profile-table.sql | 2 ++ backend/src/app/tasks/telemetry.clj | 22 ++++++++++++------- 3 files changed, 20 insertions(+), 9 deletions(-) create mode 100644 backend/src/app/migrations/sql/0124-mod-profile-table.sql diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 965b0ce77..2a4728c8e 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -391,7 +391,10 @@ :fn (mg/resource "app/migrations/sql/0122-mod-file-data-fragment-table.sql")} {:name "0123-mod-file-change-table" - :fn (mg/resource "app/migrations/sql/0123-mod-file-change-table.sql")}]) + :fn (mg/resource "app/migrations/sql/0123-mod-file-change-table.sql")} + + {:name "0124-mod-profile-table" + :fn (mg/resource "app/migrations/sql/0124-mod-profile-table.sql")}]) (defn apply-migrations! [pool name migrations] diff --git a/backend/src/app/migrations/sql/0124-mod-profile-table.sql b/backend/src/app/migrations/sql/0124-mod-profile-table.sql new file mode 100644 index 000000000..e9624abd6 --- /dev/null +++ b/backend/src/app/migrations/sql/0124-mod-profile-table.sql @@ -0,0 +1,2 @@ +CREATE INDEX profile__props__newsletter1__idx ON profile (email) WHERE props->>'~:newsletter-news' = 'true'; +CREATE INDEX profile__props__newsletter2__idx ON profile (email) WHERE props->>'~:newsletter-updates' = 'true'; diff --git a/backend/src/app/tasks/telemetry.clj b/backend/src/app/tasks/telemetry.clj index 410595f72..204d6be0c 100644 --- a/backend/src/app/tasks/telemetry.clj +++ b/backend/src/app/tasks/telemetry.clj @@ -62,19 +62,25 @@ [conn] (-> (db/exec-one! conn ["SELECT count(*) AS count FROM file"]) :count)) +(def ^:private sql:num-file-changes + "SELECT count(*) AS count + FROM file_change + WHERE created_at < date_trunc('day', now()) + '24 hours'::interval + AND created_at > date_trunc('day', now())") + (defn- get-num-file-changes [conn] - (let [sql (str "SELECT count(*) AS count " - " FROM file_change " - " where date_trunc('day', created_at) = date_trunc('day', now())")] - (-> (db/exec-one! conn [sql]) :count))) + (-> (db/exec-one! conn [sql:num-file-changes]) :count)) + +(def ^:private sql:num-touched-files + "SELECT count(distinct file_id) AS count + FROM file_change + WHERE created_at < date_trunc('day', now()) + '24 hours'::interval + AND created_at > date_trunc('day', now())") (defn- get-num-touched-files [conn] - (let [sql (str "SELECT count(distinct file_id) AS count " - " FROM file_change " - " where date_trunc('day', created_at) = date_trunc('day', now())")] - (-> (db/exec-one! conn [sql]) :count))) + (-> (db/exec-one! conn [sql:num-touched-files]) :count)) (defn- get-num-users [conn] From 3219c150d4b3cd94aac780cd6c397cc10b50b763 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 5 Aug 2024 13:18:12 +0200 Subject: [PATCH 5/6] :zap: Add better internal fillfactor setting for file table Increasing the change for HOT updates on db for this heavy-update table --- backend/src/app/migrations.clj | 5 ++++- backend/src/app/migrations/sql/0125-mod-file-table.sql | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 backend/src/app/migrations/sql/0125-mod-file-table.sql diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 2a4728c8e..0efd24613 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -394,7 +394,10 @@ :fn (mg/resource "app/migrations/sql/0123-mod-file-change-table.sql")} {:name "0124-mod-profile-table" - :fn (mg/resource "app/migrations/sql/0124-mod-profile-table.sql")}]) + :fn (mg/resource "app/migrations/sql/0124-mod-profile-table.sql")} + + {:name "0125-mod-file-table" + :fn (mg/resource "app/migrations/sql/0125-mod-file-table.sql")}]) (defn apply-migrations! [pool name migrations] diff --git a/backend/src/app/migrations/sql/0125-mod-file-table.sql b/backend/src/app/migrations/sql/0125-mod-file-table.sql new file mode 100644 index 000000000..20d560bbb --- /dev/null +++ b/backend/src/app/migrations/sql/0125-mod-file-table.sql @@ -0,0 +1,3 @@ +--- This setting allow to optimize the table for heavy write workload +--- leaving space on the page for HOT updates +ALTER TABLE file SET (FILLFACTOR=50); From d2937a76d9792c6b8a2ce68023e9e40d65418181 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 9 Aug 2024 14:16:16 +0200 Subject: [PATCH 6/6] :bug: Fix error handling issue on login with oidc happens when no oidc backend is configured on backend --- frontend/src/app/main/ui/auth/login.cljs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/frontend/src/app/main/ui/auth/login.cljs b/frontend/src/app/main/ui/auth/login.cljs index c43741175..2881458d8 100644 --- a/frontend/src/app/main/ui/auth/login.cljs +++ b/frontend/src/app/main/ui/auth/login.cljs @@ -53,14 +53,15 @@ (.replace js/location redirect-uri) (log/error :hint "unexpected response from OIDC method" :resp (pr-str rsp)))) - (fn [{:keys [type code] :as error}] - (cond - (and (= type :restriction) - (= code :provider-not-configured)) - (st/emit! (ntf/error (tr "errors.auth-provider-not-configured"))) + (fn [cause] + (let [{:keys [type code] :as error} (ex-data cause)] + (cond + (and (= type :restriction) + (= code :provider-not-configured)) + (st/emit! (ntf/error (tr "errors.auth-provider-not-configured"))) - :else - (st/emit! (ntf/error (tr "errors.generic")))))))) + :else + (st/emit! (ntf/error (tr "errors.generic"))))))))) (def ^:private schema:login-form [:map {:title "LoginForm"}