0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-12 18:18:24 -05:00

♻️ Refactor websockets subsystem (on backend)

- Refactor msgbus subsystem, simplifying many parts.
- Enable persistent websocket connection for the all session duration.
This commit is contained in:
Andrey Antukh 2022-03-18 12:36:42 +01:00 committed by Alonso Torres
parent 4a9e38a221
commit f60d8c6c96
12 changed files with 482 additions and 362 deletions

View file

@ -19,7 +19,7 @@
io.lettuce/lettuce-core {:mvn/version "6.1.6.RELEASE"} io.lettuce/lettuce-core {:mvn/version "6.1.6.RELEASE"}
java-http-clj/java-http-clj {:mvn/version "0.4.3"} java-http-clj/java-http-clj {:mvn/version "0.4.3"}
funcool/yetti {:git/tag "v6.0" :git/sha "4c8690e" funcool/yetti {:git/tag "v8.0" :git/sha "ea7162d"
:git/url "https://github.com/funcool/yetti.git" :git/url "https://github.com/funcool/yetti.git"
:exclusions [org.slf4j/slf4j-api]} :exclusions [org.slf4j/slf4j-api]}

View file

@ -22,51 +22,163 @@
;; WEBSOCKET HANDLER ;; WEBSOCKET HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare send-presence!)
(defmulti handle-message (defmulti handle-message
(fn [_wsp message] (:type message))) (fn [_ message]
(:type message)))
(defmethod handle-message :connect (defmethod handle-message :connect
[wsp _] [wsp _]
(let [{:keys [msgbus file-id team-id session-id ::ws/output-ch]} @wsp (l/trace :fn "handle-message" :event :connect)
sub-ch (a/chan (a/dropping-buffer 32))]
(swap! wsp assoc :sub-ch sub-ch) (let [msgbus-fn (:msgbus @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
;; Start a subscription forwarding goroutine xform (remove #(= (:session-id %) session-id))
(a/go-loop [] channel (a/chan (a/dropping-buffer 16) xform)]
(when-let [val (a/<! sub-ch)]
(when-not (= (:session-id val) session-id)
;; If we receive a connect message of other user, we need
;; to send an update presence to all participants.
(when (= :connect (:type val))
(a/<! (send-presence! @wsp :presence)))
;; Then, just forward the message (swap! wsp assoc ::profile-subs-channel channel)
(a/>! output-ch val)) (a/pipe channel output-ch false)
(recur))) (msgbus-fn :cmd :sub :topic profile-id :chan channel)))
(a/go
(a/<! (msgbus :sub {:topics [file-id team-id] :chan sub-ch}))
(a/<! (send-presence! @wsp :connect)))))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
[wsp _] [wsp _]
(a/close! (:sub-ch @wsp)) (l/trace :fn "handle-message" :event :disconnect)
(send-presence! @wsp :disconnect)) (a/go
(let [msgbus-fn (:msgbus @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
profile-ch (::profile-subs-channel @wsp)
subs (::subscriptions @wsp)]
;; Close the main profile subscription
(a/close! profile-ch)
(a/<! (msgbus-fn :cmd :purge :chans [profile-ch]))
;; Close all other active subscrption on this websocket context.
(doseq [{:keys [channel topic]} (map second subs)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :pub :topic topic
:message {:type :disconnect
:profile-id profile-id
:session-id session-id}))
(a/<! (msgbus-fn :cmd :purge :chans [channel]))))))
(defmethod handle-message :subscribe-team
[wsp {:keys [team-id] :as params}]
(l/trace :fn "handle-message" :event :subscribe-team :team-id team-id)
(let [msgbus-fn (:msgbus @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
subs (get-in @wsp [::subscriptions team-id])
xform (comp
(remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id team-id)))]
(a/go
(when (not= (:team-id subs) team-id)
;; if it exists we just need to close that
(when-let [channel (:channel subs)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel])))
(let [channel (a/chan (a/dropping-buffer 64) xform)]
;; Message forwarding
(a/pipe channel output-ch false)
(let [state {:team-id team-id :channel channel :topic team-id}]
(swap! wsp update ::subscriptions assoc team-id state))
(a/<! (msgbus-fn :cmd :sub :topic team-id :chan channel)))))))
(defmethod handle-message :subscribe-file
[wsp {:keys [subs-id file-id] :as params}]
(l/trace :fn "handle-message" :event :subscribe-file :subs-id subs-id :file-id file-id)
(let [msgbus-fn (:msgbus @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
xform (comp
(remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id subs-id)))
channel (a/chan (a/dropping-buffer 64) xform)]
;; Message forwarding
(a/go-loop []
(when-let [{:keys [type] :as message} (a/<! channel)]
(when (or (= :join-file type)
(= :leave-file type)
(= :disconnect type))
(let [message {:type :presence
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(a/<! (msgbus-fn :cmd :pub
:topic file-id
:message message))))
(a/>! output-ch message)
(recur)))
(let [state {:file-id file-id :channel channel :topic file-id}]
(swap! wsp update ::subscriptions assoc subs-id state))
(a/go
;; Subscribe to file topic
(a/<! (msgbus-fn :cmd :sub :topic file-id :chan channel))
;; Notifify the rest of participants of the new connection.
(let [message {:type :join-file
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(a/<! (msgbus-fn :cmd :pub
:topic file-id
:message message))))))
(defmethod handle-message :unsubscribe-file
[wsp {:keys [subs-id] :as params}]
(l/trace :fn "handle-message" :event :unsubscribe-file :subs-id subs-id)
(let [msgbus-fn (:msgbus @wsp)
session-id (::session-id @wsp)
profile-id (::profile-id @wsp)]
(a/go
(when-let [{:keys [file-id channel]} (get-in @wsp [::subscriptions subs-id])]
(let [message {:type :leave-file
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(a/close! channel)
(a/<! (msgbus-fn :cmd :pub :topic file-id :message message))
(a/<! (msgbus-fn :cmd :purge :chans [channel])))))))
(defmethod handle-message :keepalive (defmethod handle-message :keepalive
[_ _] [_ _]
(l/trace :fn "handle-message" :event :keepalive)
(a/go :nothing)) (a/go :nothing))
(defmethod handle-message :pointer-update (defmethod handle-message :pointer-update
[wsp message] [wsp {:keys [subs-id] :as message}]
(let [{:keys [profile-id file-id session-id msgbus]} @wsp] (a/go
(msgbus :pub {:topic file-id ;; Only allow receive pointer updates when active subscription
:message (assoc message (when-let [{:keys [topic]} (get-in @wsp [::subscriptions subs-id])]
:profile-id profile-id (l/trace :fn "handle-message" :event :pointer-update :message message)
:session-id session-id)}))) (let [msgbus-fn (:msgbus @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
message (-> message
(dissoc :subs-id)
(assoc :profile-id profile-id)
(assoc :session-id session-id))]
(a/<! (msgbus-fn :cmd :pub
:topic topic
:message message))))))
(defmethod handle-message :default (defmethod handle-message :default
[_ message] [_ message]
@ -75,51 +187,33 @@
:msg "received unexpected message" :msg "received unexpected message"
:message message))) :message message)))
;; --- IMPL
(defn- send-presence!
([ws] (send-presence! ws :presence))
([{:keys [msgbus session-id profile-id file-id]} type]
(msgbus :pub {:topic file-id
:message {:type type
:session-id session-id
:profile-id profile-id}})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HTTP HANDLER ;; HTTP HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare retrieve-file)
(s/def ::msgbus fn?) (s/def ::msgbus fn?)
(s/def ::file-id ::us/uuid)
(s/def ::session-id ::us/uuid) (s/def ::session-id ::us/uuid)
(s/def ::handler-params (s/def ::handler-params
(s/keys :req-un [::file-id ::session-id])) (s/keys :req-un [::session-id]))
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::msgbus ::db/pool ::mtx/metrics])) (s/keys :req-un [::msgbus ::db/pool ::mtx/metrics]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [pool] :as cfg}] [_ cfg]
(fn [{:keys [profile-id params] :as req} respond raise] (fn [{:keys [profile-id params] :as req} respond raise]
(let [params (us/conform ::handler-params params) (let [{:keys [session-id]} (us/conform ::handler-params params)
file (retrieve-file pool (:file-id params)) cfg (-> cfg
cfg (-> (merge cfg params) (assoc ::profile-id profile-id)
(assoc :profile-id profile-id) (assoc ::session-id session-id))]
(assoc :team-id (:team-id file)))]
(l/trace :hint "http request to websocket" :profile-id profile-id :session-id session-id)
(cond (cond
(not profile-id) (not profile-id)
(raise (ex/error :type :authentication (raise (ex/error :type :authentication
:hint "Authentication required.")) :hint "Authentication required."))
(not file)
(raise (ex/error :type :not-found
:code :object-not-found))
(not (yws/upgrade-request? req)) (not (yws/upgrade-request? req))
(raise (ex/error :type :validation (raise (ex/error :type :validation
:code :websocket-request-expected :code :websocket-request-expected
@ -129,16 +223,3 @@
(->> (ws/handler handle-message cfg) (->> (ws/handler handle-message cfg)
(yws/upgrade req) (yws/upgrade req)
(respond)))))) (respond))))))
(def ^:private
sql:retrieve-file
"select f.id as id,
p.team_id as team_id
from file as f
join project as p on (p.id = f.project_id)
where f.id = ?")
(defn- retrieve-file
[conn id]
(db/exec-one! conn [sql:retrieve-file id]))

View file

@ -65,6 +65,7 @@
:app.msgbus/msgbus :app.msgbus/msgbus
{:backend (cf/get :msgbus-backend :redis) {:backend (cf/get :msgbus-backend :redis)
:executor (ig/ref [::default :app.worker/executor])
:redis-uri (cf/get :redis-uri)} :redis-uri (cf/get :redis-uri)}
:app.tokens/tokens :app.tokens/tokens

View file

@ -7,12 +7,15 @@
(ns app.msgbus (ns app.msgbus
"The msgbus abstraction implemented using redis as underlying backend." "The msgbus abstraction implemented using redis as underlying backend."
(:require (:require
[app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.transit :as t]
[app.config :as cfg] [app.config :as cfg]
[app.util.blob :as blob] [app.util.async :as aa]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
@ -28,119 +31,82 @@
io.lettuce.core.codec.StringCodec io.lettuce.core.codec.StringCodec
io.lettuce.core.pubsub.RedisPubSubListener io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
io.lettuce.core.resource.ClientResources io.lettuce.core.resource.ClientResources
io.lettuce.core.resource.DefaultClientResources io.lettuce.core.resource.DefaultClientResources
java.time.Duration)) java.time.Duration))
(set! *warn-on-reflection* true)
(def ^:private prefix (cfg/get :tenant)) (def ^:private prefix (cfg/get :tenant))
(defn- prefix-topic (defn- prefix-topic
[topic] [topic]
(str prefix "." topic)) (str prefix "." topic))
(def xform-prefix (map prefix-topic)) (def ^:private xform-prefix-topic
(def xform-topics (map (fn [m] (update m :topics #(into #{} xform-prefix %))))) (map (fn [obj] (update obj :topic prefix-topic))))
(def xform-topic (map (fn [m] (update m :topic prefix-topic))))
(s/def ::redis-uri ::us/string) (declare ^:private redis-connect)
(s/def ::buffer-size ::us/integer) (declare ^:private redis-disconnect)
(declare ^:private start-io-loop)
(defmulti init-backend :backend) (declare ^:private subscribe)
(defmulti stop-backend :backend) (declare ^:private purge)
(defmulti init-pub-loop :backend) (declare ^:private redis-pub)
(defmulti init-sub-loop :backend) (declare ^:private redis-sub)
(declare ^:private redis-unsub)
(defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :opt-un [::buffer-size ::redis-uri]))
(defmethod ig/prep-key ::msgbus (defmethod ig/prep-key ::msgbus
[_ cfg] [_ cfg]
(merge {:buffer-size 128} cfg)) (merge {:buffer-size 128
:timeout (dt/duration {:seconds 30})}
(d/without-nils cfg)))
(s/def ::timeout ::dt/duration)
(s/def ::redis-uri ::us/string)
(s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::buffer-size ::redis-uri ::timeout ::wrk/executor]))
(defmethod ig/init-key ::msgbus (defmethod ig/init-key ::msgbus
[_ {:keys [backend buffer-size] :as cfg}] [_ {:keys [buffer-size redis-uri] :as cfg}]
(l/debug :action "initialize msgbus" (l/info :hint "initialize msgbus"
:backend (name backend)) :buffer-size buffer-size
(let [cfg (init-backend cfg) :redis-uri redis-uri)
(let [cmd-ch (a/chan buffer-size)
;; Channel used for receive publications from the application. rcv-ch (a/chan (a/dropping-buffer buffer-size))
pub-ch (-> (a/dropping-buffer buffer-size) pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic)
(a/chan xform-topic)) state (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent" ::l/async false))
cfg (-> (redis-connect cfg)
;; Channel used for receive subscription requests. (assoc ::cmd-ch cmd-ch)
sub-ch (a/chan 1 xform-topics) (assoc ::rcv-ch rcv-ch)
cfg (-> cfg
(assoc ::pub-ch pub-ch) (assoc ::pub-ch pub-ch)
(assoc ::sub-ch sub-ch))] (assoc ::state state))]
(init-pub-loop cfg) (start-io-loop cfg)
(init-sub-loop cfg)
(with-meta (with-meta
(fn run (fn [& {:keys [cmd] :as params}]
([command] (run command nil))
([command params]
(a/go (a/go
(case command (case cmd
:pub (a/>! pub-ch params) :pub (a/>! pub-ch params)
:sub (a/>! sub-ch params))))) :sub (a/<! (subscribe cfg params))
:purge (a/<! (purge cfg params))
(l/error :hint "unexpeced error on msgbus command processing" :params params))))
cfg))) cfg)))
(defmethod ig/halt-key! ::msgbus (defmethod ig/halt-key! ::msgbus
[_ f] [_ f]
(let [mdata (meta f)] (let [mdata (meta f)]
(stop-backend mdata) (redis-disconnect mdata)
(a/close! (::pub-ch mdata)) (a/close! (::cmd-ch mdata))
(a/close! (::sub-ch mdata)))) (a/close! (::rcv-ch mdata))))
;; --- IN-MEMORY BACKEND IMPL ;; --- IMPL
(defmethod init-backend :memory [cfg] cfg) (defn- redis-connect
(defmethod stop-backend :memory [_]) [{:keys [redis-uri timeout] :as cfg}]
(defmethod init-pub-loop :memory [_])
(defmethod init-sub-loop :memory
[{:keys [::sub-ch ::pub-ch]}]
(a/go-loop [state {}]
(let [[val port] (a/alts! [pub-ch sub-ch])]
(cond
(and (= port sub-ch) (some? val))
(let [{:keys [topics chan]} val]
(recur (reduce #(update %1 %2 (fnil conj #{}) chan) state topics)))
(and (= port pub-ch) (some? val))
(let [topic (:topic val)
message (:message val)
state (loop [state state
chans (get state topic)]
(if-let [c (first chans)]
(if (a/>! c message)
(recur state (rest chans))
(recur (update state topic disj c)
(rest chans)))
state))]
(recur state))
:else
(->> (vals state)
(mapcat identity)
(run! a/close!))))))
;; Add a unique listener to connection
;; --- REDIS BACKEND IMPL
(declare impl-redis-open?)
(declare impl-redis-pub)
(declare impl-redis-sub)
(declare impl-redis-unsub)
(defmethod init-backend :redis
[{:keys [redis-uri] :as cfg}]
(let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)
resources (.. (DefaultClientResources/builder) resources (.. (DefaultClientResources/builder)
@ -151,50 +117,101 @@
uri (RedisURI/create redis-uri) uri (RedisURI/create redis-uri)
rclient (RedisClient/create ^ClientResources resources ^RedisURI uri) rclient (RedisClient/create ^ClientResources resources ^RedisURI uri)
pub-conn (.connect ^RedisClient rclient ^RedisCodec codec) pconn (.connect ^RedisClient rclient ^RedisCodec codec)
sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] sconn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)]
(.setTimeout ^StatefulRedisConnection pub-conn ^Duration (dt/duration {:seconds 10})) (.setTimeout ^StatefulRedisConnection pconn ^Duration timeout)
(.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10})) (.setTimeout ^StatefulRedisPubSubConnection sconn ^Duration timeout)
(-> cfg (-> cfg
(assoc ::resources resources) (assoc ::resources resources)
(assoc ::pub-conn pub-conn) (assoc ::pconn pconn)
(assoc ::sub-conn sub-conn)))) (assoc ::sconn sconn))))
(defmethod stop-backend :redis (defn- redis-disconnect
[{:keys [::pub-conn ::sub-conn ::resources] :as cfg}] [{:keys [::pconn ::sconn ::resources] :as cfg}]
(.close ^StatefulRedisConnection pub-conn) (.. ^StatefulConnection pconn close)
(.close ^StatefulRedisPubSubConnection sub-conn) (.. ^StatefulConnection sconn close)
(.shutdown ^ClientResources resources)) (.shutdown ^ClientResources resources))
(defmethod init-pub-loop :redis (defn- conj-subscription
[{:keys [::pub-conn ::pub-ch]}] "A low level function that is responsible to create on-demand
(let [rac (.async ^StatefulRedisConnection pub-conn)] subscriptions on redis. It reuses the same subscription if it is
(a/go-loop [] already established. Intended to be executed in agent."
(when-let [val (a/<! pub-ch)] [nsubs cfg topic chan]
(let [result (a/<! (impl-redis-pub rac val))] (let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))]
(when (and (impl-redis-open? pub-conn) (when (= 1 (count nsubs))
(ex/exception? result)) (l/trace :hint "open subscription" :topic topic ::l/async false)
(l/error :cause result (redis-sub cfg topic))
:hint "unexpected error on publish message to redis"))) nsubs))
(recur)))))
(defmethod init-sub-loop :redis (defn- disj-subscription
[{:keys [::sub-conn ::sub-ch buffer-size]}] "A low level function responsible on removing subscriptions. The
(let [rcv-ch (a/chan (a/dropping-buffer buffer-size)) subscription is trully removed from redis once no single local
chans (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent")) subscription is look for it. Intended to be executed in agent."
rac (.async ^StatefulRedisPubSubConnection sub-conn)] [nsubs cfg topic chan]
(let [nsubs (disj nsubs chan)]
(when (empty? nsubs)
(l/trace :hint "close subscription" :topic topic ::l/async false)
(redis-unsub cfg topic))
nsubs))
;; Add a unique listener to connection (defn- subscribe-to-topics
(.addListener sub-conn "Function responsible to attach local subscription to the
state. Intended to be used in agent."
[state cfg topics chan done-ch]
(l/trace :hint "subscribe-to-topics" :topics topics ::l/async false)
(aa/with-closing done-ch
(let [state (update state :chans assoc chan topics)]
(reduce (fn [state topic]
(update-in state [:topics topic] conj-subscription cfg topic chan))
state
topics))))
(defn- unsubscribe-single-channel
"Auxiliar function responsible on removing a single local
subscription from the state."
[state cfg chan]
(let [topics (get-in state [:chans chan])
state (update state :chans dissoc chan)]
(reduce (fn [state topic]
(update-in state [:topics topic] disj-subscription cfg topic chan))
state
topics)))
(defn- unsubscribe-channels
"Function responsible from detach from state a seq of channels,
useful when client disconnects or in-bulk unsubscribe
operations. Intended to be executed in agent."
[state cfg channels done-ch]
(l/trace :hint "unsubscribe-channels" :chans (count channels) ::l/async false)
(aa/with-closing done-ch
(reduce #(unsubscribe-single-channel %1 cfg %2) state channels)))
(defn- subscribe
[{:keys [::state executor] :as cfg} {:keys [topic topics chan]}]
(let [done-ch (a/chan)
topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/trace :hint "subscribe" :topics topics)
(send-via executor state subscribe-to-topics cfg topics chan done-ch)
done-ch))
(defn- purge
[{:keys [::state executor] :as cfg} {:keys [chans]}]
(l/trace :hint "purge" :chans (count chans))
(let [done-ch (a/chan)]
(send-via executor state unsubscribe-channels cfg chans done-ch)
done-ch))
(defn- create-listener
[rcv-ch]
(reify RedisPubSubListener (reify RedisPubSubListener
(message [_ _pattern _topic _message]) (message [_ _pattern _topic _message])
(message [_ topic message] (message [_ topic message]
;; There are no back pressure, so we use a slidding ;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends ;; buffer for cases when the pubsub broker sends
;; more messages that we can process. ;; more messages that we can process.
(let [val {:topic topic :message (blob/decode message)}] (let [val {:topic topic :message (t/decode message)}]
(when-not (a/offer! rcv-ch val) (when-not (a/offer! rcv-ch val)
(l/warn :msg "dropping message on subscription loop")))) (l/warn :msg "dropping message on subscription loop"))))
(psubscribed [_ _pattern _count]) (psubscribed [_ _pattern _count])
@ -202,111 +219,79 @@
(subscribed [_ _topic _count]) (subscribed [_ _topic _count])
(unsubscribed [_ _topic _count]))) (unsubscribed [_ _topic _count])))
(letfn [(subscribe-to-single-topic [nsubs topic chan] (defn start-io-loop
(let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))] [{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}]
(when (= 1 (count nsubs))
(let [result (a/<!! (impl-redis-sub rac topic))]
(l/trace :action "open subscription"
:topic topic)
(when (ex/exception? result)
(l/error :cause result
:hint "unexpected exception on subscribing"
:topic topic))))
nsubs))
(subscribe-to-topics [state topics chan] ;; Add a single listener to the pubsub connection
(let [state (update state :chans assoc chan topics)] (.addListener ^StatefulRedisPubSubConnection sconn
(reduce (fn [state topic] ^RedisPubSubListener (create-listener rcv-ch))
(update-in state [:topics topic] subscribe-to-single-topic topic chan))
state
topics)))
(unsubscribe-from-single-topic [nsubs topic chan] (letfn [(send-to-topic [topic message]
(let [nsubs (disj nsubs chan)] (a/go-loop [chans (seq (get-in @state [:topics topic]))
(when (empty? nsubs) closed #{}]
(let [result (a/<!! (impl-redis-unsub rac topic))]
(l/trace :action "close subscription"
:topic topic)
(when (and (impl-redis-open? sub-conn)
(ex/exception? result))
(l/error :cause result
:hint "unexpected exception on unsubscribing"
:topic topic))))
nsubs))
(unsubscribe-channels [state pending]
(reduce (fn [state ch]
(let [topics (get-in state [:chans ch])
state (update state :chans dissoc ch)]
(reduce (fn [state topic]
(update-in state [:topics topic] unsubscribe-from-single-topic topic ch))
state
topics)))
state
pending))]
;; Asynchronous subscription loop;
(a/go-loop []
(if-let [{:keys [topics chan]} (a/<! sub-ch)]
(do
(send-off chans subscribe-to-topics topics chan)
(recur))
(a/close! rcv-ch)))
;; Asynchronous message processing loop;x
(a/go-loop []
(if-let [{:keys [topic message]} (a/<! rcv-ch)]
;; This means we receive data from redis and we need to
;; forward it to the underlying subscriptions.
(let [pending (loop [chans (seq (get-in @chans [:topics topic]))
pending #{}]
(if-let [ch (first chans)] (if-let [ch (first chans)]
(if (a/>! ch message) (if (a/>! ch message)
(recur (rest chans) pending) (recur (rest chans) closed)
(recur (rest chans) (conj pending ch))) (recur (rest chans) (conj closed ch)))
pending))] (seq closed))))
(some->> (seq pending)
(send-off chans unsubscribe-channels))
(recur)) (process-incoming [{:keys [topic message]}]
(a/go
(when-let [closed (a/<! (send-to-topic topic message))]
(send-via executor state unsubscribe-channels cfg closed nil))))
]
;; Stop condition; close all underlying subscriptions and (a/go-loop []
;; exit. The close operation is performed asynchronously. (let [[val port] (a/alts! [pub-ch rcv-ch])]
(send-off chans (fn [state] (cond
(nil? val)
(do
(l/trace :hint "stoping io-loop, nil received")
(send-via executor state (fn [state]
(->> (vals state) (->> (vals state)
(mapcat identity) (mapcat identity)
(filter some?) (filter some?)
(run! a/close!))))))))) (run! a/close!))
nil)))
(= port rcv-ch)
(do
(a/<! (process-incoming val))
(recur))
(defn- impl-redis-open? (= port pub-ch)
[^StatefulConnection conn] (let [result (a/<! (redis-pub cfg val))]
(.isOpen conn)) (when (ex/exception? result)
(l/error :hint "unexpected error on publishing" :message val
:cause result))
(recur)))))))
(defn- impl-redis-pub (defn- redis-pub
[^RedisAsyncCommands rac {:keys [topic message]}] "Publish a message to the redis server. Asynchronous operation,
(let [message (blob/encode message) intended to be used in core.async go blocks."
res (a/chan 1)] [{:keys [::pconn] :as cfg} {:keys [topic message]}]
(-> (.publish rac ^String topic ^bytes message) (let [message (t/encode message)
(p/finally (fn [_ e] res (a/chan 1)
(when e (a/>!! res e)) pcomm (.async ^StatefulRedisConnection pconn)]
(-> (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)
(p/finally (fn [_ cause]
(when (and cause (.isOpen ^StatefulConnection pconn))
(a/offer! res cause))
(a/close! res)))) (a/close! res))))
res)) res))
(defn impl-redis-sub (defn redis-sub
[^RedisPubSubAsyncCommands rac topic] "Create redis subscription. Blocking operation, intended to be used
(let [res (a/chan 1)] inside an agent."
(-> (.subscribe rac (into-array String [topic])) [{:keys [::sconn] :as cfg} topic]
(p/finally (fn [_ e] (let [topic (into-array String [topic])
(when e (a/>!! res e)) scomm (.sync ^StatefulRedisPubSubConnection sconn)]
(a/close! res)))) (.subscribe ^RedisPubSubCommands scomm topic)))
res))
(defn impl-redis-unsub (defn redis-unsub
[rac topic] "Removes redis subscription. Blocking operation, intended to be used
(let [res (a/chan 1)] inside an agent."
(-> (.unsubscribe rac (into-array String [topic])) [{:keys [::sconn] :as cfg} topic]
(p/finally (fn [_ e] (let [topic (into-array String [topic])
(when e (a/>!! res e)) scomm (.sync ^StatefulRedisPubSubConnection sconn)]
(a/close! res)))) (.unsubscribe ^RedisPubSubCommands scomm topic)))
res))

View file

@ -386,31 +386,33 @@
(assoc :changes [])))))))) (assoc :changes []))))))))
(defn- send-notifications (defn- send-notifications
[{:keys [msgbus conn] :as cfg} {:keys [file changes session-id] :as params}] [{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}]
(let [lchanges (filter library-change? changes)] (let [lchanges (filter library-change? changes)
msgbus-fn (:msgbus cfg)]
;; Asynchronously publish message to the msgbus ;; Asynchronously publish message to the msgbus
(msgbus :pub {:topic (:id file) (msgbus-fn :cmd :pub
:message :topic (:id file)
{:type :file-change :message {:type :file-change
:profile-id (:profile-id params) :profile-id (:profile-id params)
:file-id (:id file) :file-id (:id file)
:session-id (:session-id params) :session-id (:session-id params)
:revn (:revn file) :revn (:revn file)
:changes changes}}) :changes changes})
(when (and (:is-shared file) (seq lchanges)) (when (and (:is-shared file) (seq lchanges))
(let [team-id (retrieve-team-id conn (:project-id file))] (let [team-id (retrieve-team-id conn (:project-id file))]
;; Asynchronously publish message to the msgbus ;; Asynchronously publish message to the msgbus
(msgbus :pub {:topic team-id (msgbus-fn :cmd :pub
:message :topic team-id
{:type :library-change :message {:type :library-change
:profile-id (:profile-id params) :profile-id (:profile-id params)
:file-id (:id file) :file-id (:id file)
:session-id session-id :session-id session-id
:revn (:revn file) :revn (:revn file)
:modified-at (dt/now) :modified-at (dt/now)
:changes lchanges}}))))) :changes lchanges})))))
(defn- retrieve-team-id (defn- retrieve-team-id
[conn project-id] [conn project-id]

View file

@ -38,6 +38,13 @@
(throw r#) (throw r#)
r#))) r#)))
(defmacro with-closing
[ch & body]
`(try
~@body
(finally
(some-> ~ch a/close!))))
(defn thread-call (defn thread-call
[^Executor executor f] [^Executor executor f]
(let [c (a/chan 1)] (let [c (a/chan 1)]

View file

@ -54,16 +54,22 @@
pong-ch (a/chan (a/sliding-buffer 6)) pong-ch (a/chan (a/sliding-buffer 6))
close-ch (a/chan) close-ch (a/chan)
options (-> options options (atom
(-> options
(assoc ::input-ch input-ch) (assoc ::input-ch input-ch)
(assoc ::output-ch output-ch) (assoc ::output-ch output-ch)
(assoc ::close-ch close-ch) (assoc ::close-ch close-ch)
(assoc ::channel channel) (assoc ::channel channel)
(dissoc ::metrics)) (dissoc ::metrics)))
terminated (atom false) terminated (atom false)
created-at (dt/now) created-at (dt/now)
on-open
(fn [channel]
(mtx/run! metrics {:id :websocket-active-connections :inc 1})
(yws/idle-timeout! channel (dt/duration idle-timeout)))
on-terminate on-terminate
(fn [& _args] (fn [& _args]
(when (compare-and-set! terminated false true) (when (compare-and-set! terminated false true)
@ -79,7 +85,8 @@
(fn [_ error] (fn [_ error]
(on-terminate) (on-terminate)
;; TODO: properly log timeout exceptions ;; TODO: properly log timeout exceptions
(when-not (instance? java.nio.channels.ClosedChannelException error) (when-not (or (instance? java.nio.channels.ClosedChannelException error)
(instance? java.net.SocketException error))
(l/error :hint (ex-message error) :cause error))) (l/error :hint (ex-message error) :cause error)))
on-message on-message
@ -98,12 +105,8 @@
(fn [_ buffers] (fn [_ buffers]
(a/>!! pong-ch (yu/copy-many buffers)))] (a/>!! pong-ch (yu/copy-many buffers)))]
(mtx/run! metrics {:id :websocket-active-connections :inc 1}) ;; launch heartbeat process
(-> @options
(let [wsp (atom options)]
;; Handle heartbeat
(yws/idle-timeout! channel (dt/duration idle-timeout))
(-> @wsp
(assoc ::pong-ch pong-ch) (assoc ::pong-ch pong-ch)
(assoc ::on-close on-terminate) (assoc ::on-close on-terminate)
(process-heartbeat)) (process-heartbeat))
@ -117,12 +120,13 @@
(recur))) (recur)))
;; React on messages received from the client ;; React on messages received from the client
(process-input wsp handle-message) (process-input options handle-message)
{:on-error on-error {:on-open on-open
:on-error on-error
:on-close on-terminate :on-close on-terminate
:on-text on-message :on-text on-message
:on-pong on-pong}))))) :on-pong on-pong}))))
(defn- ws-send! (defn- ws-send!
[channel s] [channel s]
@ -160,14 +164,21 @@
(.rewind buffer) (.rewind buffer)
(.getLong buffer))) (.getLong buffer)))
(defn- wrap-handler
[handler]
(fn [wsp message]
(locking wsp
(handler wsp message))))
(defn- process-input (defn- process-input
[wsp handler] [wsp handler]
(let [{:keys [::input-ch ::output-ch ::close-ch]} @wsp] (let [{:keys [::input-ch ::output-ch ::close-ch]} @wsp
handler (wrap-handler handler)]
(a/go (a/go
(a/<! (handler wsp {:type :connect})) (a/<! (handler wsp {:type :connect}))
(a/<! (a/go-loop [] (a/<! (a/go-loop []
(when-let [request (a/<! input-ch)] (when-let [message (a/<! input-ch)]
(let [[val port] (a/alts! [(handler wsp request) close-ch])] (let [[val port] (a/alts! [(handler wsp message) close-ch])]
(when-not (= port close-ch) (when-not (= port close-ch)
(cond (cond
(ex/ex-info? val) (ex/ex-info? val)
@ -177,8 +188,7 @@
(a/>! output-ch {:type :error :error {:message (ex-message val)}}) (a/>! output-ch {:type :error :error {:message (ex-message val)}})
(map? val) (map? val)
(a/>! output-ch (cond-> val (:request-id request) (assoc :request-id (:request-id request))))) (a/>! output-ch (cond-> val (:request-id message) (assoc :request-id (:request-id message)))))
(recur)))))) (recur))))))
(a/<! (handler wsp {:type :disconnect}))))) (a/<! (handler wsp {:type :disconnect})))))

View file

@ -6,11 +6,11 @@
org.clojure/clojurescript {:mvn/version "1.11.4"} org.clojure/clojurescript {:mvn/version "1.11.4"}
;; Logging ;; Logging
org.apache.logging.log4j/log4j-api {:mvn/version "2.17.1"} org.apache.logging.log4j/log4j-api {:mvn/version "2.17.2"}
org.apache.logging.log4j/log4j-core {:mvn/version "2.17.1"} org.apache.logging.log4j/log4j-core {:mvn/version "2.17.2"}
org.apache.logging.log4j/log4j-web {:mvn/version "2.17.1"} org.apache.logging.log4j/log4j-web {:mvn/version "2.17.2"}
org.apache.logging.log4j/log4j-jul {:mvn/version "2.17.1"} org.apache.logging.log4j/log4j-jul {:mvn/version "2.17.2"}
org.apache.logging.log4j/log4j-slf4j18-impl {:mvn/version "2.17.1"} org.apache.logging.log4j/log4j-slf4j18-impl {:mvn/version "2.17.2"}
org.slf4j/slf4j-api {:mvn/version "2.0.0-alpha1"} org.slf4j/slf4j-api {:mvn/version "2.0.0-alpha1"}
selmer/selmer {:mvn/version "1.12.50"} selmer/selmer {:mvn/version "1.12.50"}

View file

@ -132,7 +132,7 @@
(defn- interpolate (defn- interpolate
[s params] [s params]
(loop [items (->> (re-seq #"([^\%]+)*(\%(\d+)?)?" s) (loop [items (->> (re-seq #"([^\%]+)*(\%(\d+)?)?" s)
(remove (fn [[_ seg]] (nil? seg)))) (remove (fn [[full seg]] (and (nil? seg) (not full)))))
result [] result []
index 0] index 0]
(if-let [[_ segment var? sidx] (first items)] (if-let [[_ segment var? sidx] (first items)]
@ -156,7 +156,8 @@
(recur (rest items) (recur (rest items)
(conj result segment) (conj result segment)
(inc index))) (inc index)))
result)))
(remove nil? result))))
(defmacro fmt (defmacro fmt
"String interpolation helper. Can only be used with strings known at "String interpolation helper. Can only be used with strings known at

View file

@ -181,16 +181,18 @@
~level-sym (get-level ~level)] ~level-sym (get-level ~level)]
(when (enabled? ~logger-sym ~level-sym) (when (enabled? ~logger-sym ~level-sym)
~(if async ~(if async
`(send-off logging-agent `(do
(send-off logging-agent
(fn [_#] (fn [_#]
(with-context (merge {:id (uuid/next)} (with-context (into {:id (uuid/next)}
(get-error-context ~cause) (get-error-context ~cause)
~context) ~context)
(->> (or ~raw (build-map-message ~props)) (->> (or ~raw (build-map-message ~props))
(write-log! ~logger-sym ~level-sym ~cause))))) (write-log! ~logger-sym ~level-sym ~cause)))))
nil)
`(let [message# (or ~raw (build-map-message ~props))] `(let [message# (or ~raw (build-map-message ~props))]
(write-log! ~logger-sym ~level-sym ~cause message#)))))))) (write-log! ~logger-sym ~level-sym ~cause message#)
nil)))))))
(defmacro info (defmacro info
[& params] [& params]

View file

@ -16,6 +16,7 @@
;; because of some strange interaction with cljs.spec.alpha and ;; because of some strange interaction with cljs.spec.alpha and
;; modules splitting. ;; modules splitting.
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.uri :as u]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[cuerdas.core :as str] [cuerdas.core :as str]
[expound.alpha :as expound])) [expound.alpha :as expound]))
@ -96,6 +97,7 @@
:else :else
::s/invalid)) ::s/invalid))
;; --- Default Specs ;; --- Default Specs
(s/def ::keyword (s/conformer keyword-conformer name)) (s/def ::keyword (s/conformer keyword-conformer name))
@ -192,6 +194,15 @@
(fn [v] (fn [v]
(str/join " " v)))) (str/join " " v))))
(s/def ::uri
(s/conformer
(fn [s]
(cond
(u/uri? s) s
(string? s) (u/uri s)
:else ::s/invalid))
str))
;; --- SPEC: set-of-str ;; --- SPEC: set-of-str
(s/def ::set-of-str (s/def ::set-of-str

View file

@ -9,7 +9,9 @@
(:require (:require
[app.common.data.macros :as dm] [app.common.data.macros :as dm]
[lambdaisland.uri :as u] [lambdaisland.uri :as u]
[lambdaisland.uri.normalize :as un])) [lambdaisland.uri.normalize :as un])
#?(:clj
(:import lambdaisland.uri.URI)))
(dm/export u/uri) (dm/export u/uri)
(dm/export u/join) (dm/export u/join)
@ -25,6 +27,11 @@
[v] [v]
(if (keyword? v) (name v) v)) (if (keyword? v) (name v) v))
(defn get-domain
[{:keys [host port] :as uri}]
(cond-> host
port (str ":" port)))
(defn map->query-string (defn map->query-string
([params] (map->query-string params nil)) ([params] (map->query-string params nil))
([params {:keys [value-fn key-fn] ([params {:keys [value-fn key-fn]
@ -35,3 +42,16 @@
(remove #(nil? (second %))) (remove #(nil? (second %)))
(map (fn [[k v]] [(key-fn k) (value-fn v)])))) (map (fn [[k v]] [(key-fn k) (value-fn v)]))))
(u/map->query-string)))) (u/map->query-string))))
#?(:clj
(defmethod print-method lambdaisland.uri.URI [^URI this ^java.io.Writer writer]
(.write writer "#")
(.write writer (str u/edn-tag))
(.write writer " ")
(.write writer (pr-str (.toString this))))
:cljs
(extend-type u/URI
IPrintWithWriter
(-pr-writer [this writer _opts]
(write-all writer "#" (str u/edn-tag) " " (pr-str (.toString this))))))