From 0d33779c955501a0f7968b4ab4aa86c696b300fa Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sat, 20 Jan 2024 22:25:05 +0100 Subject: [PATCH] :sparkles: Add support for reporting and partitions on comp-v2 migration code --- backend/src/app/features/components_v2.clj | 38 ++-- backend/src/app/srepl/components_v2.clj | 199 ++++++++++++++++----- common/src/app/common/uuid.cljc | 9 + 3 files changed, 182 insertions(+), 64 deletions(-) diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index e4c18a261..2977a475f 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -1104,7 +1104,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn migrate-file! - [system file-id & {:keys [validate? skip-on-graphic-error?]}] + [system file-id & {:keys [validate? skip-on-graphic-error? label]}] (let [tpoint (dt/tpoint)] (binding [*file-stats* (atom {}) *skip-on-graphic-error* skip-on-graphic-error?] @@ -1119,7 +1119,9 @@ (fn [system] (try (binding [*system* system] - (fsnap/take-file-snapshot! system {:file-id file-id :label "migration/components-v2"}) + (when (string? label) + (fsnap/take-file-snapshot! system {:file-id file-id + :label (str "migration/" label)})) (process-file system file-id :validate? validate?)) (catch Throwable cause @@ -1145,7 +1147,7 @@ (some-> *team-stats* (swap! update :processed/files (fnil inc 0))))))))) (defn migrate-team! - [system team-id & {:keys [validate? skip-on-graphic-error?]}] + [system team-id & {:keys [validate? skip-on-graphic-error? label]}] (l/dbg :hint "migrate:team:start" :team-id (dm/str team-id)) @@ -1156,30 +1158,30 @@ migrate-file (fn [system file-id] (migrate-file! system file-id + :label label :validate? validate? :skip-on-graphics-error? skip-on-graphic-error?)) migrate-team - (fn [{:keys [::db/conn] :as system} {:keys [id features] :as team}] - (let [features (-> features - (disj "ephimeral/v2-migration") - (conj "components/v2") - (conj "layout/grid") - (conj "styles/v2"))] + (fn [{:keys [::db/conn] :as system} team-id] + (let [{:keys [id features]} (get-team system team-id)] + (if (contains? features "components/v2") + (l/inf :hint "team already migrated") + (let [features (-> features + (disj "ephimeral/v2-migration") + (conj "components/v2") + (conj "layout/grid") + (conj "styles/v2"))] - (run! (partial migrate-file system) - (get-and-lock-files conn id)) + (run! (partial migrate-file system) + (get-and-lock-files conn id)) - (update-team-features! conn id features)))] + (update-team-features! conn id features)))))] (binding [*team-stats* (atom {}) *team-id* team-id] (try - (db/tx-run! system (fn [system] - (db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) - (let [team (get-team system team-id)] - (if (contains? (:features team) "components/v2") - (l/inf :hint "team already migrated") - (migrate-team system team))))) + (db/tx-run! system migrate-team team-id) + (catch Throwable cause (vreset! err true) (throw cause)) diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index ddc320701..5cc30fadf 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -6,8 +6,10 @@ (ns app.srepl.components-v2 (:require + [app.common.data :as d] [app.common.logging :as l] [app.common.pprint :as pp] + [app.common.uuid :as uuid] [app.db :as db] [app.features.components-v2 :as feat] [app.main :as main] @@ -57,13 +59,15 @@ :completed completed :elapsed elapsed))))) -(def ^:private sql:get-teams-1 - "SELECT id, features - FROM team - WHERE deleted_at IS NULL - ORDER BY created_at DESC") +(def ^:private sql:get-teams-by-created-at + "WITH teams AS ( + SELECT id, features + FROM team + WHERE deleted_at IS NULL + ORDER BY created_at DESC + ) SELECT * FROM TEAMS %(pred)s") -(def ^:private sql:get-teams-2 +(def ^:private sql:get-teams-by-graphics "WITH teams AS ( SELECT t.id, t.features, (SELECT count(*) @@ -74,9 +78,37 @@ AND fmo.mtype = 'image/svg+xml' AND fmo.is_local = false) AS graphics FROM team AS t + WHERE t.deleted_at IS NULL ORDER BY 3 ASC ) - SELECT * FROM teams ") + SELECT * FROM teams %(pred)s") + +(def ^:private sql:get-teams-by-activity + "WITH teams AS ( + SELECT t.id, t.features, + (SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at)) + FROM file AS f + JOIN project AS p ON (f.project_id = p.id) + WHERE p.team_id = t.id) AS updated_at, + (SELECT coalesce(count(*), 0) + FROM file AS f + JOIN project AS p ON (f.project_id = p.id) + WHERE p.team_id = t.id) AS total_files + FROM team AS t + WHERE t.deleted_at IS NULL + ORDER BY 3 DESC, 4 DESC + ) + SELECT * FROM teams %(pred)s") + +(def ^:private sql:get-teams-by-report + "WITH teams AS ( + SELECT t.id t.features, mr.name + FROM migration_report AS mr + JOIN team AS t ON (t.id = mr.team_id) + WHERE t.deleted_at IS NULL + AND mr.error IS NOT NULL + ORDER BY mr.created_at + ) SELECT id, features FROM teams %(pred)s") (defn- read-pred [entries] @@ -103,26 +135,62 @@ (apply vector sql params)))))) (defn- get-teams - [conn pred] - (let [sql (if pred - (let [[sql & params] (read-pred pred)] - (apply vector (str sql:get-teams-2 sql) params)) - [sql:get-teams-1])] + [conn query pred] + (let [query (d/nilv query :created-at) + sql (case query + :created-at sql:get-teams-by-created-at + :activity sql:get-teams-by-activity + :graphics sql:get-teams-by-graphics + :report sql:get-teams-by-report) - (->> (db/cursor conn sql) + sql (if pred + (let [[pred-sql & pred-params] (read-pred pred)] + (apply vector + (str/format sql {:pred pred-sql}) + pred-params)) + [(str/format sql {:pred ""})])] + + (->> (db/cursor conn sql {:chunk-size 500}) (map feat/decode-row) (remove (fn [{:keys [features]}] - (or (contains? features "ephimeral/v2-migration") - (contains? features "components/v2")))) + (contains? features "components/v2"))) (map :id)))) +(def ^:private sql:report-table + "CREATE UNLOGGED TABLE IF NOT EXISTS migration_report ( + id bigserial NOT NULL, + label text NOT NULL, + team_id UUID NOT NULL, + error text NULL, + created_at timestamptz NOT NULL DEFAULT now(), + elapsed bigint NOT NULL, + PRIMARY KEY (label, created_at, id) + )") + +(defn- create-report-table! + [system] + (db/exec-one! system [sql:report-table])) + +(defn- clean-reports! + [system label] + (db/delete! system :migration-report {:label label})) + +(defn- report! + [system team-id label elapsed error] + (db/insert! system :migration-report + {:label label + :team-id team-id + :elapsed (inst-ms elapsed) + :error error} + {::db/return-keys false})) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; PUBLIC API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn migrate-file! - [file-id & {:keys [rollback? validate?] :or {rollback? true validate? false}}] + [file-id & {:keys [rollback? validate? label] :or {rollback? true validate? false}}] (l/dbg :hint "migrate:start" :rollback rollback?) (let [tpoint (dt/tpoint) file-id (if (string? file-id) @@ -131,7 +199,9 @@ (binding [feat/*stats* (atom {})] (try (-> (assoc main/system ::db/rollback rollback?) - (feat/migrate-file! file-id :validate? validate?)) + (feat/migrate-file! file-id + :validate? validate? + :label label)) (-> (deref feat/*stats*) (assoc :elapsed (dt/format-duration (tpoint)))) @@ -144,7 +214,7 @@ (l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed))))))) (defn migrate-team! - [team-id & {:keys [rollback? skip-on-graphic-error? validate?] + [team-id & {:keys [rollback? skip-on-graphic-error? validate? label] :or {rollback? true validate? true skip-on-graphic-error? false}}] @@ -163,6 +233,7 @@ (try (-> (assoc main/system ::db/rollback rollback?) (feat/migrate-team! team-id + :label label :validate? validate? :skip-on-graphics-error? skip-on-graphic-error?)) (print-stats! @@ -181,17 +252,29 @@ This function starts multiple concurrent team migration processes until thw maximum number of jobs is reached which by default has the - value of `1`. This is controled with the `:max-jobs` option." + value of `1`. This is controled with the `:max-jobs` option. - [& {:keys [max-jobs max-items max-time rollback? validate? + If you want to run this on multiple machines you will need to specify + the total number of partitions and the current partition. + + In order to get the report table populated, you will need to provide + a correct `:label`. That label is also used for persist a file + snaphot before continue with the migration." + [& {:keys [max-jobs max-items max-time rollback? validate? query pred max-procs cache on-start on-progress on-error on-end - skip-on-graphic-error?] + skip-on-graphic-error? label partitions current-partition] :or {validate? false rollback? true max-jobs 1 skip-on-graphic-error? true max-items Long/MAX_VALUE}}] + (when (int? partitions) + (when-not (int? current-partition) + (throw (IllegalArgumentException. "missing `current-partition` parameter"))) + (when-not (<= 0 current-partition partitions) + (throw (IllegalArgumentException. "invalid value on `current-partition` parameter")))) + (let [stats (atom {}) tpoint (dt/tpoint) mtime (some-> max-time dt/duration) @@ -207,8 +290,32 @@ (cache/create :executor :same-thread :max-items cache) nil) - migrate-team + (fn [team-id] + (let [tpoint (dt/tpoint)] + (try + (db/tx-run! (assoc main/system ::db/rollback rollback?) + (fn [system] + (db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) + (feat/migrate-team! system team-id + :label label + :validate? validate? + :skip-on-graphics-error? skip-on-graphic-error?))) + + (when (string? label) + (report! main/system label team-id (tpoint) nil)) + + (catch Throwable cause + (l/wrn :hint "unexpected error on processing team (skiping)" + :team-id (str team-id) + :cause cause) + (when (string? label) + (report! main/system label team-id (tpoint) (ex-message cause)))) + + (finally + (ps/release! sjobs))))) + + process-team (fn [team-id] (ps/acquire! sjobs) (let [ts (tpoint)] @@ -220,25 +327,15 @@ (ps/release! sjobs) (reduced nil)) - (px/run! executor (fn [] - (try - (-> (assoc main/system ::db/rollback rollback?) - (feat/migrate-team! team-id - :validate? validate? - :skip-on-graphics-error? skip-on-graphic-error?)) - - (catch Throwable cause - (l/wrn :hint "unexpected error on processing team (skiping)" - :team-id (str team-id) - :cause cause)) - - (finally - (ps/release! sjobs))))))))] + (px/run! executor (partial migrate-team team-id)))))] (l/dbg :hint "migrate:start" + :label label :rollback rollback? :max-jobs max-jobs - :max-items max-items) + :max-items max-items + :partitions partitions + :current-partition current-partition) (add-watch stats :progress-report (report-progress-teams tpoint on-progress)) @@ -249,16 +346,26 @@ (when (fn? on-start) (on-start {:rollback rollback?})) - (db/tx-run! main/system - (fn [{:keys [::db/conn]}] - (db/exec! conn ["SET statement_timeout = 0;"]) - (db/exec! conn ["SET idle_in_transaction_session_timeout = 0;"]) - (run! migrate-team - (->> (get-teams conn pred) - (take max-items))))) + (when (string? label) + (create-report-table! main/system) + (clean-reports! main/system label)) - ;; Close and await tasks - (pu/close! executor) + (db/tx-run! main/system + (fn [{:keys [::db/conn] :as system}] + (db/exec! conn ["SET statement_timeout = 0"]) + (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) + + (run! process-team + (->> (get-teams conn query pred) + (take max-items) + (filter (fn [team-id] + (if (and (int? current-partition) (int? partitions)) + (let [partition (mod (uuid/hash-int team-id) partitions)] + (= (dec current-partition) partition)) + true))))) + + ;; Close and await tasks + (pu/close! executor))) (if (fn? on-end) (-> (deref stats) diff --git a/common/src/app/common/uuid.cljc b/common/src/app/common/uuid.cljc index b205c6453..2086a0a5b 100644 --- a/common/src/app/common/uuid.cljc +++ b/common/src/app/common/uuid.cljc @@ -75,3 +75,12 @@ with base62. It is only safe to use with uuid v4 and penpot custom v8" [id] (impl/short-v8 (dm/str id)))) + +#?(:clj + (defn hash-int + [id] + (let [a (.getMostSignificantBits ^UUID id) + b (.getLeastSignificantBits ^UUID id)] + (+ (clojure.lang.Murmur3/hashLong a) + (clojure.lang.Murmur3/hashLong b))))) +