0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-12 15:51:37 -05:00

🎉 Add :memory backend to the msgbus module.

This commit is contained in:
Andrey Antukh 2021-03-22 12:13:11 +01:00 committed by Andrés Moya
parent 28691e2bf2
commit e7085571bf
4 changed files with 137 additions and 113 deletions

View file

@ -58,4 +58,3 @@
([table where-params opts] ([table where-params opts]
(let [opts (merge default-opts opts)] (let [opts (merge default-opts opts)]
(sql/for-delete table where-params opts)))) (sql/for-delete table where-params opts))))

View file

@ -59,7 +59,6 @@
:app.msgbus/msgbus :app.msgbus/msgbus
{:backend (:msgbus-backend config :redis) {:backend (:msgbus-backend config :redis)
:pool (ig/ref :app.db/pool)
:redis-uri (:redis-uri config)} :redis-uri (:redis-uri config)}
:app.tokens/tokens :app.tokens/tokens

View file

@ -13,7 +13,6 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.config :as cfg] [app.config :as cfg]
[app.db :as db]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.core.async :as a] [clojure.core.async :as a]
@ -34,6 +33,16 @@
io.lettuce.core.pubsub.StatefulRedisPubSubConnection io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands)) io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands))
(def ^:private prefix (cfg/get :tenant))
(defn- prefix-topic
[topic]
(str prefix "." topic))
(def xform-prefix (map prefix-topic))
(def xform-topics (map (fn [m] (update m :topics #(into #{} xform-prefix %)))))
(def xform-topic (map (fn [m] (update m :topic prefix-topic))))
(s/def ::redis-uri ::us/string) (s/def ::redis-uri ::us/string)
(s/def ::buffer-size ::us/integer) (s/def ::buffer-size ::us/integer)
@ -43,8 +52,7 @@
(defmulti init-sub-loop :backend) (defmulti init-sub-loop :backend)
(defmethod ig/pre-init-spec ::msgbus [_] (defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::db/pool] (s/keys :opt-un [::buffer-size ::redis-uri]))
:opt-un [::buffer-size ::redis-uri]))
(defmethod ig/prep-key ::msgbus (defmethod ig/prep-key ::msgbus
[_ cfg] [_ cfg]
@ -53,17 +61,21 @@
(defmethod ig/init-key ::msgbus (defmethod ig/init-key ::msgbus
[_ {:keys [backend buffer-size] :as cfg}] [_ {:keys [backend buffer-size] :as cfg}]
(log/debugf "initializing msgbus (backend=%s)" (name backend)) (log/debugf "initializing msgbus (backend=%s)" (name backend))
(let [cfg (init-backend cfg)
(let [backend (init-backend cfg)
;; Channel used for receive publications from the application. ;; Channel used for receive publications from the application.
pub-ch (a/chan (a/dropping-buffer buffer-size)) pub-ch (-> (a/dropping-buffer buffer-size)
(a/chan xform-topic))
;; Channel used for receive subscription requests. ;; Channel used for receive subscription requests.
sub-ch (a/chan)] sub-ch (a/chan 1 xform-topics)
(init-pub-loop (assoc backend :ch pub-ch)) cfg (-> cfg
(init-sub-loop (assoc backend :ch sub-ch)) (assoc ::pub-ch pub-ch)
(assoc ::sub-ch sub-ch))]
(init-pub-loop cfg)
(init-sub-loop cfg)
(with-meta (with-meta
(fn run (fn run
@ -73,14 +85,50 @@
(case command (case command
:pub (a/>! pub-ch params) :pub (a/>! pub-ch params)
:sub (a/>! sub-ch params))))) :sub (a/>! sub-ch params)))))
cfg)))
{::backend backend})))
(defmethod ig/halt-key! ::msgbus (defmethod ig/halt-key! ::msgbus
[_ f] [_ f]
(let [mdata (meta f)] (let [mdata (meta f)]
(stop-backend (::backend mdata)))) (stop-backend mdata)
(a/close! (::pub-ch mdata))
(a/close! (::sub-ch mdata))))
;; --- IN-MEMORY BACKEND IMPL
(defmethod init-backend :memory [cfg] cfg)
(defmethod stop-backend :memory [_])
(defmethod init-pub-loop :memory [_])
(defmethod init-sub-loop :memory
[{:keys [::sub-ch ::pub-ch]}]
(a/go-loop [state {}]
(let [[val port] (a/alts! [pub-ch sub-ch])]
(cond
(and (= port sub-ch) (some? val))
(let [{:keys [topics chan]} val]
(recur (reduce #(update %1 %2 (fnil conj #{}) chan) state topics)))
(and (= port pub-ch) (some? val))
(let [topic (:topic val)
message (:message val)
state (loop [state state
chans (get state topic)]
(if-let [c (first chans)]
(if (a/>! c message)
(recur state (rest chans))
(recur (update state topic disj c)
(rest chans)))
state))]
(recur state))
:else
(->> (vals state)
(mapcat identity)
(run! a/close!))))))
;; Add a unique listener to connection
;; --- REDIS BACKEND IMPL ;; --- REDIS BACKEND IMPL
@ -102,32 +150,28 @@
(.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10})) (.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10}))
(-> cfg (-> cfg
(assoc :pub-conn pub-conn) (assoc ::pub-conn pub-conn)
(assoc :sub-conn sub-conn) (assoc ::sub-conn sub-conn))))
(assoc :close-ch (a/chan 1)))))
(defmethod stop-backend :redis (defmethod stop-backend :redis
[{:keys [pub-conn sub-conn close-ch] :as cfg}] [{:keys [::pub-conn ::sub-conn] :as cfg}]
(.close ^StatefulRedisConnection pub-conn) (.close ^StatefulRedisConnection pub-conn)
(.close ^StatefulRedisPubSubConnection sub-conn) (.close ^StatefulRedisPubSubConnection sub-conn))
(a/close! close-ch))
(defmethod init-pub-loop :redis (defmethod init-pub-loop :redis
[{:keys [pub-conn ch close-ch]}] [{:keys [::pub-conn ::pub-ch]}]
(let [rac (.async ^StatefulRedisConnection pub-conn)] (let [rac (.async ^StatefulRedisConnection pub-conn)]
(a/go-loop [] (a/go-loop []
(let [[val _] (a/alts! [close-ch ch] :priority true)] (when-let [val (a/<! pub-ch)]
(when (some? val) (let [result (a/<! (impl-redis-pub rac val))]
(let [result (a/<! (impl-redis-pub rac val))] (when (ex/exception? result)
(when (ex/exception? result) (log/error result "unexpected error on publish message to redis")))
(log/error result "unexpected error on publish message to redis"))) (recur)))))
(recur))))))
(defmethod init-sub-loop :redis (defmethod init-sub-loop :redis
[{:keys [sub-conn ch close-ch buffer-size]}] [{:keys [::sub-conn ::sub-ch buffer-size]}]
(let [rcv-ch (a/chan (a/dropping-buffer buffer-size)) (let [rcv-ch (a/chan (a/dropping-buffer buffer-size))
chans (agent {} :error-handler #(log/error % "unexpected error on agent")) chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
tprefix (str (cfg/get :tenant) ".")
rac (.async ^StatefulRedisPubSubConnection sub-conn)] rac (.async ^StatefulRedisPubSubConnection sub-conn)]
;; Add a unique listener to connection ;; Add a unique listener to connection
@ -184,46 +228,42 @@
;; Asynchronous subscription loop; ;; Asynchronous subscription loop;
(a/go-loop [] (a/go-loop []
(let [[val _] (a/alts! [close-ch ch])] (if-let [{:keys [topics chan]} (a/<! sub-ch)]
(when-let [{:keys [topics chan]} val] (do
(let [topics (into #{} (map #(str tprefix %)) topics)] (send-off chans subscribe-to-topics topics chan)
(send-off chans subscribe-to-topics topics chan) (recur))
(recur))))) (a/close! rcv-ch)))
;; Asyncrhonous message processing loop;x
(a/go-loop [] (a/go-loop []
(let [[val port] (a/alts! [close-ch rcv-ch])] (if-let [{:keys [topic message]} (a/<! rcv-ch)]
(cond ;; This means we receive data from redis and we need to
;; Stop condition; close all underlying subscriptions and ;; forward it to the underlying subscriptions.
;; exit. The close operation is performed asynchronously. (let [pending (loop [chans (seq (get-in @chans [:topics topic]))
(= port close-ch) pending #{}]
(send-off chans (fn [state] (if-let [ch (first chans)]
(log/tracef "close") (if (a/>! ch message)
(->> (vals state) (recur (rest chans) pending)
(mapcat identity) (recur (rest chans) (conj pending ch)))
(filter some?) pending))]
(run! a/close!)))) ;; (log/tracef "received message => pending: %s" (pr-str pending))
(some->> (seq pending)
(send-off chans unsubscribe-channels))
;; This means we receive data from redis and we need to (recur))
;; forward it to the underlying subscriptions.
(= port rcv-ch) ;; Stop condition; close all underlying subscriptions and
(let [topic (:topic val) ; topic is already string ;; exit. The close operation is performed asynchronously.
pending (loop [chans (seq (get-in @chans [:topics topic])) (send-off chans (fn [state]
pending #{}] (->> (vals state)
(if-let [ch (first chans)] (mapcat identity)
(if (a/>! ch (:message val)) (filter some?)
(recur (rest chans) pending) (run! a/close!)))))))))
(recur (rest chans) (conj pending ch)))
pending))]
;; (log/tracef "received message => pending: %s" (pr-str pending))
(some->> (seq pending)
(send-off chans unsubscribe-channels))
(recur))))))))
(defn- impl-redis-pub (defn- impl-redis-pub
[^RedisAsyncCommands rac {:keys [topic message]}] [^RedisAsyncCommands rac {:keys [topic message]}]
(let [topic (str (cfg/get :tenant) "." topic) (let [message (blob/encode message)
message (blob/encode message)
res (a/chan 1)] res (a/chan 1)]
(-> (.publish rac ^String topic ^bytes message) (-> (.publish rac ^String topic ^bytes message)
(p/finally (fn [_ e] (p/finally (fn [_ e]

View file

@ -24,9 +24,7 @@
[ring.adapter.jetty9 :as jetty] [ring.adapter.jetty9 :as jetty]
[ring.middleware.cookies :refer [wrap-cookies]] [ring.middleware.cookies :refer [wrap-cookies]]
[ring.middleware.keyword-params :refer [wrap-keyword-params]] [ring.middleware.keyword-params :refer [wrap-keyword-params]]
[ring.middleware.params :refer [wrap-params]]) [ring.middleware.params :refer [wrap-params]]))
(:import
org.eclipse.jetty.websocket.api.WebSocketAdapter))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Http Handler ;; Http Handler
@ -199,56 +197,44 @@
(declare start-loop!) (declare start-loop!)
(defn- handle-connect (defn- handle-connect
[{:keys [conn] :as cfg}] [cfg]
(a/go (a/go
(try (a/<! (handle-message cfg {:type :connect}))
(aa/<? (handle-message cfg {:type :connect})) (a/<! (start-loop! cfg))
(aa/<? (start-loop! cfg)) (a/<! (handle-message cfg {:type :disconnect}))))
(aa/<? (handle-message cfg {:type :disconnect}))
(catch Throwable err
(log/errorf err "unexpected exception on websocket handler")
(let [session (.getSession ^WebSocketAdapter conn)]
(when session
(.disconnect session)))))))
(defn- start-loop! (defn- start-loop!
[{:keys [rcv-ch out-ch sub-ch session-id profile-id] :as cfg}] [{:keys [rcv-ch out-ch sub-ch session-id] :as cfg}]
(aa/go-try (a/go-loop []
(loop [] (let [timeout (a/timeout 30000)
(let [timeout (a/timeout 30000) [val port] (a/alts! [rcv-ch sub-ch timeout])]
[val port] (a/alts! [rcv-ch sub-ch timeout])] (cond
;; Process message coming from connected client
(and (= port rcv-ch) (some? val))
(do
(a/<! (handle-message cfg val))
(recur))
(cond ;; Process message coming from pubsub.
;; Process message coming from connected client (and (= port sub-ch) (some? val))
(and (= port rcv-ch) (some? val)) (do
(do (when-not (= (:session-id val) session-id)
(aa/<? (handle-message cfg val)) ;; If we receive a connect message of other user, we need
(recur)) ;; to send an update presence to all participants.
(when (= :connect (:type val))
(a/<! (send-presence cfg :presence)))
;; Then, just forward the message
(a/>! out-ch val))
(recur))
;; Process message coming from pubsub. ;; When timeout channel is signaled, we need to send a ping
(and (= port sub-ch) (some? val)) ;; message to the output channel. TODO: we need to make this
(do ;; more smart.
(when-not (= (:session-id val) session-id) (= port timeout)
;; If we receive a connect message of other user, we need (do
;; to send an update presence to all participants. (a/>! out-ch {:type :ping})
(when (= :connect (:type val)) (recur))))))
(a/<! (send-presence cfg)))
;; Then, just forward the message
(a/>! out-ch val))
(recur))
;; When timeout channel is signaled, we need to send a ping
;; message to the output channel. TODO: we need to make this
;; more smart.
(= port timeout)
(do
(a/>! out-ch {:type :ping})
(recur))
:else
nil)))))
(defn send-presence (defn send-presence
([cfg] (send-presence cfg :presence)) ([cfg] (send-presence cfg :presence))
@ -265,18 +251,18 @@
(fn [_ message] (:type message))) (fn [_ message] (:type message)))
(defmethod handle-message :connect (defmethod handle-message :connect
[{:keys [file-id msgbus] :as cfg} _message] [cfg _]
;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id) ;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
(send-presence cfg :connect)) (send-presence cfg :connect))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
[{:keys [file-id msgbus] :as cfg} _message] [cfg _]
;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id) ;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id)
(send-presence cfg :disconnect)) (send-presence cfg :disconnect))
(defmethod handle-message :keepalive (defmethod handle-message :keepalive
[cfg _message] [_ _]
(a/go (do :nothing))) (a/go :nothing))
(defmethod handle-message :pointer-update (defmethod handle-message :pointer-update
[{:keys [profile-id file-id session-id msgbus] :as cfg} message] [{:keys [profile-id file-id session-id msgbus] :as cfg} message]