✨ Improve migration scripts
4 changed files with 300 additions and 76 deletions
@ -48,12 +48,6 @@
<Logger name="app.features" level="all" additivity="true">
<AppenderRef ref="reports" level="warn" />
<!-- <AppenderRef ref="main" level="debug" /> -->
<Logger name="app.srepl" level="all" additivity="true">
<AppenderRef ref="reports" level="warn" />
<!-- <AppenderRef ref="main" level="trace" /> -->
<Logger name="app" level="all" additivity="false">
@ -77,10 +77,6 @@
internal functions without the need to explicitly pass it top down."
(def ^:dynamic ^:private *team-id*
"A dynamic var that holds the current processing team-id."
(def ^:dynamic ^:private *file-stats*
"An internal dynamic var for collect stats by file."
@ -1194,12 +1190,11 @@
;; The media processing adds the data to the
;; input map and returns it.
(media/run {:cmd :info :input item}))
(catch Throwable _
(let [team-id *team-id*]
(l/wrn :hint "unable to process embedded images on svg file"
:team-id (str team-id)
:file-id (str file-id)
:media-id (str media-id)))
(l/wrn :hint "unable to process embedded images on svg file"
:file-id (str file-id)
:media-id (str media-id))
(persist-image [acc {:keys [path size width height mtype href] :as item}]
@ -1332,24 +1327,20 @@
(catch Throwable cause
(vreset! err true)
(let [cause (pu/unwrap-exception cause)
edata (ex-data cause)
team-id *team-id*]
edata (ex-data cause)]
(instance? org.xml.sax.SAXParseException cause)
(l/inf :hint "skip processing media object: invalid svg found"
:team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj)))
(instance? org.graalvm.polyglot.PolyglotException cause)
(l/inf :hint "skip processing media object: invalid svg found"
:team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj)))
(= (:type edata) :not-found)
(l/inf :hint "skip processing media object: underlying object does not exist"
:team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj)))
@ -1357,7 +1348,6 @@
(let [skip? *skip-on-graphic-error*]
(l/wrn :hint "unable to process file media object"
:skiped skip?
:team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj))
:cause cause)
@ -1524,7 +1514,9 @@
(defn migrate-file!
[system file-id & {:keys [validate? skip-on-graphic-error? label]}]
(let [tpoint (dt/tpoint)]
(let [tpoint (dt/tpoint)
err (volatile! false)]
(binding [*file-stats* (atom {})
*skip-on-graphic-error* skip-on-graphic-error?]
@ -1533,40 +1525,50 @@
:validate validate?
:skip-on-graphic-error skip-on-graphic-error?)
(let [system (update system ::sto/storage media/configure-assets-storage)]
(db/tx-run! system
(fn [system]
(binding [*system* system]
(when (string? label)
(fsnap/take-file-snapshot! system {:file-id file-id
:label (str "migration/" label)}))
(let [file (get-file system file-id)]
(events/tap :progress
{:op :migrate-file
:name (:name file)
:id (:id file)})
(db/tx-run! (update system ::sto/storage media/configure-assets-storage)
(fn [system]
(binding [*system* system]
(when (string? label)
(fsnap/take-file-snapshot! system {:file-id file-id
:label (str "migration/" label)}))
(let [file (get-file system file-id)]
(events/tap :progress
{:op :migrate-file
:name (:name file)
:id (:id file)})
(process-file system file :validate? validate?)))
(process-file system file :validate? validate?)))))
(catch Throwable cause
(let [team-id *team-id*]
(l/wrn :hint "error on processing file"
:team-id (str team-id)
:file-id (str file-id))
(throw cause)))))))
(catch Throwable cause
(vreset! err true)
(l/wrn :hint "error on processing file"
:file-id (str file-id)
:cause cause)
(throw cause))
(let [elapsed (tpoint)
components (get @*file-stats* :processed-components 0)
graphics (get @*file-stats* :processed-graphics 0)]
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:elapsed (dt/format-duration elapsed))
(if (cache/cache? *cache*)
(let [cache-stats (cache/stats *cache*)]
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:crt (mth/to-fixed (:hit-rate cache-stats) 2)
:crq (str (:req-count cache-stats))
:error @err
:elapsed (dt/format-duration elapsed)))
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:error @err
:elapsed (dt/format-duration elapsed)))
(some-> *stats* (swap! update :processed-files (fnil inc 0)))
(some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
@ -1607,13 +1609,15 @@
(update-team-features! conn id features)))))]
(binding [*team-stats* (atom {})
*team-id* team-id]
(binding [*team-stats* (atom {})]
(db/tx-run! system migrate-team team-id)
(catch Throwable cause
(vreset! err true)
(l/wrn :hint "error on processing team"
:team-id (str team-id)
:cause cause)
(throw cause))
@ -70,8 +70,8 @@
(some? (:data snapshot)))
(l/debug :hint "snapshot found"
:snapshot-id (:id snapshot)
:file-id file-id)
:snapshot-id (str (:id snapshot))
:file-id (str file-id))
(db/update! conn :file
{:data (:data snapshot)}
@ -112,7 +112,9 @@
(when-let [file (db/get* conn :file {:id file-id})]
(let [id (uuid/next)
label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123)))]
(l/debug :hint "persisting file snapshot" :file-id file-id :label label)
(l/debug :hint "persisting file snapshot"
:file-id (str file-id)
:label label)
(db/insert! conn :file-change
{:id id
:revn (:revn file)
@ -32,12 +32,20 @@
(defn- report-progress-files
(fn [_ _ oldv newv]
(when (not= (:processed-files oldv)
(:processed-files newv))
(let [elapsed (tpoint)]
(when (or (not= (:processed-files oldv)
(:processed-files newv))
(not= (:errors oldv)
(:errors newv)))
(let [completed (:processed-files newv 0)
errors (:errors newv 0)
elapsed (dt/format-duration (tpoint))]
(events/tap :progress-report
{:elapsed elapsed
:completed completed
:errors errors})
(l/dbg :hint "progress"
:completed (:processed-files newv)
:elapsed (dt/format-duration elapsed))))))
:completed completed
:elapsed elapsed)))))
(defn- report-progress-teams
@ -101,13 +109,47 @@
(def ^:private sql:get-teams-by-report
"WITH teams AS (
SELECT t.id t.features, mr.name
FROM migration_report AS mr
FROM migration_team_report AS mr
JOIN team AS t ON (t.id = mr.team_id)
WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM teams %(pred)s")
(def ^:private sql:get-files-by-created-at
"SELECT id, features
FROM file
WHERE deleted_at IS NULL
ORDER BY created_at DESC")
(def ^:private sql:get-files-by-modified-at
"SELECT id, features
FROM file
WHERE deleted_at IS NULL
ORDER BY modified_at DESC")
(def ^:private sql:get-files-by-graphics
"WITH files AS (
SELECT f.id, f.features,
(SELECT count(*) FROM file_media_object AS fmo
WHERE fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false
AND fmo.file_id = f.id) AS graphics
FROM file AS f
WHERE f.deleted_at IS NULL
) SELECT * FROM files %(pred)s")
(def ^:private sql:get-files-by-report
"WITH files AS (
SELECT t.id t.features, mr.name
FROM migration_file_report AS mr
JOIN file AS t ON (t.id = mr.file_id)
WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM files %(pred)s")
(defn- read-pred
(let [entries (if (and (vector? entries)
@ -140,7 +182,6 @@
:activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics
:report sql:get-teams-by-report)
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
@ -154,34 +195,78 @@
(contains? features "components/v2")))
(map :id))))
(def ^:private sql:report-table
(defn- get-files
[conn query pred]
(let [query (d/nilv query :created-at)
sql (case query
:created-at sql:get-files-by-created-at
:modified-at sql:get-files-by-modified-at
:graphics sql:get-files-by-graphics
:report sql:get-files-by-report)
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
(str/format sql {:pred pred-sql})
[(str/format sql {:pred ""})])]
(->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2")))
(map :id))))
(def ^:private sql:team-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_team_report (
id bigserial NOT NULL,
label text NOT NULL,
team_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id)
PRIMARY KEY (label, created_at, id))")
(defn- create-report-table!
(def ^:private sql:file-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_file_report (
id bigserial NOT NULL,
label text NOT NULL,
file_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id))")
(defn- create-report-tables!
(db/exec-one! system [sql:report-table]))
(db/exec-one! system [sql:team-report-table])
(db/exec-one! system [sql:file-report-table]))
(defn- clean-reports!
(defn- clean-team-reports!
[system label]
(db/delete! system :migration-report {:label label}))
(db/delete! system :migration-team-report {:label label}))
(defn- report!
(defn- team-report!
[system team-id label elapsed error]
(db/insert! system :migration-report
(db/insert! system :migration-team-report
{:label label
:team-id team-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
(defn- clean-file-reports!
[system label]
(db/delete! system :migration-file-report {:label label}))
(defn- file-report!
[system file-id label elapsed error]
(db/insert! system :migration-file-report
{:label label
:file-id file-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
@ -318,12 +403,11 @@
:skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label)
(report! main/system team-id label (tpoint) nil))
(team-report! main/system team-id label (tpoint) nil))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id)
:cause cause)
:team-id (str team-id))
(events/tap :error
(ex-info "unexpected error on processing team (skiping)"
@ -333,7 +417,7 @@
(swap! stats update :errors (fnil inc 0))
(when (string? label)
(report! main/system team-id label (tpoint) (ex-message cause))))
(team-report! main/system team-id label (tpoint) (ex-message cause))))
(ps/release! sjobs)))))
@ -365,8 +449,8 @@
svgo/*semaphore* sprocs]
(when (string? label)
(create-report-table! main/system)
(clean-reports! main/system label))
(create-report-tables! main/system)
(clean-team-reports! main/system label))
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
@ -399,6 +483,146 @@
:rollback rollback?
:elapsed elapsed)))))))
(defn migrate-files!
"A REPL helper for migrate all files.
This function starts multiple concurrent file migration processes
until thw maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify
the total number of partitions and the current partition.
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache skip-on-graphic-error?
label partitions current-partition]
:or {validate? false
rollback? true
max-jobs 1
current-partition 1
skip-on-graphic-error? true
max-items Long/MAX_VALUE}}]
(when (int? partitions)
(when-not (int? current-partition)
(throw (IllegalArgumentException. "missing `current-partition` parameter")))
(when-not (<= 0 current-partition partitions)
(throw (IllegalArgumentException. "invalid value on `current-partition` parameter"))))
(let [stats (atom {})
tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)
factory (px/thread-factory :virtual false :prefix "penpot/migration/")
executor (px/cached-executor :factory factory)
max-procs (or max-procs max-jobs)
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
(fn [file-id]
(let [tpoint (dt/tpoint)]
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"])
(feat/migrate-file! system file-id
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label)
(file-report! main/system file-id label (tpoint) nil))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id))
(events/tap :error
(ex-info "unexpected error on processing file (skiping)"
{:file-id file-id}
(swap! stats update :errors (fnil inc 0))
(when (string? label)
(file-report! main/system file-id label (tpoint) (ex-message cause))))
(ps/release! sjobs)))))
(fn [file-id]
(ps/acquire! sjobs)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(l/inf :hint "max time constraint reached"
:file-id (str file-id)
:elapsed (dt/format-duration ts))
(ps/release! sjobs)
(reduced nil))
(px/run! executor (partial migrate-file file-id)))))]
(l/dbg :hint "migrate:start"
:label label
:rollback rollback?
:max-jobs max-jobs
:max-items max-items)
(add-watch stats :progress-report (report-progress-files tpoint))
(binding [feat/*stats* stats
feat/*cache* cache
svgo/*semaphore* sprocs]
(when (string? label)
(create-report-tables! main/system)
(clean-file-reports! main/system label))
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(run! process-file
(->> (get-files conn query pred)
(filter (fn [file-id]
(if (int? partitions)
(= current-partition (-> (uuid/hash-int file-id)
(mod partitions)
(take max-items)))
;; Close and await tasks
(pu/close! executor)))
(-> (deref stats)
(assoc :elapsed (dt/format-duration (tpoint))))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause)
(events/tap :error cause))
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end"
:rollback rollback?
:elapsed elapsed)))))))
