diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index f39a89891..39f961afb 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -9,28 +9,103 @@ (:require [app.common.exceptions :as ex] [app.common.logging :as l] + [app.common.pprint :as pp] [app.common.spec :as us] [app.db :as db] [app.metrics :as mtx] + [app.util.time :as dt] [app.util.websocket :as ws] [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] [yetti.websocket :as yws])) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; WEBSOCKET HOOKS +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def state (atom {})) + +(defn- on-connect + [{:keys [metrics]} wsp] + (let [created-at (dt/now)] + (swap! state assoc (::ws/id @wsp) wsp) + (mtx/run! metrics {:id :websocket-active-connections :inc 1}) + (fn [] + (swap! state dissoc (::ws/id @wsp)) + (mtx/run! metrics {:id :websocket-active-connections :dec 1}) + (mtx/run! metrics {:id :websocket-session-timing + :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)})))) + +(defn- on-rcv-message + [{:keys [metrics]} _ message] + (mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1}) + message) + +(defn- on-snd-message + [{:keys [metrics]} _ message] + (mtx/run! metrics {:id :websocket-messages-total :labels ["send"] :inc 1}) + message) + +;; REPL HELPERS + +(defn repl-get-connections-for-file + [file-id] + (->> (vals @state) + (filter #(= file-id (-> % deref ::file-subscription :file-id))) + (map deref) + (map ::ws/id))) + +(defn repl-get-connections-for-team + [team-id] + (->> (vals @state) + (filter #(= team-id (-> % deref ::team-subscription :team-id))) + (map deref) + (map ::ws/id))) + +(defn repl-close-connection + [id] + (when-let [wsp (get @state id)] + (a/>!! (::ws/close-ch @wsp) [8899 "closed from server"]) + (a/close! (::ws/close-ch @wsp)))) + +(defn repl-get-connection-info + [id] + (when-let [wsp (get @state id)] + {:id id + :created-at (dt/instant id) + :profile-id (::profile-id @wsp) + :session-id (::session-id @wsp) + :user-agent (::ws/user-agent @wsp) + :ip-addr (::ws/remote-addr @wsp) + :last-activity-at (::ws/last-activity-at @wsp) + :http-session-id (::ws/http-session-id @wsp) + :subscribed-file (-> wsp deref ::file-subscription :file-id) + :subscribed-team (-> wsp deref ::team-subscription :team-id)})) + +(defn repl-print-connection-info + [id] + (some-> id repl-get-connection-info pp/pprint)) + +(defn repl-print-connection-info-for-file + [file-id] + (some->> (repl-get-connections-for-file file-id) + (map repl-get-connection-info) + (pp/pprint))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; WEBSOCKET HANDLER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defmulti handle-message - (fn [_ message] + (fn [_ _ message] (:type message))) (defmethod handle-message :connect - [wsp _] - (l/trace :fn "handle-message" :event :connect) + [cfg wsp _] - (let [msgbus-fn (:msgbus @wsp) + (let [msgbus-fn (:msgbus cfg) + conn-id (::ws/id @wsp) profile-id (::profile-id @wsp) session-id (::session-id @wsp) output-ch (::ws/output-ch @wsp) @@ -38,94 +113,122 @@ xform (remove #(= (:session-id %) session-id)) channel (a/chan (a/dropping-buffer 16) xform)] - (swap! wsp assoc ::profile-subs-channel channel) + (l/trace :fn "handle-message" :event :connect :conn-id conn-id) + + ;; Subscribe to the profile channel and forward all messages to + ;; websocket output channel (send them to the client). + (swap! wsp assoc ::profile-subscription channel) (a/pipe channel output-ch false) (msgbus-fn :cmd :sub :topic profile-id :chan channel))) (defmethod handle-message :disconnect - [wsp _] - (l/trace :fn "handle-message" :event :disconnect) - (a/go - (let [msgbus-fn (:msgbus @wsp) - profile-id (::profile-id @wsp) - session-id (::session-id @wsp) - profile-ch (::profile-subs-channel @wsp) - subs (::subscriptions @wsp)] + [cfg wsp _] + (let [msgbus-fn (:msgbus cfg) + conn-id (::ws/id @wsp) + profile-id (::profile-id @wsp) + session-id (::session-id @wsp) + profile-ch (::profile-subscription @wsp) + fsub (::file-subscription @wsp) + tsub (::team-subscription @wsp) + message {:type :disconnect + :subs-id profile-id + :profile-id profile-id + :session-id session-id}] + + (l/trace :fn "handle-message" + :event :disconnect + :conn-id conn-id) + + (a/go ;; Close the main profile subscription (a/close! profile-ch) (a/! output-ch message) - (recur))) + (l/trace :fn "handle-message" + :event :subscribe-team + :team-id team-id + :conn-id conn-id) + + (a/pipe channel output-ch false) + + (let [state {:team-id team-id :channel channel :topic team-id}] + (swap! wsp assoc ::team-subscription state)) + + (a/go + ;; Close previous subscription if exists + (when-let [channel (:channel prev-subs)] + (a/close! channel) + (a/! output-ch message) + (recur)))) (a/go ;; Subscribe to file topic @@ -134,6 +237,7 @@ ;; Notifify the rest of participants of the new connection. (let [message {:type :join-file :file-id file-id + :subs-id file-id :session-id session-id :profile-id profile-id}] (a/ message - (dissoc :subs-id) - (assoc :profile-id profile-id) - (assoc :session-id session-id))] - + [cfg wsp {:keys [file-id] :as message}] + (let [msgbus-fn (:msgbus cfg) + profile-id (::profile-id @wsp) + session-id (::session-id @wsp) + subs (::file-subscription @wsp) + message (-> message + (assoc :subs-id file-id) + (assoc :profile-id profile-id) + (assoc :session-id session-id))] + (a/go + ;; Only allow receive pointer updates when active subscription + (when subs (a/ cfg - (assoc ::profile-id profile-id) - (assoc ::session-id session-id))] - - (l/trace :hint "http request to websocket" :profile-id profile-id :session-id session-id) + (let [{:keys [session-id]} (us/conform ::handler-params params)] (cond (not profile-id) (raise (ex/error :type :authentication @@ -218,6 +327,15 @@ :hint "this endpoint only accepts websocket connections")) :else - (->> (ws/handler handle-message cfg) - (yws/upgrade req) - (respond)))))) + (do + (l/trace :hint "websocket request" :profile-id profile-id :session-id session-id) + + (->> (ws/handler + ::ws/on-rcv-message (partial on-rcv-message cfg) + ::ws/on-snd-message (partial on-snd-message cfg) + ::ws/on-connect (partial on-connect cfg) + ::ws/handler (partial handle-message cfg) + ::profile-id profile-id + ::session-id session-id) + (yws/upgrade req) + (respond))))))) diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index 89ffb1611..dc1289a09 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -32,7 +32,7 @@ [request] (or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) (yrq/get-header request "x-real-ip") - (yrq/remote-addr request))) + (some-> (yrq/remote-addr request) str))) (defn extract-utm-params "Extracts additional data from params and namespace them under diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 4bae83abd..e14bf9e12 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -160,7 +160,6 @@ "Function responsible to attach local subscription to the state. Intended to be used in agent." [state cfg topics chan done-ch] - (l/trace :hint "subscribe-to-topics" :topics topics ::l/async false) (aa/with-closing done-ch (let [state (update state :chans assoc chan topics)] (reduce (fn [state topic] @@ -184,15 +183,15 @@ useful when client disconnects or in-bulk unsubscribe operations. Intended to be executed in agent." [state cfg channels done-ch] - (l/trace :hint "unsubscribe-channels" :chans (count channels) ::l/async false) (aa/with-closing done-ch (reduce #(unsubscribe-single-channel %1 cfg %2) state channels))) + (defn- subscribe [{:keys [::state executor] :as cfg} {:keys [topic topics chan]}] (let [done-ch (a/chan) topics (into [] (map prefix-topic) (if topic [topic] topics))] - (l/trace :hint "subscribe" :topics topics) + (l/debug :hint "subscribe" :topics topics) (send-via executor state subscribe-to-topics cfg topics chan done-ch) done-ch)) diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index e4f8a12fe..4909049fe 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -10,9 +10,10 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.transit :as t] - [app.metrics :as mtx] + [app.loggers.audit :refer [parse-client-ip]] [app.util.time :as dt] [clojure.core.async :as a] + [yetti.request :as yr] [yetti.util :as yu] [yetti.websocket :as yws]) (:import @@ -25,8 +26,10 @@ (declare process-output) (declare ws-ping!) (declare ws-send!) +(declare filter-options) (def noop (constantly nil)) +(def identity-3 (fn [_ _ o] o)) (defn handler "A WebSocket upgrade handler factory. Returns a handler that can be @@ -39,94 +42,123 @@ It also accepts some options that allows you parametrize the protocol behavior. The options map will be used as-as for the initial data of the `ws` data structure" - ([handle-message] (handler handle-message {})) - ([handle-message {:keys [::input-buff-size - ::output-buff-size - ::idle-timeout - metrics] - :or {input-buff-size 64 - output-buff-size 64 - idle-timeout 30000} - :as options}] - (fn [{:keys [::yws/channel] :as request}] - (let [input-ch (a/chan input-buff-size) - output-ch (a/chan output-buff-size) - pong-ch (a/chan (a/sliding-buffer 6)) - close-ch (a/chan) + [& {:keys [::on-rcv-message + ::on-snd-message + ::on-connect + ::input-buff-size + ::output-buff-size + ::handler + ::idle-timeout] + :or {input-buff-size 64 + output-buff-size 64 + idle-timeout 30000 + on-connect noop + on-snd-message identity-3 + on-rcv-message identity-3} + :as options}] - options (atom - (-> options - (assoc ::input-ch input-ch) - (assoc ::output-ch output-ch) - (assoc ::close-ch close-ch) - (assoc ::channel channel) - (dissoc ::metrics))) + (assert (fn? on-rcv-message) "'on-rcv-message' should be a function") + (assert (fn? on-snd-message) "'on-snd-message' should be a function") + (assert (fn? on-connect) "'on-connect' should be a function") - terminated (atom false) - created-at (dt/now) + (fn [{:keys [::yws/channel session-id] :as request}] + (let [input-ch (a/chan input-buff-size) + output-ch (a/chan output-buff-size) + pong-ch (a/chan (a/sliding-buffer 6)) + close-ch (a/chan) + stop-ch (a/chan) - on-open - (fn [channel] - (mtx/run! metrics {:id :websocket-active-connections :inc 1}) - (yws/idle-timeout! channel (dt/duration idle-timeout))) + ip-addr (parse-client-ip request) + uagent (yr/get-header request "user-agent") + id (inst-ms (dt/now)) - on-terminate - (fn [& _args] - (when (compare-and-set! terminated false true) - (mtx/run! metrics {:id :websocket-active-connections :dec 1}) - (mtx/run! metrics {:id :websocket-session-timing :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}) + options (-> (filter-options options) + (merge {::id id + ::input-ch input-ch + ::output-ch output-ch + ::close-ch close-ch + ::stop-ch stop-ch + ::channel channel + ::remote-addr ip-addr + ::http-session-id session-id + ::user-agent uagent}) + (atom)) - (a/close! close-ch) - (a/close! pong-ch) - (a/close! output-ch) - (a/close! input-ch))) + ;; call the on-connect hook and memoize the on-terminate instance + on-terminate (on-connect options) - on-error - (fn [_ error] - (on-terminate) - ;; TODO: properly log timeout exceptions - (when-not (or (instance? java.nio.channels.ClosedChannelException error) - (instance? java.net.SocketException error)) - (l/error :hint (ex-message error) :cause error))) + on-ws-open + (fn [channel] + (l/trace :fn "on-ws-open" :conn-id id) + (yws/idle-timeout! channel (dt/duration idle-timeout))) - on-message - (fn [_ message] - (mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1}) - (try - (let [message (t/decode-str message)] - (a/offer! input-ch message)) - (catch Throwable e - (l/warn :hint "error on decoding incoming message from websocket" - :wsmsg (pr-str message) - :cause e) - (on-terminate)))) + on-ws-terminate + (fn [_ code reason] + (l/trace :fn "on-ws-terminate" :conn-id id :code code :reason reason) + (a/close! close-ch)) - on-pong - (fn [_ buffers] - (a/>!! pong-ch (yu/copy-many buffers)))] + on-ws-error + (fn [_ error] + (a/close! close-ch) + (when-not (or (instance? java.nio.channels.ClosedChannelException error) + (instance? java.net.SocketException error)) + (l/error :hint (ex-message error) :cause error))) - ;; launch heartbeat process - (-> @options - (assoc ::pong-ch pong-ch) - (assoc ::on-close on-terminate) - (process-heartbeat)) + on-ws-message + (fn [_ message] + (try + (let [message (on-rcv-message options message) + message (t/decode-str message)] + (a/offer! input-ch message) + (swap! options assoc ::last-activity-at (dt/now))) + (catch Throwable e + (l/warn :hint "error on decoding incoming message from websocket" + :wsmsg (pr-str message) + :cause e) + (a/>! close-ch [8801 "decode error"]) + (a/close! close-ch)))) - ;; Forward all messages from output-ch to the websocket - ;; connection - (a/go-loop [] - (when-let [val (a/!! pong-ch (yu/copy-many buffers)))] - ;; React on messages received from the client - (process-input options handle-message) + ;; Launch heartbeat process + (-> @options + (assoc ::pong-ch pong-ch) + (process-heartbeat)) - {:on-open on-open - :on-error on-error - :on-close on-terminate - :on-text on-message - :on-pong on-pong})))) + ;; Wait a close signal + (a/go + (let [[code reason] (a/! output-ch {:type :error :error (ex-data val)}) @@ -193,19 +225,21 @@ (a/= (count issued) max-missed-heartbeats) - (on-close channel -1 "heartbeat-timeout") + (do + (a/>! close-ch [8802 "heart-beat timeout"]) + (a/close! close-ch)) (recur (inc i))))))) (a/go-loop [] @@ -213,3 +247,11 @@ (swap! beats disj (decode-beat buffer)) (recur))))) +(defn- filter-options + "Remove from options all namespace qualified keys that matches the + current namespace." + [options] + (into {} + (remove (fn [[key]] + (= (namespace key) "app.util.websocket"))) + options)) diff --git a/common/deps.edn b/common/deps.edn index 370e6d2d9..2f5e6bdf7 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -42,7 +42,7 @@ {:extra-deps {org.clojure/tools.namespace {:mvn/version "RELEASE"} org.clojure/test.check {:mvn/version "RELEASE"} - thheller/shadow-cljs {:mvn/version "2.19.3"} + thheller/shadow-cljs {:mvn/version "2.19.5"} com.bhauman/rebel-readline {:mvn/version "RELEASE"} criterium/criterium {:mvn/version "RELEASE"} mockery/mockery {:mvn/version "RELEASE"}} diff --git a/common/package.json b/common/package.json index d8e1a16c0..9e714e3e3 100644 --- a/common/package.json +++ b/common/package.json @@ -13,7 +13,7 @@ "test": "yarn run compile-test && yarn run run-test" }, "devDependencies": { - "shadow-cljs": "2.19.3", + "shadow-cljs": "2.19.5", "source-map-support": "^0.5.19", "ws": "^7.4.6" } diff --git a/docker/devenv/Dockerfile b/docker/devenv/Dockerfile index 4523423ed..7299b9904 100644 --- a/docker/devenv/Dockerfile +++ b/docker/devenv/Dockerfile @@ -1,11 +1,11 @@ -FROM ubuntu:20.04 +FROM ubuntu:22.04 LABEL maintainer="Andrey Antukh " ARG DEBIAN_FRONTEND=noninteractive ENV NODE_VERSION=v16.15.1 \ - CLOJURE_VERSION=1.11.1.1124 \ - CLJKONDO_VERSION=2022.05.31 \ + CLOJURE_VERSION=1.11.1.1149 \ + CLJKONDO_VERSION=2022.06.22 \ BABASHKA_VERSION=0.8.156 \ LANG=en_US.UTF-8 \ LC_ALL=en_US.UTF-8 @@ -44,7 +44,6 @@ RUN set -ex; \ RUN set -ex; \ apt-get -qq update; \ apt-get -qqy install --no-install-recommends \ - python \ build-essential \ imagemagick \ ghostscript \ @@ -104,7 +103,7 @@ RUN set -ex; \ rm -rf /var/lib/apt/lists/*; RUN set -ex; \ - curl -LfsSo /tmp/openjdk.tar.gz https://github.com/adoptium/temurin18-binaries/releases/download/jdk-18%2B36/OpenJDK18U-jdk_x64_linux_hotspot_18_36.tar.gz; \ + curl -LfsSo /tmp/openjdk.tar.gz https://github.com/adoptium/temurin18-binaries/releases/download/jdk-18.0.1%2B10/OpenJDK18U-jdk_x64_linux_hotspot_18.0.1_10.tar.gz; \ mkdir -p /usr/lib/jvm/openjdk; \ cd /usr/lib/jvm/openjdk; \ tar -xf /tmp/openjdk.tar.gz --strip-components=1; \ @@ -120,7 +119,7 @@ RUN set -ex; \ RUN set -ex; \ curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -; \ - echo "deb http://apt.postgresql.org/pub/repos/apt focal-pgdg main" >> /etc/apt/sources.list.d/postgresql.list; \ + echo "deb http://apt.postgresql.org/pub/repos/apt jammy-pgdg main" >> /etc/apt/sources.list.d/postgresql.list; \ apt-get -qq update; \ apt-get -qqy install postgresql-client-13; \ rm -rf /var/lib/apt/lists/*; @@ -132,8 +131,8 @@ RUN set -ex; \ tar -xf /tmp/nodejs.tar.xz --strip-components=1; \ chown -R root /usr/local/nodejs; \ PATH="$PATH:/usr/local/nodejs/bin"; \ - /usr/local/nodejs/bin/npm install -g yarn; \ - /usr/local/nodejs/bin/npm install -g svgo; \ + /usr/local/nodejs/bin/npm install --location=global yarn; \ + /usr/local/nodejs/bin/npm install --location=global svgo; \ rm -rf /tmp/nodejs.tar.xz; # Install clj-kondo @@ -143,7 +142,6 @@ RUN set -ex; \ unzip /tmp/clj-kondo.zip; \ rm /tmp/clj-kondo.zip; -# Install babashka RUN set -ex; \ cd /tmp; \ curl -LfsSo /tmp/babashka.tar.gz https://github.com/babashka/babashka/releases/download/v$BABASHKA_VERSION/babashka-$BABASHKA_VERSION-linux-amd64.tar.gz; \ @@ -151,8 +149,10 @@ RUN set -ex; \ tar -xf /tmp/babashka.tar.gz; \ rm -rf /tmp/babashka.tar.gz; + +# Install minio client RUN set -ex; \ - curl -LfsSo /tmp/mc https://dl.min.io/client/mc/release/linux-amd64/mc --user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"; \ + wget -O /tmp/mc https://dl.min.io/client/mc/release/linux-amd64/mc; \ mv /tmp/mc /usr/local/bin/; \ chmod +x /usr/local/bin/mc; diff --git a/frontend/deps.edn b/frontend/deps.edn index c94b5246b..15fc6a3c8 100644 --- a/frontend/deps.edn +++ b/frontend/deps.edn @@ -32,7 +32,7 @@ :dev {:extra-paths ["dev"] :extra-deps - {thheller/shadow-cljs {:mvn/version "2.19.3"} + {thheller/shadow-cljs {:mvn/version "2.19.5"} org.clojure/tools.namespace {:mvn/version "RELEASE"} cider/cider-nrepl {:mvn/version "0.28.4"}}} diff --git a/frontend/package.json b/frontend/package.json index d1e140ca7..f3e80cf20 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -48,7 +48,7 @@ "prettier": "^2.6.1", "rimraf": "^3.0.0", "sass": "^1.49.9", - "shadow-cljs": "2.19.3" + "shadow-cljs": "2.19.5" }, "dependencies": { "@sentry/browser": "^6.17.4", diff --git a/frontend/src/app/main/data/users.cljs b/frontend/src/app/main/data/users.cljs index 7ea945437..eee3b49cc 100644 --- a/frontend/src/app/main/data/users.cljs +++ b/frontend/src/app/main/data/users.cljs @@ -173,8 +173,7 @@ (when (is-authenticated? profile) (->> (rx/of (profile-fetched profile) (fetch-teams) - (get-redirect-event) - (ws/initialize)) + (get-redirect-event)) (rx/observe-on :async))))))) (s/def ::invitation-token ::us/not-empty-string) diff --git a/frontend/src/app/main/data/websocket.cljs b/frontend/src/app/main/data/websocket.cljs index 1fbb26770..3612770d4 100644 --- a/frontend/src/app/main/data/websocket.cljs +++ b/frontend/src/app/main/data/websocket.cljs @@ -7,14 +7,19 @@ (ns app.main.data.websocket (:require [app.common.data.macros :as dm] + [app.common.logging :as l] [app.common.uri :as u] [app.config :as cf] [app.util.websocket :as ws] [beicon.core :as rx] [potok.core :as ptk])) +(l/set-level! :error) + (dm/export ws/send!) +(defonce ws-conn (volatile! nil)) + (defn- prepare-uri [params] (let [base (-> (u/join cf/public-uri "ws/notifications") @@ -30,35 +35,34 @@ [message] (ptk/reify ::send-message ptk/EffectEvent - (effect [_ state _] - (let [ws-conn (:ws-conn state)] - (ws/send! ws-conn message))))) + (effect [_ _ _] + (some-> @ws-conn (ws/send! message))))) (defn initialize [] (ptk/reify ::initialize - ptk/UpdateEvent - (update [_ state] - (let [sid (:session-id state) - uri (prepare-uri {:session-id sid})] - (assoc state :ws-conn (ws/create uri)))) - ptk/WatchEvent (watch [_ state stream] - (let [ws-conn (:ws-conn state) - stoper (rx/merge - (rx/filter (ptk/type? ::finalize) stream) - (rx/filter (ptk/type? ::initialize) stream))] + (l/trace :hint "event:initialize" :fn "watch") + (let [sid (:session-id state) + uri (prepare-uri {:session-id sid}) + ws (ws/create uri)] - (->> (rx/merge - (->> (ws/get-rcv-stream ws-conn) - (rx/filter ws/message-event?) - (rx/map :payload) - (rx/map #(ptk/data-event ::message %))) - (->> (ws/get-rcv-stream ws-conn) - (rx/filter ws/opened-event?) - (rx/map (fn [_] (ptk/data-event ::opened {}))))) - (rx/take-until stoper)))))) + (vreset! ws-conn ws) + + (let [stoper (rx/merge + (rx/filter (ptk/type? ::finalize) stream) + (rx/filter (ptk/type? ::initialize) stream))] + + (->> (rx/merge + (->> (ws/get-rcv-stream ws) + (rx/filter ws/message-event?) + (rx/map :payload) + (rx/map #(ptk/data-event ::message %))) + (->> (ws/get-rcv-stream ws) + (rx/filter ws/opened-event?) + (rx/map (fn [_] (ptk/data-event ::opened {}))))) + (rx/take-until stoper))))))) ;; --- Finalize Websocket @@ -66,5 +70,6 @@ [] (ptk/reify ::finalize ptk/EffectEvent - (effect [_ state _] - (some-> (:ws-conn state) ws/close!)))) + (effect [_ _ _] + (l/trace :hint "event:finalize" :fn "effect") + (some-> @ws-conn ws/close!)))) diff --git a/frontend/src/app/main/data/workspace/notifications.cljs b/frontend/src/app/main/data/workspace/notifications.cljs index 6d1527b7c..a9388898c 100644 --- a/frontend/src/app/main/data/workspace/notifications.cljs +++ b/frontend/src/app/main/data/workspace/notifications.cljs @@ -9,7 +9,6 @@ [app.common.data :as d] [app.common.pages.changes-spec :as pcs] [app.common.spec :as us] - [app.common.uuid :as uuid] [app.main.data.websocket :as dws] [app.main.data.workspace.changes :as dch] [app.main.data.workspace.libraries :as dwl] @@ -34,51 +33,53 @@ (ptk/reify ::initialize ptk/WatchEvent (watch [_ state stream] - (let [subs-id (uuid/next) - stoper (rx/filter (ptk/type? ::finalize) stream) + (let [stoper (rx/filter (ptk/type? ::finalize) stream) + profile-id (:profile-id state) - initmsg [{:type :subscribe-file - :subs-id subs-id - :file-id file-id} - {:type :subscribe-team - :team-id team-id}] + initmsg [{:type :subscribe-file + :file-id file-id} + {:type :subscribe-team + :team-id team-id}] - endmsg {:type :unsubscribe-file - :subs-id subs-id} + endmsg {:type :unsubscribe-file + :file-id file-id} - stream (->> (rx/merge - ;; Send the subscription message - (->> (rx/from initmsg) - (rx/map dws/send)) + stream (->> (rx/merge + ;; Send the subscription message + (->> (rx/from initmsg) + (rx/map dws/send)) - ;; Subscribe to notifications of the subscription - (->> stream - (rx/filter (ptk/type? ::dws/message)) - (rx/map deref) ;; :library-change events occur in a different file, but need to be processed anyway - (rx/filter #(or (= subs-id (:subs-id %)) (= (:type %) :library-change))) - (rx/map process-message)) + ;; Subscribe to notifications of the subscription + (->> stream + (rx/filter (ptk/type? ::dws/message)) + (rx/map deref) + (rx/filter (fn [{:keys [subs-id] :as msg}] + (or (= subs-id team-id) + (= subs-id profile-id) + (= subs-id file-id)))) + (rx/map process-message)) - ;; On reconnect, send again the subscription messages - (->> stream - (rx/filter (ptk/type? ::dws/opened)) - (rx/mapcat #(->> (rx/from initmsg) - (rx/map dws/send)))) + ;; On reconnect, send again the subscription messages + (->> stream + (rx/filter (ptk/type? ::dws/opened)) + (rx/mapcat #(->> (rx/from initmsg) + (rx/map dws/send)))) - ;; Emit presence event for current user; - ;; this is because websocket server don't - ;; emits this for the same user. - (rx/of (handle-presence {:type :connect - :session-id (:session-id state) - :profile-id (:profile-id state)})) + ;; Emit presence event for current user; + ;; this is because websocket server don't + ;; emits this for the same user. + (rx/of (handle-presence {:type :connect + :session-id (:session-id state) + :profile-id (:profile-id state)})) - ;; Emit to all other connected users the current pointer - ;; position changes. - (->> stream - (rx/filter ms/pointer-event?) - (rx/sample 50) - (rx/map #(handle-pointer-send subs-id file-id (:pt %))))) + ;; Emit to all other connected users the current pointer + ;; position changes. + (->> stream + (rx/filter ms/pointer-event?) + (rx/sample 50) + (rx/map #(handle-pointer-send file-id (:pt %))))) - (rx/take-until stoper))] + (rx/take-until stoper))] (rx/concat stream (rx/of (dws/send endmsg))))))) @@ -95,13 +96,12 @@ nil)) (defn- handle-pointer-send - [subs-id file-id point] + [file-id point] (ptk/reify ::handle-pointer-send ptk/WatchEvent (watch [_ state _] (let [page-id (:current-page-id state) message {:type :pointer-update - :subs-id subs-id :file-id file-id :page-id page-id :position point}] diff --git a/frontend/src/app/main/data/workspace/persistence.cljs b/frontend/src/app/main/data/workspace/persistence.cljs index 7b81dc8cd..445f5001c 100644 --- a/frontend/src/app/main/data/workspace/persistence.cljs +++ b/frontend/src/app/main/data/workspace/persistence.cljs @@ -163,7 +163,9 @@ (rx/map #(shapes-changes-persisted file-id %))))))) (rx/catch (fn [cause] (rx/concat - (rx/of (rt/assign-exception cause)) + (if (= :authentication (:type cause)) + (rx/empty) + (rx/of (rt/assign-exception cause))) (rx/throw cause)))))))))) diff --git a/frontend/src/app/main/errors.cljs b/frontend/src/app/main/errors.cljs index b55d1561f..7a8edbbbe 100644 --- a/frontend/src/app/main/errors.cljs +++ b/frontend/src/app/main/errors.cljs @@ -106,7 +106,6 @@ (js/console.groupEnd msg))) - ;; Error on parsing an SVG ;; TODO: looks unused and deprecated (defmethod ptk/handle-error :svg-parser diff --git a/frontend/yarn.lock b/frontend/yarn.lock index f3c3236e2..1364fe0ff 100644 --- a/frontend/yarn.lock +++ b/frontend/yarn.lock @@ -5013,10 +5013,10 @@ shadow-cljs-jar@1.3.2: resolved "https://registry.yarnpkg.com/shadow-cljs-jar/-/shadow-cljs-jar-1.3.2.tgz#97273afe1747b6a2311917c1c88d9e243c81957b" integrity sha512-XmeffAZHv8z7451kzeq9oKh8fh278Ak+UIOGGrapyqrFBB773xN8vMQ3O7J7TYLnb9BUwcqadKkmgaq7q6fhZg== -shadow-cljs@2.19.3: - version "2.19.3" - resolved "https://registry.yarnpkg.com/shadow-cljs/-/shadow-cljs-2.19.3.tgz#115a33917f8bca1495e0f815dca7ec3957f669af" - integrity sha512-9TsTCRlmR8m1g2ekwblgomRUgJpbifQI99VlRrlH9NMqEzklev3zYAD1dvy4d5h8BoAhgdxOOEg7ld2d45CWTA== +shadow-cljs@2.19.5: + version "2.19.5" + resolved "https://registry.yarnpkg.com/shadow-cljs/-/shadow-cljs-2.19.5.tgz#e51c758d2f942db18e6e4015bcacf1857ad1e751" + integrity sha512-uZelOtmTYg4MOZP1ehJilhQcGDxkdybPKkGZ11qxp8awmfgAQMe+W/QEyZw4aVwFxVXyHIIerzCGkCqAgo/FuA== dependencies: node-libs-browser "^2.2.1" readline-sync "^1.4.7"