0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-25 07:58:49 -05:00

Add more logging to msgbus module.

This commit is contained in:
Andrey Antukh 2021-02-22 23:40:42 +01:00
parent ca1a97a52e
commit 1eddc9de33

View file

@ -10,6 +10,7 @@
(ns app.msgbus (ns app.msgbus
"The msgbus abstraction implemented using redis as underlying backend." "The msgbus abstraction implemented using redis as underlying backend."
(:require (:require
[app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt] [app.util.time :as dt]
@ -61,8 +62,8 @@
snd-conn (.connect ^RedisClient rclient ^RedisCodec codec) snd-conn (.connect ^RedisClient rclient ^RedisCodec codec)
rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec) rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)
pub-buff (a/chan (a/sliding-buffer buffer-size)) pub-buff (a/chan (a/dropping-buffer buffer-size))
rcv-buff (a/chan (a/sliding-buffer buffer-size)) rcv-buff (a/chan (a/dropping-buffer buffer-size))
sub-buff (a/chan 1) sub-buff (a/chan 1)
cch (a/chan 1)] cch (a/chan 1)]
@ -102,15 +103,15 @@
(a/close! (::rcv-buff mdata)))) (a/close! (::rcv-buff mdata))))
(defn- impl-publish-loop (defn- impl-publish-loop
[conn rcv-buff cch] [conn pub-buff cch]
(let [rac (.async ^StatefulRedisConnection conn)] (let [rac (.async ^StatefulRedisConnection conn)]
(a/go-loop [] (a/go-loop []
(let [[val _] (a/alts! [rcv-buff cch])] (let [[val _] (a/alts! [cch pub-buff] :priority true)]
(when (some? val) (when (some? val)
(let [result (a/<! (impl-redis-pub rac val))] (let [result (a/<! (impl-redis-pub rac val))]
(when (instance? Throwable result) (when (ex/exception? result)
(log/errorf result "unexpected error on publish message to redis")) (log/error result "unexpected error on publish message to redis")))
(recur))))))) (recur))))))
(defn- impl-subscribe-loop (defn- impl-subscribe-loop
[conn rcv-buff sub-buff cch] [conn rcv-buff sub-buff cch]
@ -134,12 +135,16 @@
(= port cch) (= port cch)
nil nil
;; If we receive a message on sub-buff this means that a new
;; subscription is requested by the notifications module.
(= port sub-buff) (= port sub-buff)
(let [topic (:topic val) (let [topic (:topic val)
output (:chan val) output (:chan val)
chans (update chans topic (fnil conj #{}) output)] chans (update chans topic (fnil conj #{}) output)]
(when (= 1 (count (get chans topic))) (when (= 1 (count (get chans topic)))
(a/<! (impl-redis-sub conn topic))) (let [result (a/<! (impl-redis-sub conn topic))]
(when (ex/exception? result)
(log/errorf result "unexpected exception on subscribing to '%s'" topic))))
(recur chans)) (recur chans))
;; This means we receive data from redis and we need to ;; This means we receive data from redis and we need to
@ -153,9 +158,11 @@
(recur (rest chans) pending) (recur (rest chans) pending)
(recur (rest chans) (conj pending ch))) (recur (rest chans) (conj pending ch)))
pending)) pending))
chans (update chans topic #(reduce disj % pending))] chans (update chans topic #(reduce disj % pending))]
(when (empty? (get chans topic)) (when (empty? (get chans topic))
(a/<! (impl-redis-unsub conn topic))) (let [result (a/<! (impl-redis-unsub conn topic))]
(when (ex/exception? result)
(log/errorf result "unexpected exception on unsubscribing from '%s'" topic))))
(recur chans)))))) (recur chans))))))
(defn- impl-redis-pub (defn- impl-redis-pub