From de72dc57699b1cad321133be83dc9a596ab39722 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 27 Dec 2022 17:03:43 +0100 Subject: [PATCH] :bug: Fix race conditions issues on concurrent edition --- backend/src/app/rpc/commands/files/update.clj | 49 +++++++++---------- .../app/main/data/workspace/persistence.cljs | 17 +++---- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/backend/src/app/rpc/commands/files/update.clj b/backend/src/app/rpc/commands/files/update.clj index 7c788553c..f24dede18 100644 --- a/backend/src/app/rpc/commands/files/update.clj +++ b/backend/src/app/rpc/commands/files/update.clj @@ -168,7 +168,18 @@ (->> changes-with-metadata (mapcat :changes) vec) (vec changes)) - params (assoc params :file file :changes changes)] + params (-> params + (assoc :file file) + (assoc :changes changes) + (assoc ::created-at (dt/now)))] + + (when (> (:revn params) + (:revn file)) + (ex/raise :type :validation + :code :revn-conflict + :hint "The incoming revision number is greater that stored version." + :context {:incoming-revn (:revn params) + :stored-revn (:revn file)})) (mtx/run! metrics {:id :update-file-changes :inc (count changes)}) @@ -180,24 +191,15 @@ (-> (update-fn cfg params) (vary-meta assoc ::audit/replace-props - {:id (:id file) - :name (:name file) - :features (:features file) + {:id (:id file) + :name (:name file) + :features (:features file) :project-id (:project-id file) :team-id (:team-id file)})))))) (defn- update-file* - [{:keys [conn] :as cfg} {:keys [profile-id file changes session-id] :as params}] - (when (> (:revn params) - (:revn file)) - (ex/raise :type :validation - :code :revn-conflict - :hint "The incoming revision number is greater that stored version." - :context {:incoming-revn (:revn params) - :stored-revn (:revn file)})) - - (let [ts (dt/now) - file (-> file + [{:keys [conn] :as cfg} {:keys [profile-id file changes session-id ::created-at] :as params}] + (let [file (-> file (update :revn inc) (update :data (fn [data] (cond-> data @@ -217,7 +219,7 @@ {:id (uuid/next) :session-id session-id :profile-id profile-id - :created-at ts + :created-at created-at :file-id (:id file) :revn (:revn file) :features (db/create-array conn "text" (:features file)) @@ -229,12 +231,12 @@ {:revn (:revn file) :data (:data file) :data-backend nil - :modified-at ts + :modified-at created-at :has-media-trimmed false} {:id (:id file)}) (db/update! conn :project - {:modified-at ts} + {:modified-at created-at} {:id (:project-id file)}) (let [params (assoc params :file file)] @@ -265,13 +267,10 @@ order by s.created_at asc") (defn- get-lagged-changes - [conn params] - (->> (db/exec! conn [sql:lagged-changes (:id params) (:revn params)]) - (into [] (comp (map files/decode-row) - (map (fn [row] - (cond-> row - (= (:revn row) (:revn (:file params))) - (assoc :changes [])))))))) + [conn {:keys [id revn] :as params}] + (->> (db/exec! conn [sql:lagged-changes id revn]) + (map files/decode-row) + (vec))) (defn- send-notifications! [{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}] diff --git a/frontend/src/app/main/data/workspace/persistence.cljs b/frontend/src/app/main/data/workspace/persistence.cljs index a18acb48c..29df1fd43 100644 --- a/frontend/src/app/main/data/workspace/persistence.cljs +++ b/frontend/src/app/main/data/workspace/persistence.cljs @@ -148,12 +148,7 @@ (->> (rp/cmd! :update-file params) (rx/mapcat (fn [lagged] (log/debug :hint "changes persisted" :lagged (count lagged)) - (let [lagged-updates - (cond->> lagged - (= #{sid} (into #{} (map :session-id) lagged)) - (map #(assoc % :changes []))) - - frame-updates + (let [frame-updates (-> (group-by :page-id changes) (update-vals #(into #{} (mapcat :frames) %)))] @@ -162,9 +157,13 @@ (rx/mapcat (fn [[page-id frames]] (->> frames (map #(vector page-id %))))) (rx/map (fn [[page-id frame-id]] (dwt/update-thumbnail (:id file) page-id frame-id)))) - (->> (rx/of lagged-updates) - (rx/mapcat seq) - (rx/map #(shapes-changes-persisted file-id %))))))) + (->> (rx/from lagged) + (rx/merge-map (fn [{:keys [changes] :as entry}] + (rx/merge + (rx/from + (for [[page-id changes] (group-by :page-id changes)] + (dch/update-indices page-id changes))) + (rx/of (shapes-changes-persisted file-id entry)))))))))) (rx/catch (fn [cause] (rx/concat (if (= :authentication (:type cause))