From e01f8d6fdf89cf70033535048bbd36ffece5c793 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sun, 7 Apr 2024 14:06:42 +0200 Subject: [PATCH] :paperclip: Update migration scripts --- backend/src/app/features/components_v2.clj | 111 +++++------ backend/src/app/srepl/components_v2.clj | 210 ++++++++++++++------- 2 files changed, 200 insertions(+), 121 deletions(-) diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index 00e8b5f4f..36f0a5ad5 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -16,6 +16,7 @@ [app.common.files.migrations :as fmg] [app.common.files.shapes-helpers :as cfsh] [app.common.files.validate :as cfv] + [app.common.fressian :as fres] [app.common.geom.matrix :as gmt] [app.common.geom.point :as gpt] [app.common.geom.rect :as grc] @@ -48,18 +49,18 @@ [app.rpc.commands.files-snapshot :as fsnap] [app.rpc.commands.media :as cmd.media] [app.storage :as sto] + [app.storage.impl :as impl] [app.storage.tmp :as tmp] [app.svgo :as svgo] [app.util.blob :as blob] - [app.util.cache :as cache] [app.util.events :as events] [app.util.pointer-map :as pmap] [app.util.time :as dt] [buddy.core.codecs :as bc] [clojure.set :refer [rename-keys]] [cuerdas.core :as str] + [datoteka.fs :as fs] [datoteka.io :as io] - [promesa.exec :as px] [promesa.util :as pu])) (def ^:dynamic *stats* @@ -68,7 +69,7 @@ (def ^:dynamic *cache* "A dynamic var for setting up a cache instance." - nil) + false) (def ^:dynamic *skip-on-graphic-error* "A dynamic var for setting up the default error behavior for graphics processing." @@ -100,6 +101,8 @@ (some? data) (assoc :data (blob/decode data)))) +(set! *warn-on-reflection* true) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; FILE PREPARATION BEFORE MIGRATION ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -1296,7 +1299,7 @@ (try (let [item (if (str/starts-with? href "data:") (let [[mtype data] (parse-datauri href) - size (alength data) + size (alength ^bytes data) path (tmp/tempfile :prefix "penpot.media.download.") written (io/write-to-file! data path :size size)] @@ -1365,27 +1368,49 @@ {::sql/columns [:media-id]})] (:media-id fmobject))) -(defn- get-sobject-content +(defn get-sobject-content [id] (let [storage (::sto/storage *system*) sobject (sto/get-object storage id)] + + (when-not sobject + (throw (RuntimeException. "sobject is nil"))) + (when (> (:size sobject) 1135899) + (throw (RuntimeException. "svg too big"))) + (with-open [stream (sto/get-object-data storage sobject)] (slurp stream)))) +(defn get-optimized-svg + [sid] + (let [svg-text (get-sobject-content sid) + svg-text (svgo/optimize *system* svg-text)] + (csvg/parse svg-text))) + +(def base-path "/data/cache") + +(defn get-sobject-cache-path + [sid] + (let [path (impl/id->path sid)] + (fs/join base-path path))) + +(defn get-cached-svg + [sid] + (let [path (get-sobject-cache-path sid)] + (if (fs/exists? path) + (with-open [^java.lang.AutoCloseable stream (io/input-stream path)] + (let [reader (fres/reader stream)] + (fres/read! reader))) + (get-optimized-svg sid)))) + (defn- create-shapes-for-svg [{:keys [id] :as mobj} file-id objects frame-id position] - (let [get-svg (fn [sid] - (let [svg-text (get-sobject-content sid) - svg-text (svgo/optimize *system* svg-text)] - (-> (csvg/parse svg-text) - (assoc :name (:name mobj))))) - - sid (resolve-sobject-id id) - svg-data (if (cache/cache? *cache*) - (cache/get *cache* sid (px/wrap-bindings get-svg)) - (get-svg sid)) - - svg-data (collect-and-persist-images svg-data file-id id)] + (let [sid (resolve-sobject-id id) + svg-data (if *cache* + (get-cached-svg sid) + (get-optimized-svg sid)) + svg-data (collect-and-persist-images svg-data file-id id) + svg-data (assoc svg-data :name (:name mobj))] (sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false))) @@ -1714,7 +1739,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn migrate-file! - [system file-id & {:keys [validate? skip-on-graphic-error? label]}] + [system file-id & {:keys [validate? skip-on-graphic-error? label rown]}] (let [tpoint (dt/tpoint) err (volatile! false)] @@ -1754,24 +1779,14 @@ components (get @*file-stats* :processed-components 0) graphics (get @*file-stats* :processed-graphics 0)] - (if (cache/cache? *cache*) - (let [cache-stats (cache/stats *cache*)] - (l/dbg :hint "migrate:file:end" - :file-id (str file-id) - :graphics graphics - :components components - :validate validate? - :crt (mth/to-fixed (:hit-rate cache-stats) 2) - :crq (str (:req-count cache-stats)) - :error @err - :elapsed (dt/format-duration elapsed))) - (l/dbg :hint "migrate:file:end" - :file-id (str file-id) - :graphics graphics - :components components - :validate validate? - :error @err - :elapsed (dt/format-duration elapsed))) + (l/dbg :hint "migrate:file:end" + :file-id (str file-id) + :graphics graphics + :components components + :validate validate? + :rown rown + :error @err + :elapsed (dt/format-duration elapsed)) (some-> *stats* (swap! update :processed-files (fnil inc 0))) (some-> *team-stats* (swap! update :processed-files (fnil inc 0))))))))) @@ -1833,21 +1848,9 @@ (when-not @err (some-> *stats* (swap! update :processed-teams (fnil inc 0)))) - (if (cache/cache? *cache*) - (let [cache-stats (cache/stats *cache*)] - (l/dbg :hint "migrate:team:end" - :team-id (dm/str team-id) - :files files - :components components - :graphics graphics - :crt (mth/to-fixed (:hit-rate cache-stats) 2) - :crq (str (:req-count cache-stats)) - :error @err - :elapsed (dt/format-duration elapsed))) - - (l/dbg :hint "migrate:team:end" - :team-id (dm/str team-id) - :files files - :components components - :graphics graphics - :elapsed (dt/format-duration elapsed))))))))) + (l/dbg :hint "migrate:team:end" + :team-id (dm/str team-id) + :files files + :components components + :graphics graphics + :elapsed (dt/format-duration elapsed)))))))) diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index 6b28de25e..00a3c34fb 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -7,18 +7,19 @@ (ns app.srepl.components-v2 (:require [app.common.data :as d] + [app.common.fressian :as fres] [app.common.logging :as l] - [app.common.uuid :as uuid] [app.db :as db] [app.features.components-v2 :as feat] [app.main :as main] [app.srepl.helpers :as h] [app.svgo :as svgo] - [app.util.cache :as cache] [app.util.events :as events] [app.util.time :as dt] [app.worker :as-alias wrk] [cuerdas.core :as str] + [datoteka.fs :as fs] + [datoteka.io :as io] [promesa.exec :as px] [promesa.exec.semaphore :as ps] [promesa.util :as pu])) @@ -112,14 +113,14 @@ (def ^:private sql:get-files-by-created-at "SELECT id, features, - row_number() OVER (ORDER BY created_at) AS rown + row_number() OVER (ORDER BY created_at DESC) AS rown FROM file WHERE deleted_at IS NULL ORDER BY created_at DESC") (def ^:private sql:get-files-by-modified-at "SELECT id, features - row_number() OVER (ORDER BY modified_at) AS rown + row_number() OVER (ORDER BY modified_at DESC) AS rown FROM file WHERE deleted_at IS NULL ORDER BY modified_at DESC") @@ -210,11 +211,7 @@ skip-on-graphic-error? true}}] (l/dbg :hint "migrate:start" :rollback rollback?) (let [tpoint (dt/tpoint) - file-id (h/parse-uuid file-id) - cache (if (int? cache) - (cache/create :executor (::wrk/executor main/system) - :max-items cache) - nil)] + file-id (h/parse-uuid file-id)] (binding [feat/*stats* (atom {}) feat/*cache* cache] @@ -245,12 +242,7 @@ (let [team-id (h/parse-uuid team-id) stats (atom {}) - tpoint (dt/tpoint) - - cache (if (int? cache) - (cache/create :executor (::wrk/executor main/system) - :max-items cache) - nil)] + tpoint (dt/tpoint)] (add-watch stats :progress-report (report-progress-files tpoint)) @@ -313,35 +305,30 @@ sjobs (ps/create :permits max-jobs) sprocs (ps/create :permits max-procs) - cache (if (int? cache) - (cache/create :executor (::wrk/executor main/system) - :max-items cache) - nil) migrate-team (fn [team-id] - (let [tpoint (dt/tpoint)] - (try - (db/tx-run! (assoc main/system ::db/rollback rollback?) - (fn [system] - (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"]) - (feat/migrate-team! system team-id - :label label - :validate? validate? - :skip-on-graphic-error? skip-on-graphic-error?))) + (try + (db/tx-run! (assoc main/system ::db/rollback rollback?) + (fn [system] + (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"]) + (feat/migrate-team! system team-id + :label label + :validate? validate? + :skip-on-graphic-error? skip-on-graphic-error?))) - (catch Throwable cause - (l/wrn :hint "unexpected error on processing team (skiping)" - :team-id (str team-id)) + (catch Throwable cause + (l/wrn :hint "unexpected error on processing team (skiping)" + :team-id (str team-id)) - (events/tap :error - (ex-info "unexpected error on processing team (skiping)" - {:team-id team-id} - cause)) + (events/tap :error + (ex-info "unexpected error on processing team (skiping)" + {:team-id team-id} + cause)) - (swap! stats update :errors (fnil inc 0))) + (swap! stats update :errors (fnil inc 0))) - (finally - (ps/release! sjobs))))) + (finally + (ps/release! sjobs)))) process-team (fn [team-id] @@ -439,50 +426,45 @@ sjobs (ps/create :permits max-jobs) sprocs (ps/create :permits max-procs) - cache (if (int? cache) - (cache/create :executor (::wrk/executor main/system) - :max-items cache) - nil) - migrate-file - (fn [file-id] - (let [tpoint (dt/tpoint)] - (try - (db/tx-run! (assoc main/system ::db/rollback rollback?) - (fn [system] - (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"]) - (feat/migrate-file! system file-id - :label label - :validate? validate? - :skip-on-graphic-error? skip-on-graphic-error?))) + (fn [file-id rown] + (try + (db/tx-run! (assoc main/system ::db/rollback rollback?) + (fn [system] + (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"]) + (feat/migrate-file! system file-id + :rown rown + :label label + :validate? validate? + :skip-on-graphic-error? skip-on-graphic-error?))) - (catch Throwable cause - (l/wrn :hint "unexpected error on processing file (skiping)" - :file-id (str file-id)) + (catch Throwable cause + (l/wrn :hint "unexpected error on processing file (skiping)" + :file-id (str file-id)) - (events/tap :error - (ex-info "unexpected error on processing file (skiping)" - {:file-id file-id} - cause)) + (events/tap :error + (ex-info "unexpected error on processing file (skiping)" + {:file-id file-id} + cause)) - (swap! stats update :errors (fnil inc 0))) + (swap! stats update :errors (fnil inc 0))) - (finally - (ps/release! sjobs))))) + (finally + (ps/release! sjobs)))) process-file - (fn [file-id] + (fn [{:keys [id rown]}] (ps/acquire! sjobs) (let [ts (tpoint)] (if (and mtime (neg? (compare mtime ts))) (do (l/inf :hint "max time constraint reached" - :file-id (str file-id) + :file-id (str id) :elapsed (dt/format-duration ts)) (ps/release! sjobs) (reduced nil)) - (px/run! executor (partial migrate-file file-id)))))] + (px/run! executor (partial migrate-file id rown)))))] (l/dbg :hint "migrate:start" :label label @@ -507,7 +489,6 @@ (if (int? partitions) (= current-partition (inc (mod rown partitions))) true))) - (map :id) (take max-items))) ;; Close and await tasks @@ -526,6 +507,101 @@ :rollback rollback? :elapsed elapsed))))))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; CACHE POPULATE +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def sql:sobjects-for-cache + "SELECT id, + row_number() OVER (ORDER BY created_at) AS index + FROM storage_object + WHERE (metadata->>'~:bucket' = 'file-media-object' OR + metadata->>'~:bucket' IS NULL) + AND metadata->>'~:content-type' = 'image/svg+xml' + AND deleted_at IS NULL + AND size < 1135899 + ORDER BY created_at ASC") + +(defn populate-cache! + "A REPL helper for migrate all files. + + This function starts multiple concurrent file migration processes + until thw maximum number of jobs is reached which by default has the + value of `1`. This is controled with the `:max-jobs` option. + + If you want to run this on multiple machines you will need to specify + the total number of partitions and the current partition. + + In order to get the report table populated, you will need to provide + a correct `:label`. That label is also used for persist a file + snaphot before continue with the migration." + [& {:keys [max-jobs] :or {max-jobs 1}}] + + (let [tpoint (dt/tpoint) + + factory (px/thread-factory :virtual false :prefix "penpot/cache/") + executor (px/cached-executor :factory factory) + + sjobs (ps/create :permits max-jobs) + + retrieve-sobject + (fn [id index] + (let [path (feat/get-sobject-cache-path id) + parent (fs/parent path)] + + (try + (when-not (fs/exists? parent) + (fs/create-dir parent)) + + (if (fs/exists? path) + (l/inf :hint "create cache entry" :status "exists" :index index :id (str id) :path (str path)) + (let [svg-data (feat/get-optimized-svg id)] + (with-open [^java.lang.AutoCloseable stream (io/output-stream path)] + (let [writer (fres/writer stream)] + (fres/write! writer svg-data))) + + (l/inf :hint "create cache entry" :status "created" + :index index + :id (str id) + :path (str path)))) + + (catch Throwable cause + (l/wrn :hint "create cache entry" + :status "error" + :index index + :id (str id) + :path (str path) + :cause cause)) + + (finally + (ps/release! sjobs))))) + + process-sobject + (fn [{:keys [id index]}] + (ps/acquire! sjobs) + (px/run! executor (partial retrieve-sobject id index)))] + + (l/dbg :hint "migrate:start" + :max-jobs max-jobs) + + (try + (binding [feat/*system* main/system] + (run! process-sobject + (db/exec! main/system [sql:sobjects-for-cache])) + + ;; Close and await tasks + (pu/close! executor)) + + {:elapsed (dt/format-duration (tpoint))} + + (catch Throwable cause + (l/dbg :hint "populate:error" :cause cause)) + + (finally + (let [elapsed (dt/format-duration (tpoint))] + (l/dbg :hint "populate:end" + :elapsed elapsed)))))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; FILE PROCESS HELPERS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;