From 97a1bf15ef8ce0e85009d05c3d2e9a1367a92977 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 30 Oct 2024 12:32:03 +0100 Subject: [PATCH] :sparkles: Improve how topic is managed on ws notifications --- backend/src/app/http/websocket.clj | 9 ++------- backend/src/app/msgbus.clj | 6 +++--- backend/src/app/rpc/commands/teams.clj | 3 +-- .../playwright/data/workspace/ws-notifications.js | 2 +- frontend/src/app/main/data/dashboard.cljs | 6 +++--- .../src/app/main/data/workspace/notifications.cljs | 11 ++++++----- 6 files changed, 16 insertions(+), 21 deletions(-) diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index 84b4389d8..31cac2a56 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -112,7 +112,6 @@ fsub (::file-subscription @state) tsub (::team-subscription @state) msg {:type :disconnect - :subs-id profile-id :profile-id profile-id :session-id session-id}] @@ -137,9 +136,7 @@ (l/trace :fn "handle-message" :event "subscribe-team" :team-id team-id :conn-id id) (let [prev-subs (get @state ::team-subscription) channel (sp/chan :buf (sp/dropping-buffer 64) - :xf (comp - (remove #(= (:session-id %) session-id)) - (map #(assoc % :subs-id team-id))))] + :xf (remove #(= (:session-id %) session-id)))] (sp/pipe channel output-ch false) (mbus/sub! msgbus :topic team-id :chan channel) @@ -158,8 +155,7 @@ (l/trace :fn "handle-message" :event "subscribe-file" :file-id file-id :conn-id id) (let [psub (::file-subscription @state) fch (sp/chan :buf (sp/dropping-buffer 64) - :xf (comp (remove #(= (:session-id %) session-id)) - (map #(assoc % :subs-id file-id))))] + :xf (remove #(= (:session-id %) session-id)))] (let [subs {:file-id file-id :channel fch :topic file-id}] (swap! state assoc ::file-subscription subs)) @@ -190,7 +186,6 @@ ;; Notifify the rest of participants of the new connection. (let [message {:type :join-file :file-id file-id - :subs-id file-id :session-id session-id :profile-id profile-id}] (mbus/pub! msgbus :topic file-id :message message)))) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index cdf9af501..4852734c0 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -99,8 +99,9 @@ nil)) (defn pub! - [{::keys [pub-ch]} & {:as params}] - (sp/put! pub-ch params)) + [{::keys [pub-ch]} & {:keys [topic] :as params}] + (let [params (update params :message assoc :topic topic)] + (sp/put! pub-ch params))) (defn purge! [{:keys [::state ::wrk/executor] :as msgbus} chans] @@ -230,7 +231,6 @@ (l/debug :hint "io-loop thread terminated"))))) - (defn- redis-pub! "Publish a message to the redis server. Asynchronous operation, intended to be used in core.async go blocks." diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index ffe7f4282..35fe16ea6 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -653,7 +653,7 @@ (mbus/pub! msgbus :topic member-id :message {:type :team-role-change - :subs-id member-id + :topic member-id :team-id team-id :role role}) @@ -713,7 +713,6 @@ :topic member-id :message {:type :team-membership-change :change :removed - :subs-id member-id :team-id team-id :team-name (:name team)}) diff --git a/frontend/playwright/data/workspace/ws-notifications.js b/frontend/playwright/data/workspace/ws-notifications.js index 4ab58d147..538b69201 100644 --- a/frontend/playwright/data/workspace/ws-notifications.js +++ b/frontend/playwright/data/workspace/ws-notifications.js @@ -3,5 +3,5 @@ export const presenceFixture = { "~:file-id": "~uc7ce0794-0992-8105-8004-38f280443849", "~:session-id": "~u37730924-d520-80f1-8004-4ae6e5c3942d", "~:profile-id": "~uc7ce0794-0992-8105-8004-38e630f29a9b", - "~:subs-id": "~uc7ce0794-0992-8105-8004-38f280443849", + "~:topic": "~uc7ce0794-0992-8105-8004-38f280443849", }; diff --git a/frontend/src/app/main/data/dashboard.cljs b/frontend/src/app/main/data/dashboard.cljs index 6a1ca65fe..5fbb25767 100644 --- a/frontend/src/app/main/data/dashboard.cljs +++ b/frontend/src/app/main/data/dashboard.cljs @@ -80,9 +80,9 @@ (->> stream (rx/filter (ptk/type? ::dws/message)) (rx/map deref) - (rx/filter (fn [{:keys [subs-id] :as msg}] - (or (= subs-id uuid/zero) - (= subs-id profile-id)))) + (rx/filter (fn [{:keys [topic] :as msg}] + (or (= topic uuid/zero) + (= topic profile-id)))) (rx/map process-message)) ;; Once the teams are fecthed, initialize features related diff --git a/frontend/src/app/main/data/workspace/notifications.cljs b/frontend/src/app/main/data/workspace/notifications.cljs index 8bc96033a..fc49d273f 100644 --- a/frontend/src/app/main/data/workspace/notifications.cljs +++ b/frontend/src/app/main/data/workspace/notifications.cljs @@ -65,15 +65,16 @@ (->> (rx/from initmsg) (rx/map dws/send)) + ;; Subscribe to notifications of the subscription (->> stream (rx/filter (ptk/type? ::dws/message)) (rx/map deref) - (rx/filter (fn [{:keys [subs-id] :as msg}] - (or (= subs-id uuid/zero) - (= subs-id profile-id) - (= subs-id team-id) - (= subs-id file-id)))) + (rx/filter (fn [{:keys [topic] :as msg}] + (or (= topic uuid/zero) + (= topic profile-id) + (= topic team-id) + (= topic file-id)))) (rx/map process-message)) ;; On reconnect, send again the subscription messages