2021-02-12 16:01:59 +01:00
|
|
|
;; This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
;;
|
2021-04-10 09:43:04 +02:00
|
|
|
;; Copyright (c) UXBOX Labs SL
|
2021-02-12 16:01:59 +01:00
|
|
|
|
|
|
|
(ns app.msgbus
|
|
|
|
"The msgbus abstraction implemented using redis as underlying backend."
|
|
|
|
(:require
|
2021-02-22 23:40:42 +01:00
|
|
|
[app.common.exceptions :as ex]
|
2021-09-29 16:39:25 +02:00
|
|
|
[app.common.logging :as l]
|
2021-02-12 16:01:59 +01:00
|
|
|
[app.common.spec :as us]
|
2021-02-24 13:08:44 +01:00
|
|
|
[app.config :as cfg]
|
2021-02-12 16:01:59 +01:00
|
|
|
[app.util.blob :as blob]
|
2021-02-22 23:14:53 +01:00
|
|
|
[app.util.time :as dt]
|
2021-02-12 16:01:59 +01:00
|
|
|
[clojure.core.async :as a]
|
|
|
|
[clojure.spec.alpha :as s]
|
|
|
|
[integrant.core :as ig]
|
|
|
|
[promesa.core :as p])
|
|
|
|
(:import
|
2021-02-22 23:14:53 +01:00
|
|
|
java.time.Duration
|
2021-02-12 16:01:59 +01:00
|
|
|
io.lettuce.core.RedisClient
|
|
|
|
io.lettuce.core.RedisURI
|
2021-05-14 12:35:22 +02:00
|
|
|
io.lettuce.core.api.StatefulConnection
|
2021-02-12 16:01:59 +01:00
|
|
|
io.lettuce.core.api.StatefulRedisConnection
|
|
|
|
io.lettuce.core.api.async.RedisAsyncCommands
|
|
|
|
io.lettuce.core.codec.ByteArrayCodec
|
|
|
|
io.lettuce.core.codec.RedisCodec
|
|
|
|
io.lettuce.core.codec.StringCodec
|
|
|
|
io.lettuce.core.pubsub.RedisPubSubListener
|
|
|
|
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
|
|
|
|
io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands))
|
|
|
|
|
2021-03-22 12:13:11 +01:00
|
|
|
(def ^:private prefix (cfg/get :tenant))
|
|
|
|
|
|
|
|
(defn- prefix-topic
|
|
|
|
[topic]
|
|
|
|
(str prefix "." topic))
|
|
|
|
|
|
|
|
(def xform-prefix (map prefix-topic))
|
|
|
|
(def xform-topics (map (fn [m] (update m :topics #(into #{} xform-prefix %)))))
|
|
|
|
(def xform-topic (map (fn [m] (update m :topic prefix-topic))))
|
|
|
|
|
2021-03-07 20:30:41 +01:00
|
|
|
(s/def ::redis-uri ::us/string)
|
2021-02-12 16:01:59 +01:00
|
|
|
(s/def ::buffer-size ::us/integer)
|
|
|
|
|
2021-03-07 20:30:41 +01:00
|
|
|
(defmulti init-backend :backend)
|
|
|
|
(defmulti stop-backend :backend)
|
|
|
|
(defmulti init-pub-loop :backend)
|
|
|
|
(defmulti init-sub-loop :backend)
|
|
|
|
|
2021-02-12 16:01:59 +01:00
|
|
|
(defmethod ig/pre-init-spec ::msgbus [_]
|
2021-03-22 12:13:11 +01:00
|
|
|
(s/keys :opt-un [::buffer-size ::redis-uri]))
|
2021-02-12 16:01:59 +01:00
|
|
|
|
|
|
|
(defmethod ig/prep-key ::msgbus
|
|
|
|
[_ cfg]
|
|
|
|
(merge {:buffer-size 128} cfg))
|
|
|
|
|
|
|
|
(defmethod ig/init-key ::msgbus
|
2021-03-07 20:30:41 +01:00
|
|
|
[_ {:keys [backend buffer-size] :as cfg}]
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/debug :action "initialize msgbus"
|
|
|
|
:backend (name backend))
|
2021-03-22 12:13:11 +01:00
|
|
|
(let [cfg (init-backend cfg)
|
2021-02-12 16:01:59 +01:00
|
|
|
|
2021-02-23 13:17:01 +01:00
|
|
|
;; Channel used for receive publications from the application.
|
2021-03-22 12:13:11 +01:00
|
|
|
pub-ch (-> (a/dropping-buffer buffer-size)
|
|
|
|
(a/chan xform-topic))
|
2021-02-23 13:17:01 +01:00
|
|
|
|
|
|
|
;; Channel used for receive subscription requests.
|
2021-03-22 12:13:11 +01:00
|
|
|
sub-ch (a/chan 1 xform-topics)
|
2021-02-22 19:10:05 +01:00
|
|
|
|
2021-03-22 12:13:11 +01:00
|
|
|
cfg (-> cfg
|
|
|
|
(assoc ::pub-ch pub-ch)
|
|
|
|
(assoc ::sub-ch sub-ch))]
|
|
|
|
|
|
|
|
(init-pub-loop cfg)
|
|
|
|
(init-sub-loop cfg)
|
2021-02-12 16:01:59 +01:00
|
|
|
|
|
|
|
(with-meta
|
|
|
|
(fn run
|
|
|
|
([command] (run command nil))
|
|
|
|
([command params]
|
|
|
|
(a/go
|
|
|
|
(case command
|
2021-03-07 20:30:41 +01:00
|
|
|
:pub (a/>! pub-ch params)
|
|
|
|
:sub (a/>! sub-ch params)))))
|
2021-03-22 12:13:11 +01:00
|
|
|
cfg)))
|
2021-02-12 16:01:59 +01:00
|
|
|
|
|
|
|
(defmethod ig/halt-key! ::msgbus
|
|
|
|
[_ f]
|
|
|
|
(let [mdata (meta f)]
|
2021-03-22 12:13:11 +01:00
|
|
|
(stop-backend mdata)
|
|
|
|
(a/close! (::pub-ch mdata))
|
|
|
|
(a/close! (::sub-ch mdata))))
|
|
|
|
|
|
|
|
;; --- IN-MEMORY BACKEND IMPL
|
|
|
|
|
|
|
|
(defmethod init-backend :memory [cfg] cfg)
|
|
|
|
(defmethod stop-backend :memory [_])
|
|
|
|
(defmethod init-pub-loop :memory [_])
|
|
|
|
|
|
|
|
(defmethod init-sub-loop :memory
|
|
|
|
[{:keys [::sub-ch ::pub-ch]}]
|
|
|
|
(a/go-loop [state {}]
|
|
|
|
(let [[val port] (a/alts! [pub-ch sub-ch])]
|
|
|
|
(cond
|
|
|
|
(and (= port sub-ch) (some? val))
|
|
|
|
(let [{:keys [topics chan]} val]
|
|
|
|
(recur (reduce #(update %1 %2 (fnil conj #{}) chan) state topics)))
|
|
|
|
|
|
|
|
(and (= port pub-ch) (some? val))
|
|
|
|
(let [topic (:topic val)
|
|
|
|
message (:message val)
|
|
|
|
state (loop [state state
|
|
|
|
chans (get state topic)]
|
|
|
|
(if-let [c (first chans)]
|
|
|
|
(if (a/>! c message)
|
|
|
|
(recur state (rest chans))
|
|
|
|
(recur (update state topic disj c)
|
|
|
|
(rest chans)))
|
|
|
|
state))]
|
|
|
|
(recur state))
|
|
|
|
|
|
|
|
:else
|
|
|
|
(->> (vals state)
|
|
|
|
(mapcat identity)
|
|
|
|
(run! a/close!))))))
|
|
|
|
|
|
|
|
|
|
|
|
;; Add a unique listener to connection
|
2021-03-07 20:30:41 +01:00
|
|
|
|
|
|
|
;; --- REDIS BACKEND IMPL
|
|
|
|
|
2021-05-14 12:35:22 +02:00
|
|
|
(declare impl-redis-open?)
|
2021-03-07 20:30:41 +01:00
|
|
|
(declare impl-redis-pub)
|
|
|
|
(declare impl-redis-sub)
|
|
|
|
(declare impl-redis-unsub)
|
|
|
|
|
|
|
|
(defmethod init-backend :redis
|
|
|
|
[{:keys [redis-uri] :as cfg}]
|
|
|
|
(let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)
|
|
|
|
|
|
|
|
uri (RedisURI/create redis-uri)
|
|
|
|
rclient (RedisClient/create ^RedisURI uri)
|
|
|
|
|
|
|
|
pub-conn (.connect ^RedisClient rclient ^RedisCodec codec)
|
|
|
|
sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)]
|
|
|
|
|
|
|
|
(.setTimeout ^StatefulRedisConnection pub-conn ^Duration (dt/duration {:seconds 10}))
|
|
|
|
(.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10}))
|
|
|
|
|
|
|
|
(-> cfg
|
2021-03-22 12:13:11 +01:00
|
|
|
(assoc ::pub-conn pub-conn)
|
|
|
|
(assoc ::sub-conn sub-conn))))
|
2021-03-07 20:30:41 +01:00
|
|
|
|
|
|
|
(defmethod stop-backend :redis
|
2021-03-22 12:13:11 +01:00
|
|
|
[{:keys [::pub-conn ::sub-conn] :as cfg}]
|
2021-03-07 20:30:41 +01:00
|
|
|
(.close ^StatefulRedisConnection pub-conn)
|
2021-03-22 12:13:11 +01:00
|
|
|
(.close ^StatefulRedisPubSubConnection sub-conn))
|
2021-03-07 20:30:41 +01:00
|
|
|
|
|
|
|
(defmethod init-pub-loop :redis
|
2021-03-22 12:13:11 +01:00
|
|
|
[{:keys [::pub-conn ::pub-ch]}]
|
2021-03-07 20:30:41 +01:00
|
|
|
(let [rac (.async ^StatefulRedisConnection pub-conn)]
|
2021-02-12 16:01:59 +01:00
|
|
|
(a/go-loop []
|
2021-03-22 12:13:11 +01:00
|
|
|
(when-let [val (a/<! pub-ch)]
|
|
|
|
(let [result (a/<! (impl-redis-pub rac val))]
|
2021-05-14 12:35:22 +02:00
|
|
|
(when (and (impl-redis-open? pub-conn)
|
|
|
|
(ex/exception? result))
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/error :cause result
|
|
|
|
:hint "unexpected error on publish message to redis")))
|
2021-03-22 12:13:11 +01:00
|
|
|
(recur)))))
|
2021-02-12 16:01:59 +01:00
|
|
|
|
2021-03-07 20:30:41 +01:00
|
|
|
(defmethod init-sub-loop :redis
|
2021-03-22 12:13:11 +01:00
|
|
|
[{:keys [::sub-conn ::sub-ch buffer-size]}]
|
2021-03-07 20:30:41 +01:00
|
|
|
(let [rcv-ch (a/chan (a/dropping-buffer buffer-size))
|
2021-04-06 23:25:34 +02:00
|
|
|
chans (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent"))
|
2021-03-07 20:30:41 +01:00
|
|
|
rac (.async ^StatefulRedisPubSubConnection sub-conn)]
|
|
|
|
|
|
|
|
;; Add a unique listener to connection
|
|
|
|
(.addListener sub-conn
|
|
|
|
(reify RedisPubSubListener
|
2021-12-23 00:04:15 +01:00
|
|
|
(message [_ _pattern _topic _message])
|
|
|
|
(message [_ topic message]
|
|
|
|
;; There are no back pressure, so we use a slidding
|
2021-03-07 20:30:41 +01:00
|
|
|
;; buffer for cases when the pubsub broker sends
|
|
|
|
;; more messages that we can process.
|
|
|
|
(let [val {:topic topic :message (blob/decode message)}]
|
|
|
|
(when-not (a/offer! rcv-ch val)
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/warn :msg "dropping message on subscription loop"))))
|
2021-12-23 00:04:15 +01:00
|
|
|
(psubscribed [_ _pattern _count])
|
|
|
|
(punsubscribed [_ _pattern _count])
|
|
|
|
(subscribed [_ _topic _count])
|
|
|
|
(unsubscribed [_ _topic _count])))
|
2021-03-07 20:30:41 +01:00
|
|
|
|
|
|
|
(letfn [(subscribe-to-single-topic [nsubs topic chan]
|
|
|
|
(let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))]
|
|
|
|
(when (= 1 (count nsubs))
|
|
|
|
(let [result (a/<!! (impl-redis-sub rac topic))]
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/trace :action "open subscription"
|
|
|
|
:topic topic)
|
2021-03-07 20:30:41 +01:00
|
|
|
(when (ex/exception? result)
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/error :cause result
|
|
|
|
:hint "unexpected exception on subscribing"
|
|
|
|
:topic topic))))
|
2021-03-07 20:30:41 +01:00
|
|
|
nsubs))
|
|
|
|
|
|
|
|
(subscribe-to-topics [state topics chan]
|
|
|
|
(let [state (update state :chans assoc chan topics)]
|
|
|
|
(reduce (fn [state topic]
|
|
|
|
(update-in state [:topics topic] subscribe-to-single-topic topic chan))
|
|
|
|
state
|
|
|
|
topics)))
|
|
|
|
|
|
|
|
(unsubscribe-from-single-topic [nsubs topic chan]
|
|
|
|
(let [nsubs (disj nsubs chan)]
|
|
|
|
(when (empty? nsubs)
|
|
|
|
(let [result (a/<!! (impl-redis-unsub rac topic))]
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/trace :action "close subscription"
|
|
|
|
:topic topic)
|
2021-05-14 12:35:22 +02:00
|
|
|
(when (and (impl-redis-open? sub-conn)
|
|
|
|
(ex/exception? result))
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/error :cause result
|
|
|
|
:hint "unexpected exception on unsubscribing"
|
|
|
|
:topic topic))))
|
2021-03-07 20:30:41 +01:00
|
|
|
nsubs))
|
|
|
|
|
|
|
|
(unsubscribe-channels [state pending]
|
|
|
|
(reduce (fn [state ch]
|
|
|
|
(let [topics (get-in state [:chans ch])
|
|
|
|
state (update state :chans dissoc ch)]
|
|
|
|
(reduce (fn [state topic]
|
|
|
|
(update-in state [:topics topic] unsubscribe-from-single-topic topic ch))
|
|
|
|
state
|
|
|
|
topics)))
|
|
|
|
state
|
|
|
|
pending))]
|
|
|
|
|
|
|
|
;; Asynchronous subscription loop;
|
|
|
|
(a/go-loop []
|
2021-03-22 12:13:11 +01:00
|
|
|
(if-let [{:keys [topics chan]} (a/<! sub-ch)]
|
|
|
|
(do
|
|
|
|
(send-off chans subscribe-to-topics topics chan)
|
|
|
|
(recur))
|
|
|
|
(a/close! rcv-ch)))
|
2021-03-07 20:30:41 +01:00
|
|
|
|
2021-11-15 09:53:10 -05:00
|
|
|
;; Asynchronous message processing loop;x
|
2021-03-07 20:30:41 +01:00
|
|
|
(a/go-loop []
|
2021-03-22 12:13:11 +01:00
|
|
|
(if-let [{:keys [topic message]} (a/<! rcv-ch)]
|
|
|
|
;; This means we receive data from redis and we need to
|
|
|
|
;; forward it to the underlying subscriptions.
|
|
|
|
(let [pending (loop [chans (seq (get-in @chans [:topics topic]))
|
|
|
|
pending #{}]
|
|
|
|
(if-let [ch (first chans)]
|
|
|
|
(if (a/>! ch message)
|
|
|
|
(recur (rest chans) pending)
|
|
|
|
(recur (rest chans) (conj pending ch)))
|
|
|
|
pending))]
|
|
|
|
(some->> (seq pending)
|
|
|
|
(send-off chans unsubscribe-channels))
|
|
|
|
|
|
|
|
(recur))
|
|
|
|
|
|
|
|
;; Stop condition; close all underlying subscriptions and
|
|
|
|
;; exit. The close operation is performed asynchronously.
|
|
|
|
(send-off chans (fn [state]
|
|
|
|
(->> (vals state)
|
|
|
|
(mapcat identity)
|
|
|
|
(filter some?)
|
|
|
|
(run! a/close!)))))))))
|
|
|
|
|
2021-02-12 16:01:59 +01:00
|
|
|
|
2021-05-14 12:35:22 +02:00
|
|
|
(defn- impl-redis-open?
|
|
|
|
[^StatefulConnection conn]
|
|
|
|
(.isOpen conn))
|
|
|
|
|
2021-02-22 23:14:53 +01:00
|
|
|
(defn- impl-redis-pub
|
2021-03-07 20:30:41 +01:00
|
|
|
[^RedisAsyncCommands rac {:keys [topic message]}]
|
2021-03-22 12:13:11 +01:00
|
|
|
(let [message (blob/encode message)
|
2021-02-22 23:14:53 +01:00
|
|
|
res (a/chan 1)]
|
2021-03-07 20:30:41 +01:00
|
|
|
(-> (.publish rac ^String topic ^bytes message)
|
2021-02-22 23:14:53 +01:00
|
|
|
(p/finally (fn [_ e]
|
|
|
|
(when e (a/>!! res e))
|
|
|
|
(a/close! res))))
|
|
|
|
res))
|
2021-02-12 16:01:59 +01:00
|
|
|
|
|
|
|
(defn impl-redis-sub
|
2021-03-07 20:30:41 +01:00
|
|
|
[^RedisPubSubAsyncCommands rac topic]
|
|
|
|
(let [res (a/chan 1)]
|
|
|
|
(-> (.subscribe rac (into-array String [topic]))
|
2021-02-12 16:01:59 +01:00
|
|
|
(p/finally (fn [_ e]
|
|
|
|
(when e (a/>!! res e))
|
|
|
|
(a/close! res))))
|
|
|
|
res))
|
|
|
|
|
|
|
|
(defn impl-redis-unsub
|
2021-03-07 20:30:41 +01:00
|
|
|
[rac topic]
|
|
|
|
(let [res (a/chan 1)]
|
|
|
|
(-> (.unsubscribe rac (into-array String [topic]))
|
2021-02-12 16:01:59 +01:00
|
|
|
(p/finally (fn [_ e]
|
|
|
|
(when e (a/>!! res e))
|
|
|
|
(a/close! res))))
|
|
|
|
res))
|