diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index 39f961afb..2a142bc35 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -13,6 +13,7 @@ [app.common.spec :as us] [app.db :as db] [app.metrics :as mtx] + [app.msgbus :as mbus] [app.util.time :as dt] [app.util.websocket :as ws] [clojure.core.async :as a] @@ -20,6 +21,12 @@ [integrant.core :as ig] [yetti.websocket :as yws])) +(def recv-labels + (into-array String ["recv"])) + +(def send-labels + (into-array String ["send"])) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; WEBSOCKET HOOKS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -30,21 +37,30 @@ [{:keys [metrics]} wsp] (let [created-at (dt/now)] (swap! state assoc (::ws/id @wsp) wsp) - (mtx/run! metrics {:id :websocket-active-connections :inc 1}) + (mtx/run! metrics + :id :websocket-active-connections + :inc 1) (fn [] (swap! state dissoc (::ws/id @wsp)) - (mtx/run! metrics {:id :websocket-active-connections :dec 1}) - (mtx/run! metrics {:id :websocket-session-timing - :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)})))) + (mtx/run! metrics :id :websocket-active-connections :dec 1) + (mtx/run! metrics + :id :websocket-session-timing + :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0))))) (defn- on-rcv-message [{:keys [metrics]} _ message] - (mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1}) + (mtx/run! metrics + :id :websocket-messages-total + :labels recv-labels + :inc 1) message) (defn- on-snd-message [{:keys [metrics]} _ message] - (mtx/run! metrics {:id :websocket-messages-total :labels ["send"] :inc 1}) + (mtx/run! metrics + :id :websocket-messages-total + :labels send-labels + :inc 1) message) ;; REPL HELPERS @@ -72,12 +88,12 @@ (defn repl-get-connection-info [id] (when-let [wsp (get @state id)] - {:id id - :created-at (dt/instant id) - :profile-id (::profile-id @wsp) - :session-id (::session-id @wsp) - :user-agent (::ws/user-agent @wsp) - :ip-addr (::ws/remote-addr @wsp) + {:id id + :created-at (dt/instant id) + :profile-id (::profile-id @wsp) + :session-id (::session-id @wsp) + :user-agent (::ws/user-agent @wsp) + :ip-addr (::ws/remote-addr @wsp) :last-activity-at (::ws/last-activity-at @wsp) :http-session-id (::ws/http-session-id @wsp) :subscribed-file (-> wsp deref ::file-subscription :file-id) @@ -104,7 +120,7 @@ (defmethod handle-message :connect [cfg wsp _] - (let [msgbus-fn (:msgbus cfg) + (let [msgbus (:msgbus cfg) conn-id (::ws/id @wsp) profile-id (::profile-id @wsp) session-id (::session-id @wsp) @@ -113,17 +129,17 @@ xform (remove #(= (:session-id %) session-id)) channel (a/chan (a/dropping-buffer 16) xform)] - (l/trace :fn "handle-message" :event :connect :conn-id conn-id) + (l/trace :fn "handle-message" :event "connect" :conn-id conn-id) ;; Subscribe to the profile channel and forward all messages to ;; websocket output channel (send them to the client). (swap! wsp assoc ::profile-subscription channel) (a/pipe channel output-ch false) - (msgbus-fn :cmd :sub :topic profile-id :chan channel))) + (mbus/sub! msgbus :topic profile-id :chan channel))) (defmethod handle-message :disconnect [cfg wsp _] - (let [msgbus-fn (:msgbus cfg) + (let [msgbus (:msgbus cfg) conn-id (::ws/id @wsp) profile-id (::profile-id @wsp) session-id (::session-id @wsp) @@ -143,21 +159,21 @@ (a/go ;; Close the main profile subscription (a/close! profile-ch) - (a/! output-ch message) (recur)))) (a/go ;; Subscribe to file topic - (a/ (redis-connect cfg) + state (agent {}) + msgbus (-> (redis-connect cfg) (assoc ::cmd-ch cmd-ch) (assoc ::rcv-ch rcv-ch) (assoc ::pub-ch pub-ch) - (assoc ::state state))] + (assoc ::state state) + (assoc ::wrk/executor executor))] - (start-io-loop cfg) + (us/verify! ::msgbus msgbus) - (with-meta - (fn [& {:keys [cmd] :as params}] - (a/go - (case cmd - :pub (a/>! pub-ch params) - :sub (a/! pub-ch params))) + +(defn purge! + [{:keys [::state ::wrk/executor] :as msgbus} chans] + (l/trace :hint "purge" :chans (count chans)) + (let [done-ch (a/chan)] + (send-via executor state unsubscribe-channels msgbus chans done-ch) + done-ch)) (defmethod ig/halt-key! ::msgbus - [_ f] - (let [mdata (meta f)] - (redis-disconnect mdata) - (a/close! (::cmd-ch mdata)) - (a/close! (::rcv-ch mdata)))) + [_ msgbus] + (redis-disconnect msgbus) + (a/close! (::cmd-ch msgbus)) + (a/close! (::rcv-ch msgbus)) + (a/close! (::pub-ch msgbus))) ;; --- IMPL @@ -91,9 +117,8 @@ [{:keys [timeout redis] :as cfg}] (let [pconn (redis/connect redis :timeout timeout) sconn (redis/connect redis :type :pubsub :timeout timeout)] - (-> cfg - (assoc ::pconn pconn) - (assoc ::sconn sconn)))) + {::pconn pconn + ::sconn sconn})) (defn- redis-disconnect [{:keys [::pconn ::sconn] :as cfg}] @@ -152,22 +177,6 @@ (aa/with-closing done-ch (reduce #(unsubscribe-single-channel %1 cfg %2) state channels))) - -(defn- subscribe - [{:keys [::state executor] :as cfg} {:keys [topic topics chan]}] - (let [done-ch (a/chan) - topics (into [] (map prefix-topic) (if topic [topic] topics))] - (l/debug :hint "subscribe" :topics topics) - (send-via executor state subscribe-to-topics cfg topics chan done-ch) - done-ch)) - -(defn- purge - [{:keys [::state executor] :as cfg} {:keys [chans]}] - (l/trace :hint "purge" :chans (count chans)) - (let [done-ch (a/chan)] - (send-via executor state unsubscribe-channels cfg chans done-ch) - done-ch)) - (defn- create-listener [rcv-ch] (redis/pubsub-listener @@ -179,8 +188,8 @@ (when-not (a/offer! rcv-ch val) (l/warn :msg "dropping message on subscription loop")))))) -(defn start-io-loop - [{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}] +(defn start-io-loop! + [{:keys [::sconn ::rcv-ch ::pub-ch ::state ::wrk/executor] :as cfg}] (redis/add-listener! sconn (create-listener rcv-ch)) (letfn [(send-to-topic [topic message] (a/go-loop [chans (seq (get-in @state [:topics topic])) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 162d48444..ff0739949 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -13,6 +13,7 @@ [app.http :as-alias http] [app.loggers.audit :as audit] [app.metrics :as mtx] + [app.msgbus :as-alias mbus] [app.rpc.retry :as retry] [app.rpc.rlimit :as rlimit] [app.rpc.semaphore :as rsem] @@ -248,7 +249,7 @@ (s/def ::executors map?) (s/def ::http-client fn?) (s/def ::ldap (s/nilable map?)) -(s/def ::msgbus fn?) +(s/def ::msgbus ::mbus/msgbus) (s/def ::public-uri ::us/not-empty-string) (s/def ::session map?) (s/def ::storage some?) diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index 4cd4f1ab4..6c6cc1b04 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -17,6 +17,7 @@ [app.db :as db] [app.loggers.audit :as audit] [app.metrics :as mtx] + [app.msgbus :as mbus] [app.rpc.permissions :as perms] [app.rpc.queries.files :as files] [app.rpc.queries.projects :as proj] @@ -439,12 +440,12 @@ (defn- send-notifications [{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}] - (let [lchanges (filter library-change? changes) - msgbus-fn (:msgbus cfg)] + (let [lchanges (filter library-change? changes) + msgbus (:msgbus cfg)] ;; Asynchronously publish message to the msgbus - (msgbus-fn :cmd :pub + (mbus/pub! msgbus :topic (:id file) :message {:type :file-change :profile-id (:profile-id params) @@ -456,7 +457,7 @@ (when (and (:is-shared file) (seq lchanges)) (let [team-id (retrieve-team-id conn (:project-id file))] ;; Asynchronously publish message to the msgbus - (msgbus-fn :cmd :pub + (mbus/pub! msgbus :topic team-id :message {:type :library-change :profile-id (:profile-id params) diff --git a/backend/src/app/util/async.clj b/backend/src/app/util/async.clj index 96ee32af1..8be9ac3e3 100644 --- a/backend/src/app/util/async.clj +++ b/backend/src/app/util/async.clj @@ -6,12 +6,16 @@ (ns app.util.async (:require + [app.common.exceptions :as ex] [clojure.core.async :as a] + [clojure.core.async.impl.protocols :as ap] [clojure.spec.alpha :as s]) (:import - java.util.concurrent.Executor)) + java.util.concurrent.Executor + java.util.concurrent.RejectedExecutionException)) (s/def ::executor #(instance? Executor %)) +(s/def ::channel #(satisfies? ap/Channel %)) (defonce processors (delay (.availableProcessors (Runtime/getRuntime)))) @@ -23,7 +27,7 @@ ~@body (catch Exception e# e#)))) -(defmacro thread-try +(defmacro thread [& body] `(a/thread (try @@ -47,19 +51,19 @@ (defn thread-call [^Executor executor f] - (let [c (a/chan 1)] + (let [ch (a/chan 1) + f' (fn [] + (try + (let [ret (ex/try* f identity)] + (when (some? ret) (a/>!! ch ret))) + (finally + (a/close! ch))))] (try - (.execute executor - (fn [] - (try - (let [ret (try (f) (catch Exception e e))] - (when (some? ret) (a/>!! c ret))) - (finally - (a/close! c))))) - c - (catch java.util.concurrent.RejectedExecutionException _e - (a/close! c) - c)))) + (.execute executor f') + (catch RejectedExecutionException _cause + (a/close! ch))) + + ch)) (defmacro with-thread [executor & body] diff --git a/common/src/app/common/spec.cljc b/common/src/app/common/spec.cljc index 29d747aa0..8ac2307e0 100644 --- a/common/src/app/common/spec.cljc +++ b/common/src/app/common/spec.cljc @@ -217,6 +217,9 @@ (s/def ::coll-of-uuid (s/every ::uuid)) (s/def ::set-of-uuid (s/every ::uuid :kind set?)) +#?(:clj + (s/def ::agent #(instance? clojure.lang.Agent %))) + (defn bytes? "Test if a first parameter is a byte array or not."