0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-14 19:19:09 -05:00

Merge pull request #4107 from penpot/staging-migration

 Improvements to migration process
This commit is contained in:
Alejandro 2024-02-08 08:32:33 +01:00 committed by GitHub
commit de7c61e5ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 453 additions and 136 deletions

View file

@ -48,12 +48,6 @@
<Logger name="app.features" level="all" additivity="true"> <Logger name="app.features" level="all" additivity="true">
<AppenderRef ref="reports" level="warn" /> <AppenderRef ref="reports" level="warn" />
<!-- <AppenderRef ref="main" level="debug" /> -->
</Logger>
<Logger name="app.srepl" level="all" additivity="true">
<AppenderRef ref="reports" level="warn" />
<!-- <AppenderRef ref="main" level="trace" /> -->
</Logger> </Logger>
<Logger name="app" level="all" additivity="false"> <Logger name="app" level="all" additivity="false">

View file

@ -474,6 +474,7 @@
[{:keys [::db/pool] :as cfg} info] [{:keys [::db/pool] :as cfg} info]
(dm/with-open [conn (db/open pool)] (dm/with-open [conn (db/open pool)]
(some->> (:email info) (some->> (:email info)
(profile/clean-email)
(profile/get-profile-by-email conn)))) (profile/get-profile-by-email conn))))
(defn- redirect-response (defn- redirect-response

View file

@ -77,10 +77,6 @@
internal functions without the need to explicitly pass it top down." internal functions without the need to explicitly pass it top down."
nil) nil)
(def ^:dynamic ^:private *team-id*
"A dynamic var that holds the current processing team-id."
nil)
(def ^:dynamic ^:private *file-stats* (def ^:dynamic ^:private *file-stats*
"An internal dynamic var for collect stats by file." "An internal dynamic var for collect stats by file."
nil) nil)
@ -1194,12 +1190,11 @@
;; The media processing adds the data to the ;; The media processing adds the data to the
;; input map and returns it. ;; input map and returns it.
(media/run {:cmd :info :input item})) (media/run {:cmd :info :input item}))
(catch Throwable _ (catch Throwable _
(let [team-id *team-id*] (l/wrn :hint "unable to process embedded images on svg file"
(l/wrn :hint "unable to process embedded images on svg file" :file-id (str file-id)
:team-id (str team-id) :media-id (str media-id))
:file-id (str file-id)
:media-id (str media-id)))
nil))) nil)))
(persist-image [acc {:keys [path size width height mtype href] :as item}] (persist-image [acc {:keys [path size width height mtype href] :as item}]
@ -1332,24 +1327,20 @@
(catch Throwable cause (catch Throwable cause
(vreset! err true) (vreset! err true)
(let [cause (pu/unwrap-exception cause) (let [cause (pu/unwrap-exception cause)
edata (ex-data cause) edata (ex-data cause)]
team-id *team-id*]
(cond (cond
(instance? org.xml.sax.SAXParseException cause) (instance? org.xml.sax.SAXParseException cause)
(l/inf :hint "skip processing media object: invalid svg found" (l/inf :hint "skip processing media object: invalid svg found"
:team-id (str team-id)
:file-id (str (:id fdata)) :file-id (str (:id fdata))
:id (str (:id mobj))) :id (str (:id mobj)))
(instance? org.graalvm.polyglot.PolyglotException cause) (instance? org.graalvm.polyglot.PolyglotException cause)
(l/inf :hint "skip processing media object: invalid svg found" (l/inf :hint "skip processing media object: invalid svg found"
:team-id (str team-id)
:file-id (str (:id fdata)) :file-id (str (:id fdata))
:id (str (:id mobj))) :id (str (:id mobj)))
(= (:type edata) :not-found) (= (:type edata) :not-found)
(l/inf :hint "skip processing media object: underlying object does not exist" (l/inf :hint "skip processing media object: underlying object does not exist"
:team-id (str team-id)
:file-id (str (:id fdata)) :file-id (str (:id fdata))
:id (str (:id mobj))) :id (str (:id mobj)))
@ -1357,7 +1348,6 @@
(let [skip? *skip-on-graphic-error*] (let [skip? *skip-on-graphic-error*]
(l/wrn :hint "unable to process file media object" (l/wrn :hint "unable to process file media object"
:skiped skip? :skiped skip?
:team-id (str team-id)
:file-id (str (:id fdata)) :file-id (str (:id fdata))
:id (str (:id mobj)) :id (str (:id mobj))
:cause cause) :cause cause)
@ -1452,7 +1442,7 @@
data))) data)))
(fmg/migrate-file)))) (fmg/migrate-file))))
(defn- get-team (defn get-team
[system team-id] [system team-id]
(-> (db/get system :team {:id team-id} (-> (db/get system :team {:id team-id}
{::db/remove-deleted false {::db/remove-deleted false
@ -1506,17 +1496,19 @@
AND f.deleted_at IS NULL AND f.deleted_at IS NULL
FOR UPDATE") FOR UPDATE")
(defn- get-and-lock-files (defn get-and-lock-files
[conn team-id] [conn team-id]
(->> (db/cursor conn [sql:get-and-lock-team-files team-id]) (->> (db/cursor conn [sql:get-and-lock-team-files team-id])
(map :id))) (map :id)))
(defn- update-team-features! (defn update-team!
[conn team-id features] [conn team]
(let [features (db/create-array conn "text" features)] (let [params (-> team
(update :features db/encode-pgarray conn "text")
(dissoc :id))]
(db/update! conn :team (db/update! conn :team
{:features features} params
{:id team-id}))) {:id (:id team)})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
@ -1524,7 +1516,9 @@
(defn migrate-file! (defn migrate-file!
[system file-id & {:keys [validate? skip-on-graphic-error? label]}] [system file-id & {:keys [validate? skip-on-graphic-error? label]}]
(let [tpoint (dt/tpoint)] (let [tpoint (dt/tpoint)
err (volatile! false)]
(binding [*file-stats* (atom {}) (binding [*file-stats* (atom {})
*skip-on-graphic-error* skip-on-graphic-error?] *skip-on-graphic-error* skip-on-graphic-error?]
(try (try
@ -1533,40 +1527,50 @@
:validate validate? :validate validate?
:skip-on-graphic-error skip-on-graphic-error?) :skip-on-graphic-error skip-on-graphic-error?)
(let [system (update system ::sto/storage media/configure-assets-storage)] (db/tx-run! (update system ::sto/storage media/configure-assets-storage)
(db/tx-run! system (fn [system]
(fn [system] (binding [*system* system]
(try (when (string? label)
(binding [*system* system] (fsnap/take-file-snapshot! system {:file-id file-id
(when (string? label) :label (str "migration/" label)}))
(fsnap/take-file-snapshot! system {:file-id file-id (let [file (get-file system file-id)]
:label (str "migration/" label)})) (events/tap :progress
(let [file (get-file system file-id)] {:op :migrate-file
(events/tap :progress :name (:name file)
{:op :migrate-file :id (:id file)})
:name (:name file)
:id (:id file)})
(process-file system file :validate? validate?))) (process-file system file :validate? validate?)))))
(catch Throwable cause (catch Throwable cause
(let [team-id *team-id*] (vreset! err true)
(l/wrn :hint "error on processing file" (l/wrn :hint "error on processing file"
:team-id (str team-id) :file-id (str file-id)
:file-id (str file-id)) :cause cause)
(throw cause))))))) (throw cause))
(finally (finally
(let [elapsed (tpoint) (let [elapsed (tpoint)
components (get @*file-stats* :processed-components 0) components (get @*file-stats* :processed-components 0)
graphics (get @*file-stats* :processed-graphics 0)] graphics (get @*file-stats* :processed-graphics 0)]
(l/dbg :hint "migrate:file:end" (if (cache/cache? *cache*)
:file-id (str file-id) (let [cache-stats (cache/stats *cache*)]
:graphics graphics (l/dbg :hint "migrate:file:end"
:components components :file-id (str file-id)
:validate validate? :graphics graphics
:elapsed (dt/format-duration elapsed)) :components components
:validate validate?
:crt (mth/to-fixed (:hit-rate cache-stats) 2)
:crq (str (:req-count cache-stats))
:error @err
:elapsed (dt/format-duration elapsed)))
(l/dbg :hint "migrate:file:end"
:file-id (str file-id)
:graphics graphics
:components components
:validate validate?
:error @err
:elapsed (dt/format-duration elapsed)))
(some-> *stats* (swap! update :processed-files (fnil inc 0))) (some-> *stats* (swap! update :processed-files (fnil inc 0)))
(some-> *team-stats* (swap! update :processed-files (fnil inc 0))))))))) (some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
@ -1588,7 +1592,7 @@
:skip-on-graphic-error? skip-on-graphic-error?)) :skip-on-graphic-error? skip-on-graphic-error?))
migrate-team migrate-team
(fn [{:keys [::db/conn] :as system} team-id] (fn [{:keys [::db/conn] :as system} team-id]
(let [{:keys [id features name]} (get-team system team-id)] (let [{:keys [id features] :as team} (get-team system team-id)]
(if (contains? features "components/v2") (if (contains? features "components/v2")
(l/inf :hint "team already migrated") (l/inf :hint "team already migrated")
(let [features (-> features (let [features (-> features
@ -1599,21 +1603,24 @@
(events/tap :progress (events/tap :progress
{:op :migrate-team {:op :migrate-team
:name name :name (:name team)
:id id}) :id id})
(run! (partial migrate-file system) (run! (partial migrate-file system)
(get-and-lock-files conn id)) (get-and-lock-files conn id))
(update-team-features! conn id features)))))] (->> (assoc team :features features)
(update-team! conn))))))]
(binding [*team-stats* (atom {}) (binding [*team-stats* (atom {})]
*team-id* team-id]
(try (try
(db/tx-run! system migrate-team team-id) (db/tx-run! system migrate-team team-id)
(catch Throwable cause (catch Throwable cause
(vreset! err true) (vreset! err true)
(l/wrn :hint "error on processing team"
:team-id (str team-id)
:cause cause)
(throw cause)) (throw cause))
(finally (finally

View file

@ -347,7 +347,10 @@
:code :missing-force :code :missing-force
:hint "missing force checkbox")) :hint "missing force checkbox"))
(let [profile (some->> params :email (profile/get-profile-by-email pool))] (let [profile (some->> params
:email
(profile/clean-email)
(profile/get-profile-by-email pool))]
(when-not profile (when-not profile
(ex/raise :type :validation (ex/raise :type :validation

View file

@ -370,7 +370,10 @@
:fn (mg/resource "app/migrations/sql/0116-mod-file-table.sql")} :fn (mg/resource "app/migrations/sql/0116-mod-file-table.sql")}
{:name "0117-mod-file-object-thumbnail-table" {:name "0117-mod-file-object-thumbnail-table"
:fn (mg/resource "app/migrations/sql/0117-mod-file-object-thumbnail-table.sql")}]) :fn (mg/resource "app/migrations/sql/0117-mod-file-object-thumbnail-table.sql")}
{:name "0118-mod-task-table"
:fn (mg/resource "app/migrations/sql/0118-mod-task-table.sql")}])
(defn apply-migrations! (defn apply-migrations!
[pool name migrations] [pool name migrations]

View file

@ -0,0 +1,12 @@
-- Removes the partitioning.
CREATE TABLE new_task (LIKE task INCLUDING ALL);
INSERT INTO new_task SELECT * FROM task;
ALTER TABLE task RENAME TO old_task;
ALTER TABLE new_task RENAME TO task;
DROP TABLE old_task;
ALTER INDEX new_task_label_name_queue_idx RENAME TO task__label_name_queue__idx;
ALTER INDEX new_task_scheduled_at_queue_idx RENAME TO task__scheduled_at_queue__idx;
ALTER TABLE task DROP CONSTRAINT new_task_pkey;
ALTER TABLE task ADD PRIMARY KEY (id);
ALTER TABLE task ALTER COLUMN created_at SET DEFAULT now();
ALTER TABLE task ALTER COLUMN modified_at SET DEFAULT now();

View file

@ -82,7 +82,8 @@
profile) profile)
(login [{:keys [::db/conn] :as cfg}] (login [{:keys [::db/conn] :as cfg}]
(let [profile (->> (profile/get-profile-by-email conn email) (let [profile (->> (profile/clean-email email)
(profile/get-profile-by-email conn)
(validate-profile cfg) (validate-profile cfg)
(profile/strip-private-attrs)) (profile/strip-private-attrs))
@ -202,11 +203,12 @@
(pos? (compare elapsed register-retry-threshold)))) (pos? (compare elapsed register-retry-threshold))))
(defn prepare-register (defn prepare-register
[{:keys [::db/pool] :as cfg} params] [{:keys [::db/pool] :as cfg} {:keys [email] :as params}]
(validate-register-attempt! cfg params) (validate-register-attempt! cfg params)
(let [profile (when-let [profile (profile/get-profile-by-email pool (:email params))] (let [email (profile/clean-email email)
profile (when-let [profile (profile/get-profile-by-email pool email)]
(cond (cond
(:is-blocked profile) (:is-blocked profile)
(ex/raise :type :restriction (ex/raise :type :restriction
@ -221,7 +223,7 @@
:code :email-already-exists :code :email-already-exists
:hint "profile already exists"))) :hint "profile already exists")))
params {:email (:email params) params {:email email
:password (:password params) :password (:password params)
:invitation-token (:invitation-token params) :invitation-token (:invitation-token params)
:backend "penpot" :backend "penpot"
@ -447,7 +449,8 @@
nil))] nil))]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(when-let [profile (profile/get-profile-by-email conn email)] (when-let [profile (->> (profile/clean-email email)
(profile/get-profile-by-email conn))]
(when-not (eml/allow-send-emails? conn profile) (when-not (eml/allow-send-emails? conn profile)
(ex/raise :type :validation (ex/raise :type :validation
:code :profile-is-muted :code :profile-is-muted

View file

@ -63,34 +63,38 @@
[{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}] [{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}]
(let [storage (media/configure-assets-storage storage conn) (let [storage (media/configure-assets-storage storage conn)
params {:id id :file-id file-id} params {:id id :file-id file-id}
options {:columns [:id :data :revn]} options {:columns [:id :data :revn :features]}
snapshot (db/get* conn :file-change params options)] snapshot (db/get* conn :file-change params options)]
(when (and (some? snapshot) (when (and (some? snapshot)
(some? (:data snapshot))) (some? (:data snapshot)))
(l/debug :hint "snapshot found" (l/dbg :hint "restoring snapshot"
:snapshot-id (:id snapshot) :file-id (str file-id)
:file-id file-id) :snapshot-id (str (:id snapshot)))
(db/update! conn :file (db/update! conn :file
{:data (:data snapshot)} {:data (:data snapshot)
:revn (:revn snapshot)
:features (:features snapshot)}
{:id file-id}) {:id file-id})
;; clean object thumbnails ;; clean object thumbnails
(let [sql (str "delete from file_object_thumbnail " (let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id") " where file_id=? returning media_id")
res (db/exec! conn [sql file-id])] res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)] (doseq [media-id (into #{} (keep :media-id) res)]
(sto/del-object! storage media-id))) (sto/touch-object! storage media-id)))
;; clean object thumbnails ;; clean object thumbnails
(let [sql (str "delete from file_thumbnail " (let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id") " where file_id=? returning media_id")
res (db/exec! conn [sql file-id])] res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)] (doseq [media-id (into #{} (keep :media-id) res)]
(sto/del-object! storage media-id))) (sto/touch-object! storage media-id)))
{:id (:id snapshot)}))) {:id (:id snapshot)})))
@ -112,7 +116,9 @@
(when-let [file (db/get* conn :file {:id file-id})] (when-let [file (db/get* conn :file {:id file-id})]
(let [id (uuid/next) (let [id (uuid/next)
label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123)))] label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123)))]
(l/debug :hint "persisting file snapshot" :file-id file-id :label label) (l/debug :hint "persisting file snapshot"
:file-id (str file-id)
:label label)
(db/insert! conn :file-change (db/insert! conn :file-change
{:id id {:id id
:revn (:revn file) :revn (:revn file)

View file

@ -82,8 +82,8 @@
(db/tx-run! cfg (db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}] (fn [{:keys [::db/conn] :as cfg}]
(or (some->> (:email info) (or (some->> (:email info)
(profile/get-profile-by-email conn) (profile/clean-email)
(profile/decode-row)) (profile/get-profile-by-email conn))
(->> (assoc info :is-active true :is-demo false) (->> (assoc info :is-active true :is-demo false)
(auth/create-profile! conn) (auth/create-profile! conn)
(auth/create-profile-rels! conn) (auth/create-profile-rels! conn)

View file

@ -39,6 +39,15 @@
(declare strip-private-attrs) (declare strip-private-attrs)
(declare verify-password) (declare verify-password)
(defn clean-email
"Clean and normalizes email address string"
[email]
(let [email (str/lower email)
email (if (str/starts-with? email "mailto:")
(subs email 7)
email)]
email))
(def ^:private (def ^:private
schema:profile schema:profile
(sm/define (sm/define
@ -147,8 +156,7 @@
(let [profile (validate-password! cfg (assoc params :profile-id profile-id)) (let [profile (validate-password! cfg (assoc params :profile-id profile-id))
session-id (::session/id params)] session-id (::session/id params)]
(when (= (str/lower (:email profile)) (when (= (:email profile) (str/lower (:password params)))
(str/lower (:password params)))
(ex/raise :type :validation (ex/raise :type :validation
:code :email-as-password :code :email-as-password
:hint "you can't use your email as password")) :hint "you can't use your email as password"))
@ -270,7 +278,7 @@
cfg (assoc cfg ::conn conn) cfg (assoc cfg ::conn conn)
params (assoc params params (assoc params
:profile profile :profile profile
:email (str/lower email))] :email (clean-email email))]
(if (contains? cf/flags :smtp) (if (contains? cf/flags :smtp)
(request-email-change! cfg params) (request-email-change! cfg params)
(change-email-immediately! cfg params))))) (change-email-immediately! cfg params)))))
@ -409,10 +417,9 @@
where email = ? where email = ?
and deleted_at is null) as val") and deleted_at is null) as val")
(defn check-profile-existence! (defn- check-profile-existence!
[conn {:keys [email] :as params}] [conn {:keys [email] :as params}]
(let [email (str/lower email) (let [result (db/exec-one! conn [sql:profile-existence email])]
result (db/exec-one! conn [sql:profile-existence email])]
(when (:val result) (when (:val result)
(ex/raise :type :validation (ex/raise :type :validation
:code :email-already-exists)) :code :email-already-exists))
@ -427,7 +434,7 @@
(defn get-profile-by-email (defn get-profile-by-email
"Returns a profile looked up by email or `nil` if not match found." "Returns a profile looked up by email or `nil` if not match found."
[conn email] [conn email]
(->> (db/exec! conn [sql:profile-by-email (str/lower email)]) (->> (db/exec! conn [sql:profile-by-email (clean-email email)])
(map decode-row) (map decode-row)
(first))) (first)))

View file

@ -709,7 +709,8 @@
(defn- create-invitation (defn- create-invitation
[{:keys [::db/conn] :as cfg} {:keys [team profile role email] :as params}] [{:keys [::db/conn] :as cfg} {:keys [team profile role email] :as params}]
(let [member (profile/get-profile-by-email conn email)] (let [email (profile/clean-email email)
member (profile/get-profile-by-email conn email)]
(when (and member (not (eml/allow-send-emails? conn member))) (when (and member (not (eml/allow-send-emails? conn member)))
(ex/raise :type :validation (ex/raise :type :validation
@ -803,7 +804,8 @@
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(let [perms (get-permissions conn profile-id team-id) (let [perms (get-permissions conn profile-id team-id)
profile (db/get-by-id conn :profile profile-id) profile (db/get-by-id conn :profile profile-id)
team (db/get-by-id conn :team team-id)] team (db/get-by-id conn :team team-id)
emails (into #{} (map profile/clean-email) emails)]
(run! (partial quotes/check-quote! conn) (run! (partial quotes/check-quote! conn)
(list {::quotes/id ::quotes/invitations-per-team (list {::quotes/id ::quotes/invitations-per-team
@ -834,7 +836,7 @@
;; We don't re-send inviation to already existing members ;; We don't re-send inviation to already existing members
(remove (partial contains? members)) (remove (partial contains? members))
(map (fn [email] (map (fn [email]
{:email (str/lower email) {:email email
:team team :team team
:profile profile :profile profile
:role role})) :role role}))
@ -869,14 +871,15 @@
(let [params (assoc params :profile-id profile-id) (let [params (assoc params :profile-id profile-id)
cfg (assoc cfg ::db/conn conn) cfg (assoc cfg ::db/conn conn)
team (create-team cfg params) team (create-team cfg params)
profile (db/get-by-id conn :profile profile-id)] profile (db/get-by-id conn :profile profile-id)
emails (into #{} (map profile/clean-email) emails)]
;; Create invitations for all provided emails. ;; Create invitations for all provided emails.
(->> emails (->> emails
(map (fn [email] (map (fn [email]
{:team team {:team team
:profile profile :profile profile
:email (str/lower email) :email email
:role role})) :role role}))
(run! (partial create-invitation cfg))) (run! (partial create-invitation cfg)))
@ -913,17 +916,20 @@
{::doc/added "1.17"} {::doc/added "1.17"}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id email] :as params}] [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id email] :as params}]
(check-read-permissions! pool profile-id team-id) (check-read-permissions! pool profile-id team-id)
(let [invit (-> (db/get pool :team-invitation (let [email (profile/clean-email email)
invit (-> (db/get pool :team-invitation
{:team-id team-id {:team-id team-id
:email-to (str/lower email)}) :email-to email})
(update :role keyword)) (update :role keyword))
member (profile/get-profile-by-email pool (:email-to invit)) member (profile/get-profile-by-email pool (:email-to invit))
token (create-invitation-token cfg {:team-id (:team-id invit) token (create-invitation-token cfg {:team-id (:team-id invit)
:profile-id profile-id :profile-id profile-id
:valid-until (:valid-until invit) :valid-until (:valid-until invit)
:role (:role invit) :role (:role invit)
:member-id (:id member) :member-id (:id member)
:member-email (or (:email member) (:email-to invit))})] :member-email (or (:email member)
(profile/clean-email (:email-to invit)))})]
{:token token})) {:token token}))
;; --- Mutation: Update invitation role ;; --- Mutation: Update invitation role
@ -944,7 +950,7 @@
(db/update! conn :team-invitation (db/update! conn :team-invitation
{:role (name role) :updated-at (dt/now)} {:role (name role) :updated-at (dt/now)}
{:team-id team-id :email-to (str/lower email)}) {:team-id team-id :email-to (profile/clean-email email)})
nil))) nil)))
;; --- Mutation: Delete invitation ;; --- Mutation: Delete invitation
@ -965,6 +971,6 @@
(let [invitation (db/delete! conn :team-invitation (let [invitation (db/delete! conn :team-invitation
{:team-id team-id {:team-id team-id
:email-to (str/lower email)} :email-to (profile/clean-email email)}
{::db/return-keys true})] {::db/return-keys true})]
(rph/wrap nil {::audit/props {:invitation-id (:id invitation)}}))))) (rph/wrap nil {::audit/props {:invitation-id (:id invitation)}})))))

View file

@ -44,18 +44,19 @@
(defmethod process-token :change-email (defmethod process-token :change-email
[{:keys [conn] :as cfg} _params {:keys [profile-id email] :as claims}] [{:keys [conn] :as cfg} _params {:keys [profile-id email] :as claims}]
(when (profile/get-profile-by-email conn email) (let [email (profile/clean-email email)]
(ex/raise :type :validation (when (profile/get-profile-by-email conn email)
:code :email-already-exists)) (ex/raise :type :validation
:code :email-already-exists))
(db/update! conn :profile (db/update! conn :profile
{:email email} {:email email}
{:id profile-id}) {:id profile-id})
(rph/with-meta claims (rph/with-meta claims
{::audit/name "update-profile-email" {::audit/name "update-profile-email"
::audit/props {:email email} ::audit/props {:email email}
::audit/profile-id profile-id})) ::audit/profile-id profile-id})))
(defmethod process-token :verify-email (defmethod process-token :verify-email
[{:keys [conn] :as cfg} _ {:keys [profile-id] :as claims}] [{:keys [conn] :as cfg} _ {:keys [profile-id] :as claims}]

View file

@ -12,6 +12,7 @@
[app.db :as db] [app.db :as db]
[app.features.components-v2 :as feat] [app.features.components-v2 :as feat]
[app.main :as main] [app.main :as main]
[app.rpc.commands.files-snapshot :as rpc]
[app.svgo :as svgo] [app.svgo :as svgo]
[app.util.cache :as cache] [app.util.cache :as cache]
[app.util.events :as events] [app.util.events :as events]
@ -32,12 +33,20 @@
(defn- report-progress-files (defn- report-progress-files
[tpoint] [tpoint]
(fn [_ _ oldv newv] (fn [_ _ oldv newv]
(when (not= (:processed-files oldv) (when (or (not= (:processed-files oldv)
(:processed-files newv)) (:processed-files newv))
(let [elapsed (tpoint)] (not= (:errors oldv)
(:errors newv)))
(let [completed (:processed-files newv 0)
errors (:errors newv 0)
elapsed (dt/format-duration (tpoint))]
(events/tap :progress-report
{:elapsed elapsed
:completed completed
:errors errors})
(l/dbg :hint "progress" (l/dbg :hint "progress"
:completed (:processed-files newv) :completed completed
:elapsed (dt/format-duration elapsed)))))) :elapsed elapsed)))))
(defn- report-progress-teams (defn- report-progress-teams
[tpoint] [tpoint]
@ -101,13 +110,47 @@
(def ^:private sql:get-teams-by-report (def ^:private sql:get-teams-by-report
"WITH teams AS ( "WITH teams AS (
SELECT t.id t.features, mr.name SELECT t.id t.features, mr.name
FROM migration_report AS mr FROM migration_team_report AS mr
JOIN team AS t ON (t.id = mr.team_id) JOIN team AS t ON (t.id = mr.team_id)
WHERE t.deleted_at IS NULL WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL AND mr.error IS NOT NULL
ORDER BY mr.created_at ORDER BY mr.created_at
) SELECT id, features FROM teams %(pred)s") ) SELECT id, features FROM teams %(pred)s")
(def ^:private sql:get-files-by-created-at
"SELECT id, features
FROM file
WHERE deleted_at IS NULL
ORDER BY created_at DESC")
(def ^:private sql:get-files-by-modified-at
"SELECT id, features
FROM file
WHERE deleted_at IS NULL
ORDER BY modified_at DESC")
(def ^:private sql:get-files-by-graphics
"WITH files AS (
SELECT f.id, f.features,
(SELECT count(*) FROM file_media_object AS fmo
WHERE fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false
AND fmo.file_id = f.id) AS graphics
FROM file AS f
WHERE f.deleted_at IS NULL
ORDER BY 3 ASC
) SELECT * FROM files %(pred)s")
(def ^:private sql:get-files-by-report
"WITH files AS (
SELECT t.id t.features, mr.name
FROM migration_file_report AS mr
JOIN file AS t ON (t.id = mr.file_id)
WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM files %(pred)s")
(defn- read-pred (defn- read-pred
[entries] [entries]
(let [entries (if (and (vector? entries) (let [entries (if (and (vector? entries)
@ -140,7 +183,6 @@
:activity sql:get-teams-by-activity :activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics :graphics sql:get-teams-by-graphics
:report sql:get-teams-by-report) :report sql:get-teams-by-report)
sql (if pred sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)] (let [[pred-sql & pred-params] (read-pred pred)]
(apply vector (apply vector
@ -154,34 +196,78 @@
(contains? features "components/v2"))) (contains? features "components/v2")))
(map :id)))) (map :id))))
(def ^:private sql:report-table (defn- get-files
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_report ( [conn query pred]
(let [query (d/nilv query :created-at)
sql (case query
:created-at sql:get-files-by-created-at
:modified-at sql:get-files-by-modified-at
:graphics sql:get-files-by-graphics
:report sql:get-files-by-report)
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
(str/format sql {:pred pred-sql})
pred-params))
[(str/format sql {:pred ""})])]
(->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2")))
(map :id))))
(def ^:private sql:team-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_team_report (
id bigserial NOT NULL, id bigserial NOT NULL,
label text NOT NULL, label text NOT NULL,
team_id UUID NOT NULL, team_id UUID NOT NULL,
error text NULL, error text NULL,
created_at timestamptz NOT NULL DEFAULT now(), created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL, elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id) PRIMARY KEY (label, created_at, id))")
)")
(defn- create-report-table! (def ^:private sql:file-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_file_report (
id bigserial NOT NULL,
label text NOT NULL,
file_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id))")
(defn- create-report-tables!
[system] [system]
(db/exec-one! system [sql:report-table])) (db/exec-one! system [sql:team-report-table])
(db/exec-one! system [sql:file-report-table]))
(defn- clean-reports! (defn- clean-team-reports!
[system label] [system label]
(db/delete! system :migration-report {:label label})) (db/delete! system :migration-team-report {:label label}))
(defn- report! (defn- team-report!
[system team-id label elapsed error] [system team-id label elapsed error]
(db/insert! system :migration-report (db/insert! system :migration-team-report
{:label label {:label label
:team-id team-id :team-id team-id
:elapsed (inst-ms elapsed) :elapsed (inst-ms elapsed)
:error error} :error error}
{::db/return-keys false})) {::db/return-keys false}))
(defn- clean-file-reports!
[system label]
(db/delete! system :migration-file-report {:label label}))
(defn- file-report!
[system file-id label elapsed error]
(db/insert! system :migration-file-report
{:label label
:file-id file-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
@ -318,12 +404,11 @@
:skip-on-graphic-error? skip-on-graphic-error?))) :skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label) (when (string? label)
(report! main/system team-id label (tpoint) nil)) (team-report! main/system team-id label (tpoint) nil))
(catch Throwable cause (catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)" (l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id) :team-id (str team-id))
:cause cause)
(events/tap :error (events/tap :error
(ex-info "unexpected error on processing team (skiping)" (ex-info "unexpected error on processing team (skiping)"
@ -333,7 +418,7 @@
(swap! stats update :errors (fnil inc 0)) (swap! stats update :errors (fnil inc 0))
(when (string? label) (when (string? label)
(report! main/system team-id label (tpoint) (ex-message cause)))) (team-report! main/system team-id label (tpoint) (ex-message cause))))
(finally (finally
(ps/release! sjobs))))) (ps/release! sjobs)))))
@ -365,8 +450,8 @@
svgo/*semaphore* sprocs] svgo/*semaphore* sprocs]
(try (try
(when (string? label) (when (string? label)
(create-report-table! main/system) (create-report-tables! main/system)
(clean-reports! main/system label)) (clean-team-reports! main/system label))
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
@ -399,6 +484,146 @@
:rollback rollback? :rollback rollback?
:elapsed elapsed))))))) :elapsed elapsed)))))))
(defn migrate-files!
"A REPL helper for migrate all files.
This function starts multiple concurrent file migration processes
until thw maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify
the total number of partitions and the current partition.
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache skip-on-graphic-error?
label partitions current-partition]
:or {validate? false
rollback? true
max-jobs 1
current-partition 1
skip-on-graphic-error? true
max-items Long/MAX_VALUE}}]
(when (int? partitions)
(when-not (int? current-partition)
(throw (IllegalArgumentException. "missing `current-partition` parameter")))
(when-not (<= 0 current-partition partitions)
(throw (IllegalArgumentException. "invalid value on `current-partition` parameter"))))
(let [stats (atom {})
tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)
factory (px/thread-factory :virtual false :prefix "penpot/migration/")
executor (px/cached-executor :factory factory)
max-procs (or max-procs max-jobs)
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
cache (if (int? cache)
(cache/create :executor (::wrk/executor main/system)
:max-items cache)
nil)
migrate-file
(fn [file-id]
(let [tpoint (dt/tpoint)]
(try
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"])
(feat/migrate-file! system file-id
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label)
(file-report! main/system file-id label (tpoint) nil))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id))
(events/tap :error
(ex-info "unexpected error on processing file (skiping)"
{:file-id file-id}
cause))
(swap! stats update :errors (fnil inc 0))
(when (string? label)
(file-report! main/system file-id label (tpoint) (ex-message cause))))
(finally
(ps/release! sjobs)))))
process-file
(fn [file-id]
(ps/acquire! sjobs)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(do
(l/inf :hint "max time constraint reached"
:file-id (str file-id)
:elapsed (dt/format-duration ts))
(ps/release! sjobs)
(reduced nil))
(px/run! executor (partial migrate-file file-id)))))]
(l/dbg :hint "migrate:start"
:label label
:rollback rollback?
:max-jobs max-jobs
:max-items max-items)
(add-watch stats :progress-report (report-progress-files tpoint))
(binding [feat/*stats* stats
feat/*cache* cache
svgo/*semaphore* sprocs]
(try
(when (string? label)
(create-report-tables! main/system)
(clean-file-reports! main/system label))
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(run! process-file
(->> (get-files conn query pred)
(filter (fn [file-id]
(if (int? partitions)
(= current-partition (-> (uuid/hash-int file-id)
(mod partitions)
(inc)))
true)))
(take max-items)))
;; Close and await tasks
(pu/close! executor)))
(-> (deref stats)
(assoc :elapsed (dt/format-duration (tpoint))))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause)
(events/tap :error cause))
(finally
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end"
:rollback rollback?
:elapsed elapsed)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE PROCESS HELPERS ;; FILE PROCESS HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -412,3 +637,44 @@
:file-name (:name file)) :file-name (:name file))
(assoc file :deleted-at (dt/now))) (assoc file :deleted-at (dt/now)))
file)) file))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RESTORE SNAPSHOT
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private sql:snapshots-with-file
"SELECT f.id AS file_id,
fc.id AS id
FROM file AS f
JOIN file_change AS fc ON (fc.file_id = f.id)
WHERE fc.label = ? AND f.id = ANY(?)")
(defn restore-team!
[team-id label & {:keys [rollback?] :or {rollback? true}}]
(let [team-id (if (string? team-id)
(parse-uuid team-id)
team-id)
get-file-snapshots
(fn [conn ids]
(let [label (str "migration/" label)]
(db/exec! conn [sql:snapshots-with-file label
(db/create-array conn "uuid" ids)])))
restore-snapshot
(fn [{:keys [::db/conn] :as system}]
(let [ids (into #{} (feat/get-and-lock-files conn team-id))
snap (get-file-snapshots conn ids)
ids' (into #{} (map :file-id) snap)
team (-> (feat/get-team conn team-id)
(update :features disj "components/v2"))]
(when (not= ids ids')
(throw (RuntimeException. "no uniform snapshot available")))
(feat/update-team! conn team)
(run! (partial rpc/restore-file-snapshot! system) snap)))]
(-> (assoc main/system ::db/rollback rollback?)
(db/tx-run! restore-snapshot))))

View file

@ -78,6 +78,7 @@
[email] [email]
(let [sprops (:app.setup/props main/system) (let [sprops (:app.setup/props main/system)
pool (:app.db/pool main/system) pool (:app.db/pool main/system)
email (profile/clean-email email)
profile (profile/get-profile-by-email pool email)] profile (profile/get-profile-by-email pool email)]
(auth/send-email-verification! pool sprops profile) (auth/send-email-verification! pool sprops profile)
@ -331,11 +332,18 @@
(defn duplicate-team (defn duplicate-team
[team-id & {:keys [name]}] [team-id & {:keys [name]}]
(let [team-id (if (string? team-id) (parse-uuid team-id) team-id) (let [team-id (if (string? team-id) (parse-uuid team-id) team-id)]
name (or name (fn [prev-name]
(str/ffmt "Cloned: % (%)" prev-name (dt/format-instant (dt/now)))))]
(db/tx-run! main/system (db/tx-run! main/system
(fn [cfg] (fn [{:keys [::db/conn] :as cfg}]
(db/exec-one! cfg ["SET CONSTRAINTS ALL DEFERRED"]) (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"])
(-> (assoc cfg ::bfc/timestamp (dt/now)) (let [team (-> (assoc cfg ::bfc/timestamp (dt/now))
(mgmt/duplicate-team :team-id team-id :name name)))))) (mgmt/duplicate-team :team-id team-id :name name))
rels (db/query conn :team-profile-rel {:team-id team-id})]
(doseq [rel rels]
(let [params (-> rel
(assoc :id (uuid/next))
(assoc :team-id (:id team)))]
(db/insert! conn :team-profile-rel params
{::db/return-keys false}))))))))