From 4e9b92b857e8b25fd4debbb5562eea9f6c0f00f6 Mon Sep 17 00:00:00 2001 From: Andrey Antukh <niwi@niwi.nz> Date: Tue, 30 Jan 2024 19:14:40 +0100 Subject: [PATCH] :paperclip: Add helper for check not referenced media --- backend/src/app/srepl/helpers.clj | 329 +++++++++++++++++------------- 1 file changed, 186 insertions(+), 143 deletions(-) diff --git a/backend/src/app/srepl/helpers.clj b/backend/src/app/srepl/helpers.clj index 5e2a26ee9..cd5c3f56a 100644 --- a/backend/src/app/srepl/helpers.clj +++ b/backend/src/app/srepl/helpers.clj @@ -9,12 +9,14 @@ (:refer-clojure :exclude [parse-uuid]) #_:clj-kondo/ignore (:require + [app.binfile.common :as bfc] [app.common.data :as d] [app.common.exceptions :as ex] [app.common.features :as cfeat] [app.common.files.changes :as cpc] - [app.common.files.migrations :as pmg] + [app.common.files.migrations :as fmg] [app.common.files.repair :as repair] + [app.common.files.validate :as cfv] [app.common.files.validate :as validate] [app.common.logging :as l] [app.common.pprint :refer [pprint]] @@ -38,7 +40,8 @@ [expound.alpha :as expound] [promesa.core :as p] [promesa.exec :as px] - [promesa.exec.csp :as sp])) + [promesa.exec.semaphore :as ps] + [promesa.util :as pu])) (def ^:dynamic *system* nil) @@ -62,108 +65,111 @@ {:data data} {:id id})))) -(defn get-file +(defn- get-file* "Get the migrated data of one file." - [id & {:keys [migrate?] :or {migrate? true}}] - (db/run! main/system + [system id] + (db/run! system (fn [system] (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] - (-> (files/get-file system id :migrate? migrate?) + (-> (files/get-file system id :migrate? false) (update :data feat.fdata/process-pointers deref) - (update :data feat.fdata/process-objects (partial into {}))))))) + (update :data feat.fdata/process-objects (partial into {})) + (fmg/migrate-file)))))) + +(defn get-file + "Get the migrated data of one file." + [id] + (get-file* main/system id)) (defn validate "Validate structure, referencial integrity and semantic coherence of - all contents of a file. Returns a list of errors." + all contents of a file. Returns a list of errors." [id] (db/tx-run! main/system (fn [{:keys [::db/conn] :as system}] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] - (let [id (if (string? id) (parse-uuid id) id) - file (files/get-file system id) - libs (->> (files/get-file-libraries conn id) - (into [file] (map (fn [{:keys [id]}] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] - (-> (files/get-file system id :migrate? false) - (update :data feat.fdata/process-pointers deref) - (pmg/migrate-file)))))) - (d/index-by :id))] - (validate/validate-file file libs)))))) + (let [id (if (string? id) (parse-uuid id) id) + file (get-file* system id) + libs (->> (files/get-file-libraries conn id) + (into [file] (map (fn [{:keys [id]}] + (get-file* system id)))) + (d/index-by :id))] + (validate/validate-file file libs))))) (defn repair! "Repair the list of errors detected by validation." [id] (db/tx-run! main/system (fn [{:keys [::db/conn] :as system}] - (binding [pmap/*tracked* (pmap/create-tracked) - pmap/*load-fn* (partial feat.fdata/load-pointer system id)] - (let [id (if (string? id) (parse-uuid id) id) - file (files/get-file system id) - libs (->> (files/get-file-libraries conn id) - (into [file] (map (fn [{:keys [id]}] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] - (-> (files/get-file system id :migrate? false) - (update :data feat.fdata/process-pointers deref) - (pmg/migrate-file)))))) - (d/index-by :id)) - errors (validate/validate-file file libs) - changes (repair/repair-file file libs errors) + (let [id (if (string? id) (parse-uuid id) id) + file (get-file* system id) + libs (->> (files/get-file-libraries conn id) + (into [file] (map (fn [{:keys [id]}] + (get-file* system id)))) + (d/index-by :id)) + errors (validate/validate-file file libs) + changes (repair/repair-file file libs errors) - file (-> file - (update :revn inc) - (update :data cpc/process-changes changes) - (update :data blob/encode))] + file (-> file + (update :revn inc) + (update :data cpc/process-changes changes)) - (when (contains? (:features file) "fdata/pointer-map") - (feat.fdata/persist-pointers! system id)) + file (if (contains? (:features file) "fdata/objects-map") + (feat.fdata/enable-objects-map file) + file) + + file (if (contains? (:features file) "fdata/pointer-map") + (binding [pmap/*tracked* (pmap/create-tracked)] + (let [file (feat.fdata/enable-pointer-map file)] + (feat.fdata/persist-pointers! system id) + file)) + file)] + + (db/update! conn :file + {:revn (:revn file) + :data (blob/encode (:data file)) + :data-backend nil + :modified-at (dt/now) + :has-media-trimmed false} + {:id (:id file)}) + + :repaired)))) - (db/update! conn :file - {:revn (:revn file) - :data (:data file) - :data-backend nil - :modified-at (dt/now) - :has-media-trimmed false} - {:id (:id file)}) - :repaired))))) (defn update-file! "Apply a function to the data of one file. Optionally save the changes or not. The function receives the decoded and migrated file data." - [& {:keys [update-fn id rollback? migrate? inc-revn?] - :or {rollback? true migrate? true inc-revn? true}}] - (letfn [(process-file [{:keys [::db/conn] :as system} {:keys [features] :as file}] - (binding [pmap/*tracked* (pmap/create-tracked) - pmap/*load-fn* (partial feat.fdata/load-pointer system id) - cfeat/*wrap-with-pointer-map-fn* - (if (contains? features "fdata/pointer-map") pmap/wrap identity) - cfeat/*wrap-with-objects-map-fn* - (if (contains? features "fdata/objectd-map") omap/wrap identity)] + [& {:keys [update-fn id rollback? inc-revn?] + :or {rollback? true inc-revn? true}}] + (letfn [(process-file [{:keys [::db/conn] :as system} file-id] + (let [file (get-file* system file-id) + file (cond-> (update-fn file) + inc-revn? (update :revn inc)) - (let [file (cond-> (update-fn file) - inc-revn? (update :revn inc)) - features (db/create-array conn "text" (:features file)) - data (blob/encode (:data file))] + _ (cfv/validate-file-schema! file) - (db/update! conn :file - {:data data - :revn (:revn file) - :features features} - {:id id})) + file (if (contains? (:features file) "fdata/objects-map") + (feat.fdata/enable-objects-map file) + file) - (when (contains? (:features file) "fdata/pointer-map") - (feat.fdata/persist-pointers! system id)) + file (if (contains? (:features file) "fdata/pointer-map") + (binding [pmap/*tracked* (pmap/create-tracked)] + (let [file (feat.fdata/enable-pointer-map file)] + (feat.fdata/persist-pointers! system id) + file)) + file)] + + (db/update! conn :file + {:data (blob/encode (:data file)) + :features (db/create-array conn "text" (:features file)) + :revn (:revn file)} + {:id (:id file)}) (dissoc file :data)))] - (db/tx-run! (or *system* main/system) + (db/tx-run! (or *system* (assoc main/system ::db/rollback rollback?)) (fn [system] (binding [*system* system] - (try - (->> (files/get-file system id :migrate? migrate?) - (process-file system)) - (finally - (when rollback? - (db/rollback! system))))))))) + (process-file system id)))))) (def ^:private sql:get-file-ids @@ -190,16 +196,11 @@ (strace/print-stack-trace cause)) (process-file [{:keys [::db/conn] :as system} file-id] - (let [file (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system file-id)] - (-> (files/get-file system file-id) - (update :data feat.fdata/process-pointers deref))) - + (let [file (get-file* system file-id) libs (when with-libraries? (->> (files/get-file-libraries conn file-id) (into [file] (map (fn [{:keys [id]}] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] - (-> (files/get-file system id) - (update :data feat.fdata/process-pointers deref)))))) + (get-file* system id)))) (d/index-by :id)))] (try (if with-libraries? @@ -208,7 +209,7 @@ (catch Throwable cause ((or on-error on-error*) cause file)))))] - (db/tx-run! main/system + (db/tx-run! (assoc main/system ::db/rollback true) (fn [{:keys [::db/conn] :as system}] (try (binding [*system* system] @@ -217,83 +218,125 @@ (get-candidates conn))) (finally (when (fn? on-end) - (ex/ignoring (on-end))) - (db/rollback! system))))))) + (ex/ignoring (on-end))))))))) + +(defn repair-file-media + [{:keys [id data] :as file}] + (let [conn (db/get-connection *system*) + used (bfc/collect-used-media data) + ids (db/create-array conn "uuid" used) + sql (str "SELECT * FROM file_media_object WHERE id = ANY(?)") + rows (db/exec! conn [sql ids]) + index (reduce (fn [index media] + (if (not= (:file-id media) id) + (let [media-id (uuid/next)] + (l/wrn :hint "found not referenced media" + :file-id (str id) + :media-id (str (:id media))) + + (db/insert! *system* :file-media-object + (-> media + (assoc :file-id id) + (assoc :id media-id))) + (assoc index (:id media) media-id)) + index)) + {} + rows)] + + (when (seq index) + (binding [bfc/*state* (atom {:index index})] + (update file :data (fn [fdata] + (-> fdata + (update :pages-index #'bfc/relink-shapes) + (update :components #'bfc/relink-shapes) + (update :media #'bfc/relink-media) + (d/without-nils)))))))) (defn process-files! - "Apply a function to all files in the database, reading them in - batches." + "Apply a function to all files in the database" [& {:keys [max-items - workers + max-jobs start-at on-file - on-error - on-end - on-init rollback?] - :or {workers 1 + :or {max-jobs 1 rollback? true}}] - (letfn [(get-candidates [conn] - (cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))]) - (some? max-items) - (take max-items))) - (on-error* [cause file] - (println! "unexpected exception happened on processing file: " (:id file)) - (strace/print-stack-trace cause)) + (l/dbg :hint "process:start" + :rollback rollback? + :max-jobs max-jobs + :max-items max-items) - (process-file [system file-id] - (try - (let [{:keys [features] :as file} (files/get-file system file-id)] - (binding [pmap/*tracked* (pmap/create-tracked) - pmap/*load-fn* (partial feat.fdata/load-pointer system file-id) - cfeat/*wrap-with-pointer-map-fn* - (if (contains? features "fdata/pointer-map") pmap/wrap identity) - cfeat/*wrap-with-objects-map-fn* - (if (contains? features "fdata/objectd-map") omap/wrap identity)] + (let [tpoint (dt/tpoint) + factory (px/thread-factory :virtual false :prefix "penpot/file-process/") + executor (px/cached-executor :factory factory) + sjobs (ps/create :permits max-jobs) - (on-file file) + process-file + (fn [file-id tpoint] + (try + (l/trc :hint "process:file:start" :file-id (str file-id)) + (db/tx-run! (assoc main/system ::db/rollback rollback?) + (fn [{:keys [::db/conn] :as system}] + (let [file' (get-file* system file-id) + file (binding [*system* system] + (on-file file'))] - (when (contains? features "fdata/pointer-map") - (feat.fdata/persist-pointers! system file-id)))) + (when (and (some? file) + (not (identical? file file'))) - (catch Throwable cause - ((or on-error on-error*) cause file-id)))) + (cfv/validate-file-schema! file) - (run-worker [in index] - (db/tx-run! main/system - (fn [system] - (binding [*system* system] - (loop [i 0] - (when-let [file-id (sp/take! in)] - (println! "=> worker: index:" index "| loop:" i "| file:" (str file-id) "|" (px/get-name)) - (process-file system file-id) - (recur (inc i))))) + (let [file (if (contains? (:features file) "fdata/objects-map") + (feat.fdata/enable-objects-map file) + file) - (when rollback? - (db/rollback! system))))) + file (if (contains? (:features file) "fdata/pointer-map") + (binding [pmap/*tracked* (pmap/create-tracked)] + (let [file (feat.fdata/enable-pointer-map file)] + (feat.fdata/persist-pointers! system file-id) + file)) + file)] - (run-producer [input] - (db/tx-run! main/system - (fn [{:keys [::db/conn]}] - (doseq [file-id (get-candidates conn)] - (println! "=> producer:" file-id "|" (px/get-name)) - (sp/put! input file-id)) - (sp/close! input))))] + (db/update! conn :file + {:data (blob/encode (:data file)) + :features (db/create-array conn "text" (:features file)) + :revn (:revn file)} + {:id file-id})))))) + (catch Throwable cause + (l/wrn :hint "unexpected error on processing file (skiping)" + :file-id (str file-id) + :cause cause)) + (finally + (ps/release! sjobs) + (let [elapsed (dt/format-duration (tpoint))] + (l/trc :hint "process:file:end" + :file-id (str file-id) + :elapsed elapsed)))))] - (when (fn? on-init) (on-init)) - (let [input (sp/chan :buf 25) - producer (px/thread - {:name "penpot/srepl/producer"} - (run-producer input)) - threads (->> (range workers) - (map (fn [index] - (px/thread - {:name (str "penpot/srepl/worker/" index)} - (run-worker input index)))) - (cons producer) - (doall))] + (try + (db/tx-run! main/system + (fn [{:keys [::db/conn] :as system}] + (db/exec! conn ["SET statement_timeout = 0"]) + (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) + + (run! (fn [file-id] + (ps/acquire! sjobs) + (px/run! executor (partial process-file file-id (dt/tpoint)))) + (->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))]) + (take max-items) + (map :id))) + + ;; Close and await tasks + (pu/close! executor))) + + (catch Throwable cause + (l/dbg :hint "process:error" :cause cause)) + + (finally + (let [elapsed (dt/format-duration (tpoint))] + (l/dbg :hint "process:end" + :rollback rollback? + :elapsed elapsed)))))) - (run! p/await! threads) - (when (fn? on-end) (on-end)))))