diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 024310bd2..7892c97d2 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -58,7 +58,9 @@ {} :app.msgbus/msgbus - {:uri (:redis-uri config)} + {:backend (:msgbus-backend config :redis) + :pool (ig/ref :app.db/pool) + :redis-uri (:redis-uri config)} :app.tokens/tokens {:sprops (ig/ref :app.setup/props)} diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 63ea00ae4..41d4b8fe4 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -13,6 +13,7 @@ [app.common.exceptions :as ex] [app.common.spec :as us] [app.config :as cfg] + [app.db :as db] [app.util.blob :as blob] [app.util.time :as dt] [clojure.core.async :as a] @@ -33,56 +34,36 @@ io.lettuce.core.pubsub.StatefulRedisPubSubConnection io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands)) -(declare impl-publish-loop) -(declare impl-redis-pub) -(declare impl-redis-sub) -(declare impl-redis-unsub) -(declare impl-subscribe-loop) - - -;; --- STATE INIT: Publisher - -(s/def ::uri ::us/string) +(s/def ::redis-uri ::us/string) (s/def ::buffer-size ::us/integer) +(defmulti init-backend :backend) +(defmulti stop-backend :backend) +(defmulti init-pub-loop :backend) +(defmulti init-sub-loop :backend) + (defmethod ig/pre-init-spec ::msgbus [_] - (s/keys :req-un [::uri] - :opt-un [::buffer-size])) + (s/keys :req-un [::db/pool] + :opt-un [::buffer-size ::redis-uri])) (defmethod ig/prep-key ::msgbus [_ cfg] (merge {:buffer-size 128} cfg)) (defmethod ig/init-key ::msgbus - [_ {:keys [uri buffer-size] :as cfg}] - (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) + [_ {:keys [backend buffer-size] :as cfg}] + (log/debugf "initializing msgbus (backend=%s)" (name backend)) - uri (RedisURI/create uri) - rclient (RedisClient/create ^RedisURI uri) - - snd-conn (.connect ^RedisClient rclient ^RedisCodec codec) - rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec) + (let [backend (init-backend cfg) ;; Channel used for receive publications from the application. - pub-chan (a/chan (a/dropping-buffer buffer-size)) - - ;; Channel used for receive data from redis - rcv-chan (a/chan (a/dropping-buffer buffer-size)) + pub-ch (a/chan (a/dropping-buffer buffer-size)) ;; Channel used for receive subscription requests. - sub-chan (a/chan) - cch (a/chan 1)] + sub-ch (a/chan)] - (.setTimeout ^StatefulRedisConnection snd-conn ^Duration (dt/duration {:seconds 10})) - (.setTimeout ^StatefulRedisPubSubConnection rcv-conn ^Duration (dt/duration {:seconds 10})) - - (log/debugf "initializing msgbus (uri: '%s')" (str uri)) - - ;; Start the sending (publishing) loop - (impl-publish-loop snd-conn pub-chan cch) - - ;; Start the receiving (subscribing) loop - (impl-subscribe-loop rcv-conn rcv-chan sub-chan cch) + (init-pub-loop (assoc backend :ch pub-ch)) + (init-sub-loop (assoc backend :ch sub-ch)) (with-meta (fn run @@ -90,159 +71,179 @@ ([command params] (a/go (case command - :pub (a/>! pub-chan params) - :sub (a/>! sub-chan params))))) + :pub (a/>! pub-ch params) + :sub (a/>! sub-ch params))))) - {::snd-conn snd-conn - ::rcv-conn rcv-conn - ::cch cch - ::pub-chan pub-chan - ::rcv-chan rcv-chan}))) + {::backend backend}))) (defmethod ig/halt-key! ::msgbus [_ f] (let [mdata (meta f)] - (.close ^StatefulRedisConnection (::snd-conn mdata)) - (.close ^StatefulRedisPubSubConnection (::rcv-conn mdata)) - (a/close! (::cch mdata)) - (a/close! (::pub-chan mdata)) - (a/close! (::rcv-chan mdata)))) + (stop-backend (::backend mdata)))) -(defn- impl-publish-loop - [conn pub-chan cch] - (let [rac (.async ^StatefulRedisConnection conn)] + +;; --- REDIS BACKEND IMPL + +(declare impl-redis-pub) +(declare impl-redis-sub) +(declare impl-redis-unsub) + +(defmethod init-backend :redis + [{:keys [redis-uri] :as cfg}] + (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) + + uri (RedisURI/create redis-uri) + rclient (RedisClient/create ^RedisURI uri) + + pub-conn (.connect ^RedisClient rclient ^RedisCodec codec) + sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] + + (.setTimeout ^StatefulRedisConnection pub-conn ^Duration (dt/duration {:seconds 10})) + (.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10})) + + (-> cfg + (assoc :pub-conn pub-conn) + (assoc :sub-conn sub-conn) + (assoc :close-ch (a/chan 1))))) + +(defmethod stop-backend :redis + [{:keys [pub-conn sub-conn close-ch] :as cfg}] + (.close ^StatefulRedisConnection pub-conn) + (.close ^StatefulRedisPubSubConnection sub-conn) + (a/close! close-ch)) + +(defmethod init-pub-loop :redis + [{:keys [pub-conn ch close-ch]}] + (let [rac (.async ^StatefulRedisConnection pub-conn)] (a/go-loop [] - (let [[val _] (a/alts! [cch pub-chan] :priority true)] + (let [[val _] (a/alts! [close-ch ch] :priority true)] (when (some? val) (let [result (a/> (vals state) - (mapcat identity) - (filter some?) - (run! a/close!)))) + ;; Asynchronous subscription loop; + (a/go-loop [] + (let [[val _] (a/alts! [close-ch ch])] + (when-let [{:keys [topics chan]} val] + (let [topics (into #{} (map #(str tprefix %)) topics)] + (send-off chans subscribe-to-topics topics chan) + (recur))))) - ;; This means we receive data from redis and we need to - ;; forward it to the underlying subscriptions. - (= port rcv-chan) - (let [topic (:topic val) ; topic is already string - pending (loop [chans (seq (get-in @chans [:topics topic])) - pending #{}] - (if-let [ch (first chans)] - (if (a/>! ch (:message val)) - (recur (rest chans) pending) - (recur (rest chans) (conj pending ch))) - pending))] - ;; (log/tracef "received message => pending: %s" (pr-str pending)) - (some->> (seq pending) - (send-off chans unsubscribe-channels)) + (a/go-loop [] + (let [[val port] (a/alts! [close-ch rcv-ch])] + (cond + ;; Stop condition; close all underlying subscriptions and + ;; exit. The close operation is performed asynchronously. + (= port close-ch) + (send-off chans (fn [state] + (log/tracef "close") + (->> (vals state) + (mapcat identity) + (filter some?) + (run! a/close!)))) - (recur))))))) + ;; This means we receive data from redis and we need to + ;; forward it to the underlying subscriptions. + (= port rcv-ch) + (let [topic (:topic val) ; topic is already string + pending (loop [chans (seq (get-in @chans [:topics topic])) + pending #{}] + (if-let [ch (first chans)] + (if (a/>! ch (:message val)) + (recur (rest chans) pending) + (recur (rest chans) (conj pending ch))) + pending))] + ;; (log/tracef "received message => pending: %s" (pr-str pending)) + (some->> (seq pending) + (send-off chans unsubscribe-channels)) + + (recur)))))))) (defn- impl-redis-pub - [rac {:keys [topic message]}] + [^RedisAsyncCommands rac {:keys [topic message]}] (let [topic (str (cfg/get :tenant) "." topic) message (blob/encode message) res (a/chan 1)] - (-> (.publish ^RedisAsyncCommands rac ^String topic ^bytes message) + (-> (.publish rac ^String topic ^bytes message) (p/finally (fn [_ e] (when e (a/>!! res e)) (a/close! res)))) res)) (defn impl-redis-sub - [conn topic] - (let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn) - res (a/chan 1)] - (-> (.subscribe cmd (into-array String [topic])) + [^RedisPubSubAsyncCommands rac topic] + (let [res (a/chan 1)] + (-> (.subscribe rac (into-array String [topic])) (p/finally (fn [_ e] (when e (a/>!! res e)) (a/close! res)))) res)) (defn impl-redis-unsub - [conn topic] - (let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn) - res (a/chan 1)] - (-> (.unsubscribe cmd (into-array String [topic])) + [rac topic] + (let [res (a/chan 1)] + (-> (.unsubscribe rac (into-array String [topic])) (p/finally (fn [_ e] (when e (a/>!! res e)) (a/close! res))))