0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-04-13 15:31:26 -05:00

Improve how indexes are updated on concurrent sessions

This commit is contained in:
Andrey Antukh 2024-06-18 15:50:17 +02:00 committed by Alonso Torres
parent d977b4e27c
commit a2cc7764fb
2 changed files with 64 additions and 49 deletions

View file

@ -29,18 +29,24 @@
(def commit?
(ptk/type? ::commit))
(defn update-indexes
(defn- fix-page-id
"For events that modifies the page, page-id does not comes
as a property so we assign it from the `id` property."
[{:keys [id type page] :as change}]
(cond-> change
(and (page-change? type)
(nil? (:page-id change)))
(assoc :page-id (or id (:id page)))))
(defn- update-indexes
"Given a commit, send the changes to the worker for updating the
indexes."
[{:keys [changes] :as commit}]
[commit attr]
(ptk/reify ::update-indexes
ptk/WatchEvent
(watch [_ _ _]
(let [changes (->> changes
(map (fn [{:keys [id type page] :as change}]
(cond-> change
(and (page-change? type) (nil? (:page-id change)))
(assoc :page-id (or id (:id page))))))
(let [changes (->> (get commit attr)
(map fix-page-id)
(filter :page-id)
(group-by :page-id))]
@ -58,6 +64,41 @@
(map (d/getf (:index persistence)))
(not-empty)))
(def ^:private xf:map-page-id
(map :page-id))
(defn- apply-changes-localy
[{:keys [file-id redo-changes] :as commit} pending]
(ptk/reify ::apply-changes-localy
ptk/UpdateEvent
(update [_ state]
(let [current-file-id (get state :current-file-id)
path (if (= file-id current-file-id)
[:workspace-data]
[:workspace-libraries file-id :data])
undo-changes (if pending
(->> pending
(map :undo-changes)
(reverse)
(mapcat identity)
(vec))
nil)
redo-changes (if pending
(into redo-changes
(mapcat :redo-changes)
pending)
redo-changes)]
(d/update-in-when state path
(fn [file]
(let [file (cpc/process-changes file undo-changes false)
file (cpc/process-changes file redo-changes false)
pids (into #{} xf:map-page-id redo-changes)]
(reduce #(ctst/update-object-indices %1 %2) file pids))))))))
(defn commit
"Create a commit event instance"
[{:keys [commit-id redo-changes undo-changes origin save-undo? features
@ -70,6 +111,7 @@
(let [commit-id (or commit-id (uuid/next))
source (d/nilv source :local)
local? (= source :local)
commit {:id commit-id
:created-at (dt/now)
:source source
@ -89,38 +131,20 @@
cljs.core/IDeref
(-deref [_] commit)
ptk/UpdateEvent
(update [_ state]
(let [current-file-id (get state :current-file-id)
path (if (= file-id current-file-id)
[:workspace-data]
[:workspace-libraries file-id :data])
not-local? (not= source :local)
pending (if not-local?
(get-pending-commits state)
nil)
undo-changes (if pending
(->> pending
(map :undo-changes)
(reverse)
(mapcat identity)
(vec))
nil)
redo-changes (if pending
(into redo-changes
(mapcat :redo-changes)
pending)
redo-changes)]
(d/update-in-when state path
(fn [file]
(let [file (cpc/process-changes file undo-changes false)
file (cpc/process-changes file redo-changes false)
pids (into #{} (map :page-id) redo-changes)]
(reduce #(ctst/update-object-indices %1 %2) file pids)))))))))
ptk/WatchEvent
(watch [_ state _]
(let [pending (when-not local?
(get-pending-commits state))]
(rx/concat
(rx/of (apply-changes-localy commit pending))
(if pending
(rx/concat
(->> (rx/from (reverse pending))
(rx/map (fn [commit] (update-indexes commit :undo-changes))))
(rx/of (update-indexes commit :redo-changes))
(->> (rx/from pending)
(rx/map (fn [commit] (update-indexes commit :redo-changes)))))
(rx/of (update-indexes commit :redo-changes)))))))))
(defn- resolve-file-revn
[state file-id]

View file

@ -211,14 +211,6 @@
(update-status :pending)))
(rx/take-until stoper-s))
(->> local-commits-s
(rx/buffer-time 200)
(rx/mapcat merge-commit)
(rx/map dch/update-indexes)
(rx/take-until stoper-s)
(rx/finalize (fn []
(log/debug :hint "finalize persistence: changes watcher [index]"))))
;; Here we watch for local commits, buffer them in a small
;; chunks (very near in time commits) and append them to the
;; persistence queue
@ -237,6 +229,5 @@
(rx/map deref)
(rx/filter #(= :remote (:source %)))
(rx/mapcat (fn [{:keys [file-id file-revn] :as commit}]
(rx/of (update-file-revn file-id file-revn)
(dch/update-indexes commit))))
(rx/of (update-file-revn file-id file-revn))))
(rx/take-until stoper-s)))))))