diff --git a/backend/resources/log4j2.xml b/backend/resources/log4j2.xml
index a490272d0..5b2a91a2a 100644
--- a/backend/resources/log4j2.xml
+++ b/backend/resources/log4j2.xml
@@ -32,9 +32,9 @@
diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj
index 22d36d834..b1e13d164 100644
--- a/backend/src/app/msgbus.clj
+++ b/backend/src/app/msgbus.clj
@@ -62,9 +62,14 @@
snd-conn (.connect ^RedisClient rclient ^RedisCodec codec)
rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)
- pub-buff (a/chan (a/dropping-buffer buffer-size))
- rcv-buff (a/chan (a/dropping-buffer buffer-size))
- sub-buff (a/chan 1)
+ ;; Channel used for receive publications from the application.
+ pub-chan (a/chan (a/dropping-buffer buffer-size))
+ ;; Channel used for receive data from redis
+ rcv-chan (a/chan (a/dropping-buffer buffer-size))
+ ;; Channel used for receive subscription requests.
+ sub-chan (a/chan)
cch (a/chan 1)]
(.setTimeout ^StatefulRedisConnection snd-conn ^Duration (dt/duration {:seconds 10}))
@@ -73,10 +78,10 @@
(log/debugf "initializing msgbus (uri: '%s')" (str uri))
;; Start the sending (publishing) loop
- (impl-publish-loop snd-conn pub-buff cch)
+ (impl-publish-loop snd-conn pub-chan cch)
;; Start the receiving (subscribing) loop
- (impl-subscribe-loop rcv-conn rcv-buff sub-buff cch)
+ (impl-subscribe-loop rcv-conn rcv-chan sub-chan cch)
(fn run
@@ -84,14 +89,14 @@
([command params]
(case command
- :pub (a/>! pub-buff params)
- :sub (a/>! sub-buff params)))))
+ :pub (a/>! pub-chan params)
+ :sub (a/>! sub-chan params)))))
{::snd-conn snd-conn
::rcv-conn rcv-conn
::cch cch
- ::pub-buff pub-buff
- ::rcv-buff rcv-buff})))
+ ::pub-chan pub-chan
+ ::rcv-chan rcv-chan})))
(defmethod ig/halt-key! ::msgbus
[_ f]
@@ -99,14 +104,14 @@
(.close ^StatefulRedisConnection (::snd-conn mdata))
(.close ^StatefulRedisPubSubConnection (::rcv-conn mdata))
(a/close! (::cch mdata))
- (a/close! (::pub-buff mdata))
- (a/close! (::rcv-buff mdata))))
+ (a/close! (::pub-chan mdata))
+ (a/close! (::rcv-chan mdata))))
(defn- impl-publish-loop
- [conn pub-buff cch]
+ [conn pub-chan cch]
(let [rac (.async ^StatefulRedisConnection conn)]
(a/go-loop []
- (let [[val _] (a/alts! [cch pub-buff] :priority true)]
+ (let [[val _] (a/alts! [cch pub-chan] :priority true)]
(when (some? val)
(let [result (a/!! rcv-buff {:topic topic :message (blob/decode message)}))
+ (let [val {:topic topic :message (blob/decode message)}]
+ (when-not (a/offer! rcv-chan val)
+ (log/warn "dropping message on subscription loop"))))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it topic count])
(unsubscribed [it topic count])))
- (a/go-loop [chans {}]
- (let [[val port] (a/alts! [sub-buff cch rcv-buff] :priority true)]
- (cond
- ;; Stop condition; just do nothing
- (= port cch)
- nil
+ (let [chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
- ;; If we receive a message on sub-buff this means that a new
- ;; subscription is requested by the notifications module.
- (= port sub-buff)
- (let [topic (:topic val)
- output (:chan val)
- chans (update chans topic (fnil conj #{}) output)]
- (when (= 1 (count (get chans topic)))
- (let [result (a/! ch (:message val))
- (recur (rest chans) pending)
- (recur (rest chans) (conj pending ch)))
- pending))
- chans (update chans topic #(reduce disj % pending))]
- (when (empty? (get chans topic))
- (let [result (a/> (vals state)
+ (mapcat identity)
+ (filter some?)
+ (run! a/close!))))
+ ;; This means we receive data from redis and we need to
+ ;; forward it to the underlying subscriptions.
+ (= port rcv-chan)
+ (let [topic (:topic val) ; topic is already string
+ pending (loop [chans (seq (get-in @chans [:topics topic]))
+ pending #{}]
+ (if-let [ch (first chans)]
+ (if (a/>! ch (:message val))
+ (recur (rest chans) pending)
+ (recur (rest chans) (conj pending ch)))
+ pending))]
+ ;; (log/tracef "received message => pending: %s" (pr-str pending))
+ (some->> (seq pending)
+ (send-off chans unsubscribe-channels))
+ (recur)))))))
(defn- impl-redis-pub
[rac {:keys [topic message]}]
diff --git a/backend/src/app/notifications.clj b/backend/src/app/notifications.clj
index 36daf02e6..82ccd23d1 100644
--- a/backend/src/app/notifications.clj
+++ b/backend/src/app/notifications.clj
@@ -115,14 +115,11 @@
[conn id]
(db/exec-one! conn [sql:retrieve-file id]))
-;; WebSocket Http Handler
(declare handle-connect)
-(defrecord WebSocket [conn in out sub])
(defn- ws-send
[conn data]
@@ -134,8 +131,8 @@
(defn websocket
[{:keys [file-id team-id msgbus executor] :as cfg}]
- (let [in (a/chan (a/dropping-buffer 64))
- out (a/chan (a/dropping-buffer 64))
+ (let [rcv-ch (a/chan 32)
+ out-ch (a/chan 32)
mtx-aconn (:mtx-active-connections cfg)
mtx-messages (:mtx-messages cfg)
mtx-sessions (:mtx-sessions cfg)
@@ -143,46 +140,51 @@
ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])]
(letfn [(on-connect [conn]
- (log/debugf "on-connect %s" (:session-id cfg))
(mtx-aconn :inc)
;; A subscription channel should use a lossy buffer
;; because we can't penalize normal clients when one
;; slow client is connected to the room.
- (let [sub (a/chan (a/dropping-buffer 64))
- ws (WebSocket. conn in out sub nil cfg)]
+ (let [sub-ch (a/chan (a/dropping-buffer 128))
+ cfg (assoc cfg
+ :conn conn
+ :rcv-ch rcv-ch
+ :out-ch out-ch
+ :sub-ch sub-ch)]
- ;; Subscribe to corresponding topics
- (a/!! in message)))]
+ (when-not (a/offer! rcv-ch message)
+ (log/warn "droping ws input message, channe full"))))]
{:on-connect on-connect
:on-error on-error
@@ -190,16 +192,18 @@
:on-text (mtx/wrap-counter on-message mtx-messages ["recv"])
:on-bytes (constantly nil)})))
(declare handle-message)
(declare start-loop!)
(defn- handle-connect
- [{:keys [conn] :as ws}]
+ [{:keys [conn] :as cfg}]
- (aa/ (handle-message ws {:type :connect}))
- (aa/ (start-loop! ws))
- (aa/ (handle-message ws {:type :disconnect}))
+ (aa/ (handle-message cfg {:type :connect}))
+ (aa/ (start-loop! cfg))
+ (aa/ (handle-message cfg {:type :disconnect}))
(catch Throwable err
(log/errorf err "unexpected exception on websocket handler")
(let [session (.getSession ^WebSocketAdapter conn)]
@@ -207,36 +211,39 @@
(.disconnect session)))))))
(defn- start-loop!
- [{:keys [in out sub session-id] :as ws}]
+ [{:keys [rcv-ch out-ch sub-ch session-id] :as cfg}]
(loop []
(let [timeout (a/timeout 30000)
- [val port] (a/alts! [in sub timeout])]
+ [val port] (a/alts! [rcv-ch sub-ch timeout])]
;; Process message coming from connected client
- (and (= port in) (not (nil? val)))
+ (and (= port rcv-ch) (some? val))
- (aa/ (handle-message ws val))
+ (aa/ (handle-message cfg val))
- ;; Forward message to the websocket
- (and (= port sub) (not (nil? val)))
+ ;; If message comes from subscription channel; we just need
+ ;; to foreward it to the output channel.
+ (and (= port sub-ch) (some? val))
(when-not (= (:session-id val) session-id)
- (a/>! out val))
+ (a/>! out-ch val))
- ;; Timeout channel signaling
+ ;; When timeout channel is signaled, we need to send a ping
+ ;; message to the output channel. TODO: we need to make this
+ ;; more smart.
(= port timeout)
- (a/>! out {:type :ping})
+ (a/>! out-ch {:type :ping})
-;; Incoming Messages Handling
(def ^:private
@@ -244,12 +251,6 @@
where file_id=?
and (clock_timestamp() - updated_at) < '5 min'::interval")
-(defn- retrieve-presence
- [pool file-id]
- (aa/thread-try
- (let [rows (db/exec! pool [sql:retrieve-presence file-id])]
- (mapv (juxt :session-id :profile-id) rows))))
(def ^:private
"insert into presence (file_id, session_id, profile_id, updated_at)
@@ -257,49 +258,66 @@
on conflict (file_id, session_id, profile_id)
do update set updated_at=clock_timestamp()")
+(defn- retrieve-presence
+ [{:keys [pool file-id] :as cfg}]
+ (let [rows (db/exec! pool [sql:retrieve-presence file-id])]
+ (mapv (juxt :session-id :profile-id) rows)))
+(defn- retrieve-presence*
+ [{:keys [executor] :as cfg}]
+ (aa/with-thread executor
+ (retrieve-presence cfg)))
(defn- update-presence
- [conn file-id session-id profile-id]
- (aa/thread-try
- (let [sql [sql:update-presence file-id session-id profile-id]]
- (db/exec-one! conn sql))))
+ [{:keys [pool file-id session-id profile-id] :as cfg}]
+ (let [sql [sql:update-presence file-id session-id profile-id]]
+ (db/exec-one! pool sql)))
+(defn- update-presence*
+ [{:keys [executor] :as cfg}]
+ (aa/with-thread executor
+ (update-presence cfg)))
(defn- delete-presence
- [pool file-id session-id profile-id]
- (aa/thread-try
- (db/delete! pool :presence {:file-id file-id
- :profile-id profile-id
- :session-id session-id})))
+ [{:keys [pool file-id session-id profile-id] :as cfg}]
+ (db/delete! pool :presence {:file-id file-id
+ :profile-id profile-id
+ :session-id session-id}))
+(defn- delete-presence*
+ [{:keys [executor] :as cfg}]
+ (aa/with-thread executor
+ (delete-presence cfg)))
(defmulti handle-message
(fn [_ message] (:type message)))
-;; TODO: check permissions for join a file-id channel (probably using
-;; single use token for avoid explicit database query).
(defmethod handle-message :connect
- [{:keys [file-id profile-id session-id pool msgbus] :as ws} _message]
+ [{:keys [file-id msgbus] :as cfg} _message]
;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
- (aa/ (update-presence pool file-id session-id profile-id))
- (let [members (aa/ (retrieve-presence pool file-id))]
- (a/!! c ret)))
+ (when (some? ret) (a/>!! c ret)))
(a/close! c)))))