0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-12 07:41:43 -05:00

Merge pull request #4352 from penpot/niwinz-staging-migration

📎 MIgration related optimizations
This commit is contained in:
Alejandro 2024-04-07 14:18:26 +02:00 committed by GitHub
commit a4a70f81b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 284 additions and 281 deletions

View file

@ -91,8 +91,8 @@
:jmx-remote :jmx-remote
{:jvm-opts ["-Dcom.sun.management.jmxremote" {:jvm-opts ["-Dcom.sun.management.jmxremote"
"-Dcom.sun.management.jmxremote.port=9090" "-Dcom.sun.management.jmxremote.port=9091"
"-Dcom.sun.management.jmxremote.rmi.port=9090" "-Dcom.sun.management.jmxremote.rmi.port=9091"
"-Dcom.sun.management.jmxremote.local.only=false" "-Dcom.sun.management.jmxremote.local.only=false"
"-Dcom.sun.management.jmxremote.authenticate=false" "-Dcom.sun.management.jmxremote.authenticate=false"
"-Dcom.sun.management.jmxremote.ssl=false" "-Dcom.sun.management.jmxremote.ssl=false"

View file

@ -6,7 +6,7 @@
alwaysWriteExceptions="true" /> alwaysWriteExceptions="true" />
</Console> </Console>
<RollingFile name="main" fileName="logs/main.log" filePattern="logs/main-%i.log"> <RollingFile name="main" fileName="logs/main-latest.log" filePattern="logs/main-%i.log">
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] %level{length=1} %logger{36} - %msg%n" <PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] %level{length=1} %logger{36} - %msg%n"
alwaysWriteExceptions="true" /> alwaysWriteExceptions="true" />
<Policies> <Policies>
@ -15,7 +15,7 @@
<DefaultRolloverStrategy max="9"/> <DefaultRolloverStrategy max="9"/>
</RollingFile> </RollingFile>
<RollingFile name="reports" fileName="logs/reports.log" filePattern="logs/reports-%i.log"> <RollingFile name="reports" fileName="logs/reports-latest.log" filePattern="logs/reports-%i.log">
<PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] %level{length=1} %logger{36} - %msg%n" <PatternLayout pattern="[%d{YYYY-MM-dd HH:mm:ss.SSS}] %level{length=1} %logger{36} - %msg%n"
alwaysWriteExceptions="true" /> alwaysWriteExceptions="true" />
<Policies> <Policies>

View file

@ -1,6 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
source /home/penpot/backend/environ source /home/penpot/environ
export PENPOT_FLAGS="$PENPOT_FLAGS disable-backend-worker" export PENPOT_FLAGS="$PENPOT_FLAGS disable-backend-worker"
export OPTIONS=" export OPTIONS="
@ -12,13 +12,13 @@ export OPTIONS="
-J-XX:+UnlockDiagnosticVMOptions \ -J-XX:+UnlockDiagnosticVMOptions \
-J-XX:+DebugNonSafepoints \ -J-XX:+DebugNonSafepoints \
-J-Djdk.tracePinnedThreads=full \ -J-Djdk.tracePinnedThreads=full \
-J-XX:+UseTransparentHugePages \
-J-XX:ReservedCodeCacheSize=1g \
-J-Dpolyglot.engine.WarnInterpreterOnly=false \ -J-Dpolyglot.engine.WarnInterpreterOnly=false \
-J--enable-preview"; -J--enable-preview";
# Setup HEAP # Setup HEAP
#export OPTIONS="$OPTIONS -J-Xms900m -J-Xmx900m -J-XX:+AlwaysPreTouch" export OPTIONS="$OPTIONS -J-Xms320g -J-Xmx320g -J-XX:+AlwaysPreTouch"
export OPTIONS="$OPTIONS -J-Xms1g -J-Xmx25g"
#export OPTIONS="$OPTIONS -J-Xms900m -J-Xmx900m -J-XX:+AlwaysPreTouch"
export PENPOT_HTTP_SERVER_IO_THREADS=2 export PENPOT_HTTP_SERVER_IO_THREADS=2
export PENPOT_HTTP_SERVER_WORKER_THREADS=2 export PENPOT_HTTP_SERVER_WORKER_THREADS=2
@ -33,11 +33,10 @@ export PENPOT_HTTP_SERVER_WORKER_THREADS=2
# export OPTIONS="$OPTIONS -J-Xint" # export OPTIONS="$OPTIONS -J-Xint"
# Setup GC # Setup GC
export OPTIONS="$OPTIONS -J-XX:+UseG1GC -J-Xlog:gc:logs/gc.log" export OPTIONS="$OPTIONS -J-XX:+UseG1GC -J-Xlog:gc:logs/gc.log"
# Setup GC # Setup GC
#export OPTIONS="$OPTIONS -J-XX:+UseZGC -J-XX:+ZGenerational -J-Xlog:gc:gc.log" #export OPTIONS="$OPTIONS -J-XX:+UseZGC -J-XX:+ZGenerational -J-Xlog:gc:logs/gc.log"
# Enable ImageMagick v7.x support # Enable ImageMagick v7.x support
# export OPTIONS="-J-Dim4java.useV7=true $OPTIONS"; # export OPTIONS="-J-Dim4java.useV7=true $OPTIONS";
@ -46,4 +45,4 @@ export OPTIONS_EVAL="nil"
# export OPTIONS_EVAL="(set! *warn-on-reflection* true)" # export OPTIONS_EVAL="(set! *warn-on-reflection* true)"
set -ex set -ex
exec clojure $OPTIONS -M -e "$OPTIONS_EVAL" -m rebel-readline.main exec clojure $OPTIONS -M -e "$OPTIONS_EVAL" -m rebel-readline.main

View file

@ -16,6 +16,7 @@
[app.common.files.migrations :as fmg] [app.common.files.migrations :as fmg]
[app.common.files.shapes-helpers :as cfsh] [app.common.files.shapes-helpers :as cfsh]
[app.common.files.validate :as cfv] [app.common.files.validate :as cfv]
[app.common.fressian :as fres]
[app.common.geom.matrix :as gmt] [app.common.geom.matrix :as gmt]
[app.common.geom.point :as gpt] [app.common.geom.point :as gpt]
[app.common.geom.rect :as grc] [app.common.geom.rect :as grc]
@ -48,18 +49,18 @@
[app.rpc.commands.files-snapshot :as fsnap] [app.rpc.commands.files-snapshot :as fsnap]
[app.rpc.commands.media :as cmd.media] [app.rpc.commands.media :as cmd.media]
[app.storage :as sto] [app.storage :as sto]
[app.storage.impl :as impl]
[app.storage.tmp :as tmp] [app.storage.tmp :as tmp]
[app.svgo :as svgo] [app.svgo :as svgo]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.cache :as cache]
[app.util.events :as events] [app.util.events :as events]
[app.util.pointer-map :as pmap] [app.util.pointer-map :as pmap]
[app.util.time :as dt] [app.util.time :as dt]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
[clojure.set :refer [rename-keys]] [clojure.set :refer [rename-keys]]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.fs :as fs]
[datoteka.io :as io] [datoteka.io :as io]
[promesa.exec :as px]
[promesa.util :as pu])) [promesa.util :as pu]))
(def ^:dynamic *stats* (def ^:dynamic *stats*
@ -68,7 +69,7 @@
(def ^:dynamic *cache* (def ^:dynamic *cache*
"A dynamic var for setting up a cache instance." "A dynamic var for setting up a cache instance."
nil) false)
(def ^:dynamic *skip-on-graphic-error* (def ^:dynamic *skip-on-graphic-error*
"A dynamic var for setting up the default error behavior for graphics processing." "A dynamic var for setting up the default error behavior for graphics processing."
@ -100,6 +101,8 @@
(some? data) (some? data)
(assoc :data (blob/decode data)))) (assoc :data (blob/decode data))))
(set! *warn-on-reflection* true)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE PREPARATION BEFORE MIGRATION ;; FILE PREPARATION BEFORE MIGRATION
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -1296,7 +1299,7 @@
(try (try
(let [item (if (str/starts-with? href "data:") (let [item (if (str/starts-with? href "data:")
(let [[mtype data] (parse-datauri href) (let [[mtype data] (parse-datauri href)
size (alength data) size (alength ^bytes data)
path (tmp/tempfile :prefix "penpot.media.download.") path (tmp/tempfile :prefix "penpot.media.download.")
written (io/write-to-file! data path :size size)] written (io/write-to-file! data path :size size)]
@ -1365,27 +1368,49 @@
{::sql/columns [:media-id]})] {::sql/columns [:media-id]})]
(:media-id fmobject))) (:media-id fmobject)))
(defn- get-sobject-content (defn get-sobject-content
[id] [id]
(let [storage (::sto/storage *system*) (let [storage (::sto/storage *system*)
sobject (sto/get-object storage id)] sobject (sto/get-object storage id)]
(when-not sobject
(throw (RuntimeException. "sobject is nil")))
(when (> (:size sobject) 1135899)
(throw (RuntimeException. "svg too big")))
(with-open [stream (sto/get-object-data storage sobject)] (with-open [stream (sto/get-object-data storage sobject)]
(slurp stream)))) (slurp stream))))
(defn get-optimized-svg
[sid]
(let [svg-text (get-sobject-content sid)
svg-text (svgo/optimize *system* svg-text)]
(csvg/parse svg-text)))
(def base-path "/data/cache")
(defn get-sobject-cache-path
[sid]
(let [path (impl/id->path sid)]
(fs/join base-path path)))
(defn get-cached-svg
[sid]
(let [path (get-sobject-cache-path sid)]
(if (fs/exists? path)
(with-open [^java.lang.AutoCloseable stream (io/input-stream path)]
(let [reader (fres/reader stream)]
(fres/read! reader)))
(get-optimized-svg sid))))
(defn- create-shapes-for-svg (defn- create-shapes-for-svg
[{:keys [id] :as mobj} file-id objects frame-id position] [{:keys [id] :as mobj} file-id objects frame-id position]
(let [get-svg (fn [sid] (let [sid (resolve-sobject-id id)
(let [svg-text (get-sobject-content sid) svg-data (if *cache*
svg-text (svgo/optimize *system* svg-text)] (get-cached-svg sid)
(-> (csvg/parse svg-text) (get-optimized-svg sid))
(assoc :name (:name mobj))))) svg-data (collect-and-persist-images svg-data file-id id)
svg-data (assoc svg-data :name (:name mobj))]
sid (resolve-sobject-id id)
svg-data (if (cache/cache? *cache*)
(cache/get *cache* sid (px/wrap-bindings get-svg))
(get-svg sid))
svg-data (collect-and-persist-images svg-data file-id id)]
(sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false))) (sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false)))
@ -1664,6 +1689,7 @@
(db/update! conn :file (db/update! conn :file
{:data (blob/encode (:data file)) {:data (blob/encode (:data file))
:features (db/create-array conn "text" (:features file)) :features (db/create-array conn "text" (:features file))
:version (:version file)
:revn (:revn file)} :revn (:revn file)}
{:id (:id file)}))) {:id (:id file)})))
@ -1713,7 +1739,7 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn migrate-file! (defn migrate-file!
[system file-id & {:keys [validate? skip-on-graphic-error? label]}] [system file-id & {:keys [validate? skip-on-graphic-error? label rown]}]
(let [tpoint (dt/tpoint) (let [tpoint (dt/tpoint)
err (volatile! false)] err (volatile! false)]
@ -1753,24 +1779,14 @@
components (get @*file-stats* :processed-components 0) components (get @*file-stats* :processed-components 0)
graphics (get @*file-stats* :processed-graphics 0)] graphics (get @*file-stats* :processed-graphics 0)]
(if (cache/cache? *cache*) (l/dbg :hint "migrate:file:end"
(let [cache-stats (cache/stats *cache*)] :file-id (str file-id)
(l/dbg :hint "migrate:file:end" :graphics graphics
:file-id (str file-id) :components components
:graphics graphics :validate validate?
:components components :rown rown
:validate validate? :error @err
:crt (mth/to-fixed (:hit-rate cache-stats) 2) :elapsed (dt/format-duration elapsed))
:crq (str (:req-count cache-stats))
:error @err
:elapsed (dt/format-duration elapsed)))
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:error @err
:elapsed (dt/format-duration elapsed)))
(some-> *stats* (swap! update :processed-files (fnil inc 0))) (some-> *stats* (swap! update :processed-files (fnil inc 0)))
(some-> *team-stats* (swap! update :processed-files (fnil inc 0))))))))) (some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
@ -1832,21 +1848,9 @@
(when-not @err (when-not @err
(some-> *stats* (swap! update :processed-teams (fnil inc 0)))) (some-> *stats* (swap! update :processed-teams (fnil inc 0))))
(if (cache/cache? *cache*) (l/dbg :hint "migrate:team:end"
(let [cache-stats (cache/stats *cache*)] :team-id (dm/str team-id)
(l/dbg :hint "migrate:team:end" :files files
:team-id (dm/str team-id) :components components
:files files :graphics graphics
:components components :elapsed (dt/format-duration elapsed))))))))
:graphics graphics
:crt (mth/to-fixed (:hit-rate cache-stats) 2)
:crq (str (:req-count cache-stats))
:error @err
:elapsed (dt/format-duration elapsed)))
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files files
:components components
:graphics graphics
:elapsed (dt/format-duration elapsed)))))))))

View file

@ -7,18 +7,19 @@
(ns app.srepl.components-v2 (ns app.srepl.components-v2
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.fressian :as fres]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.features.components-v2 :as feat] [app.features.components-v2 :as feat]
[app.main :as main] [app.main :as main]
[app.srepl.helpers :as h] [app.srepl.helpers :as h]
[app.svgo :as svgo] [app.svgo :as svgo]
[app.util.cache :as cache]
[app.util.events :as events] [app.util.events :as events]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.fs :as fs]
[datoteka.io :as io]
[promesa.exec :as px] [promesa.exec :as px]
[promesa.exec.semaphore :as ps] [promesa.exec.semaphore :as ps]
[promesa.util :as pu])) [promesa.util :as pu]))
@ -68,7 +69,8 @@
(def ^:private sql:get-teams-by-created-at (def ^:private sql:get-teams-by-created-at
"WITH teams AS ( "WITH teams AS (
SELECT id, features SELECT id, features,
row_number() OVER (ORDER BY created_at) AS rown
FROM team FROM team
WHERE deleted_at IS NULL WHERE deleted_at IS NULL
ORDER BY created_at DESC ORDER BY created_at DESC
@ -77,6 +79,7 @@
(def ^:private sql:get-teams-by-graphics (def ^:private sql:get-teams-by-graphics
"WITH teams AS ( "WITH teams AS (
SELECT t.id, t.features, SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT count(*) (SELECT count(*)
FROM file_media_object AS fmo FROM file_media_object AS fmo
JOIN file AS f ON (f.id = fmo.file_id) JOIN file AS f ON (f.id = fmo.file_id)
@ -93,6 +96,7 @@
(def ^:private sql:get-teams-by-activity (def ^:private sql:get-teams-by-activity
"WITH teams AS ( "WITH teams AS (
SELECT t.id, t.features, SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at)) (SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at))
FROM file AS f FROM file AS f
JOIN project AS p ON (f.project_id = p.id) JOIN project AS p ON (f.project_id = p.id)
@ -107,24 +111,16 @@
) )
SELECT * FROM teams %(pred)s") SELECT * FROM teams %(pred)s")
(def ^:private sql:get-teams-by-report
"WITH teams AS (
SELECT t.id t.features, mr.name
FROM migration_team_report AS mr
JOIN team AS t ON (t.id = mr.team_id)
WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM teams %(pred)s")
(def ^:private sql:get-files-by-created-at (def ^:private sql:get-files-by-created-at
"SELECT id, features "SELECT id, features,
row_number() OVER (ORDER BY created_at DESC) AS rown
FROM file FROM file
WHERE deleted_at IS NULL WHERE deleted_at IS NULL
ORDER BY created_at DESC") ORDER BY created_at DESC")
(def ^:private sql:get-files-by-modified-at (def ^:private sql:get-files-by-modified-at
"SELECT id, features "SELECT id, features
row_number() OVER (ORDER BY modified_at DESC) AS rown
FROM file FROM file
WHERE deleted_at IS NULL WHERE deleted_at IS NULL
ORDER BY modified_at DESC") ORDER BY modified_at DESC")
@ -132,6 +128,7 @@
(def ^:private sql:get-files-by-graphics (def ^:private sql:get-files-by-graphics
"WITH files AS ( "WITH files AS (
SELECT f.id, f.features, SELECT f.id, f.features,
row_number() OVER (ORDER BY modified_at) AS rown,
(SELECT count(*) FROM file_media_object AS fmo (SELECT count(*) FROM file_media_object AS fmo
WHERE fmo.mtype = 'image/svg+xml' WHERE fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false AND fmo.is_local = false
@ -141,16 +138,6 @@
ORDER BY 3 ASC ORDER BY 3 ASC
) SELECT * FROM files %(pred)s") ) SELECT * FROM files %(pred)s")
(def ^:private sql:get-files-by-report
"WITH files AS (
SELECT f.id, f.features, mr.label
FROM migration_file_report AS mr
JOIN file AS f ON (f.id = mr.file_id)
WHERE f.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM files %(pred)s")
(defn- read-pred (defn- read-pred
[entries] [entries]
(let [entries (if (and (vector? entries) (let [entries (if (and (vector? entries)
@ -181,8 +168,7 @@
sql (case query sql (case query
:created-at sql:get-teams-by-created-at :created-at sql:get-teams-by-created-at
:activity sql:get-teams-by-activity :activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics :graphics sql:get-teams-by-graphics)
:report sql:get-teams-by-report)
sql (if pred sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)] (let [[pred-sql & pred-params] (read-pred pred)]
(apply vector (apply vector
@ -193,8 +179,7 @@
(->> (db/cursor conn sql {:chunk-size 500}) (->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row) (map feat/decode-row)
(remove (fn [{:keys [features]}] (remove (fn [{:keys [features]}]
(contains? features "components/v2"))) (contains? features "components/v2"))))))
(map :id))))
(defn- get-files (defn- get-files
[conn query pred] [conn query pred]
@ -202,8 +187,7 @@
sql (case query sql (case query
:created-at sql:get-files-by-created-at :created-at sql:get-files-by-created-at
:modified-at sql:get-files-by-modified-at :modified-at sql:get-files-by-modified-at
:graphics sql:get-files-by-graphics :graphics sql:get-files-by-graphics)
:report sql:get-files-by-report)
sql (if pred sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)] (let [[pred-sql & pred-params] (read-pred pred)]
(apply vector (apply vector
@ -214,60 +198,7 @@
(->> (db/cursor conn sql {:chunk-size 500}) (->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row) (map feat/decode-row)
(remove (fn [{:keys [features]}] (remove (fn [{:keys [features]}]
(contains? features "components/v2"))) (contains? features "components/v2"))))))
(map :id))))
(def ^:private sql:team-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_team_report (
id bigserial NOT NULL,
label text NOT NULL,
team_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id))")
(def ^:private sql:file-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_file_report (
id bigserial NOT NULL,
label text NOT NULL,
file_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id))")
(defn- create-report-tables!
[system]
(db/exec-one! system [sql:team-report-table])
(db/exec-one! system [sql:file-report-table]))
(defn- clean-team-reports!
[system label]
(db/delete! system :migration-team-report {:label label}))
(defn- team-report!
[system team-id label elapsed error]
(db/insert! system :migration-team-report
{:label label
:team-id team-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
(defn- clean-file-reports!
[system label]
(db/delete! system :migration-file-report {:label label}))
(defn- file-report!
[system file-id label elapsed error]
(db/insert! system :migration-file-report
{:label label
:file-id file-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
@ -280,11 +211,7 @@
skip-on-graphic-error? true}}] skip-on-graphic-error? true}}]
(l/dbg :hint "migrate:start" :rollback rollback?) (l/dbg :hint "migrate:start" :rollback rollback?)
(let [tpoint (dt/tpoint) (let [tpoint (dt/tpoint)
file-id (h/parse-uuid file-id) file-id (h/parse-uuid file-id)]
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
nil)]
(binding [feat/*stats* (atom {}) (binding [feat/*stats* (atom {})
feat/*cache* cache] feat/*cache* cache]
@ -315,12 +242,7 @@
(let [team-id (h/parse-uuid team-id) (let [team-id (h/parse-uuid team-id)
stats (atom {}) stats (atom {})
tpoint (dt/tpoint) tpoint (dt/tpoint)]
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
nil)]
(add-watch stats :progress-report (report-progress-files tpoint)) (add-watch stats :progress-report (report-progress-files tpoint))
@ -347,7 +269,7 @@
"A REPL helper for migrate all teams. "A REPL helper for migrate all teams.
This function starts multiple concurrent team migration processes This function starts multiple concurrent team migration processes
until thw maximum number of jobs is reached which by default has the until the maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option. value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify If you want to run this on multiple machines you will need to specify
@ -383,41 +305,30 @@
sjobs (ps/create :permits max-jobs) sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs) sprocs (ps/create :permits max-procs)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
nil)
migrate-team migrate-team
(fn [team-id] (fn [team-id]
(let [tpoint (dt/tpoint)] (try
(try (db/tx-run! (assoc main/system ::db/rollback rollback?)
(db/tx-run! (assoc main/system ::db/rollback rollback?) (fn [system]
(fn [system] (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) (feat/migrate-team! system team-id
(feat/migrate-team! system team-id :label label
:label label :validate? validate?
:validate? validate? :skip-on-graphic-error? skip-on-graphic-error?)))
:skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label) (catch Throwable cause
(team-report! main/system team-id label (tpoint) nil)) (l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id))
(catch Throwable cause (events/tap :error
(l/wrn :hint "unexpected error on processing team (skiping)" (ex-info "unexpected error on processing team (skiping)"
:team-id (str team-id)) {:team-id team-id}
cause))
(events/tap :error (swap! stats update :errors (fnil inc 0)))
(ex-info "unexpected error on processing team (skiping)"
{:team-id team-id}
cause))
(swap! stats update :errors (fnil inc 0)) (finally
(ps/release! sjobs))))
(when (string? label)
(team-report! main/system team-id label (tpoint) (ex-message cause))))
(finally
(ps/release! sjobs)))))
process-team process-team
(fn [team-id] (fn [team-id]
@ -445,23 +356,18 @@
feat/*cache* cache feat/*cache* cache
svgo/*semaphore* sprocs] svgo/*semaphore* sprocs]
(try (try
(when (string? label)
(create-report-tables! main/system)
(clean-team-reports! main/system label))
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"]) (db/exec! conn ["SET LOCAL statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) (db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-team (run! process-team
(->> (get-teams conn query pred) (->> (get-teams conn query pred)
(filter (fn [team-id] (filter (fn [{:keys [rown]}]
(if (int? partitions) (if (int? partitions)
(= current-partition (-> (uuid/hash-int team-id) (= current-partition (inc (mod rown partitions)))
(mod partitions)
(inc)))
true))) true)))
(map :id)
(take max-items))) (take max-items)))
;; Close and await tasks ;; Close and await tasks
@ -480,7 +386,6 @@
:rollback rollback? :rollback rollback?
:elapsed elapsed))))))) :elapsed elapsed)))))))
(defn migrate-files! (defn migrate-files!
"A REPL helper for migrate all files. "A REPL helper for migrate all files.
@ -521,56 +426,45 @@
sjobs (ps/create :permits max-jobs) sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs) sprocs (ps/create :permits max-procs)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
nil)
migrate-file migrate-file
(fn [file-id] (fn [file-id rown]
(let [tpoint (dt/tpoint)] (try
(try (db/tx-run! (assoc main/system ::db/rollback rollback?)
(db/tx-run! (assoc main/system ::db/rollback rollback?) (fn [system]
(fn [system] (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) (feat/migrate-file! system file-id
(feat/migrate-file! system file-id :rown rown
:label label :label label
:validate? validate? :validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?))) :skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label) (catch Throwable cause
(file-report! main/system file-id label (tpoint) nil)) (l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id))
(catch Throwable cause (events/tap :error
(l/wrn :hint "unexpected error on processing file (skiping)" (ex-info "unexpected error on processing file (skiping)"
:file-id (str file-id)) {:file-id file-id}
cause))
(events/tap :error (swap! stats update :errors (fnil inc 0)))
(ex-info "unexpected error on processing file (skiping)"
{:file-id file-id}
cause))
(swap! stats update :errors (fnil inc 0)) (finally
(ps/release! sjobs))))
(when (string? label)
(file-report! main/system file-id label (tpoint) (ex-message cause))))
(finally
(ps/release! sjobs)))))
process-file process-file
(fn [file-id] (fn [{:keys [id rown]}]
(ps/acquire! sjobs) (ps/acquire! sjobs)
(let [ts (tpoint)] (let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts))) (if (and mtime (neg? (compare mtime ts)))
(do (do
(l/inf :hint "max time constraint reached" (l/inf :hint "max time constraint reached"
:file-id (str file-id) :file-id (str id)
:elapsed (dt/format-duration ts)) :elapsed (dt/format-duration ts))
(ps/release! sjobs) (ps/release! sjobs)
(reduced nil)) (reduced nil))
(px/run! executor (partial migrate-file file-id)))))] (px/run! executor (partial migrate-file id rown)))))]
(l/dbg :hint "migrate:start" (l/dbg :hint "migrate:start"
:label label :label label
@ -584,22 +478,16 @@
feat/*cache* cache feat/*cache* cache
svgo/*semaphore* sprocs] svgo/*semaphore* sprocs]
(try (try
(when (string? label)
(create-report-tables! main/system)
(clean-file-reports! main/system label))
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"]) (db/exec! conn ["SET LOCAL statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) (db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-file (run! process-file
(->> (get-files conn query pred) (->> (get-files conn query pred)
(filter (fn [file-id] (filter (fn [{:keys [rown] :as row}]
(if (int? partitions) (if (int? partitions)
(= current-partition (-> (uuid/hash-int file-id) (= current-partition (inc (mod rown partitions)))
(mod partitions)
(inc)))
true))) true)))
(take max-items))) (take max-items)))
@ -619,6 +507,100 @@
:rollback rollback? :rollback rollback?
:elapsed elapsed))))))) :elapsed elapsed)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; CACHE POPULATE
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def sql:sobjects-for-cache
"SELECT id,
row_number() OVER (ORDER BY created_at) AS index
FROM storage_object
WHERE (metadata->>'~:bucket' = 'file-media-object' OR
metadata->>'~:bucket' IS NULL)
AND metadata->>'~:content-type' = 'image/svg+xml'
AND deleted_at IS NULL
AND size < 1135899
ORDER BY created_at ASC")
(defn populate-cache!
"A REPL helper for migrate all files.
This function starts multiple concurrent file migration processes
until thw maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify
the total number of partitions and the current partition.
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs] :or {max-jobs 1}}]
(let [tpoint (dt/tpoint)
factory (px/thread-factory :virtual false :prefix "penpot/cache/")
executor (px/cached-executor :factory factory)
sjobs (ps/create :permits max-jobs)
retrieve-sobject
(fn [id index]
(let [path (feat/get-sobject-cache-path id)
parent (fs/parent path)]
(try
(when-not (fs/exists? parent)
(fs/create-dir parent))
(if (fs/exists? path)
(l/inf :hint "create cache entry" :status "exists" :index index :id (str id) :path (str path))
(let [svg-data (feat/get-optimized-svg id)]
(with-open [^java.lang.AutoCloseable stream (io/output-stream path)]
(let [writer (fres/writer stream)]
(fres/write! writer svg-data)))
(l/inf :hint "create cache entry" :status "created"
:index index
:id (str id)
:path (str path))))
(catch Throwable cause
(l/wrn :hint "create cache entry"
:status "error"
:index index
:id (str id)
:path (str path)
:cause cause))
(finally
(ps/release! sjobs)))))
process-sobject
(fn [{:keys [id index]}]
(ps/acquire! sjobs)
(px/run! executor (partial retrieve-sobject id index)))]
(l/dbg :hint "migrate:start"
:max-jobs max-jobs)
(try
(binding [feat/*system* main/system]
(run! process-sobject
(db/exec! main/system [sql:sobjects-for-cache]))
;; Close and await tasks
(pu/close! executor))
{:elapsed (dt/format-duration (tpoint))}
(catch Throwable cause
(l/dbg :hint "populate:error" :cause cause))
(finally
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "populate:end"
:elapsed elapsed))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE PROCESS HELPERS ;; FILE PROCESS HELPERS

View file

@ -16,6 +16,8 @@
[clojure.set :as set] [clojure.set :as set]
[cuerdas.core :as str])) [cuerdas.core :as str]))
#?(:clj (set! *warn-on-reflection* true))
(declare reduce-objects) (declare reduce-objects)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -327,12 +329,9 @@
"Selects the shape that will be the base to add the shapes over" "Selects the shape that will be the base to add the shapes over"
[objects selected] [objects selected]
(let [;; Gets the tree-index for all the shapes (let [;; Gets the tree-index for all the shapes
indexed-shapes (indexed-shapes objects) indexed-shapes (indexed-shapes objects selected)
;; Filters the selected and retrieve a list of ids ;; Filters the selected and retrieve a list of ids
sorted-ids (->> indexed-shapes sorted-ids (map val indexed-shapes)]
(filter (comp selected second))
(map second))]
;; The first id will be the top-most ;; The first id will be the top-most
(get objects (first sorted-ids)))) (get objects (first sorted-ids))))
@ -486,43 +485,62 @@
(reduce add-element (d/ordered-set) ids))) (reduce add-element (d/ordered-set) ids)))
(defn indexed-shapes (defn- indexed-shapes
"Retrieves a list with the indexes for each element in the layer tree. "Retrieves a vector with the indexes for each element in the layer
This will be used for shift+selection." tree. This will be used for shift+selection."
[objects] [objects selected]
(letfn [(red-fn [cur-idx id] (loop [index 1
(let [[prev-idx _] (first cur-idx) result (transient [])
prev-idx (or prev-idx 0) ;; Flag to start adding elements to the index
cur-idx (conj cur-idx (d/vec2 (inc prev-idx) id))] add? false
(rec-index cur-idx id))) ;; Only add elements while we're in the selection, we finish when the selection is over
(rec-index [cur-idx id] pending (set selected)
(let [object (get objects id)] shapes (-> objects
(reduce red-fn cur-idx (reverse (:shapes object)))))] (get uuid/zero)
(into {} (rec-index '() uuid/zero)))) (get :shapes)
(rseq))]
(let [shape-id (first shapes)]
(if (and (d/not-empty? pending) shape-id)
(let [shape (get objects shape-id)
add? (or add? (contains? selected shape-id))
pending (disj pending shape-id)
result (if add?
(conj! result (d/vec2 index shape-id))
result)]
(if-let [children (get shape :shapes)]
(recur (inc index)
result
add?
pending
(concat (rseq children) (rest shapes)))
(recur (inc index)
result
add?
pending
(rest shapes))))
(persistent! result)))))
(defn expand-region-selection (defn expand-region-selection
"Given a selection selects all the shapes between the first and last in "Given a selection selects all the shapes between the first and last in
an indexed manner (shift selection)" an indexed manner (shift selection)"
[objects selection] [objects selection]
(let [indexed-shapes (indexed-shapes objects) (let [selection (if (set? selection) selection (set selection))
filter-indexes (->> indexed-shapes indexed-shapes (indexed-shapes objects selection)
(filter (comp selection second)) indexes (map key indexed-shapes)
(map first)) from (apply min indexes)
to (apply max indexes)
from (apply min filter-indexes) xform (comp
to (apply max filter-indexes)] (filter (fn [[idx _]] (and (>= idx from) (<= idx to))))
(->> indexed-shapes (map val))]
(filter (fn [[idx _]] (and (>= idx from) (<= idx to)))) (into #{} xform indexed-shapes)))
(map second)
(into #{}))))
(defn order-by-indexed-shapes (defn order-by-indexed-shapes
[objects ids] "Retrieves a ordered vector for each element in the layer tree and
(let [ids (if (set? ids) ids (set ids))] filted by selected set"
(->> (indexed-shapes objects) [objects selected]
(filter (fn [o] (contains? ids (val o)))) (let [selected (if (set? selected) selected (set selected))]
(sort-by key) (sequence (map val) (indexed-shapes objects selected))))
(map val))))
(defn get-index-replacement (defn get-index-replacement
"Given a collection of shapes, calculate their positions "Given a collection of shapes, calculate their positions