0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-24 15:39:50 -05:00

🐛 Fix race conditions issues on concurrent edition

This commit is contained in:
Andrey Antukh 2022-12-27 17:03:43 +01:00 committed by Alejandro Alonso
parent b827037f90
commit de72dc5769
2 changed files with 32 additions and 34 deletions

View file

@ -168,7 +168,18 @@
(->> changes-with-metadata (mapcat :changes) vec) (->> changes-with-metadata (mapcat :changes) vec)
(vec changes)) (vec changes))
params (assoc params :file file :changes changes)] params (-> params
(assoc :file file)
(assoc :changes changes)
(assoc ::created-at (dt/now)))]
(when (> (:revn params)
(:revn file))
(ex/raise :type :validation
:code :revn-conflict
:hint "The incoming revision number is greater that stored version."
:context {:incoming-revn (:revn params)
:stored-revn (:revn file)}))
(mtx/run! metrics {:id :update-file-changes :inc (count changes)}) (mtx/run! metrics {:id :update-file-changes :inc (count changes)})
@ -180,24 +191,15 @@
(-> (update-fn cfg params) (-> (update-fn cfg params)
(vary-meta assoc ::audit/replace-props (vary-meta assoc ::audit/replace-props
{:id (:id file) {:id (:id file)
:name (:name file) :name (:name file)
:features (:features file) :features (:features file)
:project-id (:project-id file) :project-id (:project-id file)
:team-id (:team-id file)})))))) :team-id (:team-id file)}))))))
(defn- update-file* (defn- update-file*
[{:keys [conn] :as cfg} {:keys [profile-id file changes session-id] :as params}] [{:keys [conn] :as cfg} {:keys [profile-id file changes session-id ::created-at] :as params}]
(when (> (:revn params) (let [file (-> file
(:revn file))
(ex/raise :type :validation
:code :revn-conflict
:hint "The incoming revision number is greater that stored version."
:context {:incoming-revn (:revn params)
:stored-revn (:revn file)}))
(let [ts (dt/now)
file (-> file
(update :revn inc) (update :revn inc)
(update :data (fn [data] (update :data (fn [data]
(cond-> data (cond-> data
@ -217,7 +219,7 @@
{:id (uuid/next) {:id (uuid/next)
:session-id session-id :session-id session-id
:profile-id profile-id :profile-id profile-id
:created-at ts :created-at created-at
:file-id (:id file) :file-id (:id file)
:revn (:revn file) :revn (:revn file)
:features (db/create-array conn "text" (:features file)) :features (db/create-array conn "text" (:features file))
@ -229,12 +231,12 @@
{:revn (:revn file) {:revn (:revn file)
:data (:data file) :data (:data file)
:data-backend nil :data-backend nil
:modified-at ts :modified-at created-at
:has-media-trimmed false} :has-media-trimmed false}
{:id (:id file)}) {:id (:id file)})
(db/update! conn :project (db/update! conn :project
{:modified-at ts} {:modified-at created-at}
{:id (:project-id file)}) {:id (:project-id file)})
(let [params (assoc params :file file)] (let [params (assoc params :file file)]
@ -265,13 +267,10 @@
order by s.created_at asc") order by s.created_at asc")
(defn- get-lagged-changes (defn- get-lagged-changes
[conn params] [conn {:keys [id revn] :as params}]
(->> (db/exec! conn [sql:lagged-changes (:id params) (:revn params)]) (->> (db/exec! conn [sql:lagged-changes id revn])
(into [] (comp (map files/decode-row) (map files/decode-row)
(map (fn [row] (vec)))
(cond-> row
(= (:revn row) (:revn (:file params)))
(assoc :changes []))))))))
(defn- send-notifications! (defn- send-notifications!
[{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}] [{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}]

View file

@ -148,12 +148,7 @@
(->> (rp/cmd! :update-file params) (->> (rp/cmd! :update-file params)
(rx/mapcat (fn [lagged] (rx/mapcat (fn [lagged]
(log/debug :hint "changes persisted" :lagged (count lagged)) (log/debug :hint "changes persisted" :lagged (count lagged))
(let [lagged-updates (let [frame-updates
(cond->> lagged
(= #{sid} (into #{} (map :session-id) lagged))
(map #(assoc % :changes [])))
frame-updates
(-> (group-by :page-id changes) (-> (group-by :page-id changes)
(update-vals #(into #{} (mapcat :frames) %)))] (update-vals #(into #{} (mapcat :frames) %)))]
@ -162,9 +157,13 @@
(rx/mapcat (fn [[page-id frames]] (rx/mapcat (fn [[page-id frames]]
(->> frames (map #(vector page-id %))))) (->> frames (map #(vector page-id %)))))
(rx/map (fn [[page-id frame-id]] (dwt/update-thumbnail (:id file) page-id frame-id)))) (rx/map (fn [[page-id frame-id]] (dwt/update-thumbnail (:id file) page-id frame-id))))
(->> (rx/of lagged-updates) (->> (rx/from lagged)
(rx/mapcat seq) (rx/merge-map (fn [{:keys [changes] :as entry}]
(rx/map #(shapes-changes-persisted file-id %))))))) (rx/merge
(rx/from
(for [[page-id changes] (group-by :page-id changes)]
(dch/update-indices page-id changes)))
(rx/of (shapes-changes-persisted file-id entry))))))))))
(rx/catch (fn [cause] (rx/catch (fn [cause]
(rx/concat (rx/concat
(if (= :authentication (:type cause)) (if (= :authentication (:type cause))