From e7085571bfaecc8bcc2bde88b823cfae54717701 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 22 Mar 2021 12:13:11 +0100 Subject: [PATCH] :tada: Add :memory backend to the msgbus module. --- backend/src/app/db/sql.clj | 1 - backend/src/app/main.clj | 1 - backend/src/app/msgbus.clj | 160 +++++++++++++++++++----------- backend/src/app/notifications.clj | 88 +++++++--------- 4 files changed, 137 insertions(+), 113 deletions(-) diff --git a/backend/src/app/db/sql.clj b/backend/src/app/db/sql.clj index a2abf9883..e82d8e077 100644 --- a/backend/src/app/db/sql.clj +++ b/backend/src/app/db/sql.clj @@ -58,4 +58,3 @@ ([table where-params opts] (let [opts (merge default-opts opts)] (sql/for-delete table where-params opts)))) - diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 7892c97d2..ead323cd2 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -59,7 +59,6 @@ :app.msgbus/msgbus {:backend (:msgbus-backend config :redis) - :pool (ig/ref :app.db/pool) :redis-uri (:redis-uri config)} :app.tokens/tokens diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 41d4b8fe4..c6d7c217b 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -13,7 +13,6 @@ [app.common.exceptions :as ex] [app.common.spec :as us] [app.config :as cfg] - [app.db :as db] [app.util.blob :as blob] [app.util.time :as dt] [clojure.core.async :as a] @@ -34,6 +33,16 @@ io.lettuce.core.pubsub.StatefulRedisPubSubConnection io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands)) +(def ^:private prefix (cfg/get :tenant)) + +(defn- prefix-topic + [topic] + (str prefix "." topic)) + +(def xform-prefix (map prefix-topic)) +(def xform-topics (map (fn [m] (update m :topics #(into #{} xform-prefix %))))) +(def xform-topic (map (fn [m] (update m :topic prefix-topic)))) + (s/def ::redis-uri ::us/string) (s/def ::buffer-size ::us/integer) @@ -43,8 +52,7 @@ (defmulti init-sub-loop :backend) (defmethod ig/pre-init-spec ::msgbus [_] - (s/keys :req-un [::db/pool] - :opt-un [::buffer-size ::redis-uri])) + (s/keys :opt-un [::buffer-size ::redis-uri])) (defmethod ig/prep-key ::msgbus [_ cfg] @@ -53,17 +61,21 @@ (defmethod ig/init-key ::msgbus [_ {:keys [backend buffer-size] :as cfg}] (log/debugf "initializing msgbus (backend=%s)" (name backend)) - - (let [backend (init-backend cfg) + (let [cfg (init-backend cfg) ;; Channel used for receive publications from the application. - pub-ch (a/chan (a/dropping-buffer buffer-size)) + pub-ch (-> (a/dropping-buffer buffer-size) + (a/chan xform-topic)) ;; Channel used for receive subscription requests. - sub-ch (a/chan)] + sub-ch (a/chan 1 xform-topics) - (init-pub-loop (assoc backend :ch pub-ch)) - (init-sub-loop (assoc backend :ch sub-ch)) + cfg (-> cfg + (assoc ::pub-ch pub-ch) + (assoc ::sub-ch sub-ch))] + + (init-pub-loop cfg) + (init-sub-loop cfg) (with-meta (fn run @@ -73,14 +85,50 @@ (case command :pub (a/>! pub-ch params) :sub (a/>! sub-ch params))))) - - {::backend backend}))) + cfg))) (defmethod ig/halt-key! ::msgbus [_ f] (let [mdata (meta f)] - (stop-backend (::backend mdata)))) + (stop-backend mdata) + (a/close! (::pub-ch mdata)) + (a/close! (::sub-ch mdata)))) +;; --- IN-MEMORY BACKEND IMPL + +(defmethod init-backend :memory [cfg] cfg) +(defmethod stop-backend :memory [_]) +(defmethod init-pub-loop :memory [_]) + +(defmethod init-sub-loop :memory + [{:keys [::sub-ch ::pub-ch]}] + (a/go-loop [state {}] + (let [[val port] (a/alts! [pub-ch sub-ch])] + (cond + (and (= port sub-ch) (some? val)) + (let [{:keys [topics chan]} val] + (recur (reduce #(update %1 %2 (fnil conj #{}) chan) state topics))) + + (and (= port pub-ch) (some? val)) + (let [topic (:topic val) + message (:message val) + state (loop [state state + chans (get state topic)] + (if-let [c (first chans)] + (if (a/>! c message) + (recur state (rest chans)) + (recur (update state topic disj c) + (rest chans))) + state))] + (recur state)) + + :else + (->> (vals state) + (mapcat identity) + (run! a/close!)))))) + + +;; Add a unique listener to connection ;; --- REDIS BACKEND IMPL @@ -102,32 +150,28 @@ (.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10})) (-> cfg - (assoc :pub-conn pub-conn) - (assoc :sub-conn sub-conn) - (assoc :close-ch (a/chan 1))))) + (assoc ::pub-conn pub-conn) + (assoc ::sub-conn sub-conn)))) (defmethod stop-backend :redis - [{:keys [pub-conn sub-conn close-ch] :as cfg}] + [{:keys [::pub-conn ::sub-conn] :as cfg}] (.close ^StatefulRedisConnection pub-conn) - (.close ^StatefulRedisPubSubConnection sub-conn) - (a/close! close-ch)) + (.close ^StatefulRedisPubSubConnection sub-conn)) (defmethod init-pub-loop :redis - [{:keys [pub-conn ch close-ch]}] + [{:keys [::pub-conn ::pub-ch]}] (let [rac (.async ^StatefulRedisConnection pub-conn)] (a/go-loop [] - (let [[val _] (a/alts! [close-ch ch] :priority true)] - (when (some? val) - (let [result (a/> (vals state) - (mapcat identity) - (filter some?) - (run! a/close!)))) + (if-let [{:keys [topic message]} (a/! ch message) + (recur (rest chans) pending) + (recur (rest chans) (conj pending ch))) + pending))] + ;; (log/tracef "received message => pending: %s" (pr-str pending)) + (some->> (seq pending) + (send-off chans unsubscribe-channels)) - ;; This means we receive data from redis and we need to - ;; forward it to the underlying subscriptions. - (= port rcv-ch) - (let [topic (:topic val) ; topic is already string - pending (loop [chans (seq (get-in @chans [:topics topic])) - pending #{}] - (if-let [ch (first chans)] - (if (a/>! ch (:message val)) - (recur (rest chans) pending) - (recur (rest chans) (conj pending ch))) - pending))] - ;; (log/tracef "received message => pending: %s" (pr-str pending)) - (some->> (seq pending) - (send-off chans unsubscribe-channels)) + (recur)) + + ;; Stop condition; close all underlying subscriptions and + ;; exit. The close operation is performed asynchronously. + (send-off chans (fn [state] + (->> (vals state) + (mapcat identity) + (filter some?) + (run! a/close!))))))))) - (recur)))))))) (defn- impl-redis-pub [^RedisAsyncCommands rac {:keys [topic message]}] - (let [topic (str (cfg/get :tenant) "." topic) - message (blob/encode message) + (let [message (blob/encode message) res (a/chan 1)] (-> (.publish rac ^String topic ^bytes message) (p/finally (fn [_ e] diff --git a/backend/src/app/notifications.clj b/backend/src/app/notifications.clj index cd145e955..754502b7c 100644 --- a/backend/src/app/notifications.clj +++ b/backend/src/app/notifications.clj @@ -24,9 +24,7 @@ [ring.adapter.jetty9 :as jetty] [ring.middleware.cookies :refer [wrap-cookies]] [ring.middleware.keyword-params :refer [wrap-keyword-params]] - [ring.middleware.params :refer [wrap-params]]) - (:import - org.eclipse.jetty.websocket.api.WebSocketAdapter)) + [ring.middleware.params :refer [wrap-params]])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Http Handler @@ -199,56 +197,44 @@ (declare start-loop!) (defn- handle-connect - [{:keys [conn] :as cfg}] + [cfg] (a/go - (try - (aa/! out-ch val)) + (recur)) - ;; Process message coming from pubsub. - (and (= port sub-ch) (some? val)) - (do - (when-not (= (:session-id val) session-id) - ;; If we receive a connect message of other user, we need - ;; to send an update presence to all participants. - (when (= :connect (:type val)) - (a/! out-ch val)) - (recur)) - - ;; When timeout channel is signaled, we need to send a ping - ;; message to the output channel. TODO: we need to make this - ;; more smart. - (= port timeout) - (do - (a/>! out-ch {:type :ping}) - (recur)) - - :else - nil))))) + ;; When timeout channel is signaled, we need to send a ping + ;; message to the output channel. TODO: we need to make this + ;; more smart. + (= port timeout) + (do + (a/>! out-ch {:type :ping}) + (recur)))))) (defn send-presence ([cfg] (send-presence cfg :presence)) @@ -265,18 +251,18 @@ (fn [_ message] (:type message))) (defmethod handle-message :connect - [{:keys [file-id msgbus] :as cfg} _message] + [cfg _] ;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id) (send-presence cfg :connect)) (defmethod handle-message :disconnect - [{:keys [file-id msgbus] :as cfg} _message] + [cfg _] ;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id) (send-presence cfg :disconnect)) (defmethod handle-message :keepalive - [cfg _message] - (a/go (do :nothing))) + [_ _] + (a/go :nothing)) (defmethod handle-message :pointer-update [{:keys [profile-id file-id session-id msgbus] :as cfg} message]