diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index 20bea852d..0b7f1553e 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -45,6 +45,7 @@ [buddy.core.codecs :as bc] [cuerdas.core :as str] [datoteka.io :as io] + [promesa.exec :as px] [promesa.exec.semaphore :as ps])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -53,6 +54,8 @@ (def ^:dynamic *system* nil) (def ^:dynamic *stats* nil) +(def ^:dynamic *file-stats* nil) +(def ^:dynamic *team-stats* nil) (def ^:dynamic *semaphore* nil) (def ^:dynamic *skip-on-error* true) @@ -380,12 +383,10 @@ fdata (d/zip components positions))))] - (when (some? *stats*) - (let [total (count components)] - (swap! *stats* (fn [stats] - (-> stats - (update :processed/components (fnil + 0) total) - (assoc :current/components total)))))) + (let [total (count components)] + (some-> *stats* (swap! update :processed/components (fnil + 0) total)) + (some-> *team-stats* (swap! update :processed/components (fnil + 0) total)) + (some-> *file-stats* (swap! assoc :processed/components total))) (-> file-data (prepare-file-data libraries) @@ -575,8 +576,7 @@ cfsh/prepare-create-artboard-from-selection) changes (pcb/concat-changes changes changes2)] - (cp/process-changes (assoc-in fdata [:options :components-v2] true) ; Process component creation in v2 way - (:redo-changes changes) false))) + (:redo-changes changes))) (defn- migrate-graphics [fdata] @@ -591,30 +591,48 @@ (grc/rect->points))] (assoc media :points points))))) - ;; FIXME: think about what to do with existing media entries ?? grid (ctst/generate-shape-grid media position grid-gap)] - (when (some? *stats*) - (let [total (count media)] - (swap! *stats* (fn [stats] - (-> stats - (update :processed/graphics (fnil + 0) total) - (assoc :current/graphics total)))))) + (let [total (count media)] + (some-> *stats* (swap! update :processed/graphics (fnil + 0) total)) + (some-> *team-stats* (swap! update :processed/graphics (fnil + 0) total)) + (some-> *file-stats* (swap! assoc :processed/graphics total))) - (->> (d/zip media grid) - (reduce (fn [fdata [mobj position]] - (try - (process-media-object fdata page-id mobj position) - (catch Throwable cause - (l/warn :hint "unable to process file media object (skiping)" - :file-id (str (:id fdata)) - :id (str (:id mobj)) - :cause cause) + (let [factory (px/thread-factory :virtual true) + executor (px/fixed-executor :parallelism 10 :factory factory) + process (fn [mobj position] + (let [tp1 (dt/tpoint)] + (try + (process-media-object fdata page-id mobj position) + (catch Throwable cause + (l/wrn :hint "unable to process file media object (skiping)" + :file-id (str (:id fdata)) + :id (str (:id mobj)) + :cause cause) - (if-not *skip-on-error* - (throw cause) - fdata)))) - fdata))))) + (if-not *skip-on-error* + (throw cause) + fdata)) + (finally + (l/trc :hint "graphic processed" + :file-id (str (:id fdata)) + :media-id (str (:id mobj)) + :elapsed (dt/format-duration (tp1))))))) + + process (px/wrap-bindings process)] + + (try + (->> (d/zip media grid) + (map (fn [[mobj position]] + (l/trc :hint "submit graphic processing" :file-id (str (:id fdata)) :id (str (:id mobj))) + (px/submit! executor (partial process mobj position)))) + (reduce (fn [fdata promise] + (if-let [changes (deref promise)] + (cp/process-changes fdata changes false) + fdata)) + fdata)) + (finally + (.close ^java.lang.AutoCloseable executor))))))) (defn- migrate-file-data [fdata libs] @@ -665,7 +683,7 @@ (when validate? (let [errors (cfv/validate-file file libs)] (when (seq errors) - (l/err :hint "migrate:file:validation-error" + (l/wrn :hint "migrate:file:validation-error" :file-id (str (:id file)) :file-name (:name file) :errors errors)))) @@ -674,43 +692,38 @@ (defn migrate-file! [system file-id & {:keys [validate?]}] + (let [tpoint (dt/tpoint) file-id (if (string? file-id) (parse-uuid file-id) file-id)] - (try - (l/dbg :hint "migrate:file:start" :file-id (str file-id)) - (let [system (update system ::sto/storage media/configure-assets-storage)] - (db/tx-run! system - (fn [{:keys [::db/conn] :as system}] - (fsnap/take-file-snapshot! system {:file-id file-id - :label "migration/components-v2"}) + (binding [*file-stats* (atom {})] + (try + (l/dbg :hint "migrate:file:start" :file-id (str file-id)) - (binding [*system* system] - (-> (db/get conn :file {:id file-id}) - (update :features db/decode-pgarray #{}) - (process-file :validate? validate?)))))) + (let [system (update system ::sto/storage media/configure-assets-storage)] + (db/tx-run! system + (fn [{:keys [::db/conn] :as system}] + (binding [*system* system] + (fsnap/take-file-snapshot! system {:file-id file-id + :label "migration/components-v2"}) + (-> (db/get conn :file {:id file-id}) + (update :features db/decode-pgarray #{}) + (process-file :validate? validate?)))))) - (finally - (let [elapsed (tpoint) - stats (some-> *stats* deref)] - (l/dbg :hint "migrate:file:end" - :file-id (str file-id) - :components (:current/components stats 0) - :graphics (:current/graphics stats 0) - :elapsed (dt/format-duration elapsed)) + (finally + (let [elapsed (tpoint) + components (get @*file-stats* :processed/components 0) + graphics (get @*file-stats* :processed/graphics 0)] - (when (some? *stats*) - (swap! *stats* (fn [stats] - (let [elapsed (inst-ms elapsed) - completed (inc (get stats :processed/files 0)) - total (+ (get stats :elapsed/total-by-file 0) elapsed) - avg (/ (double elapsed) completed)] - (-> stats - (update :elapsed/max-by-file (fnil max 0) elapsed) - (assoc :elapsed/avg-by-file avg) - (assoc :elapsed/total-by-file total) - (assoc :processed/files completed))))))))))) + (l/dbg :hint "migrate:file:end" + :file-id (str file-id) + :graphics graphics + :components components + :elapsed (dt/format-duration elapsed)) + + (some-> *stats* (swap! update :processed/files (fnil inc 0))) + (some-> *team-stats* (swap! update :processed/files (fnil inc 0))))))))) (defn migrate-team! [system team-id & {:keys [validate?]}] @@ -719,72 +732,66 @@ (parse-uuid team-id) team-id)] (l/dbg :hint "migrate:team:start" :team-id (dm/str team-id)) - (try - ;; We execute this out of transaction because we want this - ;; change to be visible to all other sessions before starting - ;; the migration - (let [sql (str "UPDATE team SET features = " - " array_append(features, 'ephimeral/v2-migration') " - " WHERE id = ?")] - (db/exec-one! system [sql team-id])) + (binding [*team-stats* (atom {})] + (try + ;; We execute this out of transaction because we want this + ;; change to be visible to all other sessions before starting + ;; the migration + (let [sql (str "UPDATE team SET features = " + " array_append(features, 'ephimeral/v2-migration') " + " WHERE id = ?")] + (db/exec-one! system [sql team-id])) - (db/tx-run! system - (fn [{:keys [::db/conn] :as system}] - ;; Lock the team - (db/exec-one! conn ["SET idle_in_transaction_session_timeout = 0"]) + (db/tx-run! system + (fn [{:keys [::db/conn] :as system}] + ;; Lock the team + (db/exec-one! conn ["SET idle_in_transaction_session_timeout = 0"]) - (let [{:keys [features] :as team} (-> (db/get conn :team {:id team-id}) - (update :features db/decode-pgarray #{}))] + (let [{:keys [features] :as team} (-> (db/get conn :team {:id team-id}) + (update :features db/decode-pgarray #{}))] - (if (contains? features "components/v2") - (l/dbg :hint "team already migrated") - (let [sql (str/concat - "SELECT f.id FROM file AS f " - " JOIN project AS p ON (p.id = f.project_id) " - "WHERE p.team_id = ? AND f.deleted_at IS NULL AND p.deleted_at IS NULL " - "FOR UPDATE") + (if (contains? features "components/v2") + (l/dbg :hint "team already migrated") + (let [sql (str/concat + "SELECT f.id FROM file AS f " + " JOIN project AS p ON (p.id = f.project_id) " + "WHERE p.team_id = ? AND f.deleted_at IS NULL AND p.deleted_at IS NULL " + "FOR UPDATE") - rows (->> (db/exec! conn [sql team-id]) - (map :id))] + rows (->> (db/exec! conn [sql team-id]) + (map :id))] - (run! #(migrate-file! system % :validate? validate?) rows) - (some-> *stats* (swap! assoc :current/files (count rows))) + (run! #(migrate-file! system % :validate? validate?) rows) - (let [features (-> features - (disj "ephimeral/v2-migration") - (conj "components/v2") - (conj "layout/grid") - (conj "styles/v2"))] - (db/update! conn :team - {:features (db/create-array conn "text" features)} - {:id team-id}))))))) - (finally - (some-> *semaphore* ps/release!) - (let [elapsed (tpoint) - stats (some-> *stats* deref)] - (when (some? *stats*) - (swap! *stats* (fn [stats] - (let [elapsed (inst-ms elapsed) - completed (inc (get stats :processed/teams 0)) - total (+ (get stats :elapsed/total-by-team 0) elapsed) - avg (/ (double elapsed) completed)] - (-> stats - (update :elapsed/max-by-team (fnil max 0) elapsed) - (assoc :elapsed/avg-by-team avg) - (assoc :elapsed/total-by-team total) - (assoc :processed/teams completed)))))) + (let [features (-> features + (disj "ephimeral/v2-migration") + (conj "components/v2") + (conj "layout/grid") + (conj "styles/v2"))] + (db/update! conn :team + {:features (db/create-array conn "text" features)} + {:id team-id}))))))) + (finally + (some-> *semaphore* ps/release!) + (let [elapsed (tpoint)] + (some-> *stats* (swap! update :processed/teams (fnil inc 0))) - ;; We execute this out of transaction because we want this - ;; change to be visible to all other sessions before starting - ;; the migration - (let [sql (str "UPDATE team SET features = " - " array_remove(features, 'ephimeral/v2-migration') " - " WHERE id = ?")] - (db/exec-one! system [sql team-id])) + ;; We execute this out of transaction because we want this + ;; change to be visible to all other sessions before starting + ;; the migration + (let [sql (str "UPDATE team SET features = " + " array_remove(features, 'ephimeral/v2-migration') " + " WHERE id = ?")] + (db/exec-one! system [sql team-id])) - (l/dbg :hint "migrate:team:end" - :team-id (dm/str team-id) - :files (:current/files stats 0) - :elapsed (dt/format-duration elapsed))))))) + (let [components (get @*team-stats* :processed/components 0) + graphics (get @*team-stats* :processed/graphics 0) + files (get @*team-stats* :processed/files 0)] + (l/dbg :hint "migrate:team:end" + :team-id (dm/str team-id) + :files files + :components components + :graphics graphics + :elapsed (dt/format-duration elapsed))))))))) diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index e8ffb1a7f..8a14ccbc5 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -21,17 +21,9 @@ (defn- print-stats! [stats] - (let [stats (-> stats - (d/update-when :elapsed/max-by-team (comp dt/format-duration dt/duration int)) - (d/update-when :elapsed/avg-by-team (comp dt/format-duration dt/duration int)) - (d/update-when :elapsed/total-by-team (comp dt/format-duration dt/duration int)) - (d/update-when :elapsed/max-by-file (comp dt/format-duration dt/duration int)) - (d/update-when :elapsed/avg-by-file (comp dt/format-duration dt/duration int)) - (d/update-when :elapsed/total-by-file (comp dt/format-duration dt/duration int)) - )] - (->> stats - (into (sorted-map)) - (pp/pprint)))) + (->> stats + (into (sorted-map)) + (pp/pprint))) (defn- report-progress-files [tpoint] @@ -42,7 +34,7 @@ completed (:processed/files newv) progress (/ (* completed 100.0) total) elapsed (tpoint)] - (l/trc :hint "progress" + (l/dbg :hint "progress" :completed (:processed/files newv) :total (:total/files newv) :progress (str (int progress) "%") @@ -57,8 +49,11 @@ completed (:processed/teams newv) progress (/ (* completed 100.0) total) elapsed (tpoint)] - (l/trc :hint "progress" - :completed (:processed/teams newv) + (l/dbg :hint "progress" + :completed-teams (:processed/teams newv) + :completed-files (:processed/files newv) + :completed-graphics (:processed/graphics newv) + :completed-components (:processed/components newv) :progress (str (int progress) "%") :elapsed (dt/format-duration elapsed)))))) @@ -88,36 +83,35 @@ (:count res))) (defn migrate-file! - [system file-id & {:keys [rollback] :or {rollback true}}] + [system file-id & {:keys [rollback?] :or {rollback? true}}] (l/dbg :hint "migrate:start") (let [tpoint (dt/tpoint)] (try (binding [feat/*stats* (atom {})] - (-> (assoc system ::db/rollback rollback) + (-> (assoc system ::db/rollback rollback?) (feat/migrate-file! file-id)) + (-> (deref feat/*stats*) - (assoc :elapsed (dt/format-duration (tpoint))) - (dissoc :current/graphics) - (dissoc :current/components) - (dissoc :current/files))) + (assoc :elapsed (dt/format-duration (tpoint))))) (catch Throwable cause - (l/dbg :hint "migrate:error" :cause cause)) + (l/wrn :hint "migrate:error" :cause cause)) (finally (let [elapsed (dt/format-duration (tpoint))] (l/dbg :hint "migrate:end" :elapsed elapsed)))))) (defn migrate-files! - [{:keys [::db/pool] :as system} & {:keys [chunk-size max-jobs max-items start-at preset rollback skip-on-error validate] - :or {chunk-size 10 - skip-on-error true - max-jobs 10 - max-items Long/MAX_VALUE - preset :shutdown-on-failure - rollback true - validate false}}] + [{:keys [::db/pool] :as system} + & {:keys [chunk-size max-jobs max-items start-at preset rollback? skip-on-error validate?] + :or {chunk-size 10 + skip-on-error true + max-jobs 10 + max-items Long/MAX_VALUE + preset :shutdown-on-failure + rollback? true + validate? false}}] (letfn [(get-chunk [cursor] (let [sql (str/concat "SELECT id, created_at FROM file " @@ -151,17 +145,14 @@ (run! (fn [file-id] (ps/acquire! feat/*semaphore*) (px/submit! scope (fn [] - (-> (assoc system ::db/rollback rollback) - (feat/migrate-file! file-id :validate? validate))))) + (-> (assoc system ::db/rollback rollback?) + (feat/migrate-file! file-id :validate? validate?))))) (get-candidates)) (p/await! scope)) (-> (deref feat/*stats*) - (assoc :elapsed (dt/format-duration (tpoint))) - (dissoc :current/graphics) - (dissoc :current/components) - (dissoc :current/files)) + (assoc :elapsed (dt/format-duration (tpoint)))) (catch Throwable cause (l/dbg :hint "migrate:error" :cause cause)) @@ -172,8 +163,8 @@ (defn migrate-team! [{:keys [::db/pool] :as system} team-id - & {:keys [rollback skip-on-error validate] - :or {rollback true skip-on-error true validate false}}] + & {:keys [rollback? skip-on-error validate?] + :or {rollback? true skip-on-error true validate? false}}] (l/dbg :hint "migrate:start") (let [total (get-total-files pool :team-id team-id) @@ -185,15 +176,13 @@ (try (binding [feat/*stats* stats feat/*skip-on-error* skip-on-error] - (-> (assoc system ::db/rollback rollback) - (feat/migrate-team! team-id :validate? validate)) + (-> (assoc system ::db/rollback rollback?) + (feat/migrate-team! team-id :validate? validate?)) (print-stats! (-> (deref feat/*stats*) (dissoc :total/files) - (dissoc :current/graphics) - (dissoc :current/components) - (dissoc :current/files)))) + (assoc :elapsed (dt/format-duration (tpoint)))))) (catch Throwable cause (l/dbg :hint "migrate:error" :cause cause)) @@ -204,14 +193,14 @@ (defn migrate-teams! [{:keys [::db/pool] :as system} - & {:keys [chunk-size max-jobs max-items start-at rollback preset skip-on-error max-time validate] + & {:keys [chunk-size max-jobs max-items start-at rollback? preset skip-on-error max-time validate?] :or {chunk-size 10000 - rollback true + validate? false + rollback? true skip-on-error true preset :shutdown-on-failure max-jobs Integer/MAX_VALUE - max-items Long/MAX_VALUE - validate false}}] + max-items Long/MAX_VALUE}}] (letfn [(get-chunk [cursor] (let [sql (str/concat @@ -233,8 +222,8 @@ (migrate-team [team-id] (try - (-> (assoc system ::db/rollback rollback) - (feat/migrate-team! team-id :validate? validate)) + (-> (assoc system ::db/rollback rollback?) + (feat/migrate-team! team-id :validate? validate?)) (catch Throwable cause (l/err :hint "unexpected error on processing team" :team-id (dm/str team-id) :cause cause)))) @@ -242,7 +231,7 @@ (ps/acquire! feat/*semaphore*) (let [ts (tpoint)] (if (and mtime (neg? (compare mtime ts))) - (l/trc :hint "max time constraint reached" :elapsed (dt/format-duration ts)) + (l/inf :hint "max time constraint reached" :elapsed (dt/format-duration ts)) (px/submit! scope (partial migrate-team team-id)))))] (l/dbg :hint "migrate:start") @@ -270,10 +259,8 @@ (print-stats! (-> (deref feat/*stats*) - (dissoc :total/teams) - (dissoc :current/graphics) - (dissoc :current/components) - (dissoc :current/files))) + (assoc :elapsed/total (dt/format-duration (tpoint))) + (dissoc :total/teams))) (catch Throwable cause (l/dbg :hint "migrate:error" :cause cause))