diff --git a/src/uxbox/rstore.cljs b/src/uxbox/rstore.cljs index 3ec2097e7..1ba27e711 100644 --- a/src/uxbox/rstore.cljs +++ b/src/uxbox/rstore.cljs @@ -81,23 +81,25 @@ "Initializes the stream event loop and return a stream with model changes." [state] - (let [update-s (rx/filter update? bus) - watch-s (rx/filter watch? bus) + (let [watch-s (rx/filter watch? bus) effect-s (rx/filter effect? bus) + update-s (rx/filter update? bus) state-s (->> update-s (rx/scan #(-apply-update %2 %1) state) (rx/share))] - ;; Process effects: combine with the latest model to process the new effect - (-> (rx/with-latest-from vector state-s effect-s) - (rx/subscribe (fn [[event model]] (-apply-effect event model)))) - ;; Process event sources: combine with the latest model and the result will be ;; pushed to the event-stream bus - (as-> (rx/with-latest-from vector state-s watch-s) $ + (as-> watch-s $ + (rx/with-latest-from vector state-s $) (rx/flat-map (fn [[event model]] (-apply-watch event model)) $) (rx/on-value $ emit!)) + ;; Process effects: combine with the latest model to process the new effect + (as-> effect-s $ + (rx/with-latest-from vector state-s $) + (rx/subscribe $ (fn [[event model]] (-apply-effect event model)))) + ;; Initialize the stream machinary with initial state. (emit! (swap-state (fn [s] (merge s state)))) state-s))