From a6de12323e529a7d8cfd3539212ab07af94f5e60 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 4 Sep 2024 12:02:35 +0200 Subject: [PATCH] :recycle: Refactor file-update for make it more reusable --- backend/src/app/rpc/commands/files_update.clj | 415 ++++++++++-------- backend/test/backend_tests/helpers.clj | 12 +- 2 files changed, 239 insertions(+), 188 deletions(-) diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index f010ac7b7..7cf5dfa40 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -38,6 +38,20 @@ [clojure.set :as set] [promesa.exec :as px])) +(declare ^:private get-lagged-changes) +(declare ^:private send-notifications!) +(declare ^:private update-file) +(declare ^:private update-file*) +(declare ^:private process-changes-and-validate) +(declare ^:private take-snapshot?) +(declare ^:private delete-old-snapshots!) + +;; PUBLIC API; intended to be used outside of this module +(declare update-file!) +(declare update-file-data!) +(declare persist-file!) +(declare get-file) + ;; --- SCHEMA (def ^:private @@ -97,41 +111,6 @@ (or (contains? library-change-types type) (contains? file-change-types type))) -(def ^:private sql:get-file - "SELECT f.*, p.team_id - FROM file AS f - JOIN project AS p ON (p.id = f.project_id) - WHERE f.id = ? - AND (f.deleted_at IS NULL OR - f.deleted_at > now()) - FOR KEY SHARE") - -(defn get-file - [conn id] - (let [file (db/exec-one! conn [sql:get-file id])] - (when-not file - (ex/raise :type :not-found - :code :object-not-found - :hint (format "file with id '%s' does not exists" id))) - (update file :features db/decode-pgarray #{}))) - -(defn- wrap-with-pointer-map-context - [f] - (fn [cfg {:keys [id] :as file}] - (binding [pmap/*tracked* (pmap/create-tracked) - pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)] - (let [result (f cfg file)] - (feat.fdata/persist-pointers! cfg id) - result)))) - -(declare ^:private delete-old-snapshots!) -(declare ^:private get-lagged-changes) -(declare ^:private send-notifications!) -(declare ^:private take-snapshot?) -(declare ^:private update-file) -(declare ^:private update-file*) -(declare ^:private update-file-data) - ;; If features are specified from params and the final feature ;; set is different than the persisted one, update it on the ;; database. @@ -147,7 +126,8 @@ ::sm/result schema:update-file-result ::doc/module :files ::doc/added "1.17"} - [cfg {:keys [::rpc/profile-id id] :as params}] + [{:keys [::mtx/metrics] :as cfg} + {:keys [::rpc/profile-id id changes changes-with-metadata] :as params}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (files/check-edition-permissions! conn profile-id id) (db/xact-lock! conn id) @@ -161,14 +141,30 @@ (cfeat/check-client-features! (:features params)) (cfeat/check-file-features! (:features file) (:features params))) - params (assoc params - :profile-id profile-id - :features features - :team team - :file file) + changes (if changes-with-metadata + (->> changes-with-metadata (mapcat :changes) vec) + (vec changes)) + + params (-> params + (assoc :profile-id profile-id) + (assoc :features features) + (assoc :team team) + (assoc :file file) + (assoc :changes changes)) + + cfg (assoc cfg ::timestamp (dt/now)) tpoint (dt/tpoint)] + + (when (> (:revn params) + (:revn file)) + (ex/raise :type :validation + :code :revn-conflict + :hint "The incoming revision number is greater that stored version." + :context {:incoming-revn (:revn params) + :stored-revn (:revn file)})) + ;; When newly computed features does not match exactly with ;; the features defined on team row, we update it. (when (not= features (:features team)) @@ -177,90 +173,126 @@ {:features features} {:id (:id team)}))) + (mtx/run! metrics {:id :update-file-changes :inc (count changes)}) + (binding [l/*context* (some-> (meta params) (get :app.http/request) (errors/request->context))] - (-> (update-file cfg params) + (-> (update-file* cfg params) (rph/with-defer #(let [elapsed (tpoint)] (l/trace :hint "update-file" :time (dt/format-duration elapsed)))))))))) -(defn update-file - [{:keys [::mtx/metrics] :as cfg} - {:keys [file features changes changes-with-metadata] :as params}] - (let [features (-> features - (set/difference cfeat/frontend-only-features) - (set/union (:features file))) - - update-fn (cond-> update-file* - (contains? features "fdata/pointer-map") - (wrap-with-pointer-map-context)) - - changes (if changes-with-metadata - (->> changes-with-metadata (mapcat :changes) vec) - (vec changes))] - - (when (> (:revn params) - (:revn file)) - (ex/raise :type :validation - :code :revn-conflict - :hint "The incoming revision number is greater that stored version." - :context {:incoming-revn (:revn params) - :stored-revn (:revn file)})) - - (mtx/run! metrics {:id :update-file-changes :inc (count changes)}) - - (binding [cfeat/*current* features - cfeat/*previous* (:features file)] - (let [file (assoc file :features features) - params (-> params - (assoc :file file) - (assoc :changes changes) - (assoc ::created-at (dt/now)))] - - (-> (update-fn cfg params) - (vary-meta assoc ::audit/replace-props - {:id (:id file) - :name (:name file) - :features (:features file) - :project-id (:project-id file) - :team-id (:team-id file)})))))) - (defn- update-file* - [{:keys [::db/conn ::wrk/executor] :as cfg} - {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] + "Internal function, part of the update-file process, that encapsulates + the changes application offload to a separated thread and emit all + corresponding notifications. + + Follow the inner implementation to `update-file-data!` function. + + Only intended for internal use on this module." + [{:keys [::db/conn ::wrk/executor ::timestamp] :as cfg} + {:keys [profile-id file features changes session-id skip-validate] :as params}] + (let [;; Retrieve the file data - file (feat.fdata/resolve-file-data cfg file) + file (feat.fdata/resolve-file-data cfg file) + + file (assoc file :features + (-> features + (set/difference cfeat/frontend-only-features) + (set/union (:features file)))) ;; Process the file data on separated thread for avoid to do ;; the CPU intensive operation on vthread. + file (px/invoke! executor + (fn [] + (binding [cfeat/*current* features + cfeat/*previous* (:features file)] + (update-file-data! cfg file + process-changes-and-validate + changes skip-validate))))] - file (px/invoke! executor (partial update-file-data cfg file changes skip-validate)) - features (db/create-array conn "text" (:features file))] - - ;; NOTE: if file was offloaded, we need to touch the referenced - ;; storage object because on this update operation the data will - ;; be overwritted. - (when (= "objects-storage" (:data-backend file)) + (when (feat.fdata/offloaded? file) (let [storage (sto/resolve cfg ::db/reuse-conn true)] - (sto/touch-object! storage (:data-ref-id file)))) - - (db/insert! conn :file-change - {:id (uuid/next) - :session-id session-id - :profile-id profile-id - :created-at created-at - :file-id (:id file) - :revn (:revn file) - :version (:version file) - :label (::snapshot-label file) - :data (::snapshot-data file) - :features (db/create-array conn "text" (:features file)) - :changes (blob/encode changes)} - {::db/return-keys false}) + (some->> (:data-ref-id file) (sto/touch-object! storage)))) + ;; TODO: move this to asynchronous task (when (::snapshot-data file) (delete-old-snapshots! cfg file)) + (persist-file! cfg file) + + (let [params (assoc params :file file) + response {:revn (:revn file) + :lagged (get-lagged-changes conn params)} + features (db/create-array conn "text" (:features file))] + + ;; Insert change (xlog) + (db/insert! conn :file-change + {:id (uuid/next) + :session-id session-id + :profile-id profile-id + :created-at timestamp + :file-id (:id file) + :revn (:revn file) + :version (:version file) + :features (db/create-array conn "text" (:features file)) + :label (::snapshot-label file) + :data (::snapshot-data file) + :changes (blob/encode changes)} + {::db/return-keys false}) + + ;; Send asynchronous notifications + (send-notifications! cfg params) + + (vary-meta response assoc ::audit/replace-props + {:id (:id file) + :name (:name file) + :features (:features file) + :project-id (:project-id file) + :team-id (:team-id file)})))) + +(defn update-file! + "A public api that allows apply a transformation to a file with all context setup." + [cfg file-id update-fn & args] + (let [file (get-file cfg file-id) + file (apply update-file-data! cfg file update-fn args)] + (persist-file! cfg file))) + +(def ^:private sql:get-file + "SELECT f.*, p.team_id + FROM file AS f + JOIN project AS p ON (p.id = f.project_id) + WHERE f.id = ? + AND (f.deleted_at IS NULL OR + f.deleted_at > now()) + FOR KEY SHARE") + +(defn get-file + "Get not-decoded file, only decodes the features set." + [conn id] + (let [file (db/exec-one! conn [sql:get-file id])] + (when-not file + (ex/raise :type :not-found + :code :object-not-found + :hint (format "file with id '%s' does not exists" id))) + (update file :features db/decode-pgarray #{}))) + +(defn persist-file! + "Function responsible of persisting already encoded file. Should be + used together with `get-file` and `update-file-data!`. + + It also updates the project modified-at attr." + [{:keys [::db/conn ::timestamp]} file] + (let [features (db/create-array conn "text" (:features file)) + ;; The timestamp can be nil because this function is also + ;; intended to be used outside of this module + modified-at (or timestamp (dt/now))] + + (db/update! conn :project + {:modified-at modified-at} + {:id (:project-id file)} + {::db/return-keys false}) + (db/update! conn :file {:revn (:revn file) :data (:data file) @@ -268,20 +300,95 @@ :features features :data-backend nil :data-ref-id nil - :modified-at created-at + :modified-at modified-at :has-media-trimmed false} - {:id (:id file)}) + {:id (:id file)} + {::db/return-keys false}))) - (db/update! conn :project - {:modified-at created-at} - {:id (:project-id file)}) +(defn- update-file-data! + "Perform a file data transformation in with all update context setup. - (let [params (assoc params :file file)] - ;; Send asynchronous notifications - (send-notifications! cfg params) + This function expected not-decoded file and transformation function. Returns + an encoded file. - {:revn (:revn file) - :lagged (get-lagged-changes conn params)}))) + This function is not responsible of saving the file. It only saves + fdata/pointer-map modified fragments." + + [cfg {:keys [id] :as file} update-fn & args] + (binding [pmap/*tracked* (pmap/create-tracked) + pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)] + (let [file (update file :data (fn [data] + (-> data + (blob/decode) + (assoc :id (:id file))))) + + ;; For avoid unnecesary overhead of creating multiple pointers + ;; and handly internally with objects map in their worst + ;; case (when probably all shapes and all pointers will be + ;; readed in any case), we just realize/resolve them before + ;; applying the migration to the file + file (if (fmg/need-migration? file) + (-> file + (update :data feat.fdata/process-pointers deref) + (update :data feat.fdata/process-objects (partial into {})) + (fmg/migrate-file)) + file) + + file (apply update-fn cfg file args) + + ;; TODO: reuse operations if file is migrated + ;; TODO: move encoding to a separated thread + file (if (take-snapshot? file) + (let [tpoint (dt/tpoint) + snapshot (-> (:data file) + (feat.fdata/process-pointers deref) + (feat.fdata/process-objects (partial into {})) + (blob/encode)) + elapsed (tpoint) + label (str "internal/snapshot/" (:revn file))] + + (l/trc :hint "take snapshot" + :file-id (str (:id file)) + :revn (:revn file) + :label label + :elapsed (dt/format-duration elapsed)) + + (-> file + (assoc ::snapshot-data snapshot) + (assoc ::snapshot-label label))) + file) + + file (cond-> file + (contains? cfeat/*current* "fdata/objects-map") + (feat.fdata/enable-objects-map) + + (contains? cfeat/*current* "fdata/pointer-map") + (feat.fdata/enable-pointer-map) + + :always + (update :data blob/encode))] + + (feat.fdata/persist-pointers! cfg id) + + file))) + +(defn- get-file-libraries + "A helper for preload file libraries, mainly used for perform file + semantical and structural validation" + [{:keys [::db/conn] :as cfg} file] + (->> (files/get-file-libraries conn (:id file)) + (into [file] (map (fn [{:keys [id]}] + (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id) + pmap/*tracked* nil] + ;; We do not resolve the objects maps here + ;; because there is a lower probability that all + ;; shapes needed to be loded into memory, so we + ;; leeave it on lazy status + (-> (files/get-file cfg id :migrate? false) + (update :data feat.fdata/process-pointers deref) ; ensure all pointers resolved + (update :data feat.fdata/process-objects (partial into {})) + (fmg/migrate-file)))))) + (d/index-by :id))) (defn- soft-validate-file-schema! [file] @@ -298,68 +405,19 @@ (l/error :hint "file validation error" :cause cause)))) -(defn- update-file-data - [{:keys [::db/conn] :as cfg} file changes skip-validate] - (let [file (update file :data (fn [data] - (-> data - (blob/decode) - (assoc :id (:id file))))) - ;; For avoid unnecesary overhead of creating multiple pointers - ;; and handly internally with objects map in their worst - ;; case (when probably all shapes and all pointers will be - ;; readed in any case), we just realize/resolve them before - ;; applying the migration to the file - file (if (fmg/need-migration? file) - (-> file - (update :data feat.fdata/process-pointers deref) - (update :data feat.fdata/process-objects (partial into {})) - (fmg/migrate-file)) - file) - - ;; WARNING: this ruins performance; maybe we need to find +(defn- process-changes-and-validate + [cfg file changes skip-validate] + (let [;; WARNING: this ruins performance; maybe we need to find ;; some other way to do general validation libs (when (and (or (contains? cf/flags :file-validation) (contains? cf/flags :soft-file-validation)) (not skip-validate)) - (->> (files/get-file-libraries conn (:id file)) - (into [file] (map (fn [{:keys [id]}] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id) - pmap/*tracked* nil] - ;; We do not resolve the objects maps here - ;; because there is a lower probability that all - ;; shapes needed to be loded into memory, so we - ;; leeave it on lazy status - (-> (files/get-file cfg id :migrate? false) - (update :data feat.fdata/process-pointers deref) ; ensure all pointers resolved - (update :data feat.fdata/process-objects (partial into {})) - (fmg/migrate-file)))))) - (d/index-by :id))) - + (get-file-libraries cfg file)) file (-> (files/check-version! file) (update :revn inc) (update :data cpc/process-changes changes) - (update :data d/without-nils)) - - file (if (take-snapshot? file) - (let [tpoint (dt/tpoint) - snapshot (-> (:data file) - (feat.fdata/process-pointers deref) - (feat.fdata/process-objects (partial into {})) - (blob/encode)) - elapsed (tpoint) - label (str "internal/snapshot/" (:revn file))] - - (l/trc :hint "take snapshot" - :file-id (str (:id file)) - :revn (:revn file) - :label label - :elapsed (dt/format-duration elapsed)) - - (-> file - (assoc ::snapshot-data snapshot) - (assoc ::snapshot-label label))) - file)] + (update :data d/without-nils))] (binding [pmap/*tracked* nil] (when (contains? cf/flags :soft-file-validation) @@ -376,15 +434,7 @@ (not skip-validate)) (val/validate-file-schema! file))) - (cond-> file - (contains? cfeat/*current* "fdata/objects-map") - (feat.fdata/enable-objects-map) - - (contains? cfeat/*current* "fdata/pointer-map") - (feat.fdata/enable-pointer-map) - - :always - (update :data blob/encode)))) + file)) (defn- take-snapshot? "Defines the rule when file `data` snapshot should be saved." @@ -426,8 +476,7 @@ result (db/exec-one! conn [sql:delete-snapshots id last-date])] (l/trc :hint "delete old snapshots" :file-id (str id) :total (db/get-update-count result))))) -(def ^:private - sql:lagged-changes +(def ^:private sql:lagged-changes "select s.id, s.revn, s.file_id, s.session_id, s.changes from file_change as s diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 8380ea13e..e77b51d6a 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -76,7 +76,7 @@ :enable-feature-fdata-pointer-map :enable-feature-fdata-objets-map :enable-feature-components-v2 - :enable-file-snapshot + :enable-auto-file-snapshot :disable-file-validation]) (defn state-init @@ -304,16 +304,18 @@ ([params] (update-file* *system* params)) ([system {:keys [file-id changes session-id profile-id revn] :or {session-id (uuid/next) revn 0}}] - (db/tx-run! system (fn [{:keys [::db/conn] :as system}] - (let [file (files.update/get-file conn file-id)] - (files.update/update-file system + (-> system + (assoc ::files.update/timestamp (dt/now)) + (db/tx-run! (fn [{:keys [::db/conn] :as system}] + (let [file (files.update/get-file conn file-id)] + (#'files.update/update-file* system {:id file-id :revn revn :file file :features (:features file) :changes changes :session-id session-id - :profile-id profile-id})))))) + :profile-id profile-id}))))))) (declare command!)