0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-04-13 15:31:26 -05:00

Improve how topic is managed on ws notifications

This commit is contained in:
Andrey Antukh 2024-10-30 12:32:03 +01:00 committed by Alonso Torres
parent 9409078069
commit 97a1bf15ef
6 changed files with 16 additions and 21 deletions

View file

@ -112,7 +112,6 @@
fsub (::file-subscription @state)
tsub (::team-subscription @state)
msg {:type :disconnect
:subs-id profile-id
:profile-id profile-id
:session-id session-id}]
@ -137,9 +136,7 @@
(l/trace :fn "handle-message" :event "subscribe-team" :team-id team-id :conn-id id)
(let [prev-subs (get @state ::team-subscription)
channel (sp/chan :buf (sp/dropping-buffer 64)
:xf (comp
(remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id team-id))))]
:xf (remove #(= (:session-id %) session-id)))]
(sp/pipe channel output-ch false)
(mbus/sub! msgbus :topic team-id :chan channel)
@ -158,8 +155,7 @@
(l/trace :fn "handle-message" :event "subscribe-file" :file-id file-id :conn-id id)
(let [psub (::file-subscription @state)
fch (sp/chan :buf (sp/dropping-buffer 64)
:xf (comp (remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id file-id))))]
:xf (remove #(= (:session-id %) session-id)))]
(let [subs {:file-id file-id :channel fch :topic file-id}]
(swap! state assoc ::file-subscription subs))
@ -190,7 +186,6 @@
;; Notifify the rest of participants of the new connection.
(let [message {:type :join-file
:file-id file-id
:subs-id file-id
:session-id session-id
:profile-id profile-id}]
(mbus/pub! msgbus :topic file-id :message message))))

View file

@ -99,8 +99,9 @@
nil))
(defn pub!
[{::keys [pub-ch]} & {:as params}]
(sp/put! pub-ch params))
[{::keys [pub-ch]} & {:keys [topic] :as params}]
(let [params (update params :message assoc :topic topic)]
(sp/put! pub-ch params)))
(defn purge!
[{:keys [::state ::wrk/executor] :as msgbus} chans]
@ -230,7 +231,6 @@
(l/debug :hint "io-loop thread terminated")))))
(defn- redis-pub!
"Publish a message to the redis server. Asynchronous operation,
intended to be used in core.async go blocks."

View file

@ -653,7 +653,7 @@
(mbus/pub! msgbus
:topic member-id
:message {:type :team-role-change
:subs-id member-id
:topic member-id
:team-id team-id
:role role})
@ -713,7 +713,6 @@
:topic member-id
:message {:type :team-membership-change
:change :removed
:subs-id member-id
:team-id team-id
:team-name (:name team)})

View file

@ -3,5 +3,5 @@ export const presenceFixture = {
"~:file-id": "~uc7ce0794-0992-8105-8004-38f280443849",
"~:session-id": "~u37730924-d520-80f1-8004-4ae6e5c3942d",
"~:profile-id": "~uc7ce0794-0992-8105-8004-38e630f29a9b",
"~:subs-id": "~uc7ce0794-0992-8105-8004-38f280443849",
"~:topic": "~uc7ce0794-0992-8105-8004-38f280443849",
};

View file

@ -80,9 +80,9 @@
(->> stream
(rx/filter (ptk/type? ::dws/message))
(rx/map deref)
(rx/filter (fn [{:keys [subs-id] :as msg}]
(or (= subs-id uuid/zero)
(= subs-id profile-id))))
(rx/filter (fn [{:keys [topic] :as msg}]
(or (= topic uuid/zero)
(= topic profile-id))))
(rx/map process-message))
;; Once the teams are fecthed, initialize features related

View file

@ -65,15 +65,16 @@
(->> (rx/from initmsg)
(rx/map dws/send))
;; Subscribe to notifications of the subscription
(->> stream
(rx/filter (ptk/type? ::dws/message))
(rx/map deref)
(rx/filter (fn [{:keys [subs-id] :as msg}]
(or (= subs-id uuid/zero)
(= subs-id profile-id)
(= subs-id team-id)
(= subs-id file-id))))
(rx/filter (fn [{:keys [topic] :as msg}]
(or (= topic uuid/zero)
(= topic profile-id)
(= topic team-id)
(= topic file-id))))
(rx/map process-message))
;; On reconnect, send again the subscription messages