📎 Update migration scripts

This commit is contained in:
Andrey Antukh 2024-04-07 14:06:42 +02:00
parent da5f452db5
commit e01f8d6fdf
2 changed files with 200 additions and 121 deletions

@ -16,6 +16,7 @@
[app.common.files.migrations :as fmg]
[app.common.files.shapes-helpers :as cfsh]
[app.common.files.validate :as cfv]
[app.common.fressian :as fres]
[app.common.geom.matrix :as gmt]
[app.common.geom.point :as gpt]
[app.common.geom.rect :as grc]
@ -48,18 +49,18 @@
[app.rpc.commands.files-snapshot :as fsnap]
[app.rpc.commands.media :as cmd.media]
[app.storage :as sto]
[app.storage.impl :as impl]
[app.storage.tmp :as tmp]
[app.svgo :as svgo]
[app.util.blob :as blob]
[app.util.cache :as cache]
[app.util.events :as events]
[app.util.pointer-map :as pmap]
[app.util.time :as dt]
[buddy.core.codecs :as bc]
[clojure.set :refer [rename-keys]]
[cuerdas.core :as str]
[datoteka.fs :as fs]
[datoteka.io :as io]
[promesa.exec :as px]
[promesa.util :as pu]))
(def ^:dynamic *stats*
@ -68,7 +69,7 @@
(def ^:dynamic *cache*
"A dynamic var for setting up a cache instance."
(def ^:dynamic *skip-on-graphic-error*
"A dynamic var for setting up the default error behavior for graphics processing."
@ -100,6 +101,8 @@
(some? data)
(assoc :data (blob/decode data))))
(set! *warn-on-reflection* true)
@ -1296,7 +1299,7 @@
(let [item (if (str/starts-with? href "data:")
(let [[mtype data] (parse-datauri href)
size (alength data)
size (alength ^bytes data)
path (tmp/tempfile :prefix "penpot.media.download.")
written (io/write-to-file! data path :size size)]
@ -1365,27 +1368,49 @@
{::sql/columns [:media-id]})]
(:media-id fmobject)))
(defn- get-sobject-content
(defn get-sobject-content
(let [storage (::sto/storage *system*)
sobject (sto/get-object storage id)]
(when-not sobject
(throw (RuntimeException. "sobject is nil")))
(when (> (:size sobject) 1135899)
(throw (RuntimeException. "svg too big")))
(with-open [stream (sto/get-object-data storage sobject)]
(slurp stream))))
(defn get-optimized-svg
(let [svg-text (get-sobject-content sid)
svg-text (svgo/optimize *system* svg-text)]
(csvg/parse svg-text)))
(def base-path "/data/cache")
(defn get-sobject-cache-path
(let [path (impl/id->path sid)]
(fs/join base-path path)))
(defn get-cached-svg
(let [path (get-sobject-cache-path sid)]
(if (fs/exists? path)
(with-open [^java.lang.AutoCloseable stream (io/input-stream path)]
(let [reader (fres/reader stream)]
(fres/read! reader)))
(get-optimized-svg sid))))
(defn- create-shapes-for-svg
[{:keys [id] :as mobj} file-id objects frame-id position]
(let [get-svg (fn [sid]
(let [svg-text (get-sobject-content sid)
svg-text (svgo/optimize *system* svg-text)]
(-> (csvg/parse svg-text)
(assoc :name (:name mobj)))))
sid (resolve-sobject-id id)
svg-data (if (cache/cache? *cache*)
(cache/get *cache* sid (px/wrap-bindings get-svg))
(get-svg sid))
svg-data (collect-and-persist-images svg-data file-id id)]
(let [sid (resolve-sobject-id id)
svg-data (if *cache*
(get-cached-svg sid)
(get-optimized-svg sid))
svg-data (collect-and-persist-images svg-data file-id id)
svg-data (assoc svg-data :name (:name mobj))]
(sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false)))
@ -1714,7 +1739,7 @@
(defn migrate-file!
[system file-id & {:keys [validate? skip-on-graphic-error? label]}]
[system file-id & {:keys [validate? skip-on-graphic-error? label rown]}]
(let [tpoint (dt/tpoint)
err (volatile! false)]
@ -1754,24 +1779,14 @@
components (get @*file-stats* :processed-components 0)
graphics (get @*file-stats* :processed-graphics 0)]
(if (cache/cache? *cache*)
(let [cache-stats (cache/stats *cache*)]
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:crt (mth/to-fixed (:hit-rate cache-stats) 2)
:crq (str (:req-count cache-stats))
:error @err
:elapsed (dt/format-duration elapsed)))
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:error @err
:elapsed (dt/format-duration elapsed)))
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:rown rown
:error @err
:elapsed (dt/format-duration elapsed))
(some-> *stats* (swap! update :processed-files (fnil inc 0)))
(some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
@ -1833,21 +1848,9 @@
(when-not @err
(some-> *stats* (swap! update :processed-teams (fnil inc 0))))
(if (cache/cache? *cache*)
(let [cache-stats (cache/stats *cache*)]
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files files
:components components
:graphics graphics
:crt (mth/to-fixed (:hit-rate cache-stats) 2)
:crq (str (:req-count cache-stats))
:error @err
:elapsed (dt/format-duration elapsed)))
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files files
:components components
:graphics graphics
:elapsed (dt/format-duration elapsed)))))))))
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files files
:components components
:graphics graphics
:elapsed (dt/format-duration elapsed))))))))

@ -7,18 +7,19 @@
(ns app.srepl.components-v2
[app.common.data :as d]
[app.common.fressian :as fres]
[app.common.logging :as l]
[app.common.uuid :as uuid]
[app.db :as db]
[app.features.components-v2 :as feat]
[app.main :as main]
[app.srepl.helpers :as h]
[app.svgo :as svgo]
[app.util.cache :as cache]
[app.util.events :as events]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[cuerdas.core :as str]
[datoteka.fs :as fs]
[datoteka.io :as io]
[promesa.exec :as px]
[promesa.exec.semaphore :as ps]
[promesa.util :as pu]))
@ -112,14 +113,14 @@
(def ^:private sql:get-files-by-created-at
"SELECT id, features,
row_number() OVER (ORDER BY created_at) AS rown
row_number() OVER (ORDER BY created_at DESC) AS rown
FROM file
WHERE deleted_at IS NULL
ORDER BY created_at DESC")
(def ^:private sql:get-files-by-modified-at
"SELECT id, features
row_number() OVER (ORDER BY modified_at) AS rown
row_number() OVER (ORDER BY modified_at DESC) AS rown
FROM file
WHERE deleted_at IS NULL
ORDER BY modified_at DESC")
@ -210,11 +211,7 @@
skip-on-graphic-error? true}}]
(l/dbg :hint "migrate:start" :rollback rollback?)
(let [tpoint (dt/tpoint)
file-id (h/parse-uuid file-id)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
file-id (h/parse-uuid file-id)]
(binding [feat/*stats* (atom {})
feat/*cache* cache]
@ -245,12 +242,7 @@
(let [team-id (h/parse-uuid team-id)
stats (atom {})
tpoint (dt/tpoint)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
tpoint (dt/tpoint)]
(add-watch stats :progress-report (report-progress-files tpoint))
@ -313,35 +305,30 @@
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
(fn [team-id]
(let [tpoint (dt/tpoint)]
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-team! system team-id
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-team! system team-id
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id))
(events/tap :error
(ex-info "unexpected error on processing team (skiping)"
{:team-id team-id}
(events/tap :error
(ex-info "unexpected error on processing team (skiping)"
{:team-id team-id}
(swap! stats update :errors (fnil inc 0)))
(swap! stats update :errors (fnil inc 0)))
(ps/release! sjobs)))))
(ps/release! sjobs))))
(fn [team-id]
@ -439,50 +426,45 @@
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
(fn [file-id]
(let [tpoint (dt/tpoint)]
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-file! system file-id
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(fn [file-id rown]
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-file! system file-id
:rown rown
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id))
(events/tap :error
(ex-info "unexpected error on processing file (skiping)"
{:file-id file-id}
(events/tap :error
(ex-info "unexpected error on processing file (skiping)"
{:file-id file-id}
(swap! stats update :errors (fnil inc 0)))
(swap! stats update :errors (fnil inc 0)))
(ps/release! sjobs)))))
(ps/release! sjobs))))
(fn [file-id]
(fn [{:keys [id rown]}]
(ps/acquire! sjobs)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(l/inf :hint "max time constraint reached"
:file-id (str file-id)
:file-id (str id)
:elapsed (dt/format-duration ts))
(ps/release! sjobs)
(reduced nil))
(px/run! executor (partial migrate-file file-id)))))]
(px/run! executor (partial migrate-file id rown)))))]
(l/dbg :hint "migrate:start"
:label label
@ -507,7 +489,6 @@
(if (int? partitions)
(= current-partition (inc (mod rown partitions)))
(map :id)
(take max-items)))
;; Close and await tasks
@ -526,6 +507,101 @@
:rollback rollback?
:elapsed elapsed)))))))
(def sql:sobjects-for-cache
row_number() OVER (ORDER BY created_at) AS index
FROM storage_object
WHERE (metadata->>'~:bucket' = 'file-media-object' OR
metadata->>'~:bucket' IS NULL)
AND metadata->>'~:content-type' = 'image/svg+xml'
AND deleted_at IS NULL
AND size < 1135899
ORDER BY created_at ASC")
(defn populate-cache!
"A REPL helper for migrate all files.
This function starts multiple concurrent file migration processes
until thw maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify
the total number of partitions and the current partition.
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs] :or {max-jobs 1}}]
(let [tpoint (dt/tpoint)
factory (px/thread-factory :virtual false :prefix "penpot/cache/")
executor (px/cached-executor :factory factory)
sjobs (ps/create :permits max-jobs)
(fn [id index]
(let [path (feat/get-sobject-cache-path id)
parent (fs/parent path)]
(when-not (fs/exists? parent)
(fs/create-dir parent))
(if (fs/exists? path)
(l/inf :hint "create cache entry" :status "exists" :index index :id (str id) :path (str path))
(let [svg-data (feat/get-optimized-svg id)]
(with-open [^java.lang.AutoCloseable stream (io/output-stream path)]
(let [writer (fres/writer stream)]
(fres/write! writer svg-data)))
(l/inf :hint "create cache entry" :status "created"
:index index
:id (str id)
:path (str path))))
(catch Throwable cause
(l/wrn :hint "create cache entry"
:status "error"
:index index
:id (str id)
:path (str path)
:cause cause))
(ps/release! sjobs)))))
(fn [{:keys [id index]}]
(ps/acquire! sjobs)
(px/run! executor (partial retrieve-sobject id index)))]
(l/dbg :hint "migrate:start"
:max-jobs max-jobs)
(binding [feat/*system* main/system]
(run! process-sobject
(db/exec! main/system [sql:sobjects-for-cache]))
;; Close and await tasks
(pu/close! executor))
{:elapsed (dt/format-duration (tpoint))}
(catch Throwable cause
(l/dbg :hint "populate:error" :cause cause))
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "populate:end"
:elapsed elapsed))))))