;; This Source Code Form is subject to the terms of the Mozilla Public ;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; ;; Copyright (c) UXBOX Labs SL (ns app.msgbus "The msgbus abstraction implemented using redis as underlying backend." (:require [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] [app.config :as cfg] [app.util.blob :as blob] [app.util.time :as dt] [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p]) (:import java.time.Duration io.lettuce.core.RedisClient io.lettuce.core.RedisURI io.lettuce.core.api.StatefulConnection io.lettuce.core.api.StatefulRedisConnection io.lettuce.core.api.async.RedisAsyncCommands io.lettuce.core.codec.ByteArrayCodec io.lettuce.core.codec.RedisCodec io.lettuce.core.codec.StringCodec io.lettuce.core.pubsub.RedisPubSubListener io.lettuce.core.pubsub.StatefulRedisPubSubConnection io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands)) (def ^:private prefix (cfg/get :tenant)) (defn- prefix-topic [topic] (str prefix "." topic)) (def xform-prefix (map prefix-topic)) (def xform-topics (map (fn [m] (update m :topics #(into #{} xform-prefix %))))) (def xform-topic (map (fn [m] (update m :topic prefix-topic)))) (s/def ::redis-uri ::us/string) (s/def ::buffer-size ::us/integer) (defmulti init-backend :backend) (defmulti stop-backend :backend) (defmulti init-pub-loop :backend) (defmulti init-sub-loop :backend) (defmethod ig/pre-init-spec ::msgbus [_] (s/keys :opt-un [::buffer-size ::redis-uri])) (defmethod ig/prep-key ::msgbus [_ cfg] (merge {:buffer-size 128} cfg)) (defmethod ig/init-key ::msgbus [_ {:keys [backend buffer-size] :as cfg}] (l/debug :action "initialize msgbus" :backend (name backend)) (let [cfg (init-backend cfg) ;; Channel used for receive publications from the application. pub-ch (-> (a/dropping-buffer buffer-size) (a/chan xform-topic)) ;; Channel used for receive subscription requests. sub-ch (a/chan 1 xform-topics) cfg (-> cfg (assoc ::pub-ch pub-ch) (assoc ::sub-ch sub-ch))] (init-pub-loop cfg) (init-sub-loop cfg) (with-meta (fn run ([command] (run command nil)) ([command params] (a/go (case command :pub (a/>! pub-ch params) :sub (a/>! sub-ch params))))) cfg))) (defmethod ig/halt-key! ::msgbus [_ f] (let [mdata (meta f)] (stop-backend mdata) (a/close! (::pub-ch mdata)) (a/close! (::sub-ch mdata)))) ;; --- IN-MEMORY BACKEND IMPL (defmethod init-backend :memory [cfg] cfg) (defmethod stop-backend :memory [_]) (defmethod init-pub-loop :memory [_]) (defmethod init-sub-loop :memory [{:keys [::sub-ch ::pub-ch]}] (a/go-loop [state {}] (let [[val port] (a/alts! [pub-ch sub-ch])] (cond (and (= port sub-ch) (some? val)) (let [{:keys [topics chan]} val] (recur (reduce #(update %1 %2 (fnil conj #{}) chan) state topics))) (and (= port pub-ch) (some? val)) (let [topic (:topic val) message (:message val) state (loop [state state chans (get state topic)] (if-let [c (first chans)] (if (a/>! c message) (recur state (rest chans)) (recur (update state topic disj c) (rest chans))) state))] (recur state)) :else (->> (vals state) (mapcat identity) (run! a/close!)))))) ;; Add a unique listener to connection ;; --- REDIS BACKEND IMPL (declare impl-redis-open?) (declare impl-redis-pub) (declare impl-redis-sub) (declare impl-redis-unsub) (defmethod init-backend :redis [{:keys [redis-uri] :as cfg}] (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) uri (RedisURI/create redis-uri) rclient (RedisClient/create ^RedisURI uri) pub-conn (.connect ^RedisClient rclient ^RedisCodec codec) sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] (.setTimeout ^StatefulRedisConnection pub-conn ^Duration (dt/duration {:seconds 10})) (.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10})) (-> cfg (assoc ::pub-conn pub-conn) (assoc ::sub-conn sub-conn)))) (defmethod stop-backend :redis [{:keys [::pub-conn ::sub-conn] :as cfg}] (.close ^StatefulRedisConnection pub-conn) (.close ^StatefulRedisPubSubConnection sub-conn)) (defmethod init-pub-loop :redis [{:keys [::pub-conn ::pub-ch]}] (let [rac (.async ^StatefulRedisConnection pub-conn)] (a/go-loop [] (when-let [val (a/! ch message) (recur (rest chans) pending) (recur (rest chans) (conj pending ch))) pending))] (some->> (seq pending) (send-off chans unsubscribe-channels)) (recur)) ;; Stop condition; close all underlying subscriptions and ;; exit. The close operation is performed asynchronously. (send-off chans (fn [state] (->> (vals state) (mapcat identity) (filter some?) (run! a/close!))))))))) (defn- impl-redis-open? [^StatefulConnection conn] (.isOpen conn)) (defn- impl-redis-pub [^RedisAsyncCommands rac {:keys [topic message]}] (let [message (blob/encode message) res (a/chan 1)] (-> (.publish rac ^String topic ^bytes message) (p/finally (fn [_ e] (when e (a/>!! res e)) (a/close! res)))) res)) (defn impl-redis-sub [^RedisPubSubAsyncCommands rac topic] (let [res (a/chan 1)] (-> (.subscribe rac (into-array String [topic])) (p/finally (fn [_ e] (when e (a/>!! res e)) (a/close! res)))) res)) (defn impl-redis-unsub [rac topic] (let [res (a/chan 1)] (-> (.unsubscribe rac (into-array String [topic])) (p/finally (fn [_ e] (when e (a/>!! res e)) (a/close! res)))) res))