Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-06 14:50:20 -05:00

Simplify v2 migration helpers on srepl ns

This commit is contained in:
Andrey Antukh 2024-04-09 13:39:30 +02:00
parent c6d92a2517
commit 5924f3bc41
7 changed files with 100 additions and 416 deletions

View file

@ -160,7 +160,6 @@ available_commands = (
parser = argparse.ArgumentParser(
@ -233,7 +232,4 @@ elif args.action == "search-profile":
elif args.action == "migrate-components-v2":

View file

@ -38,29 +38,9 @@ export PENPOT_MEDIA_MAX_FILE_SIZE=104857600
# Setup default multipart upload size to 300MiB
# Setup HEAP
# export OPTIONS="$OPTIONS -J-Xms50m -J-Xmx1024m"
# export OPTIONS="$OPTIONS -J-Xms1100m -J-Xmx1100m -J-XX:+AlwaysPreTouch"
# Increase virtual thread pool size
# export OPTIONS="$OPTIONS -J-Djdk.virtualThreadScheduler.parallelism=16"
# Disable C2 Compiler
# export OPTIONS="$OPTIONS -J-XX:TieredStopAtLevel=1"
# Disable all compilers
# export OPTIONS="$OPTIONS -J-Xint"
# Setup GC
# export OPTIONS="$OPTIONS -J-XX:+UseG1GC"
# Setup GC
# Enable ImageMagick v7.x support
# export OPTIONS="-J-Dim4java.useV7=true $OPTIONS";
# Initialize MINIO config
mc alias set penpot-s3/ http://minio:9000 minioadmin minioadmin -q
mc admin user add penpot-s3 penpot-devenv penpot-devenv -q
@ -76,24 +56,8 @@ export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3
export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000
if [ "$1" = "--watch" ]; then
trap "exit" INT TERM ERR
trap "kill 0" EXIT
echo "Start Watch..."
set -ex
clojure $OPTIONS -A:dev -M -m app.main &
npx nodemon \
--watch src \
--watch ../common \
--ext "clj" \
--signal SIGKILL \
--exec 'echo "(app.main/stop)\n\r(repl/refresh)\n\r(app.main/start)\n" | nc -N localhost 6062'
set -x
clojure $OPTIONS -A:dev -M -m app.main;
clojure $OPTIONS -A:dev -M -m $entrypoint;

View file

@ -53,7 +53,6 @@
[app.storage.tmp :as tmp]
[app.svgo :as svgo]
[app.util.blob :as blob]
[app.util.events :as events]
[app.util.pointer-map :as pmap]
[app.util.time :as dt]
[buddy.core.codecs :as bc]
@ -1196,9 +1195,6 @@
(fn [fdata frame-id grid assets]
(reduce (fn [result [component position]]
(events/tap :progress {:op :migrate-component
:id (:id component)
:name (:name component)})
(add-main-instance result component frame-id (gpt/add position
(gpt/point grid-gap grid-gap))))
@ -1518,9 +1514,6 @@
(->> (d/zip media-group grid)
(reduce (fn [fdata [mobj position]]
(events/tap :progress {:op :migrate-graphic
:id (:id mobj)
:name (:name mobj)})
(or (process fdata mobj position) fdata))
(assoc-in fdata [:options :components-v2] true)))))
@ -1759,11 +1752,6 @@
(let [file (get-file system file-id)
file (process-file! system file :validate? validate?)]
(events/tap :progress
{:op :migrate-file
:name (:name file)
:id (:id file)})
(persist-file! system file)))))
(catch Throwable cause
@ -1791,10 +1779,11 @@
(some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
(defn migrate-team!
[system team-id & {:keys [validate? skip-on-graphic-error? label]}]
[system team-id & {:keys [validate? rown skip-on-graphic-error? label]}]
(l/dbg :hint "migrate:team:start"
:team-id (dm/str team-id))
:team-id (dm/str team-id)
:rown rown)
(let [tpoint (dt/tpoint)
err (volatile! false)
@ -1816,11 +1805,6 @@
(conj "layout/grid")
(conj "styles/v2"))]
(events/tap :progress
{:op :migrate-team
:name (:name team)
:id id})
(run! (partial migrate-file system)
(get-and-lock-team-files conn id))
@ -1849,6 +1833,7 @@
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:rown rown
:files files
:components components
:graphics graphics

View file

@ -527,6 +527,15 @@
:worker? (contains? cf/flags :backend-worker)
:version (:full cf/version)))
(defn start-custom
(ig/load-namespaces config)
(alter-var-root #'system (fn [sys]
(when sys (ig/halt! sys))
(-> config
(defn stop
(alter-var-root #'system (fn [sys]

View file

@ -11,10 +11,7 @@
[app.common.exceptions :as ex]
[app.common.uuid :as uuid]
[app.db :as db]
[app.main :as main]
[app.rpc.commands.auth :as cmd.auth]
[app.srepl.components-v2 :refer [migrate-teams!]]
[app.util.events :as events]
[app.util.json :as json]
[app.util.time :as dt]
[cuerdas.core :as str]))
@ -105,39 +102,6 @@
[{:keys [password]}]
(auth/derive-password password))
(defmethod exec-command :migrate-v2
(letfn [(on-progress-report [{:keys [elapsed completed errors]}]
(println (str/ffmt "-> Progress: completed: %, errors: %, elapsed: %"
completed errors elapsed)))
(on-progress [{:keys [op name]}]
(case op
(println (str/ffmt "-> Migrating team: \"%\"" name))
(println (str/ffmt "=> Migrating file: \"%\"" name))
(on-event [[type payload]]
(case type
:progress-report (on-progress-report payload)
:progress (on-progress payload)
:error (on-error payload)
(on-error [cause]
(println "EE:" (ex-message cause)))]
(println "The components/v2 migration started...")
(let [result (-> (partial migrate-teams! main/system {:rollback? true})
(events/run-with! on-event))]
(println (str/ffmt "Migration process finished (elapsed: %)" (:elapsed result))))
(catch Throwable cause
(on-error cause)))))
(defmethod exec-command :default
[{:keys [::cmd]}]
(ex/raise :type :internal

View file

@ -6,18 +6,16 @@
(ns app.srepl.components-v2
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.fressian :as fres]
[app.common.logging :as l]
[app.db :as db]
[app.features.components-v2 :as feat]
[app.main :as main]
[app.srepl.helpers :as h]
[app.svgo :as svgo]
[app.util.events :as events]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[cuerdas.core :as str]
[datoteka.fs :as fs]
[datoteka.io :as io]
[promesa.exec :as px]
@ -31,86 +29,6 @@
(defn- report-progress-files
(fn [_ _ oldv newv]
(when (or (not= (:processed-files oldv)
(:processed-files newv))
(not= (:errors oldv)
(:errors newv)))
(let [completed (:processed-files newv 0)
errors (:errors newv 0)
elapsed (dt/format-duration (tpoint))]
(events/tap :progress-report
{:elapsed elapsed
:completed completed
:errors errors})
(l/dbg :hint "progress"
:completed completed
:elapsed elapsed)))))
(defn- report-progress-teams
(fn [_ _ oldv newv]
(when (or (not= (:processed-teams oldv)
(:processed-teams newv))
(not= (:errors oldv)
(:errors newv)))
(let [completed (:processed-teams newv 0)
errors (:errors newv 0)
elapsed (dt/format-duration (tpoint))]
(events/tap :progress-report
{:elapsed elapsed
:completed completed
:errors errors})
(l/dbg :hint "progress"
:completed completed
:elapsed elapsed)))))
(def ^:private sql:get-teams-by-created-at
"WITH teams AS (
SELECT id, features,
row_number() OVER (ORDER BY created_at) AS rown
FROM team
WHERE deleted_at IS NULL
ORDER BY created_at DESC
) SELECT * FROM TEAMS %(pred)s")
(def ^:private sql:get-teams-by-graphics
"WITH teams AS (
SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT count(*)
FROM file_media_object AS fmo
JOIN file AS f ON (f.id = fmo.file_id)
JOIN project AS p ON (p.id = f.project_id)
WHERE p.team_id = t.id
AND fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false) AS graphics
FROM team AS t
WHERE t.deleted_at IS NULL
SELECT * FROM teams %(pred)s")
(def ^:private sql:get-teams-by-activity
"WITH teams AS (
SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at))
FROM file AS f
JOIN project AS p ON (f.project_id = p.id)
WHERE p.team_id = t.id) AS updated_at,
(SELECT coalesce(count(*), 0)
FROM file AS f
JOIN project AS p ON (f.project_id = p.id)
WHERE p.team_id = t.id) AS total_files
FROM team AS t
WHERE t.deleted_at IS NULL
SELECT * FROM teams %(pred)s")
(def ^:private sql:get-files-by-created-at
"SELECT id, features,
row_number() OVER (ORDER BY created_at DESC) AS rown
@ -118,87 +36,12 @@
WHERE deleted_at IS NULL
ORDER BY created_at DESC")
(def ^:private sql:get-files-by-modified-at
"SELECT id, features
row_number() OVER (ORDER BY modified_at DESC) AS rown
FROM file
WHERE deleted_at IS NULL
ORDER BY modified_at DESC")
(def ^:private sql:get-files-by-graphics
"WITH files AS (
SELECT f.id, f.features,
row_number() OVER (ORDER BY modified_at) AS rown,
(SELECT count(*) FROM file_media_object AS fmo
WHERE fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false
AND fmo.file_id = f.id) AS graphics
FROM file AS f
WHERE f.deleted_at IS NULL
) SELECT * FROM files %(pred)s")
(defn- read-pred
(let [entries (if (and (vector? entries)
(keyword? (first entries)))
(loop [params []
queries []
entries (seq entries)]
(if-let [[op val field] (first entries)]
(let [field (name field)
cond (case op
:lt (str/ffmt "% < ?" field)
:lte (str/ffmt "% <= ?" field)
:gt (str/ffmt "% > ?" field)
:gte (str/ffmt "% >= ?" field)
:eq (str/ffmt "% = ?" field))]
(recur (conj params val)
(conj queries cond)
(rest entries)))
(let [sql (apply str "WHERE " (str/join " AND " queries))]
(apply vector sql params))))))
(defn- get-teams
[conn query pred]
(let [query (d/nilv query :created-at)
sql (case query
:created-at sql:get-teams-by-created-at
:activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics)
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
(str/format sql {:pred pred-sql})
[(str/format sql {:pred ""})])]
(->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2"))))))
(defn- get-files
[conn query pred]
(let [query (d/nilv query :created-at)
sql (case query
:created-at sql:get-files-by-created-at
:modified-at sql:get-files-by-modified-at
:graphics sql:get-files-by-graphics)
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
(str/format sql {:pred pred-sql})
[(str/format sql {:pred ""})])]
(->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2"))))))
(->> (db/cursor conn [sql:get-files-by-created-at] {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2")))))
@ -244,8 +87,6 @@
stats (atom {})
tpoint (dt/tpoint)]
(add-watch stats :progress-report (report-progress-files tpoint))
(binding [feat/*stats* stats
feat/*cache* cache]
@ -265,127 +106,6 @@
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed)))))))
(defn migrate-teams!
"A REPL helper for migrate all teams.
This function starts multiple concurrent team migration processes
until the maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify
the total number of partitions and the current partition.
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache skip-on-graphic-error?
label partitions current-partition]
:or {validate? false
rollback? true
max-jobs 1
current-partition 1
skip-on-graphic-error? true
max-items Long/MAX_VALUE}}]
(when (int? partitions)
(when-not (int? current-partition)
(throw (IllegalArgumentException. "missing `current-partition` parameter")))
(when-not (<= 0 current-partition partitions)
(throw (IllegalArgumentException. "invalid value on `current-partition` parameter"))))
(let [stats (atom {})
tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)
factory (px/thread-factory :virtual false :prefix "penpot/migration/")
executor (px/cached-executor :factory factory)
max-procs (or max-procs max-jobs)
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
(fn [team-id]
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-team! system team-id
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id))
(events/tap :error
(ex-info "unexpected error on processing team (skiping)"
{:team-id team-id}
(swap! stats update :errors (fnil inc 0)))
(ps/release! sjobs))))
(fn [team-id]
(ps/acquire! sjobs)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(l/inf :hint "max time constraint reached"
:team-id (str team-id)
:elapsed (dt/format-duration ts))
(ps/release! sjobs)
(reduced nil))
(px/run! executor (partial migrate-team team-id)))))]
(l/dbg :hint "migrate:start"
:label label
:rollback rollback?
:max-jobs max-jobs
:max-items max-items)
(add-watch stats :progress-report (report-progress-teams tpoint))
(binding [feat/*stats* stats
feat/*cache* cache
svgo/*semaphore* sprocs]
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET LOCAL statement_timeout = 0"])
(db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-team
(->> (get-teams conn query pred)
(filter (fn [{:keys [rown]}]
(if (int? partitions)
(= current-partition (inc (mod rown partitions)))
(map :id)
(take max-items)))
;; Close and await tasks
(pu/close! executor)))
(-> (deref stats)
(assoc :elapsed (dt/format-duration (tpoint))))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause)
(events/tap :error cause))
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end"
:rollback rollback?
:elapsed elapsed)))))))
(defn migrate-files!
"A REPL helper for migrate all files.
@ -399,8 +119,8 @@
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache skip-on-graphic-error?
[& {:keys [max-jobs max-items rollback? validate?
cache skip-on-graphic-error?
label partitions current-partition]
:or {validate? false
rollback? true
@ -417,14 +137,10 @@
(let [stats (atom {})
tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)
factory (px/thread-factory :virtual false :prefix "penpot/migration/")
executor (px/cached-executor :factory factory)
max-procs (or max-procs max-jobs)
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
(fn [file-id rown]
@ -455,16 +171,7 @@
(fn [{:keys [id rown]}]
(ps/acquire! sjobs)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(l/inf :hint "max time constraint reached"
:file-id (str id)
:elapsed (dt/format-duration ts))
(ps/release! sjobs)
(reduced nil))
(px/run! executor (partial migrate-file id rown)))))]
(px/run! executor (partial migrate-file id rown)))]
(l/dbg :hint "migrate:start"
:label label
@ -472,11 +179,8 @@
:max-jobs max-jobs
:max-items max-items)
(add-watch stats :progress-report (report-progress-files tpoint))
(binding [feat/*stats* stats
feat/*cache* cache
svgo/*semaphore* sprocs]
feat/*cache* cache]
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
@ -484,7 +188,7 @@
(db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-file
(->> (get-files conn query pred)
(->> (get-files conn)
(filter (fn [{:keys [rown] :as row}]
(if (int? partitions)
(= current-partition (inc (mod rown partitions)))
@ -603,15 +307,77 @@
:elapsed elapsed))))))
(defn delete-broken-files
[{:keys [id data] :as file}]
(if (-> data :options :components-v2 true?)
(l/wrn :hint "found old components-v2 format"
:file-id (str id)
:file-name (:name file))
(assoc file :deleted-at (dt/now)))
(def ^:private required-services
[[:app.main/assets :app.storage.s3/backend]
[:app.main/assets :app.storage.fs/backend]
(def ^:private sql:get-teams-by-created-at
"SELECT id, features,
row_number() OVER (ORDER BY created_at DESC) AS rown
FROM team
WHERE deleted_at IS NULL
ORDER BY created_at DESC")
(defn- get-teams
(->> (db/cursor conn [sql:get-teams-by-created-at] {:chunk-size 1})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2")))))
(defn- migrate-teams
[{:keys [::db/conn] :as system}]
(db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! (fn [{:keys [id rown]}]
(-> (assoc system ::db/rollback true)
(feat/migrate-team! id
:rown rown
:label "migration-v2"
:validate? false
:skip-on-graphics-error? true))
(catch Throwable _
(swap! feat/*stats* update :errors (fnil inc 0))
(l/wrn :hint "error on migrating team (skiping)"))))
(get-teams conn)))
(defn run-migration
(let [config (select-keys main/system-config required-services)
tpoint (dt/tpoint)
stats (atom {})]
(main/start-custom config)
(binding [feat/*stats* stats]
(db/tx-run! main/system migrate-teams))
(let [stats (deref stats)
elapsed (dt/format-duration (tpoint))]
(l/inf :hint "migration finished"
:files (:processed-files stats)
:teams (:processed-teams stats)
:errors (:errors stats)
:elapsed elapsed))
(defn -main
[& _args]
(System/exit 0)
(catch Throwable cause
(ex/print-throwable cause)
(System/exit -1))))

View file

@ -612,7 +612,7 @@
(t/is (fn? result))
(let [events (th/consume-sse result)]
(t/is (= 6 (count events)))
(t/is (= 5 (count events)))
(t/is (= :end (first (last events))))))))
(t/deftest get-list-of-buitin-templates