♻️ Refactor websockets impl to use virtual threads

Removing the use of core.async code and implement code using
plain old and familiar synchronous code
Andrey Antukh 2023-02-20 12:44:35 +01:00
8 changed files with 465 additions and 537 deletions

@ -19,8 +19,8 @@
java-http-clj/java-http-clj {:mvn/version "0.4.3"}
{:git/tag "v9.12"
:git/sha "51646d8"
{:git/tag "v9.13"
:git/sha "e2d25db"
:git/url "https://github.com/funcool/yetti.git"
:exclusions [org.slf4j/slf4j-api]}

@ -42,6 +42,9 @@ export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3
export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000
export OPTIONS="
-A:jmx-remote -A:dev \
-J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager \
@ -49,7 +52,9 @@ export OPTIONS="
-J-Dlog4j2.configurationFile=log4j2-devenv.xml \
-J-XX:-OmitStackTraceInFastThrow \
-J-XX:+UnlockDiagnosticVMOptions \
-J-XX:+DebugNonSafepoints \
-J-Djdk.tracePinnedThreads=full \
# Setup HEAP
export OPTIONS="$OPTIONS -J-Xms50m -J-Xmx1024m"

@ -17,9 +17,9 @@
[app.msgbus :as mbus]
[app.util.time :as dt]
[app.util.websocket :as ws]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.exec.csp :as sp]
[yetti.websocket :as yws]))
(def recv-labels
@ -34,70 +34,38 @@
(def state (atom {}))
(defn- on-connect
[{:keys [::mtx/metrics]} wsp]
(let [created-at (dt/now)]
(swap! state assoc (::ws/id @wsp) wsp)
(mtx/run! metrics
:id :websocket-active-connections
:inc 1)
(fn []
(swap! state dissoc (::ws/id @wsp))
(mtx/run! metrics :id :websocket-active-connections :dec 1)
(mtx/run! metrics
:id :websocket-session-timing
:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)))))
(defn- on-rcv-message
[{:keys [::mtx/metrics]} _ message]
(mtx/run! metrics
:id :websocket-messages-total
:labels recv-labels
:inc 1)
(defn- on-snd-message
[{:keys [::mtx/metrics]} _ message]
(mtx/run! metrics
:id :websocket-messages-total
:labels send-labels
:inc 1)
(defn repl-get-connections-for-file
(->> (vals @state)
(filter #(= file-id (-> % deref ::file-subscription :file-id)))
(map deref)
(map ::ws/id)))
(defn repl-get-connections-for-team
(->> (vals @state)
(filter #(= team-id (-> % deref ::team-subscription :team-id)))
(map deref)
(map ::ws/id)))
(defn repl-close-connection
(when-let [wsp (get @state id)]
(a/>!! (::ws/close-ch @wsp) [8899 "closed from server"])
(a/close! (::ws/close-ch @wsp))))
(when-let [{:keys [::ws/close-ch] :as wsp} (get @state id)]
(sp/put! close-ch [8899 "closed from server"])
(sp/close! close-ch)))
(defn repl-get-connection-info
(when-let [wsp (get @state id)]
{:id id
:created-at (::created-at @wsp)
:profile-id (::profile-id @wsp)
:session-id (::session-id @wsp)
:user-agent (::ws/user-agent @wsp)
:ip-addr (::ws/remote-addr @wsp)
:last-activity-at (::ws/last-activity-at @wsp)
:subscribed-file (-> wsp deref ::file-subscription :file-id)
:subscribed-team (-> wsp deref ::team-subscription :team-id)}))
:created-at (::created-at wsp)
:profile-id (::profile-id wsp)
:session-id (::session-id wsp)
:user-agent (::ws/user-agent wsp)
:ip-addr (::ws/remote-addr wsp)
:last-activity-at (::ws/last-activity-at wsp)
:subscribed-file (-> wsp ::file-subscription :file-id)
:subscribed-team (-> wsp ::team-subscription :team-id)}))
(defn repl-print-connection-info
@ -117,202 +85,195 @@
(fn [_ _ message]
(:type message)))
(defmethod handle-message :connect
[cfg wsp _]
(defmethod handle-message :open
[{:keys [::mbus/msgbus]} {:keys [::ws/id ::ws/output-ch ::ws/state ::profile-id ::session-id] :as wsp} _]
(l/trace :fn "handle-message" :event "open" :conn-id id)
(let [ch (sp/chan :buf (sp/dropping-buffer 16)
:xf (remove #(= (:session-id %) session-id)))]
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
;; Subscribe to the profile channel and forward all messages to websocket output
;; channel (send them to the client).
(swap! state assoc ::profile-subscription {:channel ch})
xform (remove #(= (:session-id %) session-id))
channel (a/chan (a/dropping-buffer 16) xform)]
;; Forward the subscription messages directly to the websocket output channel
(sp/pipe ch output-ch false)
(l/trace :fn "handle-message" :event "connect" :conn-id conn-id)
;; Subscribe to the profile topic on msgbus/redis
(mbus/sub! msgbus :topic profile-id :chan ch)))
;; Subscribe to the profile channel and forward all messages to
;; websocket output channel (send them to the client).
(swap! wsp assoc ::profile-subscription channel)
(a/pipe channel output-ch false)
(mbus/sub! msgbus :topic profile-id :chan channel)))
(defmethod handle-message :close
[{:keys [::mbus/msgbus]} {:keys [::ws/id ::ws/state ::profile-id ::session-id]} _]
(l/trace :fn "handle-message" :event "close" :conn-id id)
(let [psub (::profile-subscription @state)
fsub (::file-subscription @state)
tsub (::team-subscription @state)
msg {:type :disconnect
:subs-id profile-id
:profile-id profile-id
:session-id session-id}]
(defmethod handle-message :disconnect
[cfg wsp _]
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
profile-ch (::profile-subscription @wsp)
fsub (::file-subscription @wsp)
tsub (::team-subscription @wsp)
;; Close profile subscription if exists
(when-let [ch (:channel psub)]
(sp/close! ch)
(mbus/purge! msgbus [ch]))
message {:type :disconnect
:subs-id profile-id
:profile-id profile-id
:session-id session-id}]
(l/trace :fn "handle-message"
:event :disconnect
:conn-id conn-id)
;; Close the main profile subscription
(a/close! profile-ch)
(a/<! (mbus/purge! msgbus [profile-ch]))
;; Close tram subscription if exists
(when-let [channel (:channel tsub)]
(a/close! channel)
(a/<! (mbus/purge! msgbus channel)))
;; Close team subscription if exists
(when-let [ch (:channel tsub)]
(sp/close! ch)
(mbus/purge! msgbus [ch]))
;; Close file subscription if exists
(when-let [{:keys [topic channel]} fsub]
(a/close! channel)
(a/<! (mbus/purge! msgbus channel))
(a/<! (mbus/pub! msgbus :topic topic :message message))))))
(sp/close! channel)
(mbus/purge! msgbus [channel])
(mbus/pub! msgbus :topic topic :message msg))))
(defmethod handle-message :subscribe-team
[cfg wsp {:keys [team-id] :as params}]
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
prev-subs (get @wsp ::team-subscription)
xform (comp
(remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id team-id)))
[{:keys [::mbus/msgbus]} {:keys [::ws/id ::ws/state ::ws/output-ch ::session-id]} {:keys [team-id] :as params}]
(l/trace :fn "handle-message" :event "subscribe-team" :team-id team-id :conn-id id)
(let [prev-subs (get @state ::team-subscription)
channel (sp/chan :buf (sp/dropping-buffer 64)
:xf (comp
(remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id team-id))))]
channel (a/chan (a/dropping-buffer 64) xform)]
(sp/pipe channel output-ch false)
(mbus/sub! msgbus :topic team-id :chan channel)
(l/trace :fn "handle-message"
:event :subscribe-team
:team-id team-id
:conn-id conn-id)
(let [subs {:team-id team-id :channel channel :topic team-id}]
(swap! state assoc ::team-subscription subs))
(a/pipe channel output-ch false)
;; Close previous subscription if exists
(when-let [ch (:channel prev-subs)]
(sp/close! ch)
(mbus/purge! msgbus [ch]))))
(let [state {:team-id team-id :channel channel :topic team-id}]
(swap! wsp assoc ::team-subscription state))
;; Close previous subscription if exists
(when-let [channel (:channel prev-subs)]
(a/close! channel)
(a/<! (mbus/purge! msgbus channel))))
(a/<! (mbus/sub! msgbus :topic team-id :chan channel)))))
(defmethod handle-message :subscribe-file
[cfg wsp {:keys [file-id] :as params}]
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
prev-subs (::file-subscription @wsp)
xform (comp (remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id file-id)))
channel (a/chan (a/dropping-buffer 64) xform)]
[{:keys [::mbus/msgbus]} {:keys [::ws/id ::ws/state ::ws/output-ch ::session-id ::profile-id]} {:keys [file-id] :as params}]
(l/trace :fn "handle-message" :event "subscribe-file" :file-id file-id :conn-id id)
(let [psub (::file-subscription @state)
fch (sp/chan :buf (sp/dropping-buffer 64)
:xf (comp (remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id file-id))))]
(l/trace :fn "handle-message"
:event :subscribe-file
:file-id file-id
:conn-id conn-id)
(let [subs {:file-id file-id :channel fch :topic file-id}]
(swap! state assoc ::file-subscription subs))
(let [state {:file-id file-id :channel channel :topic file-id}]
(swap! wsp assoc ::file-subscription state))
;; Close previous subscription if exists
(when-let [ch (:channel psub)]
(sp/close! ch)
(mbus/purge! msgbus [ch]))
;; Close previous subscription if exists
(when-let [channel (:channel prev-subs)]
(a/close! channel)
(a/<! (mbus/purge! msgbus channel))))
;; Message forwarding
(loop []
(when-let [{:keys [type] :as message} (a/<! channel)]
(when (or (= :join-file type)
(= :leave-file type)
(= :disconnect type))
(let [message {:type :presence
:file-id file-id
:session-id session-id
(sp/go-loop []
(when-let [{:keys [type] :as message} (sp/take! fch)]
(sp/put! output-ch message)
(when (or (= :join-file type)
(= :leave-file type)
(= :disconnect type))
(let [message {:type :presence
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(a/<! (mbus/pub! msgbus :topic file-id :message message))))
(a/>! output-ch message)
(mbus/pub! msgbus
:topic file-id
:message message)))
;; Subscribe to file topic
(a/<! (mbus/sub! msgbus :topic file-id :chan channel))
;; Subscribe to file topic
(mbus/sub! msgbus :topic file-id :chan fch)
;; Notifify the rest of participants of the new connection.
(let [message {:type :join-file
:file-id file-id
:subs-id file-id
:session-id session-id
:profile-id profile-id}]
(a/<! (mbus/pub! msgbus :topic file-id :message message))))))
;; Notifify the rest of participants of the new connection.
(let [message {:type :join-file
:file-id file-id
:subs-id file-id
:session-id session-id
:profile-id profile-id}]
(mbus/pub! msgbus :topic file-id :message message))))
(defmethod handle-message :unsubscribe-file
[cfg wsp {:keys [file-id] :as params}]
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
session-id (::session-id @wsp)
profile-id (::profile-id @wsp)
subs (::file-subscription @wsp)
[{:keys [::mbus/msgbus]} {:keys [::ws/id ::ws/state ::session-id ::profile-id]} {:keys [file-id] :as params}]
(l/trace :fn "handle-message" :event "unsubscribe-file" :file-id file-id :conn-id id)
message {:type :leave-file
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(let [subs (::file-subscription @state)
message {:type :leave-file
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(l/trace :fn "handle-message"
:event :unsubscribe-file
:file-id file-id
:conn-id conn-id)
(when (= (:file-id subs) file-id)
(let [channel (:channel subs)]
(a/close! channel)
(a/<! (mbus/purge! msgbus channel))
(a/<! (mbus/pub! msgbus :topic file-id :message message)))))))
(when (= (:file-id subs) file-id)
(mbus/pub! msgbus :topic file-id :message message)
(let [ch (:channel subs)]
(sp/close! ch)
(mbus/purge! msgbus [ch])))))
(defmethod handle-message :keepalive
[_ _ _]
(l/trace :fn "handle-message" :event :keepalive)
(a/go :nothing))
(l/trace :fn "handle-message" :event :keepalive))
(defmethod handle-message :broadcast
[{:keys [::mbus/msgbus]} {:keys [::ws/id ::session-id ::profile-id]} message]
(l/trace :fn "handle-message" :event "broadcast" :conn-id id)
(let [message (-> message
(assoc :subs-id profile-id)
(assoc :profile-id profile-id)
(assoc :session-id session-id))]
(mbus/pub! msgbus :topic profile-id :message message)))
(defmethod handle-message :pointer-update
[cfg wsp {:keys [file-id] :as message}]
(let [msgbus (::mbus/msgbus cfg)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
subs (::file-subscription @wsp)
message (-> message
(assoc :subs-id file-id)
(assoc :profile-id profile-id)
(assoc :session-id session-id))]
;; Only allow receive pointer updates when active subscription
(when subs
(a/<! (mbus/pub! msgbus :topic file-id :message message))))))
[{:keys [::mbus/msgbus]} {:keys [::ws/state ::session-id ::profile-id]} {:keys [file-id] :as message}]
(when (::file-subscription @state)
(let [message (-> message
(assoc :subs-id file-id)
(assoc :profile-id profile-id)
(assoc :session-id session-id))]
(mbus/pub! msgbus :topic file-id :message message))))
(defmethod handle-message :default
[_ wsp message]
(let [conn-id (::ws/id @wsp)]
(l/warn :hint "received unexpected message"
:message message
:conn-id conn-id)
(a/go :none)))
[_ {:keys [::ws/id]} message]
(l/warn :hint "received unexpected message"
:message message
:conn-id id))
(defn- on-connect
[{:keys [::mtx/metrics]} {:keys [::ws/id] :as wsp}]
(let [created-at (dt/now)]
(l/trace :fn "on-connect" :conn-id id)
(swap! state assoc id wsp)
(mtx/run! metrics
:id :websocket-active-connections
:inc 1)
(assoc wsp ::ws/on-disconnect
(fn []
(l/trace :fn "on-disconnect" :conn-id id)
(swap! state dissoc id)
(mtx/run! metrics :id :websocket-active-connections :dec 1)
(mtx/run! metrics
:id :websocket-session-timing
:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0))))))
(defn- on-rcv-message
[{:keys [::mtx/metrics ::profile-id ::session-id]} message]
(mtx/run! metrics
:id :websocket-messages-total
:labels recv-labels
:inc 1)
(assoc message :profile-id profile-id :session-id session-id))
(defn- on-snd-message
[{:keys [::mtx/metrics]} message]
(mtx/run! metrics
:id :websocket-messages-total
:labels send-labels
:inc 1)
(s/def ::session-id ::us/uuid)
(s/def ::handler-params
(s/keys :req-un [::session-id]))

@ -195,9 +195,8 @@
::mtx/metrics (ig/ref ::mtx/metrics)}
{:backend (cf/get :msgbus-backend :redis)
:executor (ig/ref ::wrk/executor)
:redis (ig/ref ::rds/redis)}
{::wrk/executor (ig/ref ::wrk/executor)
::rds/redis (ig/ref ::rds/redis)}
{::wrk/executor (ig/ref ::wrk/executor)}

@ -8,20 +8,18 @@
"The msgbus abstraction implemented using redis as underlying backend."
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.transit :as t]
[app.config :as cfg]
[app.redis :as redis]
[app.util.async :as aa]
[app.redis :as rds]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
[promesa.exec :as px]
[promesa.exec.csp :as sp]))
(set! *warn-on-reflection* true)
@ -34,132 +32,116 @@
(def ^:private xform-prefix-topic
(map (fn [obj] (update obj :topic prefix-topic))))
(declare ^:private redis-connect)
(declare ^:private redis-disconnect)
(declare ^:private redis-pub)
(declare ^:private redis-sub)
(declare ^:private redis-unsub)
(declare ^:private redis-pub!)
(declare ^:private redis-sub!)
(declare ^:private redis-unsub!)
(declare ^:private start-io-loop!)
(declare ^:private subscribe-to-topics)
(declare ^:private unsubscribe-channels)
(defmethod ig/prep-key ::msgbus
[_ cfg]
(merge {:buffer-size 128
:timeout (dt/duration {:seconds 30})}
(d/without-nils cfg)))
(s/def ::cmd-ch ::aa/channel)
(s/def ::rcv-ch ::aa/channel)
(s/def ::pub-ch ::aa/channel)
(s/def ::cmd-ch sp/chan?)
(s/def ::rcv-ch sp/chan?)
(s/def ::pub-ch sp/chan?)
(s/def ::state ::us/agent)
(s/def ::pconn ::redis/connection-holder)
(s/def ::sconn ::redis/connection-holder)
(s/def ::pconn ::rds/connection-holder)
(s/def ::sconn ::rds/connection-holder)
(s/def ::msgbus
(s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor]))
(s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor]))
(s/keys :req [::rds/redis ::wrk/executor]))
(defmethod ig/prep-key ::msgbus
[_ cfg]
(-> cfg
(assoc ::buffer-size 128)
(assoc ::timeout (dt/duration {:seconds 30}))))
(defmethod ig/init-key ::msgbus
[_ {:keys [buffer-size executor] :as cfg}]
[_ {:keys [::buffer-size ::wrk/executor ::timeout ::rds/redis] :as cfg}]
(l/info :hint "initialize msgbus" :buffer-size buffer-size)
(let [cmd-ch (a/chan buffer-size)
rcv-ch (a/chan (a/dropping-buffer buffer-size))
pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic)
(let [cmd-ch (sp/chan :buf buffer-size)
rcv-ch (sp/chan :buf (sp/dropping-buffer buffer-size))
pub-ch (sp/chan :buf (sp/dropping-buffer buffer-size)
:xf xform-prefix-topic)
state (agent {})
msgbus (-> (redis-connect cfg)
pconn (rds/connect redis :timeout timeout)
sconn (rds/connect redis :type :pubsub :timeout timeout)
msgbus (-> cfg
(assoc ::pconn pconn)
(assoc ::sconn sconn)
(assoc ::cmd-ch cmd-ch)
(assoc ::rcv-ch rcv-ch)
(assoc ::pub-ch pub-ch)
(assoc ::state state)
(assoc ::wrk/executor executor))]
(us/verify! ::msgbus msgbus)
(set-error-handler! state #(l/error :cause % :hint "unexpected error on agent" ::l/sync? true))
(set-error-mode! state :continue)
(start-io-loop! msgbus)
(defn sub!
[{:keys [::state ::wrk/executor] :as cfg} & {:keys [topic topics chan]}]
(let [done-ch (a/chan)
topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/debug :hint "subscribe" :topics topics)
(send-via executor state subscribe-to-topics cfg topics chan done-ch)
(defn pub!
[{::keys [pub-ch]} & {:as params}]
(a/>! pub-ch params)))
(defn purge!
[{:keys [::state ::wrk/executor] :as msgbus} chans]
(l/trace :hint "purge" :chans (count chans))
(let [done-ch (a/chan)]
(send-via executor state unsubscribe-channels msgbus chans done-ch)
(assoc msgbus ::io-thr (start-io-loop! msgbus))))
(defmethod ig/halt-key! ::msgbus
[_ msgbus]
(redis-disconnect msgbus)
(a/close! (::cmd-ch msgbus))
(a/close! (::rcv-ch msgbus))
(a/close! (::pub-ch msgbus)))
(px/interrupt! (::io-thr msgbus))
(sp/close! (::cmd-ch msgbus))
(sp/close! (::rcv-ch msgbus))
(sp/close! (::pub-ch msgbus))
(d/close! (::pconn msgbus))
(d/close! (::sconn msgbus)))
(defn sub!
[{:keys [::state ::wrk/executor] :as cfg} & {:keys [topic topics chan]}]
(let [topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/debug :hint "subscribe" :topics topics :chan (hash chan))
(send-via executor state subscribe-to-topics cfg topics chan)
(defn pub!
[{::keys [pub-ch]} & {:as params}]
(sp/put! pub-ch params))
(defn purge!
[{:keys [::state ::wrk/executor] :as msgbus} chans]
(l/debug :hint "purge" :chans (count chans))
(send-via executor state unsubscribe-channels msgbus chans)
;; --- IMPL
(defn- redis-connect
[{:keys [timeout redis] :as cfg}]
(let [pconn (redis/connect redis :timeout timeout)
sconn (redis/connect redis :type :pubsub :timeout timeout)]
{::pconn pconn
::sconn sconn}))
(defn- redis-disconnect
[{:keys [::pconn ::sconn] :as cfg}]
(d/close! pconn)
(d/close! sconn))
(defn- conj-subscription
"A low level function that is responsible to create on-demand
subscriptions on redis. It reuses the same subscription if it is
already established. Intended to be executed in agent."
already established."
[nsubs cfg topic chan]
(let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))]
(when (= 1 (count nsubs))
(l/trace :hint "open subscription" :topic topic ::l/sync? true)
(redis-sub cfg topic))
(redis-sub! cfg topic))
(defn- disj-subscription
"A low level function responsible on removing subscriptions. The
subscription is truly removed from redis once no single local
subscription is look for it. Intended to be executed in agent."
subscription is look for it."
[nsubs cfg topic chan]
(let [nsubs (disj nsubs chan)]
(when (empty? nsubs)
(l/trace :hint "close subscription" :topic topic ::l/sync? true)
(redis-unsub cfg topic))
(redis-unsub! cfg topic))
(defn- subscribe-to-topics
"Function responsible to attach local subscription to the
state. Intended to be used in agent."
[state cfg topics chan done-ch]
(aa/with-closing done-ch
(let [state (update state :chans assoc chan topics)]
(reduce (fn [state topic]
(update-in state [:topics topic] conj-subscription cfg topic chan))
"Function responsible to attach local subscription to the state."
[state cfg topics chan]
(let [state (update state :chans assoc chan topics)]
(reduce (fn [state topic]
(update-in state [:topics topic] conj-subscription cfg topic chan))
(defn- unsubscribe-single-channel
(defn- unsubscribe-channel
"Auxiliary function responsible on removing a single local
subscription from the state."
[state cfg chan]
@ -174,87 +156,113 @@
"Function responsible from detach from state a seq of channels,
useful when client disconnects or in-bulk unsubscribe
operations. Intended to be executed in agent."
[state cfg channels done-ch]
(aa/with-closing done-ch
(reduce #(unsubscribe-single-channel %1 cfg %2) state channels)))
[state cfg channels]
(reduce #(unsubscribe-channel %1 cfg %2) state channels))
(defn- create-listener
:on-message (fn [_ topic message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(let [val {:topic topic :message (t/decode message)}]
(when-not (a/offer! rcv-ch val)
(when-not (sp/offer! rcv-ch val)
(l/warn :msg "dropping message on subscription loop"))))))
(defn- process-input!
[{:keys [::state ::wrk/executor] :as cfg} topic message]
(let [chans (get-in @state [:topics topic])]
(when-let [closed (loop [chans (seq chans)
closed #{}]
@ -5,7 +5,7 @@
(if (sp/put! ch message)
(recur (rest chans) closed)
(recur (rest chans) (conj closed ch)))
(seq closed)))]
(send-via executor state unsubscribe-channels cfg closed))))
(defn start-io-loop!
[{:keys [::sconn ::rcv-ch ::pub-ch ::state ::wrk/executor] :as cfg}]
(redis/add-listener! sconn (create-listener rcv-ch))
(letfn [(send-to-topic [topic message]
(a/go-loop [chans (seq (get-in @state [:topics topic]))
closed #{}]
(if-let [ch (first chans)]
(if (a/>! ch message)
(recur (rest chans) closed)
(recur (rest chans) (conj closed ch)))
(seq closed))))
(rds/add-listener! sconn (create-listener rcv-ch))
(process-incoming [{:keys [topic message]}]
(when-let [closed (a/<! (send-to-topic topic message))]
(send-via executor state unsubscribe-channels cfg closed nil))))
{:name "penpot/msgbus-io-loop"}
{:name "penpot/msgbus/io-loop"
:virtual true}
(loop []
(let [[val port] (a/alts!! [pub-ch rcv-ch])]
(let [timeout-ch (sp/timeout-chan 1000)
[val port] (sp/alts! [timeout-ch pub-ch rcv-ch])]
(nil? val)
(l/trace :hint "stopping io-loop, nil received")
(send-via executor state (fn [state]
(->> (vals state)
(mapcat identity)
(filter some?)
(run! a/close!))
(= port rcv-ch)
(a/<!! (process-incoming val))
(identical? port timeout-ch)
(let [closed (->> (:chans @state)
(map key)
(filter sp/closed?))]
(when (seq closed)
(send-via executor state unsubscribe-channels cfg closed)
(l/debug :hint "proactively purge channels" :count (count closed)))
(= port pub-ch)
(let [result (a/<!! (redis-pub cfg val))]
(when (ex/exception? result)
(l/error :hint "unexpected error on publishing"
:message val
:cause result))
(nil? val)
(throw (InterruptedException. "internally interrupted"))
(defn- redis-pub
(identical? port rcv-ch)
(let [{:keys [topic message]} val]
(process-input! cfg topic message)
(identical? port pub-ch)
(redis-pub! cfg val)
(catch InterruptedException _
(l/trace :hint "io-loop thread interrumpted"))
(catch Throwable cause
(l/error :hint "unexpected exception on io-loop thread"
:cause cause))
(l/trace :hint "clearing io-loop state")
(when-let [chans (:chans @state)]
(run! sp/close! (keys chans)))
(l/debug :hint "io-loop thread terminated")))))
(defn- redis-pub!
"Publish a message to the redis server. Asynchronous operation,
intended to be used in core.async go blocks."
[{:keys [::pconn] :as cfg} {:keys [topic message]}]
(let [message (t/encode message)
res (a/chan 1)]
(-> (redis/publish! pconn topic message)
(p/finally (fn [_ cause]
(when (and cause (redis/open? pconn))
(a/offer! res cause))
(a/close! res))))
(p/await! (rds/publish! pconn topic (t/encode message)))
(catch InterruptedException cause
(throw cause))
(catch Throwable cause
(l/error :hint "unexpected error on publishing"
:message message
:cause cause))))
(defn redis-sub
(defn- redis-sub!
"Create redis subscription. Blocking operation, intended to be used
inside an agent."
[{:keys [::sconn] :as cfg} topic]
(redis/subscribe! sconn topic))
(rds/subscribe! sconn topic)
(catch InterruptedException cause
(throw cause))
(catch Throwable cause
(l/trace :hint "exception on subscribing" :topic topic :cause cause))))
(defn redis-unsub
(defn- redis-unsub!
"Removes redis subscription. Blocking operation, intended to be used
inside an agent."
[{:keys [::sconn] :as cfg} topic]
(redis/unsubscribe! sconn topic))
(rds/unsubscribe! sconn topic)
(catch InterruptedException cause
(throw cause))
(catch Throwable cause
(l/trace :hint "exception on unsubscribing" :topic topic :cause cause))))

@ -18,7 +18,8 @@
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.core :as p])
[promesa.core :as p]
[promesa.exec :as px])
@ -99,11 +100,11 @@
(defmethod ig/prep-key ::redis
[_ cfg]
(let [runtime (Runtime/getRuntime)
cpus (.availableProcessors ^Runtime runtime)]
(let [cpus (px/get-available-processors)
threads (max 1 (int (* cpus 0.2)))]
(merge {::timeout (dt/duration "10s")
::io-threads (max 3 cpus)
::worker-threads (max 3 cpus)}
::io-threads (max 3 threads)
::worker-threads (max 3 threads)}
(d/without-nils cfg))))
(defmethod ig/pre-init-spec ::redis [_]

@ -277,7 +277,6 @@
(let [lchanges (filter library-change? changes)
msgbus (::mbus/msgbus cfg)]
;; Asynchronously publish message to the msgbus
(mbus/pub! msgbus
:topic (:id file)
:message {:type :file-change
@ -290,7 +289,6 @@
(when (and (:is-shared file) (seq lchanges))
(let [team-id (or (:team-id file)
(files/get-team-id conn (:project-id file)))]
;; Asynchronously publish message to the msgbus
(mbus/pub! msgbus
:topic team-id
:message {:type :library-change

@ -5,7 +5,7 @@
;; Copyright (c) KALEIDOS INC
(ns app.util.websocket
"A general protocol implementation on top of websockets."
"A general protocol implementation on top of websockets using vthreads."
[app.common.exceptions :as ex]
[app.common.logging :as l]
@ -13,22 +13,42 @@
[app.common.uuid :as uuid]
[app.loggers.audit :refer [parse-client-ip]]
[app.util.time :as dt]
[clojure.core.async :as a]
[promesa.exec :as px]
[promesa.exec.csp :as sp]
[yetti.request :as yr]
[yetti.util :as yu]
[yetti.websocket :as yws])
(declare decode-beat)
(declare encode-beat)
(declare start-io-loop)
(declare ws-ping!)
(declare ws-send!)
(declare filter-options)
(def noop (constantly nil))
(def identity-3 (fn [_ _ o] o))
(def max-missed-heartbeats 3)
(def heartbeat-interval 5000)
(defn- encode-beat
(doto (ByteBuffer/allocate 8)
(.putLong n)
(defn- decode-beat
[^ByteBuffer buffer]
(when (= 8 (.capacity buffer))
(.rewind buffer)
(.getLong buffer)))
(defn- wrap-handler
(fn [wsp message]
(handler wsp message)
(catch Throwable cause
(if (ex/error? cause)
{:type :error :error (ex-data cause)}
{:type :error :error {:message (ex-message cause)}})))))
(declare start-io-loop!)
(defn handler
"A WebSocket upgrade handler factory. Returns a handler that can be
@ -46,12 +66,11 @@
:or {input-buff-size 64
output-buff-size 64
idle-timeout 60000
on-connect noop
on-connect identity
on-snd-message identity-3
on-rcv-message identity-3}
:as options}]
@ -61,91 +80,65 @@
(assert (fn? on-connect) "'on-connect' should be a function")
(fn [{:keys [::yws/channel] :as request}]
(let [input-ch (a/chan input-buff-size)
output-ch (a/chan output-buff-size)
hbeat-ch (a/chan (a/sliding-buffer 6))
close-ch (a/chan)
stop-ch (a/chan)
(let [input-ch (sp/chan :buf input-buff-size)
output-ch (sp/chan :buf output-buff-size)
hbeat-ch (sp/chan :buf (sp/sliding-buffer 6))
close-ch (sp/chan)
ip-addr (parse-client-ip request)
uagent (yr/get-header request "user-agent")
id (uuid/next)
state (atom {})
beats (atom #{})
options (-> (filter-options options)
(merge {::id id
::created-at (dt/now)
::input-ch input-ch
::heartbeat-ch hbeat-ch
::output-ch output-ch
::close-ch close-ch
::stop-ch stop-ch
::channel channel
::remote-addr ip-addr
::user-agent uagent})
;; call the on-connect hook and memoize the on-terminate instance
on-terminate (on-connect options)
options (-> options
(update ::handler wrap-handler)
(assoc ::id id)
(assoc ::state state)
(assoc ::beats beats)
(assoc ::created-at (dt/now))
(assoc ::input-ch input-ch)
(assoc ::heartbeat-ch hbeat-ch)
(assoc ::output-ch output-ch)
(assoc ::close-ch close-ch)
(assoc ::channel channel)
(assoc ::remote-addr ip-addr)
(assoc ::user-agent uagent)
(fn [channel]
(l/trace :fn "on-ws-open" :conn-id id)
(yws/idle-timeout! channel (dt/duration idle-timeout)))
(let [timeout (dt/duration idle-timeout)
name (str "penpot/websocket/io-loop/" id)]
(yws/idle-timeout! channel timeout)
(px/fn->thread (partial start-io-loop! options)
{:name name :virtual true})))
(fn [_ code reason]
(l/trace :fn "on-ws-terminate" :conn-id id :code code :reason reason)
(a/close! close-ch))
(l/trace :fn "on-ws-terminate"
:conn-id id
:code code
:reason reason)
(sp/close! close-ch))
(fn [_ error]
(when-not (or (instance? java.nio.channels.ClosedChannelException error)
(instance? java.net.SocketException error)
(instance? java.io.IOException error))
(l/error :fn "on-ws-error" :conn-id id
:hint (ex-message error)
:cause error))
(on-ws-terminate nil 8801 "close after error"))
(fn [_ cause]
(sp/close! close-ch cause))
(fn [_ message]
(let [message (on-rcv-message options message)
message (t/decode-str message)]
(a/offer! input-ch message)
(swap! options assoc ::last-activity-at (dt/now)))
(catch Throwable e
(l/warn :hint "error on decoding incoming message from websocket"
:wsmsg (pr-str message)
:cause e)
(a/>! close-ch [8802 "decode error"])
(a/close! close-ch))))
(sp/offer! input-ch message)
(swap! state assoc ::last-activity-at (dt/now)))
(fn [_ buffers]
(a/>!! hbeat-ch (yu/copy-many buffers)))]
;; (l/trace :fn "on-ws-pong" :buffers (pr-str buffers))
(sp/put! hbeat-ch (yu/copy-many buffers)))]
;; Wait a close signal
(let [[code reason] (a/<! close-ch)]
(a/close! stop-ch)
(a/close! hbeat-ch)
(a/close! output-ch)
(a/close! input-ch)
(when (and code reason)
(l/trace :hint "close channel condition" :code code :reason reason)
(yws/close! channel code reason))
(when (fn? on-terminate)
(l/trace :hint "connection terminated")))
;; React on messages received from the client
(a/<! (start-io-loop options handler on-snd-message on-ws-terminate))
(l/trace :hint "io loop terminated"))
(yws/on-close! channel (fn [_]
(sp/close! close-ch)))
{:on-open on-ws-open
:on-error on-ws-error
@ -153,118 +146,81 @@
:on-text on-ws-message
:on-pong on-ws-pong})))
(defn- ws-send!
[channel s]
(let [ch (a/chan 1)]
(defn- handle-ping!
[{:keys [::id ::beats ::channel] :as wsp} beat-id]
(l/trace :hint "ping" :beat beat-id :conn-id id)
(yws/ping! channel (encode-beat beat-id))
(let [issued (swap! beats conj (long beat-id))]
(not (>= (count issued) max-missed-heartbeats))))
(defn- start-io-loop!
[{:keys [::id ::close-ch ::input-ch ::output-ch ::heartbeat-ch ::channel ::handler ::beats ::on-rcv-message ::on-snd-message] :as wsp}]
{:name (str "penpot/websocket/io-loop/" id)
:virtual true}
(yws/send! channel s (fn [e]
(when e (a/offer! ch e))
(a/close! ch)))
(handler wsp {:type :open})
(loop [i 0]
(let [ping-ch (sp/timeout-chan heartbeat-interval)
[msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])]
(when (yws/connected? channel)
(identical? p ping-ch)
(if (handle-ping! wsp i)
(recur (inc i))
(yws/close! channel 8802 "missing to many pings"))
(or (identical? p close-ch) (nil? msg))
(do :nothing)
(identical? p heartbeat-ch)
(let [beat (decode-beat msg)]
;; (l/trace :hint "pong" :beat beat :conn-id id)
(swap! beats disj beat)
(recur i))
(identical? p input-ch)
(let [message (t/decode-str msg)
message (on-rcv-message message)
{:keys [request-id] :as response} (handler wsp message)]
(when (map? response)
(sp/put! output-ch
(cond-> response
(some? request-id)
(assoc :request-id request-id))))
(recur i))
(identical? p output-ch)
(let [message (on-snd-message msg)
message (t/encode-str message {:type :json-verbose})]
;; (l/trace :hint "writing message to output" :message msg)
(yws/send! channel message)
(recur i))))))
(catch java.nio.channels.ClosedChannelException _)
(catch java.net.SocketException _)
(catch java.io.IOException _)
(catch InterruptedException _
(l/debug :hint "websocket thread interrumpted" :conn-id id))
(catch Throwable cause
(a/offer! ch cause)
(a/close! ch)))
(l/error :hint "unhandled exception on websocket thread"
:conn-id id
:cause cause))
(defn- ws-ping!
[channel s]
(let [ch (a/chan 1)]
(yws/ping! channel s (fn [e]
(when e (a/offer! ch e))
(a/close! ch)))
(catch Throwable cause
(a/offer! ch cause)
(a/close! ch)))
(handler wsp {:type :close})
(defn- encode-beat
(doto (ByteBuffer/allocate 8)
(.putLong n)
(when (yws/connected? channel)
;; NOTE: we need to ignore all exceptions here because
;; there can be a race condition that first returns that
;; channel is connected but on closing, will raise that
;; channel is already closed.
(yws/close! channel 8899 "terminated")))
(defn- decode-beat
[^ByteBuffer buffer]
(when (= 8 (.capacity buffer))
(.rewind buffer)
(.getLong buffer)))
(when-let [on-disconnect (::on-disconnect wsp)]
(defn- wrap-handler
(fn [wsp message]
(locking wsp
(handler wsp message))))
(def max-missed-heartbeats 3)
(def heartbeat-interval 5000)
(defn- start-io-loop
[wsp handler on-snd-message on-ws-terminate]
(let [input-ch (::input-ch @wsp)
output-ch (::output-ch @wsp)
stop-ch (::stop-ch @wsp)
hbeat-pong-ch (::heartbeat-ch @wsp)
channel (::channel @wsp)
conn-id (::id @wsp)
handler (wrap-handler handler)
beats (atom #{})
choices [stop-ch
;; Start IO loop
(a/<! (handler wsp {:type :connect}))
(a/<! (a/go-loop [i 0]
(let [hbeat-ping-ch (a/timeout heartbeat-interval)
[v p] (a/alts! (conj choices hbeat-ping-ch))]
(not (yws/connected? channel))
(on-ws-terminate nil 8800 "channel disconnected")
(= p hbeat-ping-ch)
(l/trace :hint "ping" :beat i :conn-id conn-id)
(a/<! (ws-ping! channel (encode-beat i)))
(let [issued (swap! beats conj (long i))]
(if (>= (count issued) max-missed-heartbeats)
(on-ws-terminate nil 8802 "heartbeat: timeout")
(recur (inc i)))))
(= p hbeat-pong-ch)
(let [beat (decode-beat v)]
(l/trace :hint "pong" :beat beat :conn-id conn-id)
(swap! beats disj beat)
(recur i))
(= p input-ch)
(let [result (a/<! (handler wsp v))]
;; (l/trace :hint "message received" :message v)
(ex/error? result)
(a/>! output-ch {:type :error :error (ex-data result)})
(ex/exception? result)
(a/>! output-ch {:type :error :error {:message (ex-message result)}})
(map? result)
(a/>! output-ch (cond-> result (:request-id v) (assoc :request-id (:request-id v)))))
(recur i))
(= p output-ch)
(let [v (on-snd-message wsp v)]
;; (l/trace :hint "writing message to output" :message v)
(a/<! (ws-send! channel (t/encode-str v)))
(recur i))))))
(a/<! (handler wsp {:type :disconnect})))))
(defn- filter-options
"Remove from options all namespace qualified keys that matches the
current namespace."
(into {}
(remove (fn [[key]]
(= (namespace key) "app.util.websocket")))
(l/trace :hint "websocket thread terminated" :conn-id id)))))