0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-24 23:49:45 -05:00

Add performance and reporting improvements to migration script

This commit is contained in:
Andrey Antukh 2023-11-15 13:27:40 +01:00
parent 4c190e385e
commit 1457b7cf38
2 changed files with 165 additions and 171 deletions

View file

@ -45,6 +45,7 @@
[buddy.core.codecs :as bc]
[cuerdas.core :as str]
[datoteka.io :as io]
[promesa.exec :as px]
[promesa.exec.semaphore :as ps]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -53,6 +54,8 @@
(def ^:dynamic *system* nil)
(def ^:dynamic *stats* nil)
(def ^:dynamic *file-stats* nil)
(def ^:dynamic *team-stats* nil)
(def ^:dynamic *semaphore* nil)
(def ^:dynamic *skip-on-error* true)
@ -380,12 +383,10 @@
fdata
(d/zip components positions))))]
(when (some? *stats*)
(let [total (count components)]
(swap! *stats* (fn [stats]
(-> stats
(update :processed/components (fnil + 0) total)
(assoc :current/components total))))))
(let [total (count components)]
(some-> *stats* (swap! update :processed/components (fnil + 0) total))
(some-> *team-stats* (swap! update :processed/components (fnil + 0) total))
(some-> *file-stats* (swap! assoc :processed/components total)))
(-> file-data
(prepare-file-data libraries)
@ -575,8 +576,7 @@
cfsh/prepare-create-artboard-from-selection)
changes (pcb/concat-changes changes changes2)]
(cp/process-changes (assoc-in fdata [:options :components-v2] true) ; Process component creation in v2 way
(:redo-changes changes) false)))
(:redo-changes changes)))
(defn- migrate-graphics
[fdata]
@ -591,30 +591,48 @@
(grc/rect->points))]
(assoc media :points points)))))
;; FIXME: think about what to do with existing media entries ??
grid (ctst/generate-shape-grid media position grid-gap)]
(when (some? *stats*)
(let [total (count media)]
(swap! *stats* (fn [stats]
(-> stats
(update :processed/graphics (fnil + 0) total)
(assoc :current/graphics total))))))
(let [total (count media)]
(some-> *stats* (swap! update :processed/graphics (fnil + 0) total))
(some-> *team-stats* (swap! update :processed/graphics (fnil + 0) total))
(some-> *file-stats* (swap! assoc :processed/graphics total)))
(->> (d/zip media grid)
(reduce (fn [fdata [mobj position]]
(try
(process-media-object fdata page-id mobj position)
(catch Throwable cause
(l/warn :hint "unable to process file media object (skiping)"
:file-id (str (:id fdata))
:id (str (:id mobj))
:cause cause)
(let [factory (px/thread-factory :virtual true)
executor (px/fixed-executor :parallelism 10 :factory factory)
process (fn [mobj position]
(let [tp1 (dt/tpoint)]
(try
(process-media-object fdata page-id mobj position)
(catch Throwable cause
(l/wrn :hint "unable to process file media object (skiping)"
:file-id (str (:id fdata))
:id (str (:id mobj))
:cause cause)
(if-not *skip-on-error*
(throw cause)
fdata))))
fdata)))))
(if-not *skip-on-error*
(throw cause)
fdata))
(finally
(l/trc :hint "graphic processed"
:file-id (str (:id fdata))
:media-id (str (:id mobj))
:elapsed (dt/format-duration (tp1)))))))
process (px/wrap-bindings process)]
(try
(->> (d/zip media grid)
(map (fn [[mobj position]]
(l/trc :hint "submit graphic processing" :file-id (str (:id fdata)) :id (str (:id mobj)))
(px/submit! executor (partial process mobj position))))
(reduce (fn [fdata promise]
(if-let [changes (deref promise)]
(cp/process-changes fdata changes false)
fdata))
fdata))
(finally
(.close ^java.lang.AutoCloseable executor)))))))
(defn- migrate-file-data
[fdata libs]
@ -665,7 +683,7 @@
(when validate?
(let [errors (cfv/validate-file file libs)]
(when (seq errors)
(l/err :hint "migrate:file:validation-error"
(l/wrn :hint "migrate:file:validation-error"
:file-id (str (:id file))
:file-name (:name file)
:errors errors))))
@ -674,43 +692,38 @@
(defn migrate-file!
[system file-id & {:keys [validate?]}]
(let [tpoint (dt/tpoint)
file-id (if (string? file-id)
(parse-uuid file-id)
file-id)]
(try
(l/dbg :hint "migrate:file:start" :file-id (str file-id))
(let [system (update system ::sto/storage media/configure-assets-storage)]
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(fsnap/take-file-snapshot! system {:file-id file-id
:label "migration/components-v2"})
(binding [*file-stats* (atom {})]
(try
(l/dbg :hint "migrate:file:start" :file-id (str file-id))
(binding [*system* system]
(-> (db/get conn :file {:id file-id})
(update :features db/decode-pgarray #{})
(process-file :validate? validate?))))))
(let [system (update system ::sto/storage media/configure-assets-storage)]
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(binding [*system* system]
(fsnap/take-file-snapshot! system {:file-id file-id
:label "migration/components-v2"})
(-> (db/get conn :file {:id file-id})
(update :features db/decode-pgarray #{})
(process-file :validate? validate?))))))
(finally
(let [elapsed (tpoint)
stats (some-> *stats* deref)]
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:components (:current/components stats 0)
:graphics (:current/graphics stats 0)
:elapsed (dt/format-duration elapsed))
(finally
(let [elapsed (tpoint)
components (get @*file-stats* :processed/components 0)
graphics (get @*file-stats* :processed/graphics 0)]
(when (some? *stats*)
(swap! *stats* (fn [stats]
(let [elapsed (inst-ms elapsed)
completed (inc (get stats :processed/files 0))
total (+ (get stats :elapsed/total-by-file 0) elapsed)
avg (/ (double elapsed) completed)]
(-> stats
(update :elapsed/max-by-file (fnil max 0) elapsed)
(assoc :elapsed/avg-by-file avg)
(assoc :elapsed/total-by-file total)
(assoc :processed/files completed)))))))))))
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:elapsed (dt/format-duration elapsed))
(some-> *stats* (swap! update :processed/files (fnil inc 0)))
(some-> *team-stats* (swap! update :processed/files (fnil inc 0)))))))))
(defn migrate-team!
[system team-id & {:keys [validate?]}]
@ -719,72 +732,66 @@
(parse-uuid team-id)
team-id)]
(l/dbg :hint "migrate:team:start" :team-id (dm/str team-id))
(try
;; We execute this out of transaction because we want this
;; change to be visible to all other sessions before starting
;; the migration
(let [sql (str "UPDATE team SET features = "
" array_append(features, 'ephimeral/v2-migration') "
" WHERE id = ?")]
(db/exec-one! system [sql team-id]))
(binding [*team-stats* (atom {})]
(try
;; We execute this out of transaction because we want this
;; change to be visible to all other sessions before starting
;; the migration
(let [sql (str "UPDATE team SET features = "
" array_append(features, 'ephimeral/v2-migration') "
" WHERE id = ?")]
(db/exec-one! system [sql team-id]))
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
;; Lock the team
(db/exec-one! conn ["SET idle_in_transaction_session_timeout = 0"])
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
;; Lock the team
(db/exec-one! conn ["SET idle_in_transaction_session_timeout = 0"])
(let [{:keys [features] :as team} (-> (db/get conn :team {:id team-id})
(update :features db/decode-pgarray #{}))]
(let [{:keys [features] :as team} (-> (db/get conn :team {:id team-id})
(update :features db/decode-pgarray #{}))]
(if (contains? features "components/v2")
(l/dbg :hint "team already migrated")
(let [sql (str/concat
"SELECT f.id FROM file AS f "
" JOIN project AS p ON (p.id = f.project_id) "
"WHERE p.team_id = ? AND f.deleted_at IS NULL AND p.deleted_at IS NULL "
"FOR UPDATE")
(if (contains? features "components/v2")
(l/dbg :hint "team already migrated")
(let [sql (str/concat
"SELECT f.id FROM file AS f "
" JOIN project AS p ON (p.id = f.project_id) "
"WHERE p.team_id = ? AND f.deleted_at IS NULL AND p.deleted_at IS NULL "
"FOR UPDATE")
rows (->> (db/exec! conn [sql team-id])
(map :id))]
rows (->> (db/exec! conn [sql team-id])
(map :id))]
(run! #(migrate-file! system % :validate? validate?) rows)
(some-> *stats* (swap! assoc :current/files (count rows)))
(run! #(migrate-file! system % :validate? validate?) rows)
(let [features (-> features
(disj "ephimeral/v2-migration")
(conj "components/v2")
(conj "layout/grid")
(conj "styles/v2"))]
(db/update! conn :team
{:features (db/create-array conn "text" features)}
{:id team-id})))))))
(finally
(some-> *semaphore* ps/release!)
(let [elapsed (tpoint)
stats (some-> *stats* deref)]
(when (some? *stats*)
(swap! *stats* (fn [stats]
(let [elapsed (inst-ms elapsed)
completed (inc (get stats :processed/teams 0))
total (+ (get stats :elapsed/total-by-team 0) elapsed)
avg (/ (double elapsed) completed)]
(-> stats
(update :elapsed/max-by-team (fnil max 0) elapsed)
(assoc :elapsed/avg-by-team avg)
(assoc :elapsed/total-by-team total)
(assoc :processed/teams completed))))))
(let [features (-> features
(disj "ephimeral/v2-migration")
(conj "components/v2")
(conj "layout/grid")
(conj "styles/v2"))]
(db/update! conn :team
{:features (db/create-array conn "text" features)}
{:id team-id})))))))
(finally
(some-> *semaphore* ps/release!)
(let [elapsed (tpoint)]
(some-> *stats* (swap! update :processed/teams (fnil inc 0)))
;; We execute this out of transaction because we want this
;; change to be visible to all other sessions before starting
;; the migration
(let [sql (str "UPDATE team SET features = "
" array_remove(features, 'ephimeral/v2-migration') "
" WHERE id = ?")]
(db/exec-one! system [sql team-id]))
;; We execute this out of transaction because we want this
;; change to be visible to all other sessions before starting
;; the migration
(let [sql (str "UPDATE team SET features = "
" array_remove(features, 'ephimeral/v2-migration') "
" WHERE id = ?")]
(db/exec-one! system [sql team-id]))
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files (:current/files stats 0)
:elapsed (dt/format-duration elapsed)))))))
(let [components (get @*team-stats* :processed/components 0)
graphics (get @*team-stats* :processed/graphics 0)
files (get @*team-stats* :processed/files 0)]
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files files
:components components
:graphics graphics
:elapsed (dt/format-duration elapsed)))))))))

View file

@ -21,17 +21,9 @@
(defn- print-stats!
[stats]
(let [stats (-> stats
(d/update-when :elapsed/max-by-team (comp dt/format-duration dt/duration int))
(d/update-when :elapsed/avg-by-team (comp dt/format-duration dt/duration int))
(d/update-when :elapsed/total-by-team (comp dt/format-duration dt/duration int))
(d/update-when :elapsed/max-by-file (comp dt/format-duration dt/duration int))
(d/update-when :elapsed/avg-by-file (comp dt/format-duration dt/duration int))
(d/update-when :elapsed/total-by-file (comp dt/format-duration dt/duration int))
)]
(->> stats
(into (sorted-map))
(pp/pprint))))
(->> stats
(into (sorted-map))
(pp/pprint)))
(defn- report-progress-files
[tpoint]
@ -42,7 +34,7 @@
completed (:processed/files newv)
progress (/ (* completed 100.0) total)
elapsed (tpoint)]
(l/trc :hint "progress"
(l/dbg :hint "progress"
:completed (:processed/files newv)
:total (:total/files newv)
:progress (str (int progress) "%")
@ -57,8 +49,11 @@
completed (:processed/teams newv)
progress (/ (* completed 100.0) total)
elapsed (tpoint)]
(l/trc :hint "progress"
:completed (:processed/teams newv)
(l/dbg :hint "progress"
:completed-teams (:processed/teams newv)
:completed-files (:processed/files newv)
:completed-graphics (:processed/graphics newv)
:completed-components (:processed/components newv)
:progress (str (int progress) "%")
:elapsed (dt/format-duration elapsed))))))
@ -88,36 +83,35 @@
(:count res)))
(defn migrate-file!
[system file-id & {:keys [rollback] :or {rollback true}}]
[system file-id & {:keys [rollback?] :or {rollback? true}}]
(l/dbg :hint "migrate:start")
(let [tpoint (dt/tpoint)]
(try
(binding [feat/*stats* (atom {})]
(-> (assoc system ::db/rollback rollback)
(-> (assoc system ::db/rollback rollback?)
(feat/migrate-file! file-id))
(-> (deref feat/*stats*)
(assoc :elapsed (dt/format-duration (tpoint)))
(dissoc :current/graphics)
(dissoc :current/components)
(dissoc :current/files)))
(assoc :elapsed (dt/format-duration (tpoint)))))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause))
(l/wrn :hint "migrate:error" :cause cause))
(finally
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end" :elapsed elapsed))))))
(defn migrate-files!
[{:keys [::db/pool] :as system} & {:keys [chunk-size max-jobs max-items start-at preset rollback skip-on-error validate]
:or {chunk-size 10
skip-on-error true
max-jobs 10
max-items Long/MAX_VALUE
preset :shutdown-on-failure
rollback true
validate false}}]
[{:keys [::db/pool] :as system}
& {:keys [chunk-size max-jobs max-items start-at preset rollback? skip-on-error validate?]
:or {chunk-size 10
skip-on-error true
max-jobs 10
max-items Long/MAX_VALUE
preset :shutdown-on-failure
rollback? true
validate? false}}]
(letfn [(get-chunk [cursor]
(let [sql (str/concat
"SELECT id, created_at FROM file "
@ -151,17 +145,14 @@
(run! (fn [file-id]
(ps/acquire! feat/*semaphore*)
(px/submit! scope (fn []
(-> (assoc system ::db/rollback rollback)
(feat/migrate-file! file-id :validate? validate)))))
(-> (assoc system ::db/rollback rollback?)
(feat/migrate-file! file-id :validate? validate?)))))
(get-candidates))
(p/await! scope))
(-> (deref feat/*stats*)
(assoc :elapsed (dt/format-duration (tpoint)))
(dissoc :current/graphics)
(dissoc :current/components)
(dissoc :current/files))
(assoc :elapsed (dt/format-duration (tpoint))))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause))
@ -172,8 +163,8 @@
(defn migrate-team!
[{:keys [::db/pool] :as system} team-id
& {:keys [rollback skip-on-error validate]
:or {rollback true skip-on-error true validate false}}]
& {:keys [rollback? skip-on-error validate?]
:or {rollback? true skip-on-error true validate? false}}]
(l/dbg :hint "migrate:start")
(let [total (get-total-files pool :team-id team-id)
@ -185,15 +176,13 @@
(try
(binding [feat/*stats* stats
feat/*skip-on-error* skip-on-error]
(-> (assoc system ::db/rollback rollback)
(feat/migrate-team! team-id :validate? validate))
(-> (assoc system ::db/rollback rollback?)
(feat/migrate-team! team-id :validate? validate?))
(print-stats!
(-> (deref feat/*stats*)
(dissoc :total/files)
(dissoc :current/graphics)
(dissoc :current/components)
(dissoc :current/files))))
(assoc :elapsed (dt/format-duration (tpoint))))))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause))
@ -204,14 +193,14 @@
(defn migrate-teams!
[{:keys [::db/pool] :as system}
& {:keys [chunk-size max-jobs max-items start-at rollback preset skip-on-error max-time validate]
& {:keys [chunk-size max-jobs max-items start-at rollback? preset skip-on-error max-time validate?]
:or {chunk-size 10000
rollback true
validate? false
rollback? true
skip-on-error true
preset :shutdown-on-failure
max-jobs Integer/MAX_VALUE
max-items Long/MAX_VALUE
validate false}}]
max-items Long/MAX_VALUE}}]
(letfn [(get-chunk [cursor]
(let [sql (str/concat
@ -233,8 +222,8 @@
(migrate-team [team-id]
(try
(-> (assoc system ::db/rollback rollback)
(feat/migrate-team! team-id :validate? validate))
(-> (assoc system ::db/rollback rollback?)
(feat/migrate-team! team-id :validate? validate?))
(catch Throwable cause
(l/err :hint "unexpected error on processing team" :team-id (dm/str team-id) :cause cause))))
@ -242,7 +231,7 @@
(ps/acquire! feat/*semaphore*)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(l/trc :hint "max time constraint reached" :elapsed (dt/format-duration ts))
(l/inf :hint "max time constraint reached" :elapsed (dt/format-duration ts))
(px/submit! scope (partial migrate-team team-id)))))]
(l/dbg :hint "migrate:start")
@ -270,10 +259,8 @@
(print-stats!
(-> (deref feat/*stats*)
(dissoc :total/teams)
(dissoc :current/graphics)
(dissoc :current/components)
(dissoc :current/files)))
(assoc :elapsed/total (dt/format-duration (tpoint)))
(dissoc :total/teams)))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause))