0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-24 07:29:08 -05:00

Add better progress reporting

For components migration and for binfile import process
This commit is contained in:
Andrey Antukh 2024-01-23 19:52:23 +01:00
parent 7f60946204
commit cdf312fdd9
10 changed files with 215 additions and 152 deletions

View file

@ -20,7 +20,6 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.http.sse :as sse]
[app.loggers.audit :as-alias audit] [app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.media :as media] [app.media :as media]
@ -30,6 +29,7 @@
[app.storage :as sto] [app.storage :as sto]
[app.storage.tmp :as tmp] [app.storage.tmp :as tmp]
[app.tasks.file-gc] [app.tasks.file-gc]
[app.util.events :as events]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.java.io :as jio] [clojure.java.io :as jio]
@ -475,9 +475,6 @@
features (cfeat/get-team-enabled-features cf/flags team)] features (cfeat/get-team-enabled-features cf/flags team)]
(sse/tap {:type :import-progress
:section :read-import})
;; Process all sections ;; Process all sections
(run! (fn [section] (run! (fn [section]
(l/dbg :hint "reading section" :section section ::l/sync? true) (l/dbg :hint "reading section" :section section ::l/sync? true)
@ -487,8 +484,7 @@
(assoc ::section section) (assoc ::section section)
(assoc ::input input))] (assoc ::input input))]
(binding [bfc/*options* options] (binding [bfc/*options* options]
(sse/tap {:type :import-progress (events/tap :progress {:op :import :section section})
:section section})
(read-section options)))) (read-section options))))
[:v1/metadata :v1/files :v1/rels :v1/sobjects]) [:v1/metadata :v1/files :v1/rels :v1/sobjects])

View file

@ -18,12 +18,12 @@
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.http.sse :as sse]
[app.loggers.audit :as-alias audit] [app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.media :as media] [app.media :as media]
[app.storage :as sto] [app.storage :as sto]
[app.storage.tmp :as tmp] [app.storage.tmp :as tmp]
[app.util.events :as events]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.set :as set] [clojure.set :as set]
@ -116,13 +116,15 @@
(defn- write-team! (defn- write-team!
[cfg team-id] [cfg team-id]
(sse/tap {:type :export-progress
:section :write-team
:id team-id})
(let [team (bfc/get-team cfg team-id) (let [team (bfc/get-team cfg team-id)
fonts (bfc/get-fonts cfg team-id)] fonts (bfc/get-fonts cfg team-id)]
(events/tap :progress
{:op :export
:section :write-team
:id team-id
:name (:name team)})
(l/trc :hint "write" :obj "team" (l/trc :hint "write" :obj "team"
:id (str team-id) :id (str team-id)
:fonts (count fonts)) :fonts (count fonts))
@ -138,28 +140,29 @@
(defn- write-project! (defn- write-project!
[cfg project-id] [cfg project-id]
(sse/tap {:type :export-progress
:section :write-project
:id project-id})
(let [project (bfc/get-project cfg project-id)] (let [project (bfc/get-project cfg project-id)]
(events/tap :progress
{:op :export
:section :write-project
:id project-id
:name (:name project)})
(l/trc :hint "write" :obj "project" :id (str project-id)) (l/trc :hint "write" :obj "project" :id (str project-id))
(write! cfg :project (str project-id) project) (write! cfg :project (str project-id) project)
(vswap! bfc/*state* update :projects conj project-id))) (vswap! bfc/*state* update :projects conj project-id)))
(defn- write-file! (defn- write-file!
[cfg file-id] [cfg file-id]
(sse/tap {:type :export-progress
:section :write-file
:id file-id})
(let [file (bfc/get-file cfg file-id) (let [file (bfc/get-file cfg file-id)
thumbs (bfc/get-file-object-thumbnails cfg file-id) thumbs (bfc/get-file-object-thumbnails cfg file-id)
media (bfc/get-file-media cfg file) media (bfc/get-file-media cfg file)
rels (bfc/get-files-rels cfg #{file-id})] rels (bfc/get-files-rels cfg #{file-id})]
(events/tap :progress
{:op :export
:section :write-file
:id file-id
:name (:name file)})
(vswap! bfc/*state* (fn [state] (vswap! bfc/*state* (fn [state]
(-> state (-> state
(update :files conj file-id) (update :files conj file-id)
@ -218,10 +221,6 @@
[{:keys [::db/conn ::bfc/timestamp] :as cfg} team-id] [{:keys [::db/conn ::bfc/timestamp] :as cfg} team-id]
(l/trc :hint "read" :obj "team" :id (str team-id)) (l/trc :hint "read" :obj "team" :id (str team-id))
(sse/tap {:type :import-progress
:section :read-team
:id team-id})
(let [team (read-obj cfg :team team-id) (let [team (read-obj cfg :team team-id)
team (-> team team (-> team
(update :id bfc/lookup-index) (update :id bfc/lookup-index)
@ -229,6 +228,12 @@
(assoc :created-at timestamp) (assoc :created-at timestamp)
(assoc :modified-at timestamp))] (assoc :modified-at timestamp))]
(events/tap :progress
{:op :import
:section :read-team
:id team-id
:name (:name team)})
(db/insert! conn :team (db/insert! conn :team
(update team :features db/encode-pgarray conn "text") (update team :features db/encode-pgarray conn "text")
::db/return-keys false) ::db/return-keys false)
@ -253,10 +258,6 @@
[{:keys [::db/conn ::bfc/timestamp] :as cfg} project-id] [{:keys [::db/conn ::bfc/timestamp] :as cfg} project-id]
(l/trc :hint "read" :obj "project" :id (str project-id)) (l/trc :hint "read" :obj "project" :id (str project-id))
(sse/tap {:type :import-progress
:section :read-project
:id project-id})
(let [project (read-obj cfg :project project-id) (let [project (read-obj cfg :project project-id)
project (-> project project (-> project
(update :id bfc/lookup-index) (update :id bfc/lookup-index)
@ -264,6 +265,12 @@
(assoc :created-at timestamp) (assoc :created-at timestamp)
(assoc :modified-at timestamp))] (assoc :modified-at timestamp))]
(events/tap :progress
{:op :import
:section :read-project
:id project-id
:name (:name project)})
(db/insert! conn :project project (db/insert! conn :project project
::db/return-keys false))) ::db/return-keys false)))
@ -271,15 +278,17 @@
[{:keys [::db/conn ::bfc/timestamp] :as cfg} file-id] [{:keys [::db/conn ::bfc/timestamp] :as cfg} file-id]
(l/trc :hint "read" :obj "file" :id (str file-id)) (l/trc :hint "read" :obj "file" :id (str file-id))
(sse/tap {:type :import-progress
:section :read-file
:id file-id})
(let [file (-> (read-obj cfg :file file-id) (let [file (-> (read-obj cfg :file file-id)
(update :id bfc/lookup-index) (update :id bfc/lookup-index)
(update :project-id bfc/lookup-index) (update :project-id bfc/lookup-index)
(bfc/process-file))] (bfc/process-file))]
(events/tap :progress
{:op :import
:section :read-file
:id file-id
:name (:name file)})
;; All features that are enabled and requires explicit migration are ;; All features that are enabled and requires explicit migration are
;; added to the state for a posterior migration step. ;; added to the state for a posterior migration step.
(doseq [feature (-> (::bfc/features cfg) (doseq [feature (-> (::bfc/features cfg)

View file

@ -41,7 +41,6 @@
[app.db :as db] [app.db :as db]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.features.fdata :as fdata] [app.features.fdata :as fdata]
[app.http.sse :as sse]
[app.media :as media] [app.media :as media]
[app.rpc.commands.files :as files] [app.rpc.commands.files :as files]
[app.rpc.commands.files-snapshot :as fsnap] [app.rpc.commands.files-snapshot :as fsnap]
@ -51,6 +50,7 @@
[app.svgo :as svgo] [app.svgo :as svgo]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.cache :as cache] [app.util.cache :as cache]
[app.util.events :as events]
[app.util.pointer-map :as pmap] [app.util.pointer-map :as pmap]
[app.util.time :as dt] [app.util.time :as dt]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
@ -767,8 +767,6 @@
backup', generate main instances for all components there and remove backup', generate main instances for all components there and remove
shapes from library components. Mark the file with the :components-v2 option." shapes from library components. Mark the file with the :components-v2 option."
[file-data libraries] [file-data libraries]
(sse/tap {:type :migration-progress
:section :components})
(let [file-data (prepare-file-data file-data libraries) (let [file-data (prepare-file-data file-data libraries)
components (ctkl/components-seq file-data)] components (ctkl/components-seq file-data)]
(if (empty? components) (if (empty? components)
@ -843,9 +841,9 @@
add-instance-grid add-instance-grid
(fn [fdata frame-id grid assets] (fn [fdata frame-id grid assets]
(reduce (fn [result [component position]] (reduce (fn [result [component position]]
(sse/tap {:type :migration-progress (events/tap :progress {:op :migrate-component
:section :components :id (:id component)
:name (:name component)}) :name (:name component)})
(add-main-instance result component frame-id (gpt/add position (add-main-instance result component frame-id (gpt/add position
(gpt/point grid-gap grid-gap)))) (gpt/point grid-gap grid-gap))))
fdata fdata
@ -881,9 +879,9 @@
(gpt/add position (gpt/point 0 (+ height (* 2 grid-gap) frame-gap)))))))))] (gpt/add position (gpt/point 0 (+ height (* 2 grid-gap) frame-gap)))))))))]
(let [total (count components)] (let [total (count components)]
(some-> *stats* (swap! update :processed/components (fnil + 0) total)) (some-> *stats* (swap! update :processed-components (fnil + 0) total))
(some-> *team-stats* (swap! update :processed/components (fnil + 0) total)) (some-> *team-stats* (swap! update :processed-components (fnil + 0) total))
(some-> *file-stats* (swap! assoc :processed/components total))) (some-> *file-stats* (swap! assoc :processed-components total)))
(add-instance-grids file-data))))) (add-instance-grids file-data)))))
@ -1143,16 +1141,14 @@
(->> (d/zip media-group grid) (->> (d/zip media-group grid)
(reduce (fn [fdata [mobj position]] (reduce (fn [fdata [mobj position]]
(sse/tap {:type :migration-progress (events/tap :progress {:op :migrate-graphic
:section :graphics :id (:id mobj)
:name (:name mobj)}) :name (:name mobj)})
(or (process fdata mobj position) fdata)) (or (process fdata mobj position) fdata))
(assoc-in fdata [:options :components-v2] true))))) (assoc-in fdata [:options :components-v2] true)))))
(defn- migrate-graphics (defn- migrate-graphics
[fdata] [fdata]
(sse/tap {:type :migration-progress
:section :graphics})
(if (empty? (:media fdata)) (if (empty? (:media fdata))
fdata fdata
(let [[fdata page-id start-pos] (let [[fdata page-id start-pos]
@ -1167,9 +1163,9 @@
groups (get-asset-groups media "Graphics")] groups (get-asset-groups media "Graphics")]
(let [total (count media)] (let [total (count media)]
(some-> *stats* (swap! update :processed/graphics (fnil + 0) total)) (some-> *stats* (swap! update :processed-graphics (fnil + 0) total))
(some-> *team-stats* (swap! update :processed/graphics (fnil + 0) total)) (some-> *team-stats* (swap! update :processed-graphics (fnil + 0) total))
(some-> *file-stats* (swap! assoc :processed/graphics total))) (some-> *file-stats* (swap! assoc :processed-graphics total)))
(loop [groups (seq groups) (loop [groups (seq groups)
fdata fdata fdata fdata
@ -1236,10 +1232,8 @@
(cfv/validate-file-schema! file)) (cfv/validate-file-schema! file))
(defn- process-file (defn- process-file
[{:keys [::db/conn] :as system} id & {:keys [validate?]}] [{:keys [::db/conn] :as system} {:keys [id] :as file} & {:keys [validate?]}]
(let [file (get-file system id) (let [libs (->> (files/get-file-libraries conn id)
libs (->> (files/get-file-libraries conn id)
(into [file] (comp (map :id) (into [file] (comp (map :id)
(map (partial get-file system)))) (map (partial get-file system))))
(d/index-by :id)) (d/index-by :id))
@ -1314,7 +1308,13 @@
(when (string? label) (when (string? label)
(fsnap/take-file-snapshot! system {:file-id file-id (fsnap/take-file-snapshot! system {:file-id file-id
:label (str "migration/" label)})) :label (str "migration/" label)}))
(process-file system file-id :validate? validate?)) (let [file (get-file system file-id)]
(events/tap :progress
{:op :migrate-file
:name (:name file)
:id (:id file)})
(process-file system file :validate? validate?)))
(catch Throwable cause (catch Throwable cause
(let [team-id *team-id*] (let [team-id *team-id*]
@ -1325,8 +1325,8 @@
(finally (finally
(let [elapsed (tpoint) (let [elapsed (tpoint)
components (get @*file-stats* :processed/components 0) components (get @*file-stats* :processed-components 0)
graphics (get @*file-stats* :processed/graphics 0)] graphics (get @*file-stats* :processed-graphics 0)]
(l/dbg :hint "migrate:file:end" (l/dbg :hint "migrate:file:end"
:file-id (str file-id) :file-id (str file-id)
@ -1335,8 +1335,8 @@
:validate validate? :validate validate?
:elapsed (dt/format-duration elapsed)) :elapsed (dt/format-duration elapsed))
(some-> *stats* (swap! update :processed/files (fnil inc 0))) (some-> *stats* (swap! update :processed-files (fnil inc 0)))
(some-> *team-stats* (swap! update :processed/files (fnil inc 0))))))))) (some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
(defn migrate-team! (defn migrate-team!
[system team-id & {:keys [validate? skip-on-graphic-error? label]}] [system team-id & {:keys [validate? skip-on-graphic-error? label]}]
@ -1355,7 +1355,7 @@
:skip-on-graphic-error? skip-on-graphic-error?)) :skip-on-graphic-error? skip-on-graphic-error?))
migrate-team migrate-team
(fn [{:keys [::db/conn] :as system} team-id] (fn [{:keys [::db/conn] :as system} team-id]
(let [{:keys [id features]} (get-team system team-id)] (let [{:keys [id features name]} (get-team system team-id)]
(if (contains? features "components/v2") (if (contains? features "components/v2")
(l/inf :hint "team already migrated") (l/inf :hint "team already migrated")
(let [features (-> features (let [features (-> features
@ -1364,6 +1364,11 @@
(conj "layout/grid") (conj "layout/grid")
(conj "styles/v2"))] (conj "styles/v2"))]
(events/tap :progress
{:op :migrate-team
:name name
:id id})
(run! (partial migrate-file system) (run! (partial migrate-file system)
(get-and-lock-files conn id)) (get-and-lock-files conn id))
@ -1380,11 +1385,12 @@
(finally (finally
(let [elapsed (tpoint) (let [elapsed (tpoint)
components (get @*team-stats* :processed/components 0) components (get @*team-stats* :processed-components 0)
graphics (get @*team-stats* :processed/graphics 0) graphics (get @*team-stats* :processed-graphics 0)
files (get @*team-stats* :processed/files 0)] files (get @*team-stats* :processed-files 0)]
(some-> *stats* (swap! update :processed/teams (fnil inc 0))) (when-not @err
(some-> *stats* (swap! update :processed-teams (fnil inc 0))))
(if (cache/cache? *cache*) (if (cache/cache? *cache*)
(let [cache-stats (cache/stats *cache*)] (let [cache-stats (cache/stats *cache*)]

View file

@ -9,11 +9,10 @@
(:refer-clojure :exclude [tap]) (:refer-clojure :exclude [tap])
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.transit :as t] [app.common.transit :as t]
[app.http.errors :as errors] [app.http.errors :as errors]
[promesa.core :as p] [app.util.events :as events]
[promesa.exec :as px] [promesa.exec :as px]
[promesa.exec.csp :as sp] [promesa.exec.csp :as sp]
[promesa.util :as pu] [promesa.util :as pu]
@ -21,26 +20,12 @@
(:import (:import
java.io.OutputStream)) java.io.OutputStream))
(def ^:dynamic *channel* nil)
(defn- write! (defn- write!
[^OutputStream output ^bytes data] [^OutputStream output ^bytes data]
(l/trc :hint "writting data" :data data :length (alength data)) (l/trc :hint "writting data" :data data :length (alength data))
(.write output data) (.write output data)
(.flush output)) (.flush output))
(defn- create-writer-loop
[^OutputStream output]
(try
(loop []
(when-let [event (sp/take! *channel*)]
(let [result (ex/try! (write! output event))]
(if (ex/exception? result)
(l/wrn :hint "unexpected exception on sse writer" :cause result)
(recur)))))
(finally
(pu/close! output))))
(defn- encode (defn- encode
[[name data]] [[name data]]
(try (try
@ -61,13 +46,6 @@
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
"Pragma" "no-cache"}) "Pragma" "no-cache"})
(defn tap
([data] (tap "event" data))
([name data]
(when-let [channel *channel*]
(sp/put! channel [name data])
nil)))
(defn response (defn response
[handler & {:keys [buf] :or {buf 32} :as opts}] [handler & {:keys [buf] :or {buf 32} :as opts}]
(fn [request] (fn [request]
@ -75,15 +53,18 @@
::rres/status 200 ::rres/status 200
::rres/body (reify rres/StreamableResponseBody ::rres/body (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output] (-write-body-to-stream [_ _ output]
(binding [*channel* (sp/chan :buf buf :xf (keep encode))] (binding [events/*channel* (sp/chan :buf buf :xf (keep encode))]
(let [writer (px/run! :virtual (partial create-writer-loop output))] (let [listener (events/start-listener
(partial write! output)
(partial pu/close! output))]
(try (try
(tap "end" (handler)) (let [result (handler)]
(events/tap :end result))
(catch Throwable cause (catch Throwable cause
(binding [l/*context* (errors/request->context request)] (binding [l/*context* (errors/request->context request)]
(l/err :hint "unexpected error process streaming response" (l/err :hint "unexpected error process streaming response"
:cause cause)) :cause cause))
(tap "error" (errors/handle' cause request))) (events/tap :error (errors/handle' cause request)))
(finally (finally
(sp/close! *channel*) (sp/close! events/*channel*)
(p/await! writer)))))))})) (px/await! listener)))))))}))

View file

@ -13,7 +13,8 @@
[app.db :as db] [app.db :as db]
[app.main :as main] [app.main :as main]
[app.rpc.commands.auth :as cmd.auth] [app.rpc.commands.auth :as cmd.auth]
[app.srepl.components-v2] [app.srepl.components-v2 :refer [migrate-teams!]]
[app.util.events :as events]
[app.util.json :as json] [app.util.json :as json]
[app.util.time :as dt] [app.util.time :as dt]
[cuerdas.core :as str])) [cuerdas.core :as str]))
@ -106,25 +107,36 @@
(defmethod exec-command :migrate-v2 (defmethod exec-command :migrate-v2
[_] [_]
(letfn [(on-start [{:keys [total rollback]}] (letfn [(on-progress-report [{:keys [elapsed completed errors]}]
(println (println (str/ffmt "-> Progress: completed: %, errors: %, elapsed: %"
(str/ffmt "The components/v2 migration started (rollback:%, teams:%)" completed errors elapsed)))
(if rollback "on" "off")
total))) (on-progress [{:keys [op name]}]
(case op
:migrate-team
(println (str/ffmt "-> Migrating team: \"%\"" name))
:migrate-file
(println (str/ffmt "=> Migrating file: \"%\"" name))
nil))
(on-event [[type payload]]
(case type
:progress-report (on-progress-report payload)
:progress (on-progress payload)
:error (on-error payload)
nil))
(on-progress [{:keys [total elapsed progress completed]}]
(println (str/ffmt "Progress % (total: %, completed: %, elapsed: %)"
progress total completed elapsed)))
(on-error [cause] (on-error [cause]
(println "ERR:" (ex-message cause))) (println "EE:" (ex-message cause)))]
(on-end [_] (println "The components/v2 migration started...")
(println "Migration finished"))]
(app.srepl.components-v2/migrate-teams! main/system (try
:on-start on-start (let [result (-> (partial migrate-teams! main/system {:rollback? true})
:on-error on-error (events/run-with! on-event))]
:on-progress on-progress (println (str/ffmt "Migration process finished (elapsed: %)" (:elapsed result))))
:on-end on-end))) (catch Throwable cause
(on-error cause)))))
(defmethod exec-command :default (defmethod exec-command :default
[{:keys [::cmd]}] [{:keys [::cmd]}]

View file

@ -8,13 +8,13 @@
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.pprint :as pp]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.features.components-v2 :as feat] [app.features.components-v2 :as feat]
[app.main :as main] [app.main :as main]
[app.svgo :as svgo] [app.svgo :as svgo]
[app.util.cache :as cache] [app.util.cache :as cache]
[app.util.events :as events]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[cuerdas.core :as str] [cuerdas.core :as str]
@ -29,32 +29,30 @@
;; PRIVATE HELPERS ;; PRIVATE HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- print-stats!
[stats]
(->> stats
(into (sorted-map))
(pp/pprint)))
(defn- report-progress-files (defn- report-progress-files
[tpoint] [tpoint]
(fn [_ _ oldv newv] (fn [_ _ oldv newv]
(when (not= (:processed/files oldv) (when (not= (:processed-files oldv)
(:processed/files newv)) (:processed-files newv))
(let [elapsed (tpoint)] (let [elapsed (tpoint)]
(l/dbg :hint "progress" (l/dbg :hint "progress"
:completed (:processed/files newv) :completed (:processed-files newv)
:elapsed (dt/format-duration elapsed)))))) :elapsed (dt/format-duration elapsed))))))
(defn- report-progress-teams (defn- report-progress-teams
[tpoint on-progress] [tpoint]
(fn [_ _ oldv newv] (fn [_ _ oldv newv]
(when (not= (:processed/teams oldv) (when (or (not= (:processed-teams oldv)
(:processed/teams newv)) (:processed-teams newv))
(let [completed (:processed/teams newv) (not= (:errors oldv)
(:errors newv)))
(let [completed (:processed-teams newv 0)
errors (:errors newv 0)
elapsed (dt/format-duration (tpoint))] elapsed (dt/format-duration (tpoint))]
(when (fn? on-progress) (events/tap :progress-report
(on-progress {:elapsed elapsed {:elapsed elapsed
:completed completed})) :completed completed
:errors errors})
(l/dbg :hint "progress" (l/dbg :hint "progress"
:completed completed :completed completed
:elapsed elapsed))))) :elapsed elapsed)))))
@ -235,10 +233,10 @@
(feat/migrate-team! team-id (feat/migrate-team! team-id
:label label :label label
:validate? validate? :validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)) :skip-on-graphics-error? skip-on-graphic-error?))
(print-stats!
(-> (deref feat/*stats*) (-> (deref feat/*stats*)
(assoc :elapsed (dt/format-duration (tpoint))))) (assoc :elapsed (dt/format-duration (tpoint))))
(catch Throwable cause (catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause)) (l/dbg :hint "migrate:error" :cause cause))
@ -261,8 +259,8 @@
a correct `:label`. That label is also used for persist a file a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration." snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query [& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache on-start on-progress on-error on-end pred max-procs cache skip-on-graphic-error?
skip-on-graphic-error? label partitions current-partition] label partitions current-partition]
:or {validate? false :or {validate? false
rollback? true rollback? true
max-jobs 1 max-jobs 1
@ -310,6 +308,14 @@
(l/wrn :hint "unexpected error on processing team (skiping)" (l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id) :team-id (str team-id)
:cause cause) :cause cause)
(events/tap :error
(ex-info "unexpected error on processing team (skiping)"
{:team-id team-id}
cause))
(swap! stats update :errors (fnil inc 0))
(when (string? label) (when (string? label)
(report! main/system team-id label (tpoint) (ex-message cause)))) (report! main/system team-id label (tpoint) (ex-message cause))))
@ -336,15 +342,12 @@
:max-jobs max-jobs :max-jobs max-jobs
:max-items max-items) :max-items max-items)
(add-watch stats :progress-report (report-progress-teams tpoint on-progress)) (add-watch stats :progress-report (report-progress-teams tpoint))
(binding [feat/*stats* stats (binding [feat/*stats* stats
feat/*cache* cache feat/*cache* cache
svgo/*semaphore* sprocs] svgo/*semaphore* sprocs]
(try (try
(when (fn? on-start)
(on-start {:rollback rollback?}))
(when (string? label) (when (string? label)
(create-report-table! main/system) (create-report-table! main/system)
(clean-reports! main/system label)) (clean-reports! main/system label))
@ -367,20 +370,12 @@
;; Close and await tasks ;; Close and await tasks
(pu/close! executor))) (pu/close! executor)))
(if (fn? on-end) (-> (deref stats)
(-> (deref stats) (assoc :elapsed (dt/format-duration (tpoint))))
(assoc :elapsed/total (tpoint))
(on-end))
(-> (deref stats)
(assoc :elapsed/total (tpoint))
(update :elapsed/total dt/format-duration)
(dissoc :total/teams)
(print-stats!)))
(catch Throwable cause (catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause) (l/dbg :hint "migrate:error" :cause cause)
(when (fn? on-error) (events/tap :error cause))
(on-error cause)))
(finally (finally
(let [elapsed (dt/format-duration (tpoint))] (let [elapsed (dt/format-duration (tpoint))]

View file

@ -0,0 +1,64 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.util.events
"A generic asynchronous events notifications subsystem; used mainly
for mark event points in functions and be able to attach listeners
to them. Mainly used in http.sse for progress reporting."
(:refer-clojure :exclude [tap run!])
(:require
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[promesa.exec :as px]
[promesa.exec.csp :as sp]))
(def ^:dynamic *channel* nil)
(defn channel
[]
(sp/chan :buf 32))
(defn tap
[type data]
(when-let [channel *channel*]
(sp/put! channel [type data])
nil))
(defn start-listener
[on-event on-close]
(dm/assert!
"expected active events channel"
(sp/chan? *channel*))
(px/thread
{:virtual true}
(try
(loop []
(when-let [event (sp/take! *channel*)]
(let [result (ex/try! (on-event event))]
(if (ex/exception? result)
(do
(l/wrn :hint "unexpected exception" :cause result)
(sp/close! *channel*))
(recur)))))
(finally
(on-close)))))
(defn run-with!
"A high-level facility for to run a function in context of event
emiter."
[f on-event]
(binding [*channel* (sp/chan :buf 32)]
(let [listener (start-listener on-event (constantly nil))]
(try
(f)
(finally
(sp/close! *channel*)
(px/await! listener))))))

View file

@ -612,7 +612,7 @@
(t/is (fn? result)) (t/is (fn? result))
(let [events (th/consume-sse result)] (let [events (th/consume-sse result)]
(t/is (= 8 (count events))) (t/is (= 6 (count events)))
(t/is (= :end (first (last events)))))))) (t/is (= :end (first (last events))))))))
(t/deftest get-list-of-buitin-templates (t/deftest get-list-of-buitin-templates

View file

@ -1013,7 +1013,7 @@
(rx/tap (fn [event] (rx/tap (fn [event]
(let [payload (sse/get-payload event) (let [payload (sse/get-payload event)
type (sse/get-type event)] type (sse/get-type event)]
(if (= type "event") (if (= type "progress")
(log/dbg :hint "clone-template: progress" :section (:section payload) :name (:name payload)) (log/dbg :hint "clone-template: progress" :section (:section payload) :name (:name payload))
(log/dbg :hint "clone-template: end"))))) (log/dbg :hint "clone-template: end")))))

View file

@ -735,7 +735,7 @@
(rx/tap (fn [event] (rx/tap (fn [event]
(let [payload (sse/get-payload event) (let [payload (sse/get-payload event)
type (sse/get-type event)] type (sse/get-type event)]
(if (= type "event") (if (= type "progress")
(log/dbg :hint "import-binfile: progress" :section (:section payload) :name (:name payload)) (log/dbg :hint "import-binfile: progress" :section (:section payload) :name (:name payload))
(log/dbg :hint "import-binfile: end"))))) (log/dbg :hint "import-binfile: end")))))
(rx/filter sse/end-of-stream?) (rx/filter sse/end-of-stream?)