0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-23 06:58:58 -05:00

Merge pull request #2229 from penpot/niwinz-msgbus-improvements

 Improve msgbus internal API
This commit is contained in:
Alejandro 2022-09-06 10:50:42 +02:00 committed by GitHub
commit d41c2388c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 131 additions and 103 deletions

View file

@ -13,6 +13,7 @@
[app.common.spec :as us]
[app.db :as db]
[app.metrics :as mtx]
[app.msgbus :as mbus]
[app.util.time :as dt]
[app.util.websocket :as ws]
[clojure.core.async :as a]
@ -20,6 +21,12 @@
[integrant.core :as ig]
[yetti.websocket :as yws]))
(def recv-labels
(into-array String ["recv"]))
(def send-labels
(into-array String ["send"]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; WEBSOCKET HOOKS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -30,21 +37,30 @@
[{:keys [metrics]} wsp]
(let [created-at (dt/now)]
(swap! state assoc (::ws/id @wsp) wsp)
(mtx/run! metrics {:id :websocket-active-connections :inc 1})
(mtx/run! metrics
:id :websocket-active-connections
:inc 1)
(fn []
(swap! state dissoc (::ws/id @wsp))
(mtx/run! metrics {:id :websocket-active-connections :dec 1})
(mtx/run! metrics {:id :websocket-session-timing
:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}))))
(mtx/run! metrics :id :websocket-active-connections :dec 1)
(mtx/run! metrics
:id :websocket-session-timing
:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)))))
(defn- on-rcv-message
[{:keys [metrics]} _ message]
(mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1})
(mtx/run! metrics
:id :websocket-messages-total
:labels recv-labels
:inc 1)
message)
(defn- on-snd-message
[{:keys [metrics]} _ message]
(mtx/run! metrics {:id :websocket-messages-total :labels ["send"] :inc 1})
(mtx/run! metrics
:id :websocket-messages-total
:labels send-labels
:inc 1)
message)
;; REPL HELPERS
@ -72,12 +88,12 @@
(defn repl-get-connection-info
[id]
(when-let [wsp (get @state id)]
{:id id
:created-at (dt/instant id)
:profile-id (::profile-id @wsp)
:session-id (::session-id @wsp)
:user-agent (::ws/user-agent @wsp)
:ip-addr (::ws/remote-addr @wsp)
{:id id
:created-at (dt/instant id)
:profile-id (::profile-id @wsp)
:session-id (::session-id @wsp)
:user-agent (::ws/user-agent @wsp)
:ip-addr (::ws/remote-addr @wsp)
:last-activity-at (::ws/last-activity-at @wsp)
:http-session-id (::ws/http-session-id @wsp)
:subscribed-file (-> wsp deref ::file-subscription :file-id)
@ -104,7 +120,7 @@
(defmethod handle-message :connect
[cfg wsp _]
(let [msgbus-fn (:msgbus cfg)
(let [msgbus (:msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
@ -113,17 +129,17 @@
xform (remove #(= (:session-id %) session-id))
channel (a/chan (a/dropping-buffer 16) xform)]
(l/trace :fn "handle-message" :event :connect :conn-id conn-id)
(l/trace :fn "handle-message" :event "connect" :conn-id conn-id)
;; Subscribe to the profile channel and forward all messages to
;; websocket output channel (send them to the client).
(swap! wsp assoc ::profile-subscription channel)
(a/pipe channel output-ch false)
(msgbus-fn :cmd :sub :topic profile-id :chan channel)))
(mbus/sub! msgbus :topic profile-id :chan channel)))
(defmethod handle-message :disconnect
[cfg wsp _]
(let [msgbus-fn (:msgbus cfg)
(let [msgbus (:msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
@ -143,21 +159,21 @@
(a/go
;; Close the main profile subscription
(a/close! profile-ch)
(a/<! (msgbus-fn :cmd :purge :chans [profile-ch]))
(a/<! (mbus/purge! msgbus [profile-ch]))
;; Close tram subscription if exists
(when-let [channel (:channel tsub)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel])))
(a/<! (mbus/purge! msgbus channel)))
(when-let [{:keys [topic channel]} fsub]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel]))
(a/<! (msgbus-fn :cmd :pub :topic topic :message message))))))
(a/<! (mbus/purge! msgbus channel))
(a/<! (mbus/pub! msgbus :topic topic :message message))))))
(defmethod handle-message :subscribe-team
[cfg wsp {:keys [team-id] :as params}]
(let [msgbus-fn (:msgbus cfg)
(let [msgbus (:msgbus cfg)
conn-id (::ws/id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
@ -182,14 +198,14 @@
;; Close previous subscription if exists
(when-let [channel (:channel prev-subs)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel]))))
(a/<! (mbus/purge! msgbus channel))))
(a/go
(a/<! (msgbus-fn :cmd :sub :topic team-id :chan channel)))))
(a/<! (mbus/sub! msgbus :topic team-id :chan channel)))))
(defmethod handle-message :subscribe-file
[cfg wsp {:keys [file-id] :as params}]
(let [msgbus-fn (:msgbus cfg)
(let [msgbus (:msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
@ -211,7 +227,7 @@
;; Close previous subscription if exists
(when-let [channel (:channel prev-subs)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel]))))
(a/<! (mbus/purge! msgbus channel))))
;; Message forwarding
(a/go
@ -224,15 +240,13 @@
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(a/<! (msgbus-fn :cmd :pub
:topic file-id
:message message))))
(a/<! (mbus/pub! msgbus :topic file-id :message message))))
(a/>! output-ch message)
(recur))))
(a/go
;; Subscribe to file topic
(a/<! (msgbus-fn :cmd :sub :topic file-id :chan channel))
(a/<! (mbus/sub! msgbus :topic file-id :chan channel))
;; Notifify the rest of participants of the new connection.
(let [message {:type :join-file
@ -240,13 +254,11 @@
:subs-id file-id
:session-id session-id
:profile-id profile-id}]
(a/<! (msgbus-fn :cmd :pub
:topic file-id
:message message))))))
(a/<! (mbus/pub! msgbus :topic file-id :message message))))))
(defmethod handle-message :unsubscribe-file
[cfg wsp {:keys [file-id] :as params}]
(let [msgbus-fn (:msgbus cfg)
(let [msgbus (:msgbus cfg)
conn-id (::ws/id @wsp)
session-id (::session-id @wsp)
profile-id (::profile-id @wsp)
@ -266,8 +278,8 @@
(when (= (:file-id subs) file-id)
(let [channel (:channel subs)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel]))
(a/<! (msgbus-fn :cmd :pub :topic file-id :message message)))))))
(a/<! (mbus/purge! msgbus channel))
(a/<! (mbus/pub! msgbus :topic file-id :message message)))))))
(defmethod handle-message :keepalive
[_ _ _]
@ -276,7 +288,7 @@
(defmethod handle-message :pointer-update
[cfg wsp {:keys [file-id] :as message}]
(let [msgbus-fn (:msgbus cfg)
(let [msgbus (:msgbus cfg)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
subs (::file-subscription @wsp)
@ -287,9 +299,7 @@
(a/go
;; Only allow receive pointer updates when active subscription
(when subs
(a/<! (msgbus-fn :cmd :pub
:topic file-id
:message message))))))
(a/<! (mbus/pub! msgbus :topic file-id :message message))))))
(defmethod handle-message :default
[_ wsp message]
@ -303,7 +313,7 @@
;; HTTP HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::msgbus fn?)
(s/def ::msgbus ::mbus/msgbus)
(s/def ::session-id ::us/uuid)
(s/def ::handler-params

View file

@ -212,7 +212,7 @@
(defmulti create-collector ::mdef/type)
(defn run!
[{:keys [::definitions]} {:keys [id] :as params}]
[{:keys [::definitions]} & {:keys [id] :as params}]
(when-let [mobj (get definitions id)]
(run-collector! mobj params)
true))

View file

@ -35,12 +35,12 @@
(declare ^:private redis-connect)
(declare ^:private redis-disconnect)
(declare ^:private start-io-loop)
(declare ^:private subscribe)
(declare ^:private purge)
(declare ^:private redis-pub)
(declare ^:private redis-sub)
(declare ^:private redis-unsub)
(declare ^:private start-io-loop!)
(declare ^:private subscribe-to-topics)
(declare ^:private unsubscribe-channels)
(defmethod ig/prep-key ::msgbus
[_ cfg]
@ -48,42 +48,68 @@
:timeout (dt/duration {:seconds 30})}
(d/without-nils cfg)))
(s/def ::cmd-ch ::aa/channel)
(s/def ::rcv-ch ::aa/channel)
(s/def ::pub-ch ::aa/channel)
(s/def ::state ::us/agent)
(s/def ::pconn ::redis/connection)
(s/def ::sconn ::redis/connection)
(s/def ::msgbus
(s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor]))
(s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor]))
(defmethod ig/init-key ::msgbus
[_ {:keys [buffer-size] :as cfg}]
[_ {:keys [buffer-size executor] :as cfg}]
(l/info :hint "initialize msgbus" :buffer-size buffer-size)
(let [cmd-ch (a/chan buffer-size)
rcv-ch (a/chan (a/dropping-buffer buffer-size))
pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic)
state (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent" ::l/async false))
cfg (-> (redis-connect cfg)
state (agent {})
msgbus (-> (redis-connect cfg)
(assoc ::cmd-ch cmd-ch)
(assoc ::rcv-ch rcv-ch)
(assoc ::pub-ch pub-ch)
(assoc ::state state))]
(assoc ::state state)
(assoc ::wrk/executor executor))]
(start-io-loop cfg)
(us/verify! ::msgbus msgbus)
(with-meta
(fn [& {:keys [cmd] :as params}]
(a/go
(case cmd
:pub (a/>! pub-ch params)
:sub (a/<! (subscribe cfg params))
:purge (a/<! (purge cfg params))
(l/error :hint "unexpeced error on msgbus command processing" :params params))))
cfg)))
(set-error-handler! state #(l/error :cause % :hint "unexpected error on agent" ::l/async false))
(set-error-mode! state :continue)
(start-io-loop! msgbus)
msgbus))
(defn sub!
[{:keys [::state ::wrk/executor] :as cfg} & {:keys [topic topics chan]}]
(let [done-ch (a/chan)
topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/debug :hint "subscribe" :topics topics)
(send-via executor state subscribe-to-topics cfg topics chan done-ch)
done-ch))
(defn pub!
[{::keys [pub-ch]} & {:as params}]
(a/go
(a/>! pub-ch params)))
(defn purge!
[{:keys [::state ::wrk/executor] :as msgbus} chans]
(l/trace :hint "purge" :chans (count chans))
(let [done-ch (a/chan)]
(send-via executor state unsubscribe-channels msgbus chans done-ch)
done-ch))
(defmethod ig/halt-key! ::msgbus
[_ f]
(let [mdata (meta f)]
(redis-disconnect mdata)
(a/close! (::cmd-ch mdata))
(a/close! (::rcv-ch mdata))))
[_ msgbus]
(redis-disconnect msgbus)
(a/close! (::cmd-ch msgbus))
(a/close! (::rcv-ch msgbus))
(a/close! (::pub-ch msgbus)))
;; --- IMPL
@ -91,9 +117,8 @@
[{:keys [timeout redis] :as cfg}]
(let [pconn (redis/connect redis :timeout timeout)
sconn (redis/connect redis :type :pubsub :timeout timeout)]
(-> cfg
(assoc ::pconn pconn)
(assoc ::sconn sconn))))
{::pconn pconn
::sconn sconn}))
(defn- redis-disconnect
[{:keys [::pconn ::sconn] :as cfg}]
@ -152,22 +177,6 @@
(aa/with-closing done-ch
(reduce #(unsubscribe-single-channel %1 cfg %2) state channels)))
(defn- subscribe
[{:keys [::state executor] :as cfg} {:keys [topic topics chan]}]
(let [done-ch (a/chan)
topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/debug :hint "subscribe" :topics topics)
(send-via executor state subscribe-to-topics cfg topics chan done-ch)
done-ch))
(defn- purge
[{:keys [::state executor] :as cfg} {:keys [chans]}]
(l/trace :hint "purge" :chans (count chans))
(let [done-ch (a/chan)]
(send-via executor state unsubscribe-channels cfg chans done-ch)
done-ch))
(defn- create-listener
[rcv-ch]
(redis/pubsub-listener
@ -179,8 +188,8 @@
(when-not (a/offer! rcv-ch val)
(l/warn :msg "dropping message on subscription loop"))))))
(defn start-io-loop
[{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}]
(defn start-io-loop!
[{:keys [::sconn ::rcv-ch ::pub-ch ::state ::wrk/executor] :as cfg}]
(redis/add-listener! sconn (create-listener rcv-ch))
(letfn [(send-to-topic [topic message]
(a/go-loop [chans (seq (get-in @state [:topics topic]))

View file

@ -13,6 +13,7 @@
[app.http :as-alias http]
[app.loggers.audit :as audit]
[app.metrics :as mtx]
[app.msgbus :as-alias mbus]
[app.rpc.retry :as retry]
[app.rpc.rlimit :as rlimit]
[app.rpc.semaphore :as rsem]
@ -248,7 +249,7 @@
(s/def ::executors map?)
(s/def ::http-client fn?)
(s/def ::ldap (s/nilable map?))
(s/def ::msgbus fn?)
(s/def ::msgbus ::mbus/msgbus)
(s/def ::public-uri ::us/not-empty-string)
(s/def ::session map?)
(s/def ::storage some?)

View file

@ -17,6 +17,7 @@
[app.db :as db]
[app.loggers.audit :as audit]
[app.metrics :as mtx]
[app.msgbus :as mbus]
[app.rpc.permissions :as perms]
[app.rpc.queries.files :as files]
[app.rpc.queries.projects :as proj]
@ -439,12 +440,12 @@
(defn- send-notifications
[{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}]
(let [lchanges (filter library-change? changes)
msgbus-fn (:msgbus cfg)]
(let [lchanges (filter library-change? changes)
msgbus (:msgbus cfg)]
;; Asynchronously publish message to the msgbus
(msgbus-fn :cmd :pub
(mbus/pub! msgbus
:topic (:id file)
:message {:type :file-change
:profile-id (:profile-id params)
@ -456,7 +457,7 @@
(when (and (:is-shared file) (seq lchanges))
(let [team-id (retrieve-team-id conn (:project-id file))]
;; Asynchronously publish message to the msgbus
(msgbus-fn :cmd :pub
(mbus/pub! msgbus
:topic team-id
:message {:type :library-change
:profile-id (:profile-id params)

View file

@ -6,12 +6,16 @@
(ns app.util.async
(:require
[app.common.exceptions :as ex]
[clojure.core.async :as a]
[clojure.core.async.impl.protocols :as ap]
[clojure.spec.alpha :as s])
(:import
java.util.concurrent.Executor))
java.util.concurrent.Executor
java.util.concurrent.RejectedExecutionException))
(s/def ::executor #(instance? Executor %))
(s/def ::channel #(satisfies? ap/Channel %))
(defonce processors
(delay (.availableProcessors (Runtime/getRuntime))))
@ -23,7 +27,7 @@
~@body
(catch Exception e# e#))))
(defmacro thread-try
(defmacro thread
[& body]
`(a/thread
(try
@ -47,19 +51,19 @@
(defn thread-call
[^Executor executor f]
(let [c (a/chan 1)]
(let [ch (a/chan 1)
f' (fn []
(try
(let [ret (ex/try* f identity)]
(when (some? ret) (a/>!! ch ret)))
(finally
(a/close! ch))))]
(try
(.execute executor
(fn []
(try
(let [ret (try (f) (catch Exception e e))]
(when (some? ret) (a/>!! c ret)))
(finally
(a/close! c)))))
c
(catch java.util.concurrent.RejectedExecutionException _e
(a/close! c)
c))))
(.execute executor f')
(catch RejectedExecutionException _cause
(a/close! ch)))
ch))
(defmacro with-thread
[executor & body]

View file

@ -217,6 +217,9 @@
(s/def ::coll-of-uuid (s/every ::uuid))
(s/def ::set-of-uuid (s/every ::uuid :kind set?))
#?(:clj
(s/def ::agent #(instance? clojure.lang.Agent %)))
(defn bytes?
"Test if a first parameter is a byte
array or not."