From a2cc7764fb56c61a0b6346b544c9fb4e4338b073 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 18 Jun 2024 15:50:17 +0200 Subject: [PATCH] :sparkles: Improve how indexes are updated on concurrent sessions --- frontend/src/app/main/data/changes.cljs | 102 ++++++++++++-------- frontend/src/app/main/data/persistence.cljs | 11 +-- 2 files changed, 64 insertions(+), 49 deletions(-) diff --git a/frontend/src/app/main/data/changes.cljs b/frontend/src/app/main/data/changes.cljs index d16cfafc5..5f8229eb1 100644 --- a/frontend/src/app/main/data/changes.cljs +++ b/frontend/src/app/main/data/changes.cljs @@ -29,18 +29,24 @@ (def commit? (ptk/type? ::commit)) -(defn update-indexes +(defn- fix-page-id + "For events that modifies the page, page-id does not comes + as a property so we assign it from the `id` property." + [{:keys [id type page] :as change}] + (cond-> change + (and (page-change? type) + (nil? (:page-id change))) + (assoc :page-id (or id (:id page))))) + +(defn- update-indexes "Given a commit, send the changes to the worker for updating the indexes." - [{:keys [changes] :as commit}] + [commit attr] (ptk/reify ::update-indexes ptk/WatchEvent (watch [_ _ _] - (let [changes (->> changes - (map (fn [{:keys [id type page] :as change}] - (cond-> change - (and (page-change? type) (nil? (:page-id change))) - (assoc :page-id (or id (:id page)))))) + (let [changes (->> (get commit attr) + (map fix-page-id) (filter :page-id) (group-by :page-id))] @@ -58,6 +64,41 @@ (map (d/getf (:index persistence))) (not-empty))) +(def ^:private xf:map-page-id + (map :page-id)) + +(defn- apply-changes-localy + [{:keys [file-id redo-changes] :as commit} pending] + (ptk/reify ::apply-changes-localy + ptk/UpdateEvent + (update [_ state] + (let [current-file-id (get state :current-file-id) + path (if (= file-id current-file-id) + [:workspace-data] + [:workspace-libraries file-id :data]) + + undo-changes (if pending + (->> pending + (map :undo-changes) + (reverse) + (mapcat identity) + (vec)) + nil) + + redo-changes (if pending + (into redo-changes + (mapcat :redo-changes) + pending) + redo-changes)] + + (d/update-in-when state path + (fn [file] + (let [file (cpc/process-changes file undo-changes false) + file (cpc/process-changes file redo-changes false) + pids (into #{} xf:map-page-id redo-changes)] + (reduce #(ctst/update-object-indices %1 %2) file pids)))))))) + + (defn commit "Create a commit event instance" [{:keys [commit-id redo-changes undo-changes origin save-undo? features @@ -70,6 +111,7 @@ (let [commit-id (or commit-id (uuid/next)) source (d/nilv source :local) + local? (= source :local) commit {:id commit-id :created-at (dt/now) :source source @@ -89,38 +131,20 @@ cljs.core/IDeref (-deref [_] commit) - ptk/UpdateEvent - (update [_ state] - (let [current-file-id (get state :current-file-id) - path (if (= file-id current-file-id) - [:workspace-data] - [:workspace-libraries file-id :data]) - - not-local? (not= source :local) - pending (if not-local? - (get-pending-commits state) - nil) - - undo-changes (if pending - (->> pending - (map :undo-changes) - (reverse) - (mapcat identity) - (vec)) - nil) - - redo-changes (if pending - (into redo-changes - (mapcat :redo-changes) - pending) - redo-changes)] - - (d/update-in-when state path - (fn [file] - (let [file (cpc/process-changes file undo-changes false) - file (cpc/process-changes file redo-changes false) - pids (into #{} (map :page-id) redo-changes)] - (reduce #(ctst/update-object-indices %1 %2) file pids))))))))) + ptk/WatchEvent + (watch [_ state _] + (let [pending (when-not local? + (get-pending-commits state))] + (rx/concat + (rx/of (apply-changes-localy commit pending)) + (if pending + (rx/concat + (->> (rx/from (reverse pending)) + (rx/map (fn [commit] (update-indexes commit :undo-changes)))) + (rx/of (update-indexes commit :redo-changes)) + (->> (rx/from pending) + (rx/map (fn [commit] (update-indexes commit :redo-changes))))) + (rx/of (update-indexes commit :redo-changes))))))))) (defn- resolve-file-revn [state file-id] diff --git a/frontend/src/app/main/data/persistence.cljs b/frontend/src/app/main/data/persistence.cljs index 22f1cdb65..2e917e784 100644 --- a/frontend/src/app/main/data/persistence.cljs +++ b/frontend/src/app/main/data/persistence.cljs @@ -211,14 +211,6 @@ (update-status :pending))) (rx/take-until stoper-s)) - (->> local-commits-s - (rx/buffer-time 200) - (rx/mapcat merge-commit) - (rx/map dch/update-indexes) - (rx/take-until stoper-s) - (rx/finalize (fn [] - (log/debug :hint "finalize persistence: changes watcher [index]")))) - ;; Here we watch for local commits, buffer them in a small ;; chunks (very near in time commits) and append them to the ;; persistence queue @@ -237,6 +229,5 @@ (rx/map deref) (rx/filter #(= :remote (:source %))) (rx/mapcat (fn [{:keys [file-id file-revn] :as commit}] - (rx/of (update-file-revn file-id file-revn) - (dch/update-indexes commit)))) + (rx/of (update-file-revn file-id file-revn)))) (rx/take-until stoper-s)))))))