From 04af15cba59ed551e808fd93caafbe773d7eab36 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 24 Feb 2021 13:08:44 +0100 Subject: [PATCH] :bug: Add prefix on topics (msgbus module). --- backend/src/app/msgbus.clj | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index b1e13d164..143d5c08f 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -12,6 +12,7 @@ (:require [app.common.exceptions :as ex] [app.common.spec :as us] + [app.config :as cfg] [app.util.blob :as blob] [app.util.time :as dt] [clojure.core.async :as a] @@ -135,7 +136,8 @@ (subscribed [it topic count]) (unsubscribed [it topic count]))) - (let [chans (agent {} :error-handler #(log/error % "unexpected error on agent")) + (let [chans (agent {} :error-handler #(log/error % "unexpected error on agent")) + tprefix (str (cfg/get :tenant) ".") subscribe-to-single-topic (fn [nsubs topic chan] @@ -149,8 +151,7 @@ subscribe-to-topics (fn [state topics chan] - (let [topics (into #{} (map str) topics) - state (update state :chans assoc chan topics)] + (let [state (update state :chans assoc chan topics)] (reduce (fn [state topic] (update-in state [:topics topic] subscribe-to-single-topic topic chan)) state @@ -158,7 +159,6 @@ unsubscribe-from-single-topic (fn [nsubs topic chan] - ;; (log/tracef "unsubscribe-from-single-topic %s | %s | %s" nsubs topic chan) (let [nsubs (disj nsubs chan)] (when (empty? nsubs) (let [result (a/