From 049f4ce784d0f5e1b7b1486b0931b304cdcf5ad8 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 21 Apr 2022 18:11:24 +0200 Subject: [PATCH] :recycle: Refactor persistence flow --- frontend/src/app/main/data/workspace.cljs | 2 +- .../app/main/data/workspace/persistence.cljs | 165 +++++++++--------- 2 files changed, 81 insertions(+), 86 deletions(-) diff --git a/frontend/src/app/main/data/workspace.cljs b/frontend/src/app/main/data/workspace.cljs index 3016f2e19..5cee62881 100644 --- a/frontend/src/app/main/data/workspace.cljs +++ b/frontend/src/app/main/data/workspace.cljs @@ -163,7 +163,7 @@ (defn finalize-file [_project-id file-id] - (ptk/reify ::finalize + (ptk/reify ::finalize-file ptk/UpdateEvent (update [_ state] (dissoc state diff --git a/frontend/src/app/main/data/workspace/persistence.cljs b/frontend/src/app/main/data/workspace/persistence.cljs index bea0e622c..c27766a08 100644 --- a/frontend/src/app/main/data/workspace/persistence.cljs +++ b/frontend/src/app/main/data/workspace/persistence.cljs @@ -6,8 +6,8 @@ (ns app.main.data.workspace.persistence (:require + [app.common.logging :as log] [app.common.pages :as cp] - [app.common.pages.helpers :as cph] [app.common.spec :as us] [app.common.spec.change :as spec.change] [app.common.spec.file :as spec.file] @@ -16,19 +16,19 @@ [app.main.data.dashboard :as dd] [app.main.data.fonts :as df] [app.main.data.workspace.changes :as dch] - [app.main.data.workspace.common :as dwc] - [app.main.data.workspace.selection :as dws] [app.main.data.workspace.state-helpers :as wsh] - [app.main.refs :as refs] [app.main.repo :as rp] [app.main.store :as st] [app.util.http :as http] + [app.util.router :as rt] [app.util.time :as dt] [beicon.core :as rx] [cljs.spec.alpha :as s] - [clojure.set :as set] + [okulary.core :as l] [potok.core :as ptk])) +(log/set-level! :info) + (declare persist-changes) (declare persist-synchronous-changes) (declare shapes-changes-persisted) @@ -39,18 +39,17 @@ (defn initialize-file-persistence [file-id] (ptk/reify ::initialize-persistence - ptk/EffectEvent - (effect [_ _ stream] + ptk/WatchEvent + (watch [_ _ stream] + (log/debug :hint "initialize persistence") (let [stoper (rx/filter #(= ::finalize %) stream) - forcer (rx/filter #(= ::force-persist %) stream) - notifier (->> stream - (rx/filter dch/commit-changes?) - (rx/debounce 2000) - (rx/merge stoper forcer)) + commits (l/atom []) + local-file? #(as-> (:file-id %) event-file-id (or (nil? event-file-id) (= event-file-id file-id))) + library-file? #(as-> (:file-id %) event-file-id (and (some? event-file-id) @@ -71,93 +70,89 @@ ;; Disable reload stoper (swap! st/ongoing-tasks disj :workspace-change) (st/emit! (update-persistence-status {:status :saved})))] - (->> (rx/merge - (->> stream - (rx/filter dch/commit-changes?) - (rx/map deref) - (rx/filter local-file?) - (rx/tap on-dirty) - (rx/buffer-until notifier) - (rx/filter (complement empty?)) - (rx/map (fn [buf] - (->> (into [] (comp (map #(assoc % :id (uuid/next))) - (map #(assoc % :file-id file-id))) - buf) - (persist-changes file-id)))) - (rx/tap on-saving) - (rx/take-until (rx/delay 100 stoper))) - (->> stream - (rx/filter dch/commit-changes?) - (rx/map deref) - (rx/filter library-file?) - (rx/filter (complement #(empty? (:changes %)))) - (rx/map persist-synchronous-changes) - (rx/take-until (rx/delay 100 stoper))) - (->> stream - (rx/filter (ptk/type? ::changes-persisted)) - (rx/tap on-saved) - (rx/ignore) - (rx/take-until stoper))) - (rx/subs #(st/emit! %) - (constantly nil) - (fn [] - (on-saved)))))))) + + (rx/merge + (->> stream + (rx/filter dch/commit-changes?) + (rx/map deref) + (rx/filter local-file?) + (rx/tap on-dirty) + (rx/filter (complement empty?)) + (rx/map (fn [commit] + (-> commit + (assoc :id (uuid/next)) + (assoc :file-id file-id)))) + (rx/observe-on :async) + (rx/tap #(swap! commits conj %)) + (rx/take-until (rx/delay 100 stoper)) + (rx/finalize (fn [] + (log/debug :hint "finalize persistence: changes watcher")))) + + (->> (rx/from-atom commits) + (rx/filter (complement empty?)) + (rx/sample-when (rx/merge + (rx/interval 5000) + (rx/filter #(= ::force-persist %) stream) + (->> (rx/from-atom commits) + (rx/filter (complement empty?)) + (rx/debounce 2000)))) + (rx/tap #(reset! commits [])) + (rx/tap on-saving) + (rx/mapcat (fn [changes] + ;; NOTE: this is needed for don't start the + ;; next persistence before this one is + ;; finished. + (rx/merge + (rx/of (persist-changes file-id changes)) + (->> stream + (rx/filter (ptk/type? ::changes-persisted)) + (rx/take 1) + (rx/tap on-saved) + (rx/ignore))))) + (rx/take-until (rx/delay 100 stoper)) + (rx/finalize (fn [] + (log/debug :hint "finalize persistence: save loop")))) + + ;; Synchronous changes + (->> stream + (rx/filter dch/commit-changes?) + (rx/map deref) + (rx/filter library-file?) + (rx/filter (complement #(empty? (:changes %)))) + (rx/map persist-synchronous-changes) + (rx/take-until (rx/delay 100 stoper)) + (rx/finalize (fn [] + (log/debug :hint "finalize persistence: synchronous save loop")))) + ))))) (defn persist-changes [file-id changes] + (log/debug :hint "persist changes" :changes (count changes)) (us/verify ::us/uuid file-id) (ptk/reify ::persist-changes - ptk/UpdateEvent - (update [_ state] - (let [into* (fnil into [])] - (update-in state [:workspace-persistence :queue] into* changes))) - ptk/WatchEvent (watch [_ state _] (let [sid (:session-id state) file (get state :workspace-file) - queue (get-in state [:workspace-persistence :queue] []) - params {:id (:id file) :revn (:revn file) :session-id sid - :changes-with-metadata (into [] queue)} - - ids (into #{} (map :id) queue) - - update-persistence-queue - (fn [state] - (update-in state [:workspace-persistence :queue] - (fn [items] (into [] (remove #(ids (:id %))) items)))) - - handle-response - (fn [lagged] - (let [lagged (cond->> lagged - (= #{sid} (into #{} (map :session-id) lagged)) - (map #(assoc % :changes [])))] - (rx/concat - (rx/of update-persistence-queue) - (->> (rx/of lagged) - (rx/mapcat seq) - (rx/map #(shapes-changes-persisted file-id %)))))) - - on-error - (fn [{:keys [type] :as error}] - (if (or (= :bad-gateway type) - (= :service-unavailable type)) - (rx/of (update-persistence-status {:status :error :reason type})) - (rx/concat - (rx/of update-persistence-queue) - (rx/of (update-persistence-status {:status :error :reason type})) - (rx/of (dws/deselect-all)) - (->> (rx/of nil) - (rx/delay 200) - (rx/mapcat #(rx/throw error))))))] - + :changes-with-metadata (into [] changes)}] (when (= file-id (:id params)) (->> (rp/mutation :update-file params) - (rx/mapcat handle-response) - (rx/catch on-error))))))) + (rx/mapcat (fn [lagged] + (log/debug :hint "changes persisted" :lagged (count lagged)) + (let [lagged (cond->> lagged + (= #{sid} (into #{} (map :session-id) lagged)) + (map #(assoc % :changes [])))] + (->> (rx/of lagged) + (rx/mapcat seq) + (rx/map #(shapes-changes-persisted file-id %)))))) + (rx/catch (fn [cause] + (rx/concat + (rx/of (rt/assign-exception cause)) + (rx/throw cause)))))))))) + (defn persist-synchronous-changes [{:keys [file-id changes]}]