diff --git a/backend/resources/log4j2-experiments.xml b/backend/resources/log4j2-experiments.xml index 88542c277..3357aae31 100644 --- a/backend/resources/log4j2-experiments.xml +++ b/backend/resources/log4j2-experiments.xml @@ -48,12 +48,6 @@ - - - - - - diff --git a/backend/src/app/auth/oidc.clj b/backend/src/app/auth/oidc.clj index a189b42e6..243c08da5 100644 --- a/backend/src/app/auth/oidc.clj +++ b/backend/src/app/auth/oidc.clj @@ -474,6 +474,7 @@ [{:keys [::db/pool] :as cfg} info] (dm/with-open [conn (db/open pool)] (some->> (:email info) + (profile/clean-email) (profile/get-profile-by-email conn)))) (defn- redirect-response diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index 71b0c538a..1aaa5e268 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -77,10 +77,6 @@ internal functions without the need to explicitly pass it top down." nil) -(def ^:dynamic ^:private *team-id* - "A dynamic var that holds the current processing team-id." - nil) - (def ^:dynamic ^:private *file-stats* "An internal dynamic var for collect stats by file." nil) @@ -1194,12 +1190,11 @@ ;; The media processing adds the data to the ;; input map and returns it. (media/run {:cmd :info :input item})) + (catch Throwable _ - (let [team-id *team-id*] - (l/wrn :hint "unable to process embedded images on svg file" - :team-id (str team-id) - :file-id (str file-id) - :media-id (str media-id))) + (l/wrn :hint "unable to process embedded images on svg file" + :file-id (str file-id) + :media-id (str media-id)) nil))) (persist-image [acc {:keys [path size width height mtype href] :as item}] @@ -1332,24 +1327,20 @@ (catch Throwable cause (vreset! err true) (let [cause (pu/unwrap-exception cause) - edata (ex-data cause) - team-id *team-id*] + edata (ex-data cause)] (cond (instance? org.xml.sax.SAXParseException cause) (l/inf :hint "skip processing media object: invalid svg found" - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj))) (instance? org.graalvm.polyglot.PolyglotException cause) (l/inf :hint "skip processing media object: invalid svg found" - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj))) (= (:type edata) :not-found) (l/inf :hint "skip processing media object: underlying object does not exist" - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj))) @@ -1357,7 +1348,6 @@ (let [skip? *skip-on-graphic-error*] (l/wrn :hint "unable to process file media object" :skiped skip? - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj)) :cause cause) @@ -1452,7 +1442,7 @@ data))) (fmg/migrate-file)))) -(defn- get-team +(defn get-team [system team-id] (-> (db/get system :team {:id team-id} {::db/remove-deleted false @@ -1506,17 +1496,19 @@ AND f.deleted_at IS NULL FOR UPDATE") -(defn- get-and-lock-files +(defn get-and-lock-files [conn team-id] (->> (db/cursor conn [sql:get-and-lock-team-files team-id]) (map :id))) -(defn- update-team-features! - [conn team-id features] - (let [features (db/create-array conn "text" features)] +(defn update-team! + [conn team] + (let [params (-> team + (update :features db/encode-pgarray conn "text") + (dissoc :id))] (db/update! conn :team - {:features features} - {:id team-id}))) + params + {:id (:id team)}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; PUBLIC API @@ -1524,7 +1516,9 @@ (defn migrate-file! [system file-id & {:keys [validate? skip-on-graphic-error? label]}] - (let [tpoint (dt/tpoint)] + (let [tpoint (dt/tpoint) + err (volatile! false)] + (binding [*file-stats* (atom {}) *skip-on-graphic-error* skip-on-graphic-error?] (try @@ -1533,40 +1527,50 @@ :validate validate? :skip-on-graphic-error skip-on-graphic-error?) - (let [system (update system ::sto/storage media/configure-assets-storage)] - (db/tx-run! system - (fn [system] - (try - (binding [*system* system] - (when (string? label) - (fsnap/take-file-snapshot! system {:file-id file-id - :label (str "migration/" label)})) - (let [file (get-file system file-id)] - (events/tap :progress - {:op :migrate-file - :name (:name file) - :id (:id file)}) + (db/tx-run! (update system ::sto/storage media/configure-assets-storage) + (fn [system] + (binding [*system* system] + (when (string? label) + (fsnap/take-file-snapshot! system {:file-id file-id + :label (str "migration/" label)})) + (let [file (get-file system file-id)] + (events/tap :progress + {:op :migrate-file + :name (:name file) + :id (:id file)}) - (process-file system file :validate? validate?))) + (process-file system file :validate? validate?))))) - (catch Throwable cause - (let [team-id *team-id*] - (l/wrn :hint "error on processing file" - :team-id (str team-id) - :file-id (str file-id)) - (throw cause))))))) + (catch Throwable cause + (vreset! err true) + (l/wrn :hint "error on processing file" + :file-id (str file-id) + :cause cause) + (throw cause)) (finally (let [elapsed (tpoint) components (get @*file-stats* :processed-components 0) graphics (get @*file-stats* :processed-graphics 0)] - (l/dbg :hint "migrate:file:end" - :file-id (str file-id) - :graphics graphics - :components components - :validate validate? - :elapsed (dt/format-duration elapsed)) + (if (cache/cache? *cache*) + (let [cache-stats (cache/stats *cache*)] + (l/dbg :hint "migrate:file:end" + :file-id (str file-id) + :graphics graphics + :components components + :validate validate? + :crt (mth/to-fixed (:hit-rate cache-stats) 2) + :crq (str (:req-count cache-stats)) + :error @err + :elapsed (dt/format-duration elapsed))) + (l/dbg :hint "migrate:file:end" + :file-id (str file-id) + :graphics graphics + :components components + :validate validate? + :error @err + :elapsed (dt/format-duration elapsed))) (some-> *stats* (swap! update :processed-files (fnil inc 0))) (some-> *team-stats* (swap! update :processed-files (fnil inc 0))))))))) @@ -1588,7 +1592,7 @@ :skip-on-graphic-error? skip-on-graphic-error?)) migrate-team (fn [{:keys [::db/conn] :as system} team-id] - (let [{:keys [id features name]} (get-team system team-id)] + (let [{:keys [id features] :as team} (get-team system team-id)] (if (contains? features "components/v2") (l/inf :hint "team already migrated") (let [features (-> features @@ -1599,21 +1603,24 @@ (events/tap :progress {:op :migrate-team - :name name + :name (:name team) :id id}) (run! (partial migrate-file system) (get-and-lock-files conn id)) - (update-team-features! conn id features)))))] + (->> (assoc team :features features) + (update-team! conn))))))] - (binding [*team-stats* (atom {}) - *team-id* team-id] + (binding [*team-stats* (atom {})] (try (db/tx-run! system migrate-team team-id) (catch Throwable cause (vreset! err true) + (l/wrn :hint "error on processing team" + :team-id (str team-id) + :cause cause) (throw cause)) (finally diff --git a/backend/src/app/http/debug.clj b/backend/src/app/http/debug.clj index fe1fddc40..569e6d09d 100644 --- a/backend/src/app/http/debug.clj +++ b/backend/src/app/http/debug.clj @@ -347,7 +347,10 @@ :code :missing-force :hint "missing force checkbox")) - (let [profile (some->> params :email (profile/get-profile-by-email pool))] + (let [profile (some->> params + :email + (profile/clean-email) + (profile/get-profile-by-email pool))] (when-not profile (ex/raise :type :validation diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 900ef75f5..9cfcf9fe2 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -370,7 +370,10 @@ :fn (mg/resource "app/migrations/sql/0116-mod-file-table.sql")} {:name "0117-mod-file-object-thumbnail-table" - :fn (mg/resource "app/migrations/sql/0117-mod-file-object-thumbnail-table.sql")}]) + :fn (mg/resource "app/migrations/sql/0117-mod-file-object-thumbnail-table.sql")} + + {:name "0118-mod-task-table" + :fn (mg/resource "app/migrations/sql/0118-mod-task-table.sql")}]) (defn apply-migrations! [pool name migrations] diff --git a/backend/src/app/migrations/sql/0118-mod-task-table.sql b/backend/src/app/migrations/sql/0118-mod-task-table.sql new file mode 100644 index 000000000..d6ede0e97 --- /dev/null +++ b/backend/src/app/migrations/sql/0118-mod-task-table.sql @@ -0,0 +1,12 @@ +-- Removes the partitioning. +CREATE TABLE new_task (LIKE task INCLUDING ALL); +INSERT INTO new_task SELECT * FROM task; +ALTER TABLE task RENAME TO old_task; +ALTER TABLE new_task RENAME TO task; +DROP TABLE old_task; +ALTER INDEX new_task_label_name_queue_idx RENAME TO task__label_name_queue__idx; +ALTER INDEX new_task_scheduled_at_queue_idx RENAME TO task__scheduled_at_queue__idx; +ALTER TABLE task DROP CONSTRAINT new_task_pkey; +ALTER TABLE task ADD PRIMARY KEY (id); +ALTER TABLE task ALTER COLUMN created_at SET DEFAULT now(); +ALTER TABLE task ALTER COLUMN modified_at SET DEFAULT now(); diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index 2e82e5640..66bec377d 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -82,7 +82,8 @@ profile) (login [{:keys [::db/conn] :as cfg}] - (let [profile (->> (profile/get-profile-by-email conn email) + (let [profile (->> (profile/clean-email email) + (profile/get-profile-by-email conn) (validate-profile cfg) (profile/strip-private-attrs)) @@ -202,11 +203,12 @@ (pos? (compare elapsed register-retry-threshold)))) (defn prepare-register - [{:keys [::db/pool] :as cfg} params] + [{:keys [::db/pool] :as cfg} {:keys [email] :as params}] (validate-register-attempt! cfg params) - (let [profile (when-let [profile (profile/get-profile-by-email pool (:email params))] + (let [email (profile/clean-email email) + profile (when-let [profile (profile/get-profile-by-email pool email)] (cond (:is-blocked profile) (ex/raise :type :restriction @@ -221,7 +223,7 @@ :code :email-already-exists :hint "profile already exists"))) - params {:email (:email params) + params {:email email :password (:password params) :invitation-token (:invitation-token params) :backend "penpot" @@ -447,7 +449,8 @@ nil))] (db/with-atomic [conn pool] - (when-let [profile (profile/get-profile-by-email conn email)] + (when-let [profile (->> (profile/clean-email email) + (profile/get-profile-by-email conn))] (when-not (eml/allow-send-emails? conn profile) (ex/raise :type :validation :code :profile-is-muted diff --git a/backend/src/app/rpc/commands/files_snapshot.clj b/backend/src/app/rpc/commands/files_snapshot.clj index 68b3a017c..0e902047c 100644 --- a/backend/src/app/rpc/commands/files_snapshot.clj +++ b/backend/src/app/rpc/commands/files_snapshot.clj @@ -63,34 +63,38 @@ [{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}] (let [storage (media/configure-assets-storage storage conn) params {:id id :file-id file-id} - options {:columns [:id :data :revn]} + options {:columns [:id :data :revn :features]} snapshot (db/get* conn :file-change params options)] (when (and (some? snapshot) (some? (:data snapshot))) - (l/debug :hint "snapshot found" - :snapshot-id (:id snapshot) - :file-id file-id) + (l/dbg :hint "restoring snapshot" + :file-id (str file-id) + :snapshot-id (str (:id snapshot))) (db/update! conn :file - {:data (:data snapshot)} + {:data (:data snapshot) + :revn (:revn snapshot) + :features (:features snapshot)} {:id file-id}) ;; clean object thumbnails - (let [sql (str "delete from file_object_thumbnail " + (let [sql (str "update file_tagged_object_thumbnail " + " set deleted_at = now() " " where file_id=? returning media_id") res (db/exec! conn [sql file-id])] (doseq [media-id (into #{} (keep :media-id) res)] - (sto/del-object! storage media-id))) + (sto/touch-object! storage media-id))) ;; clean object thumbnails - (let [sql (str "delete from file_thumbnail " + (let [sql (str "update file_thumbnail " + " set deleted_at = now() " " where file_id=? returning media_id") res (db/exec! conn [sql file-id])] (doseq [media-id (into #{} (keep :media-id) res)] - (sto/del-object! storage media-id))) + (sto/touch-object! storage media-id))) {:id (:id snapshot)}))) @@ -112,7 +116,9 @@ (when-let [file (db/get* conn :file {:id file-id})] (let [id (uuid/next) label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123)))] - (l/debug :hint "persisting file snapshot" :file-id file-id :label label) + (l/debug :hint "persisting file snapshot" + :file-id (str file-id) + :label label) (db/insert! conn :file-change {:id id :revn (:revn file) diff --git a/backend/src/app/rpc/commands/ldap.clj b/backend/src/app/rpc/commands/ldap.clj index afcb48420..bb86aec90 100644 --- a/backend/src/app/rpc/commands/ldap.clj +++ b/backend/src/app/rpc/commands/ldap.clj @@ -82,8 +82,8 @@ (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (or (some->> (:email info) - (profile/get-profile-by-email conn) - (profile/decode-row)) + (profile/clean-email) + (profile/get-profile-by-email conn)) (->> (assoc info :is-active true :is-demo false) (auth/create-profile! conn) (auth/create-profile-rels! conn) diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index a2fa82ba4..6ef2ef90d 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -39,6 +39,15 @@ (declare strip-private-attrs) (declare verify-password) +(defn clean-email + "Clean and normalizes email address string" + [email] + (let [email (str/lower email) + email (if (str/starts-with? email "mailto:") + (subs email 7) + email)] + email)) + (def ^:private schema:profile (sm/define @@ -147,8 +156,7 @@ (let [profile (validate-password! cfg (assoc params :profile-id profile-id)) session-id (::session/id params)] - (when (= (str/lower (:email profile)) - (str/lower (:password params))) + (when (= (:email profile) (str/lower (:password params))) (ex/raise :type :validation :code :email-as-password :hint "you can't use your email as password")) @@ -270,7 +278,7 @@ cfg (assoc cfg ::conn conn) params (assoc params :profile profile - :email (str/lower email))] + :email (clean-email email))] (if (contains? cf/flags :smtp) (request-email-change! cfg params) (change-email-immediately! cfg params))))) @@ -409,10 +417,9 @@ where email = ? and deleted_at is null) as val") -(defn check-profile-existence! +(defn- check-profile-existence! [conn {:keys [email] :as params}] - (let [email (str/lower email) - result (db/exec-one! conn [sql:profile-existence email])] + (let [result (db/exec-one! conn [sql:profile-existence email])] (when (:val result) (ex/raise :type :validation :code :email-already-exists)) @@ -427,7 +434,7 @@ (defn get-profile-by-email "Returns a profile looked up by email or `nil` if not match found." [conn email] - (->> (db/exec! conn [sql:profile-by-email (str/lower email)]) + (->> (db/exec! conn [sql:profile-by-email (clean-email email)]) (map decode-row) (first))) diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index 381611f81..4b5f07700 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -709,7 +709,8 @@ (defn- create-invitation [{:keys [::db/conn] :as cfg} {:keys [team profile role email] :as params}] - (let [member (profile/get-profile-by-email conn email)] + (let [email (profile/clean-email email) + member (profile/get-profile-by-email conn email)] (when (and member (not (eml/allow-send-emails? conn member))) (ex/raise :type :validation @@ -803,7 +804,8 @@ (db/with-atomic [conn pool] (let [perms (get-permissions conn profile-id team-id) profile (db/get-by-id conn :profile profile-id) - team (db/get-by-id conn :team team-id)] + team (db/get-by-id conn :team team-id) + emails (into #{} (map profile/clean-email) emails)] (run! (partial quotes/check-quote! conn) (list {::quotes/id ::quotes/invitations-per-team @@ -834,7 +836,7 @@ ;; We don't re-send inviation to already existing members (remove (partial contains? members)) (map (fn [email] - {:email (str/lower email) + {:email email :team team :profile profile :role role})) @@ -869,14 +871,15 @@ (let [params (assoc params :profile-id profile-id) cfg (assoc cfg ::db/conn conn) team (create-team cfg params) - profile (db/get-by-id conn :profile profile-id)] + profile (db/get-by-id conn :profile profile-id) + emails (into #{} (map profile/clean-email) emails)] ;; Create invitations for all provided emails. (->> emails (map (fn [email] {:team team :profile profile - :email (str/lower email) + :email email :role role})) (run! (partial create-invitation cfg))) @@ -913,17 +916,20 @@ {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id email] :as params}] (check-read-permissions! pool profile-id team-id) - (let [invit (-> (db/get pool :team-invitation + (let [email (profile/clean-email email) + invit (-> (db/get pool :team-invitation {:team-id team-id - :email-to (str/lower email)}) + :email-to email}) (update :role keyword)) + member (profile/get-profile-by-email pool (:email-to invit)) token (create-invitation-token cfg {:team-id (:team-id invit) :profile-id profile-id :valid-until (:valid-until invit) :role (:role invit) :member-id (:id member) - :member-email (or (:email member) (:email-to invit))})] + :member-email (or (:email member) + (profile/clean-email (:email-to invit)))})] {:token token})) ;; --- Mutation: Update invitation role @@ -944,7 +950,7 @@ (db/update! conn :team-invitation {:role (name role) :updated-at (dt/now)} - {:team-id team-id :email-to (str/lower email)}) + {:team-id team-id :email-to (profile/clean-email email)}) nil))) ;; --- Mutation: Delete invitation @@ -965,6 +971,6 @@ (let [invitation (db/delete! conn :team-invitation {:team-id team-id - :email-to (str/lower email)} + :email-to (profile/clean-email email)} {::db/return-keys true})] (rph/wrap nil {::audit/props {:invitation-id (:id invitation)}}))))) diff --git a/backend/src/app/rpc/commands/verify_token.clj b/backend/src/app/rpc/commands/verify_token.clj index 559933a13..49c76c110 100644 --- a/backend/src/app/rpc/commands/verify_token.clj +++ b/backend/src/app/rpc/commands/verify_token.clj @@ -44,18 +44,19 @@ (defmethod process-token :change-email [{:keys [conn] :as cfg} _params {:keys [profile-id email] :as claims}] - (when (profile/get-profile-by-email conn email) - (ex/raise :type :validation - :code :email-already-exists)) + (let [email (profile/clean-email email)] + (when (profile/get-profile-by-email conn email) + (ex/raise :type :validation + :code :email-already-exists)) - (db/update! conn :profile - {:email email} - {:id profile-id}) + (db/update! conn :profile + {:email email} + {:id profile-id}) - (rph/with-meta claims - {::audit/name "update-profile-email" - ::audit/props {:email email} - ::audit/profile-id profile-id})) + (rph/with-meta claims + {::audit/name "update-profile-email" + ::audit/props {:email email} + ::audit/profile-id profile-id}))) (defmethod process-token :verify-email [{:keys [conn] :as cfg} _ {:keys [profile-id] :as claims}] diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index 4adf55293..3b145d1cd 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -12,6 +12,7 @@ [app.db :as db] [app.features.components-v2 :as feat] [app.main :as main] + [app.rpc.commands.files-snapshot :as rpc] [app.svgo :as svgo] [app.util.cache :as cache] [app.util.events :as events] @@ -32,12 +33,20 @@ (defn- report-progress-files [tpoint] (fn [_ _ oldv newv] - (when (not= (:processed-files oldv) - (:processed-files newv)) - (let [elapsed (tpoint)] + (when (or (not= (:processed-files oldv) + (:processed-files newv)) + (not= (:errors oldv) + (:errors newv))) + (let [completed (:processed-files newv 0) + errors (:errors newv 0) + elapsed (dt/format-duration (tpoint))] + (events/tap :progress-report + {:elapsed elapsed + :completed completed + :errors errors}) (l/dbg :hint "progress" - :completed (:processed-files newv) - :elapsed (dt/format-duration elapsed)))))) + :completed completed + :elapsed elapsed))))) (defn- report-progress-teams [tpoint] @@ -101,13 +110,47 @@ (def ^:private sql:get-teams-by-report "WITH teams AS ( SELECT t.id t.features, mr.name - FROM migration_report AS mr + FROM migration_team_report AS mr JOIN team AS t ON (t.id = mr.team_id) WHERE t.deleted_at IS NULL AND mr.error IS NOT NULL ORDER BY mr.created_at ) SELECT id, features FROM teams %(pred)s") +(def ^:private sql:get-files-by-created-at + "SELECT id, features + FROM file + WHERE deleted_at IS NULL + ORDER BY created_at DESC") + +(def ^:private sql:get-files-by-modified-at + "SELECT id, features + FROM file + WHERE deleted_at IS NULL + ORDER BY modified_at DESC") + +(def ^:private sql:get-files-by-graphics + "WITH files AS ( + SELECT f.id, f.features, + (SELECT count(*) FROM file_media_object AS fmo + WHERE fmo.mtype = 'image/svg+xml' + AND fmo.is_local = false + AND fmo.file_id = f.id) AS graphics + FROM file AS f + WHERE f.deleted_at IS NULL + ORDER BY 3 ASC + ) SELECT * FROM files %(pred)s") + +(def ^:private sql:get-files-by-report + "WITH files AS ( + SELECT t.id t.features, mr.name + FROM migration_file_report AS mr + JOIN file AS t ON (t.id = mr.file_id) + WHERE t.deleted_at IS NULL + AND mr.error IS NOT NULL + ORDER BY mr.created_at + ) SELECT id, features FROM files %(pred)s") + (defn- read-pred [entries] (let [entries (if (and (vector? entries) @@ -140,7 +183,6 @@ :activity sql:get-teams-by-activity :graphics sql:get-teams-by-graphics :report sql:get-teams-by-report) - sql (if pred (let [[pred-sql & pred-params] (read-pred pred)] (apply vector @@ -154,34 +196,78 @@ (contains? features "components/v2"))) (map :id)))) -(def ^:private sql:report-table - "CREATE UNLOGGED TABLE IF NOT EXISTS migration_report ( +(defn- get-files + [conn query pred] + (let [query (d/nilv query :created-at) + sql (case query + :created-at sql:get-files-by-created-at + :modified-at sql:get-files-by-modified-at + :graphics sql:get-files-by-graphics + :report sql:get-files-by-report) + sql (if pred + (let [[pred-sql & pred-params] (read-pred pred)] + (apply vector + (str/format sql {:pred pred-sql}) + pred-params)) + [(str/format sql {:pred ""})])] + + (->> (db/cursor conn sql {:chunk-size 500}) + (map feat/decode-row) + (remove (fn [{:keys [features]}] + (contains? features "components/v2"))) + (map :id)))) + +(def ^:private sql:team-report-table + "CREATE UNLOGGED TABLE IF NOT EXISTS migration_team_report ( id bigserial NOT NULL, label text NOT NULL, team_id UUID NOT NULL, error text NULL, created_at timestamptz NOT NULL DEFAULT now(), elapsed bigint NOT NULL, - PRIMARY KEY (label, created_at, id) - )") + PRIMARY KEY (label, created_at, id))") -(defn- create-report-table! +(def ^:private sql:file-report-table + "CREATE UNLOGGED TABLE IF NOT EXISTS migration_file_report ( + id bigserial NOT NULL, + label text NOT NULL, + file_id UUID NOT NULL, + error text NULL, + created_at timestamptz NOT NULL DEFAULT now(), + elapsed bigint NOT NULL, + PRIMARY KEY (label, created_at, id))") + +(defn- create-report-tables! [system] - (db/exec-one! system [sql:report-table])) + (db/exec-one! system [sql:team-report-table]) + (db/exec-one! system [sql:file-report-table])) -(defn- clean-reports! +(defn- clean-team-reports! [system label] - (db/delete! system :migration-report {:label label})) + (db/delete! system :migration-team-report {:label label})) -(defn- report! +(defn- team-report! [system team-id label elapsed error] - (db/insert! system :migration-report + (db/insert! system :migration-team-report {:label label :team-id team-id :elapsed (inst-ms elapsed) :error error} {::db/return-keys false})) +(defn- clean-file-reports! + [system label] + (db/delete! system :migration-file-report {:label label})) + +(defn- file-report! + [system file-id label elapsed error] + (db/insert! system :migration-file-report + {:label label + :file-id file-id + :elapsed (inst-ms elapsed) + :error error} + {::db/return-keys false})) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; PUBLIC API @@ -318,12 +404,11 @@ :skip-on-graphic-error? skip-on-graphic-error?))) (when (string? label) - (report! main/system team-id label (tpoint) nil)) + (team-report! main/system team-id label (tpoint) nil)) (catch Throwable cause (l/wrn :hint "unexpected error on processing team (skiping)" - :team-id (str team-id) - :cause cause) + :team-id (str team-id)) (events/tap :error (ex-info "unexpected error on processing team (skiping)" @@ -333,7 +418,7 @@ (swap! stats update :errors (fnil inc 0)) (when (string? label) - (report! main/system team-id label (tpoint) (ex-message cause)))) + (team-report! main/system team-id label (tpoint) (ex-message cause)))) (finally (ps/release! sjobs))))) @@ -365,8 +450,8 @@ svgo/*semaphore* sprocs] (try (when (string? label) - (create-report-table! main/system) - (clean-reports! main/system label)) + (create-report-tables! main/system) + (clean-team-reports! main/system label)) (db/tx-run! main/system (fn [{:keys [::db/conn] :as system}] @@ -399,6 +484,146 @@ :rollback rollback? :elapsed elapsed))))))) + +(defn migrate-files! + "A REPL helper for migrate all files. + + This function starts multiple concurrent file migration processes + until thw maximum number of jobs is reached which by default has the + value of `1`. This is controled with the `:max-jobs` option. + + If you want to run this on multiple machines you will need to specify + the total number of partitions and the current partition. + + In order to get the report table populated, you will need to provide + a correct `:label`. That label is also used for persist a file + snaphot before continue with the migration." + [& {:keys [max-jobs max-items max-time rollback? validate? query + pred max-procs cache skip-on-graphic-error? + label partitions current-partition] + :or {validate? false + rollback? true + max-jobs 1 + current-partition 1 + skip-on-graphic-error? true + max-items Long/MAX_VALUE}}] + + (when (int? partitions) + (when-not (int? current-partition) + (throw (IllegalArgumentException. "missing `current-partition` parameter"))) + (when-not (<= 0 current-partition partitions) + (throw (IllegalArgumentException. "invalid value on `current-partition` parameter")))) + + (let [stats (atom {}) + tpoint (dt/tpoint) + mtime (some-> max-time dt/duration) + + factory (px/thread-factory :virtual false :prefix "penpot/migration/") + executor (px/cached-executor :factory factory) + + max-procs (or max-procs max-jobs) + sjobs (ps/create :permits max-jobs) + sprocs (ps/create :permits max-procs) + + cache (if (int? cache) + (cache/create :executor (::wrk/executor main/system) + :max-items cache) + nil) + + migrate-file + (fn [file-id] + (let [tpoint (dt/tpoint)] + (try + (db/tx-run! (assoc main/system ::db/rollback rollback?) + (fn [system] + (db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) + (feat/migrate-file! system file-id + :label label + :validate? validate? + :skip-on-graphic-error? skip-on-graphic-error?))) + + (when (string? label) + (file-report! main/system file-id label (tpoint) nil)) + + (catch Throwable cause + (l/wrn :hint "unexpected error on processing file (skiping)" + :file-id (str file-id)) + + (events/tap :error + (ex-info "unexpected error on processing file (skiping)" + {:file-id file-id} + cause)) + + (swap! stats update :errors (fnil inc 0)) + + (when (string? label) + (file-report! main/system file-id label (tpoint) (ex-message cause)))) + + (finally + (ps/release! sjobs))))) + + process-file + (fn [file-id] + (ps/acquire! sjobs) + (let [ts (tpoint)] + (if (and mtime (neg? (compare mtime ts))) + (do + (l/inf :hint "max time constraint reached" + :file-id (str file-id) + :elapsed (dt/format-duration ts)) + (ps/release! sjobs) + (reduced nil)) + + (px/run! executor (partial migrate-file file-id)))))] + + (l/dbg :hint "migrate:start" + :label label + :rollback rollback? + :max-jobs max-jobs + :max-items max-items) + + (add-watch stats :progress-report (report-progress-files tpoint)) + + (binding [feat/*stats* stats + feat/*cache* cache + svgo/*semaphore* sprocs] + (try + (when (string? label) + (create-report-tables! main/system) + (clean-file-reports! main/system label)) + + (db/tx-run! main/system + (fn [{:keys [::db/conn] :as system}] + (db/exec! conn ["SET statement_timeout = 0"]) + (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) + + (run! process-file + (->> (get-files conn query pred) + (filter (fn [file-id] + (if (int? partitions) + (= current-partition (-> (uuid/hash-int file-id) + (mod partitions) + (inc))) + true))) + (take max-items))) + + ;; Close and await tasks + (pu/close! executor))) + + (-> (deref stats) + (assoc :elapsed (dt/format-duration (tpoint)))) + + (catch Throwable cause + (l/dbg :hint "migrate:error" :cause cause) + (events/tap :error cause)) + + (finally + (let [elapsed (dt/format-duration (tpoint))] + (l/dbg :hint "migrate:end" + :rollback rollback? + :elapsed elapsed))))))) + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; FILE PROCESS HELPERS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -412,3 +637,44 @@ :file-name (:name file)) (assoc file :deleted-at (dt/now))) file)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; RESTORE SNAPSHOT +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def ^:private sql:snapshots-with-file + "SELECT f.id AS file_id, + fc.id AS id + FROM file AS f + JOIN file_change AS fc ON (fc.file_id = f.id) + WHERE fc.label = ? AND f.id = ANY(?)") + +(defn restore-team! + [team-id label & {:keys [rollback?] :or {rollback? true}}] + (let [team-id (if (string? team-id) + (parse-uuid team-id) + team-id) + + get-file-snapshots + (fn [conn ids] + (let [label (str "migration/" label)] + (db/exec! conn [sql:snapshots-with-file label + (db/create-array conn "uuid" ids)]))) + + restore-snapshot + (fn [{:keys [::db/conn] :as system}] + (let [ids (into #{} (feat/get-and-lock-files conn team-id)) + snap (get-file-snapshots conn ids) + ids' (into #{} (map :file-id) snap) + team (-> (feat/get-team conn team-id) + (update :features disj "components/v2"))] + + (when (not= ids ids') + (throw (RuntimeException. "no uniform snapshot available"))) + + (feat/update-team! conn team) + (run! (partial rpc/restore-file-snapshot! system) snap)))] + + + (-> (assoc main/system ::db/rollback rollback?) + (db/tx-run! restore-snapshot)))) diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 599afa646..9539c79bc 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -78,6 +78,7 @@ [email] (let [sprops (:app.setup/props main/system) pool (:app.db/pool main/system) + email (profile/clean-email email) profile (profile/get-profile-by-email pool email)] (auth/send-email-verification! pool sprops profile) @@ -331,11 +332,18 @@ (defn duplicate-team [team-id & {:keys [name]}] - (let [team-id (if (string? team-id) (parse-uuid team-id) team-id) - name (or name (fn [prev-name] - (str/ffmt "Cloned: % (%)" prev-name (dt/format-instant (dt/now)))))] + (let [team-id (if (string? team-id) (parse-uuid team-id) team-id)] (db/tx-run! main/system - (fn [cfg] - (db/exec-one! cfg ["SET CONSTRAINTS ALL DEFERRED"]) - (-> (assoc cfg ::bfc/timestamp (dt/now)) - (mgmt/duplicate-team :team-id team-id :name name)))))) + (fn [{:keys [::db/conn] :as cfg}] + (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"]) + (let [team (-> (assoc cfg ::bfc/timestamp (dt/now)) + (mgmt/duplicate-team :team-id team-id :name name)) + rels (db/query conn :team-profile-rel {:team-id team-id})] + + (doseq [rel rels] + (let [params (-> rel + (assoc :id (uuid/next)) + (assoc :team-id (:id team)))] + (db/insert! conn :team-profile-rel params + {::db/return-keys false})))))))) +