@ -9,12 +9,14 @@
(:refer-clojure :exclude [parse-uuid])
[app.binfile.common :as bfc]
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.features :as cfeat]
[app.common.files.changes :as cpc]
[app.common.files.migrations :as pmg]
[app.common.files.migrations :as fmg]
[app.common.files.repair :as repair]
[app.common.files.validate :as cfv]
[app.common.files.validate :as validate]
[app.common.logging :as l]
[app.common.pprint :refer [pprint]]
@ -38,7 +40,8 @@
[expound.alpha :as expound]
[promesa.core :as p]
[promesa.exec :as px]
[promesa.exec.csp :as sp]))
[promesa.exec.semaphore :as ps]
[promesa.util :as pu]))
(def ^:dynamic *system* nil)
@ -62,15 +65,21 @@
{:data data}
{:id id}))))
(defn get-file
(defn- get-file*
"Get the migrated data of one file."
[id & {:keys [migrate?] :or {migrate? true}}]
(db/run! main/system
[system id]
(db/run! system
(fn [system]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? migrate?)
(-> (files/get-file system id :migrate? false)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {})))))))
(update :data feat.fdata/process-objects (partial into {}))
(defn get-file
"Get the migrated data of one file."
(get-file* main/system id))
(defn validate
"Validate structure, referencial integrity and semantic coherence of
@ -78,92 +87,89 @@
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(let [id (if (string? id) (parse-uuid id) id)
file (files/get-file system id)
file (get-file* system id)
libs (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? false)
(update :data feat.fdata/process-pointers deref)
(get-file* system id))))
(d/index-by :id))]
(validate/validate-file file libs))))))
(validate/validate-file file libs)))))
(defn repair!
"Repair the list of errors detected by validation."
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(let [id (if (string? id) (parse-uuid id) id)
file (files/get-file system id)
file (get-file* system id)
libs (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? false)
(update :data feat.fdata/process-pointers deref)
(get-file* system id))))
(d/index-by :id))
errors (validate/validate-file file libs)
changes (repair/repair-file file libs errors)
file (-> file
(update :revn inc)
(update :data cpc/process-changes changes)
(update :data blob/encode))]
(update :data cpc/process-changes changes))
(when (contains? (:features file) "fdata/pointer-map")
(feat.fdata/persist-pointers! system id))
file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system id)
(db/update! conn :file
{:revn (:revn file)
:data (:data file)
:data (blob/encode (:data file))
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)})
(defn update-file!
"Apply a function to the data of one file. Optionally save the changes or not.
The function receives the decoded and migrated file data."
[& {:keys [update-fn id rollback? migrate? inc-revn?]
:or {rollback? true migrate? true inc-revn? true}}]
(letfn [(process-file [{:keys [::db/conn] :as system} {:keys [features] :as file}]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system id)
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
(if (contains? features "fdata/objectd-map") omap/wrap identity)]
(let [file (cond-> (update-fn file)
[& {:keys [update-fn id rollback? inc-revn?]
:or {rollback? true inc-revn? true}}]
(letfn [(process-file [{:keys [::db/conn] :as system} file-id]
(let [file (get-file* system file-id)
file (cond-> (update-fn file)
inc-revn? (update :revn inc))
features (db/create-array conn "text" (:features file))
data (blob/encode (:data file))]
_ (cfv/validate-file-schema! file)
file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system id)
(db/update! conn :file
{:data data
:revn (:revn file)
:features features}
{:id id}))
(when (contains? (:features file) "fdata/pointer-map")
(feat.fdata/persist-pointers! system id))
{:data (blob/encode (:data file))
:features (db/create-array conn "text" (:features file))
:revn (:revn file)}
{:id (:id file)})
(dissoc file :data)))]
(db/tx-run! (or *system* main/system)
(db/tx-run! (or *system* (assoc main/system ::db/rollback rollback?))
(fn [system]
(binding [*system* system]
(->> (files/get-file system id :migrate? migrate?)
(process-file system))
(when rollback?
(db/rollback! system)))))))))
(process-file system id))))))
(def ^:private sql:get-file-ids
@ -190,16 +196,11 @@
(strace/print-stack-trace cause))
(process-file [{:keys [::db/conn] :as system} file-id]
(let [file (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system file-id)]
(-> (files/get-file system file-id)
(update :data feat.fdata/process-pointers deref)))
(let [file (get-file* system file-id)
libs (when with-libraries?
(->> (files/get-file-libraries conn file-id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id)
(update :data feat.fdata/process-pointers deref))))))
(get-file* system id))))
(d/index-by :id)))]
(if with-libraries?
@ -208,7 +209,7 @@
(catch Throwable cause
((or on-error on-error*) cause file)))))]
(db/tx-run! main/system
(db/tx-run! (assoc main/system ::db/rollback true)
(fn [{:keys [::db/conn] :as system}]
(binding [*system* system]
@ -217,83 +218,125 @@
(get-candidates conn)))
(when (fn? on-end)
(ex/ignoring (on-end)))
(db/rollback! system)))))))
(ex/ignoring (on-end)))))))))
(defn repair-file-media
[{:keys [id data] :as file}]
(let [conn (db/get-connection *system*)
used (bfc/collect-used-media data)
ids (db/create-array conn "uuid" used)
sql (str "SELECT * FROM file_media_object WHERE id = ANY(?)")
rows (db/exec! conn [sql ids])
index (reduce (fn [index media]
(if (not= (:file-id media) id)
(let [media-id (uuid/next)]
(l/wrn :hint "found not referenced media"
:file-id (str id)
:media-id (str (:id media)))
(db/insert! *system* :file-media-object
(-> media
(assoc :file-id id)
(assoc :id media-id)))
(assoc index (:id media) media-id))
(when (seq index)
(binding [bfc/*state* (atom {:index index})]
(update file :data (fn [fdata]
(-> fdata
(update :pages-index #'bfc/relink-shapes)
(update :components #'bfc/relink-shapes)
(update :media #'bfc/relink-media)
(defn process-files!
"Apply a function to all files in the database, reading them in
"Apply a function to all files in the database"
[& {:keys [max-items
:or {workers 1
:or {max-jobs 1
rollback? true}}]
(letfn [(get-candidates [conn]
(cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
(some? max-items)
(take max-items)))
(on-error* [cause file]
(println! "unexpected exception happened on processing file: " (:id file))
(strace/print-stack-trace cause))
(l/dbg :hint "process:start"
:rollback rollback?
:max-jobs max-jobs
:max-items max-items)
(process-file [system file-id]
(let [tpoint (dt/tpoint)
factory (px/thread-factory :virtual false :prefix "penpot/file-process/")
executor (px/cached-executor :factory factory)
sjobs (ps/create :permits max-jobs)
(fn [file-id tpoint]
(let [{:keys [features] :as file} (files/get-file system file-id)]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system file-id)
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
(if (contains? features "fdata/objectd-map") omap/wrap identity)]
(l/trc :hint "process:file:start" :file-id (str file-id))
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [{:keys [::db/conn] :as system}]
(let [file' (get-file* system file-id)
file (binding [*system* system]
(on-file file'))]
(on-file file)
(when (and (some? file)
(not (identical? file file')))
(when (contains? features "fdata/pointer-map")
(feat.fdata/persist-pointers! system file-id))))
(cfv/validate-file-schema! file)
(let [file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system file-id)
(db/update! conn :file
{:data (blob/encode (:data file))
:features (db/create-array conn "text" (:features file))
:revn (:revn file)}
{:id file-id}))))))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id)
:cause cause))
(ps/release! sjobs)
(let [elapsed (dt/format-duration (tpoint))]
(l/trc :hint "process:file:end"
:file-id (str file-id)
:elapsed elapsed)))))]
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(run! (fn [file-id]
(ps/acquire! sjobs)
(px/run! executor (partial process-file file-id (dt/tpoint))))
(->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
(take max-items)
(map :id)))
;; Close and await tasks
(pu/close! executor)))
(catch Throwable cause
((or on-error on-error*) cause file-id))))
(l/dbg :hint "process:error" :cause cause))
(run-worker [in index]
(db/tx-run! main/system
(fn [system]
(binding [*system* system]
(loop [i 0]
(when-let [file-id (sp/take! in)]
(println! "=> worker: index:" index "| loop:" i "| file:" (str file-id) "|" (px/get-name))
(process-file system file-id)
(recur (inc i)))))
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "process:end"
:rollback rollback?
:elapsed elapsed))))))
(when rollback?
(db/rollback! system)))))
(run-producer [input]
(db/tx-run! main/system
(fn [{:keys [::db/conn]}]
(doseq [file-id (get-candidates conn)]
(println! "=> producer:" file-id "|" (px/get-name))
(sp/put! input file-id))
(sp/close! input))))]
(when (fn? on-init) (on-init))
(let [input (sp/chan :buf 25)
producer (px/thread
{:name "penpot/srepl/producer"}
(run-producer input))
threads (->> (range workers)
(map (fn [index]
{:name (str "penpot/srepl/worker/" index)}
(run-worker input index))))
(cons producer)
(run! p/await! threads)
(when (fn? on-end) (on-end)))))