0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-13 08:11:30 -05:00

Improve backpressure handling on websocket connection.

This commit is contained in:
Andrey Antukh 2021-02-22 23:14:53 +01:00
parent b14c98b76e
commit ca1a97a52e
3 changed files with 47 additions and 40 deletions

View file

@ -186,7 +186,8 @@
{:msgbus (ig/ref :app.msgbus/msgbus) {:msgbus (ig/ref :app.msgbus/msgbus)
:pool (ig/ref :app.db/pool) :pool (ig/ref :app.db/pool)
:session (ig/ref :app.http.session/session) :session (ig/ref :app.http.session/session)
:metrics (ig/ref :app.metrics/metrics)} :metrics (ig/ref :app.metrics/metrics)
:executor (ig/ref :app.worker/executor)}
:app.worker/executor :app.worker/executor
{:name "worker"} {:name "worker"}

View file

@ -12,12 +12,14 @@
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p]) [promesa.core :as p])
(:import (:import
java.time.Duration
io.lettuce.core.RedisClient io.lettuce.core.RedisClient
io.lettuce.core.RedisURI io.lettuce.core.RedisURI
io.lettuce.core.api.StatefulRedisConnection io.lettuce.core.api.StatefulRedisConnection
@ -59,15 +61,18 @@
snd-conn (.connect ^RedisClient rclient ^RedisCodec codec) snd-conn (.connect ^RedisClient rclient ^RedisCodec codec)
rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec) rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)
snd-buff (a/chan (a/sliding-buffer buffer-size)) pub-buff (a/chan (a/sliding-buffer buffer-size))
rcv-buff (a/chan (a/sliding-buffer buffer-size)) rcv-buff (a/chan (a/sliding-buffer buffer-size))
sub-buff (a/chan 1) sub-buff (a/chan 1)
cch (a/chan 1)] cch (a/chan 1)]
(.setTimeout ^StatefulRedisConnection snd-conn ^Duration (dt/duration {:seconds 10}))
(.setTimeout ^StatefulRedisPubSubConnection rcv-conn ^Duration (dt/duration {:seconds 10}))
(log/debugf "initializing msgbus (uri: '%s')" (str uri)) (log/debugf "initializing msgbus (uri: '%s')" (str uri))
;; Start the sending (publishing) loop ;; Start the sending (publishing) loop
(impl-publish-loop snd-conn snd-buff cch) (impl-publish-loop snd-conn pub-buff cch)
;; Start the receiving (subscribing) loop ;; Start the receiving (subscribing) loop
(impl-subscribe-loop rcv-conn rcv-buff sub-buff cch) (impl-subscribe-loop rcv-conn rcv-buff sub-buff cch)
@ -78,13 +83,13 @@
([command params] ([command params]
(a/go (a/go
(case command (case command
:pub (a/>! snd-buff params) :pub (a/>! pub-buff params)
:sub (a/>! sub-buff params))))) :sub (a/>! sub-buff params)))))
{::snd-conn snd-conn {::snd-conn snd-conn
::rcv-conn rcv-conn ::rcv-conn rcv-conn
::cch cch ::cch cch
::snd-buff snd-buff ::pub-buff pub-buff
::rcv-buff rcv-buff}))) ::rcv-buff rcv-buff})))
(defmethod ig/halt-key! ::msgbus (defmethod ig/halt-key! ::msgbus
@ -93,25 +98,14 @@
(.close ^StatefulRedisConnection (::snd-conn mdata)) (.close ^StatefulRedisConnection (::snd-conn mdata))
(.close ^StatefulRedisPubSubConnection (::rcv-conn mdata)) (.close ^StatefulRedisPubSubConnection (::rcv-conn mdata))
(a/close! (::cch mdata)) (a/close! (::cch mdata))
(a/close! (::snd-buff mdata)) (a/close! (::pub-buff mdata))
(a/close! (::rcv-buff mdata)))) (a/close! (::rcv-buff mdata))))
(defn- impl-redis-pub
[rac {:keys [topic message]}]
(let [topic (str topic)
message (blob/encode message)
res (a/chan 1)]
(-> (.publish ^RedisAsyncCommands rac ^String topic ^bytes message)
(p/finally (fn [_ e]
(when e (a/>!! res e))
(a/close! res))))
res))
(defn- impl-publish-loop (defn- impl-publish-loop
[conn in-buff cch] [conn rcv-buff cch]
(let [rac (.async ^StatefulRedisConnection conn)] (let [rac (.async ^StatefulRedisConnection conn)]
(a/go-loop [] (a/go-loop []
(let [[val _] (a/alts! [in-buff cch])] (let [[val _] (a/alts! [rcv-buff cch])]
(when (some? val) (when (some? val)
(let [result (a/<! (impl-redis-pub rac val))] (let [result (a/<! (impl-redis-pub rac val))]
(when (instance? Throwable result) (when (instance? Throwable result)
@ -119,7 +113,7 @@
(recur))))))) (recur)))))))
(defn- impl-subscribe-loop (defn- impl-subscribe-loop
[conn in-buff sub-buff cch] [conn rcv-buff sub-buff cch]
;; Add a unique listener to connection ;; Add a unique listener to connection
(.addListener conn (reify RedisPubSubListener (.addListener conn (reify RedisPubSubListener
(message [it pattern topic message]) (message [it pattern topic message])
@ -127,14 +121,14 @@
;; There are no back pressure, so we use a slidding ;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends ;; buffer for cases when the pubsub broker sends
;; more messages that we can process. ;; more messages that we can process.
(a/put! in-buff {:topic topic :message (blob/decode message)})) (a/>!! rcv-buff {:topic topic :message (blob/decode message)}))
(psubscribed [it pattern count]) (psubscribed [it pattern count])
(punsubscribed [it pattern count]) (punsubscribed [it pattern count])
(subscribed [it topic count]) (subscribed [it topic count])
(unsubscribed [it topic count]))) (unsubscribed [it topic count])))
(a/go-loop [chans {}] (a/go-loop [chans {}]
(let [[val port] (a/alts! [sub-buff cch in-buff] :priority true)] (let [[val port] (a/alts! [sub-buff cch rcv-buff] :priority true)]
(cond (cond
;; Stop condition; just do nothing ;; Stop condition; just do nothing
(= port cch) (= port cch)
@ -150,7 +144,7 @@
;; This means we receive data from redis and we need to ;; This means we receive data from redis and we need to
;; forward it to the underlying subscriptions. ;; forward it to the underlying subscriptions.
(= port in-buff) (= port rcv-buff)
(let [topic (:topic val) (let [topic (:topic val)
pending (loop [chans (seq (get chans topic)) pending (loop [chans (seq (get chans topic))
pending #{}] pending #{}]
@ -164,6 +158,16 @@
(a/<! (impl-redis-unsub conn topic))) (a/<! (impl-redis-unsub conn topic)))
(recur chans)))))) (recur chans))))))
(defn- impl-redis-pub
[rac {:keys [topic message]}]
(let [topic (str topic)
message (blob/encode message)
res (a/chan 1)]
(-> (.publish ^RedisAsyncCommands rac ^String topic ^bytes message)
(p/finally (fn [_ e]
(when e (a/>!! res e))
(a/close! res))))
res))
(defn impl-redis-sub (defn impl-redis-sub
[conn topic] [conn topic]
@ -175,7 +179,6 @@
(a/close! res)))) (a/close! res))))
res)) res))
(defn impl-redis-unsub (defn impl-redis-unsub
[conn topic] [conn topic]
(let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn) (let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn)

View file

@ -16,6 +16,7 @@
[app.util.async :as aa] [app.util.async :as aa]
[app.util.time :as dt] [app.util.time :as dt]
[app.util.transit :as t] [app.util.transit :as t]
[app.worker :as wrk]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
@ -39,7 +40,7 @@
(s/def ::msgbus fn?) (s/def ::msgbus fn?)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::msgbus ::db/pool ::session ::mtx/metrics])) (s/keys :req-un [::msgbus ::db/pool ::session ::mtx/metrics ::wrk/executor]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [session metrics] :as cfg}] [_ {:keys [session metrics] :as cfg}]
@ -132,20 +133,22 @@
false))) false)))
(defn websocket (defn websocket
[{:keys [file-id team-id msgbus] :as cfg}] [{:keys [file-id team-id msgbus executor] :as cfg}]
(let [in (a/chan 32) (let [in (a/chan (a/dropping-buffer 64))
out (a/chan 32) out (a/chan (a/dropping-buffer 64))
mtx-aconn (:mtx-active-connections cfg) mtx-aconn (:mtx-active-connections cfg)
mtx-messages (:mtx-messages cfg) mtx-messages (:mtx-messages cfg)
mtx-sessions (:mtx-sessions cfg) mtx-sessions (:mtx-sessions cfg)
created-at (dt/now) created-at (dt/now)
ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])] ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])]
(letfn [(on-connect [conn] (letfn [(on-connect [conn]
(log/debugf "on-connect %s" (:session-id cfg)) (log/debugf "on-connect %s" (:session-id cfg))
(mtx-aconn :inc) (mtx-aconn :inc)
(let [sub (a/chan) ;; A subscription channel should use a lossy buffer
;; because we can't penalize normal clients when one
;; slow client is connected to the room.
(let [sub (a/chan (a/dropping-buffer 64))
ws (WebSocket. conn in out sub nil cfg)] ws (WebSocket. conn in out sub nil cfg)]
;; Subscribe to corresponding topics ;; Subscribe to corresponding topics
@ -155,8 +158,8 @@
;; message forwarding loop ;; message forwarding loop
(a/go-loop [] (a/go-loop []
(let [val (a/<! out)] (let [val (a/<! out)]
(when-not (nil? val) (when (some? val)
(when (ws-send conn (t/encode-str val)) (when (a/<! (aa/thread-call executor #(ws-send conn (t/encode-str val))))
(recur))))) (recur)))))
(a/go (a/go
@ -279,7 +282,7 @@
(aa/go-try (aa/go-try
(aa/<? (update-presence pool file-id session-id profile-id)) (aa/<? (update-presence pool file-id session-id profile-id))
(let [members (aa/<? (retrieve-presence pool file-id))] (let [members (aa/<? (retrieve-presence pool file-id))]
(aa/<? (msgbus :pub {:topic file-id (a/<! (msgbus :pub {:topic file-id
:message {:type :presence :sessions members}}))))) :message {:type :presence :sessions members}})))))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
@ -288,7 +291,7 @@
(aa/go-try (aa/go-try
(aa/<? (delete-presence pool file-id session-id profile-id)) (aa/<? (delete-presence pool file-id session-id profile-id))
(let [members (aa/<? (retrieve-presence pool file-id))] (let [members (aa/<? (retrieve-presence pool file-id))]
(aa/<? (msgbus :pub {:topic file-id (a/<! (msgbus :pub {:topic file-id
:message {:type :presence :sessions members}}))))) :message {:type :presence :sessions members}})))))
(defmethod handle-message :keepalive (defmethod handle-message :keepalive