0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-12 18:18:24 -05:00

🐛 Add prefix on topics (msgbus module).

This commit is contained in:
Andrey Antukh 2021-02-24 13:08:44 +01:00 committed by Andrés Moya
parent 65a3126f15
commit 04af15cba5

View file

@ -12,6 +12,7 @@
(:require
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.config :as cfg]
[app.util.blob :as blob]
[app.util.time :as dt]
[clojure.core.async :as a]
@ -135,7 +136,8 @@
(subscribed [it topic count])
(unsubscribed [it topic count])))
(let [chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
(let [chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
tprefix (str (cfg/get :tenant) ".")
subscribe-to-single-topic
(fn [nsubs topic chan]
@ -149,8 +151,7 @@
subscribe-to-topics
(fn [state topics chan]
(let [topics (into #{} (map str) topics)
state (update state :chans assoc chan topics)]
(let [state (update state :chans assoc chan topics)]
(reduce (fn [state topic]
(update-in state [:topics topic] subscribe-to-single-topic topic chan))
state
@ -158,7 +159,6 @@
unsubscribe-from-single-topic
(fn [nsubs topic chan]
;; (log/tracef "unsubscribe-from-single-topic %s | %s | %s" nsubs topic chan)
(let [nsubs (disj nsubs chan)]
(when (empty? nsubs)
(let [result (a/<!! (impl-redis-unsub conn topic))]
@ -169,11 +169,9 @@
unsubscribe-channels
(fn [state pending]
;; (log/tracef "unsubscribe-channels %s" (pr-str pending))
(reduce (fn [state ch]
(let [topics (get-in state [:chans ch])
state (update state :chans dissoc ch)]
;; (log/tracef "unsubscribe-channels topics=%s" topics)
(reduce (fn [state topic]
(update-in state [:topics topic] unsubscribe-from-single-topic topic ch))
state
@ -185,8 +183,9 @@
;; closed.
(a/go-loop []
(when-let [{:keys [topics chan]} (a/<! sub-chan)]
(send-off chans subscribe-to-topics topics chan)
(recur)))
(let [topics (into #{} (map #(str tprefix %)) topics)]
(send-off chans subscribe-to-topics topics chan)
(recur))))
(a/go-loop []
(let [[val port] (a/alts! [cch rcv-chan])]