0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-19 11:11:21 -05:00

🎉 Add msgbus abstraction.

As a replacement for the current pubsub approach.

It now uses a single connection for multiple
subscriptions (instead of conn per subscription);
has asynchronous publish and uses more efficient
blob encoding for message encoding (the same used
as page storage).
This commit is contained in:
Andrey Antukh 2021-02-12 16:01:59 +01:00 committed by Andrés Moya
parent 60f4f863df
commit 0f9b2923c2
8 changed files with 269 additions and 305 deletions

View file

@ -8,6 +8,8 @@
- Bounce & Complaint handling.
- Disable groups interactions when holding "Ctrl" key (deep selection)
- New action in context menu to "edit" some shapes (binded to key "Enter")
- Add major refactor of internal pubsub/redis code; improves
scalability and performance #640
### Bugs fixed

View file

@ -34,7 +34,7 @@
expound/expound {:mvn/version "0.8.7"}
com.cognitect/transit-clj {:mvn/version "1.0.324"}
io.lettuce/lettuce-core {:mvn/version "5.2.2.RELEASE"}
io.lettuce/lettuce-core {:mvn/version "6.1.0.M1"}
java-http-clj/java-http-clj {:mvn/version "0.4.1"}
info.sunng/ring-jetty9-adapter {:mvn/version "0.14.2"}

View file

@ -49,7 +49,7 @@
:app.telemetry/migrations
{}
:app.redis/redis
:app.msgbus/msgbus
{:uri (:redis-uri config)}
:app.tokens/tokens
@ -170,12 +170,12 @@
:tokens (ig/ref :app.tokens/tokens)
:metrics (ig/ref :app.metrics/metrics)
:storage (ig/ref :app.storage/storage)
:redis (ig/ref :app.redis/redis)
:msgbus (ig/ref :app.msgbus/msgbus)
:rlimits (ig/ref :app.rlimits/all)
:svgc (ig/ref :app.svgparse/svgc)}
:app.notifications/handler
{:redis (ig/ref :app.redis/redis)
{:msgbus (ig/ref :app.msgbus/msgbus)
:pool (ig/ref :app.db/pool)
:session (ig/ref :app.http.session/session)
:metrics (ig/ref :app.metrics/metrics)}

185
backend/src/app/msgbus.clj Normal file
View file

@ -0,0 +1,185 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2021 UXBOX Labs SL
(ns app.msgbus
"The msgbus abstraction implemented using redis as underlying backend."
(:require
[app.common.spec :as us]
[app.util.blob :as blob]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[promesa.core :as p])
(:import
io.lettuce.core.RedisClient
io.lettuce.core.RedisURI
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.codec.ByteArrayCodec
io.lettuce.core.codec.RedisCodec
io.lettuce.core.codec.StringCodec
io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands))
(declare impl-publish-loop)
(declare impl-redis-pub)
(declare impl-redis-sub)
(declare impl-redis-unsub)
(declare impl-subscribe-loop)
;; --- STATE INIT: Publisher
(s/def ::uri ::us/string)
(s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::uri]
:opt-un [::buffer-size]))
(defmethod ig/prep-key ::msgbus
[_ cfg]
(merge {:buffer-size 128} cfg))
(defmethod ig/init-key ::msgbus
[_ {:keys [uri buffer-size] :as cfg}]
(let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)
uri (RedisURI/create uri)
rclient (RedisClient/create ^RedisURI uri)
snd-conn (.connect ^RedisClient rclient ^RedisCodec codec)
rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)
snd-buff (a/chan (a/sliding-buffer buffer-size))
rcv-buff (a/chan (a/sliding-buffer buffer-size))
sub-buff (a/chan 1)
cch (a/chan 1)]
;; Start the sending (publishing) loop
(impl-publish-loop snd-conn snd-buff cch)
;; Start the receiving (subscribing) loop
(impl-subscribe-loop rcv-conn rcv-buff sub-buff cch)
(with-meta
(fn run
([command] (run command nil))
([command params]
(a/go
(case command
:pub (a/>! snd-buff params)
:sub (a/>! sub-buff params)))))
{::snd-conn snd-conn
::rcv-conn rcv-conn
::cch cch
::snd-buff snd-buff
::rcv-buff rcv-buff})))
(defmethod ig/halt-key! ::msgbus
[_ f]
(let [mdata (meta f)]
(.close ^StatefulRedisConnection (::snd-conn mdata))
(.close ^StatefulRedisPubSubConnection (::rcv-conn mdata))
(a/close! (::cch mdata))
(a/close! (::snd-buff mdata))
(a/close! (::rcv-buff mdata))))
(defn- impl-redis-pub
[rac {:keys [topic message]}]
(let [topic (str topic)
message (blob/encode message)
res (a/chan 1)]
(-> (.publish ^RedisAsyncCommands rac ^String topic ^bytes message)
(p/finally (fn [_ e]
(when e (a/>!! res e))
(a/close! res))))
res))
(defn- impl-publish-loop
[conn in-buff cch]
(let [rac (.async ^StatefulRedisConnection conn)]
(a/go-loop []
(let [[val _] (a/alts! [in-buff cch])]
(when (some? val)
(let [result (a/<! (impl-redis-pub rac val))]
(when (instance? Throwable result)
(log/errorf result "unexpected error on publish message to redis"))
(recur)))))))
(defn- impl-subscribe-loop
[conn in-buff sub-buff cch]
;; Add a unique listener to connection
(.addListener conn (reify RedisPubSubListener
(message [it pattern topic message])
(message [it topic message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(a/put! in-buff {:topic topic :message (blob/decode message)}))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it topic count])
(unsubscribed [it topic count])))
(a/go-loop [chans {}]
(let [[val port] (a/alts! [sub-buff cch in-buff] :priority true)]
(cond
;; Stop condition; just do nothing
(= port cch)
nil
(= port sub-buff)
(let [topic (:topic val)
output (:chan val)
chans (update chans topic (fnil conj #{}) output)]
(when (= 1 (count (get chans topic)))
(a/<! (impl-redis-sub conn topic)))
(recur chans))
;; This means we receive data from redis and we need to
;; forward it to the underlying subscriptions.
(= port in-buff)
(let [topic (:topic val)
pending (loop [chans (seq (get chans topic))
pending #{}]
(if-let [ch (first chans)]
(if (a/>! ch (:message val))
(recur (rest chans) pending)
(recur (rest chans) (conj pending ch)))
pending))
chans (update chans topic #(reduce disj % pending))]
(when (empty? (get chans topic))
(a/<! (impl-redis-unsub conn topic)))
(recur chans))))))
(defn impl-redis-sub
[conn topic]
(let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn)
res (a/chan 1)]
(-> (.subscribe cmd (into-array String [topic]))
(p/finally (fn [_ e]
(when e (a/>!! res e))
(a/close! res))))
res))
(defn impl-redis-unsub
[conn topic]
(let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn)
res (a/chan 1)]
(-> (.unsubscribe cmd (into-array String [topic]))
(p/finally (fn [_ e]
(when e (a/>!! res e))
(a/close! res))))
res))

View file

@ -13,7 +13,6 @@
[app.common.spec :as us]
[app.db :as db]
[app.metrics :as mtx]
[app.redis :as rd]
[app.util.async :as aa]
[app.util.transit :as t]
[clojure.core.async :as a]
@ -34,9 +33,10 @@
(declare handler)
(s/def ::session map?)
(s/def ::msgbus fn?)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::rd/redis ::db/pool ::session ::mtx/metrics]))
(s/keys :req-un [::msgbus ::db/pool ::session ::mtx/metrics]))
(defmethod ig/init-key ::handler
[_ {:keys [session metrics] :as cfg}]
@ -127,7 +127,7 @@
false)))
(defn websocket
[{:keys [file-id team-id redis] :as cfg}]
[{:keys [file-id team-id msgbus] :as cfg}]
(let [in (a/chan 32)
out (a/chan 32)
mtx-active-connections (:mtx-active-connections cfg)
@ -138,10 +138,13 @@
(letfn [(on-connect [conn]
(mtx-active-connections :inc)
(let [sub (rd/subscribe redis {:xform (map t/decode-str)
:topics [file-id team-id]})
(let [sub (a/chan)
ws (WebSocket. conn in out sub nil cfg)]
;; Subscribe to corresponding topics
(a/<!! (msgbus :sub {:topic (str file-id) :chan sub}))
(a/<!! (msgbus :sub {:topic (str team-id) :chan sub}))
;; message forwarding loop
(a/go-loop []
(let [val (a/<! out)]
@ -195,10 +198,6 @@
(let [timeout (a/timeout 30000)
[val port] (a/alts! [in sub timeout])]
;; (prn "alts" val "from" (cond (= port in) "input"
;; (= port sub) "redis"
;; :else "timeout"))
(cond
;; Process message coming from connected client
(and (= port in) (not (nil? val)))
@ -224,13 +223,6 @@
;; Incoming Messages Handling
(defn- publish
[redis channel message]
(aa/go-try
(let [message (t/encode-str message)]
(aa/<? (rd/run redis :publish {:channel (str channel)
:message message})))))
(def ^:private
sql:retrieve-presence
"select * from presence
@ -270,31 +262,34 @@
;; single use token for avoid explicit database query).
(defmethod handle-message :connect
[{:keys [file-id profile-id session-id pool redis] :as ws} _message]
[{:keys [file-id profile-id session-id pool msgbus] :as ws} _message]
;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
(aa/go-try
(aa/<? (update-presence pool file-id session-id profile-id))
(let [members (aa/<? (retrieve-presence pool file-id))]
(aa/<? (publish redis file-id {:type :presence :sessions members})))))
(aa/<? (msgbus :pub {:topic file-id
:message {:type :presence :sessions members}})))))
(defmethod handle-message :disconnect
[{:keys [profile-id file-id session-id redis pool] :as ws} _message]
[{:keys [profile-id file-id session-id pool msgbus] :as ws} _message]
;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id)
(aa/go-try
(aa/<? (delete-presence pool file-id session-id profile-id))
(let [members (aa/<? (retrieve-presence pool file-id))]
(aa/<? (publish redis file-id {:type :presence :sessions members})))))
(aa/<? (msgbus :pub {:topic file-id
:message {:type :presence :sessions members}})))))
(defmethod handle-message :keepalive
[{:keys [profile-id file-id session-id pool] :as ws} _message]
(update-presence pool file-id session-id profile-id))
(defmethod handle-message :pointer-update
[{:keys [profile-id file-id session-id redis] :as ws} message]
[{:keys [profile-id file-id session-id msgbus] :as ws} message]
(let [message (assoc message
:profile-id profile-id
:session-id session-id)]
(publish redis file-id message)))
(msgbus :pub {:topic file-id
:message message})))
(defmethod handle-message :default
[_ws message]

View file

@ -1,58 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns app.redis
(:refer-clojure :exclude [run!])
(:require
[app.common.spec :as us]
[app.util.redis :as redis]
[clojure.spec.alpha :as s]
[integrant.core :as ig])
(:import
java.lang.AutoCloseable))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; State
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmethod ig/pre-init-spec ::redis [_]
(s/keys :req-un [::uri]))
(defmethod ig/init-key ::redis
[_ cfg]
(let [client (redis/client (:uri cfg "redis://redis/0"))
conn (redis/connect client)]
{::client client
::conn conn}))
(defmethod ig/halt-key! ::redis
[_ {:keys [::client ::conn]}]
(.close ^AutoCloseable conn)
(.close ^AutoCloseable client))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::client some?)
(s/def ::conn some?)
(s/def ::redis (s/keys :req [::client ::conn]))
(defn subscribe
[client opts]
(us/assert ::redis client)
(redis/subscribe (::client client) opts))
(defn run!
[client cmd params]
(us/assert ::redis client)
(redis/run! (::conn client) cmd params))
(defn run
[client cmd params]
(us/assert ::redis client)
(redis/run (::conn client) cmd params))

View file

@ -16,14 +16,12 @@
[app.common.uuid :as uuid]
[app.config :as cfg]
[app.db :as db]
[app.redis :as rd]
[app.rpc.queries.files :as files]
[app.rpc.queries.projects :as proj]
[app.tasks :as tasks]
[app.util.blob :as blob]
[app.util.services :as sv]
[app.util.time :as dt]
[app.util.transit :as t]
[clojure.spec.alpha :as s]))
;; --- Helpers & Specs
@ -252,19 +250,22 @@
:reg-objects :mov-objects} (:type change))
(some? (:component-id change)))))
(declare update-file)
(declare retrieve-lagged-changes)
(declare insert-change)
(declare retrieve-lagged-changes)
(declare retrieve-team-id)
(declare send-notifications)
(declare update-file)
(sv/defmethod ::update-file
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [id] :as file} (db/get-by-id conn :file id {:for-update true})]
(files/check-edition-permissions! conn profile-id id)
(update-file (assoc cfg :conn conn) file params))))
(update-file (assoc cfg :conn conn)
(assoc params :file file)))))
(defn- update-file
[{:keys [conn redis]} file params]
[{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}]
(when (> (:revn params)
(:revn file))
(ex/raise :type :validation
@ -272,64 +273,69 @@
:hint "The incoming revision number is greater that stored version."
:context {:incoming-revn (:revn params)
:stored-revn (:revn file)}))
(let [sid (:session-id params)
changes (:changes params)
file (-> file
(update :data blob/decode)
(update :data assoc :id (:id file))
(update :data pmg/migrate-data)
(update :data cp/process-changes changes)
(update :data blob/encode)
(update :revn inc)
(assoc :changes (blob/encode changes)
:session-id sid))
_ (insert-change conn file)
msg {:type :file-change
:profile-id (:profile-id params)
:file-id (:id file)
:session-id sid
:revn (:revn file)
:changes changes}
library-changes (filter library-change? changes)]
@(rd/run! redis :publish {:channel (str (:id file))
:message (t/encode-str msg)})
(when (and (:is-shared file) (seq library-changes))
(let [{:keys [team-id] :as project}
(db/get-by-id conn :project (:project-id file))
msg {:type :library-change
:profile-id (:profile-id params)
(let [file (-> file
(update :revn inc)
(update :data (fn [data]
(-> data
(blob/decode)
(assoc :id (:id file))
(pmg/migrate-data)
(cp/process-changes changes)
(blob/encode)))))]
;; Insert change to the xlog
(db/insert! conn :file-change
{:id (uuid/next)
:session-id session-id
:file-id (:id file)
:session-id sid
:revn (:revn file)
:modified-at (dt/now)
:changes library-changes}]
@(rd/run! redis :publish {:channel (str team-id)
:message (t/encode-str msg)})))
:data (:data file)
:changes (blob/encode changes)})
;; Update file
(db/update! conn :file
{:revn (:revn file)
:data (:data file)}
:data (:data file)
:has-media-trimmed false}
{:id (:id file)})
(retrieve-lagged-changes conn params)))
(let [params (assoc params :file file)]
;; Send asynchronous notifications
(send-notifications cfg params)
(defn- insert-change
[conn {:keys [revn data changes session-id] :as file}]
(let [id (uuid/next)
file-id (:id file)]
(db/insert! conn :file-change
{:id id
:session-id session-id
:file-id file-id
:revn revn
:data data
:changes changes})))
;; Retrieve and return lagged data
(retrieve-lagged-changes conn params))))
(defn- send-notifications
[{:keys [msgbus conn] :as cfg} {:keys [file changes session-id] :as params}]
(let [lchanges (filter library-change? changes)]
;; Asynchronously publish message to the msgbus
(msgbus :pub {:topic (str (:id file))
:message
{:type :file-change
:profile-id (:profile-id params)
:file-id (:id file)
:session-id (:session-id params)
:revn (:revn file)
:changes changes}})
(when (and (:is-shared file) (seq lchanges))
(let [team-id (retrieve-team-id conn (:project-id file))]
;; Asynchronously publish message to the msgbus
(msgbus :pub {:topic (str team-id)
:message
{:type :library-change
:profile-id (:profile-id params)
:file-id (:id file)
:session-id session-id
:revn (:revn file)
:modified-at (dt/now)
:changes lchanges}})))))
(defn- retrieve-team-id
[conn project-id]
(:team-id (db/get-by-id conn :project project-id {:columns [:team-id]})))
(def ^:private
sql:lagged-changes

View file

@ -1,166 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 UXBOX Labs SL
(ns app.util.redis
"Asynchronous posgresql client."
(:refer-clojure :exclude [run!])
(:require
[clojure.core.async :as a]
[promesa.core :as p])
(:import
io.lettuce.core.RedisClient
io.lettuce.core.RedisURI
io.lettuce.core.codec.StringCodec
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
))
(defrecord Client [^RedisClient inner
^RedisURI uri]
clojure.lang.IDeref
(deref [_] inner)
java.lang.AutoCloseable
(close [_]
(.shutdown inner)))
(defrecord Connection [^StatefulRedisConnection inner
^RedisAsyncCommands cmd]
clojure.lang.IDeref
(deref [_] inner)
java.lang.AutoCloseable
(close [_]
(.close ^StatefulRedisConnection inner)))
(defn client
[uri]
(->Client (RedisClient/create)
(RedisURI/create uri)))
(defn connect
[{:keys [uri] :as client}]
(let [conn (.connect ^RedisClient @client StringCodec/UTF8 ^RedisURI uri)]
(->Connection conn (.async ^StatefulRedisConnection conn))))
(defn- impl-subscribe
[topics xform ^StatefulRedisPubSubConnection conn]
(let [cmd (.sync conn)
output (a/chan 1 (comp (filter string?) xform))
buffer (a/chan (a/sliding-buffer 64))
sub (reify RedisPubSubListener
(message [it pattern channel message])
(message [it channel message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(a/put! buffer message))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it channel count])
(unsubscribed [it channel count]))]
;; Start message event-loop (with keepalive mechanism)
(a/go-loop []
(let [[val port] (a/alts! [buffer (a/timeout 5000)])
message (if (= port buffer) val ::keepalive)]
(if (a/>! output message)
(recur)
(do
(a/close! buffer)
(.removeListener conn sub)
(when (.isOpen conn)
(.close conn))))))
;; Synchronously subscribe to topics
(.addListener conn sub)
(.subscribe ^RedisPubSubCommands cmd topics)
;; Return the output channel
output))
(defn subscribe
[{:keys [uri] :as client} {:keys [topics xform]}]
(let [topics (if (vector? topics)
(into-array String (map str topics))
(into-array String [(str topics)]))]
(->> (.connectPubSub ^RedisClient @client StringCodec/UTF8 ^RedisURI uri)
(impl-subscribe topics xform))))
(defn- resolve-to-bool
[v]
(if (= v 1)
true
false))
(defmulti impl-run (fn [_ cmd _] cmd))
(defn run!
[conn cmd params]
(let [^RedisAsyncCommands conn (:cmd conn)]
(impl-run conn cmd params)))
(defn run
[conn cmd params]
(let [res (a/chan 1)]
(if (instance? Connection conn)
(-> (run! conn cmd params)
(p/finally (fn [v e]
(if e
(a/offer! res e)
(a/offer! res v)))))
(a/close! res))
res))
(defmethod impl-run :get
[conn _ {:keys [key]}]
(.get ^RedisAsyncCommands conn ^String key))
(defmethod impl-run :set
[conn _ {:keys [key val]}]
(.set ^RedisAsyncCommands conn ^String key ^String val))
(defmethod impl-run :smembers
[conn _ {:keys [key]}]
(-> (.smembers ^RedisAsyncCommands conn ^String key)
(p/then' #(into #{} %))))
(defmethod impl-run :sadd
[conn _ {:keys [key val]}]
(let [keys (into-array String [val])]
(-> (.sadd ^RedisAsyncCommands conn ^String key ^"[S;" keys)
(p/then resolve-to-bool))))
(defmethod impl-run :srem
[conn _ {:keys [key val]}]
(let [keys (into-array String [val])]
(-> (.srem ^RedisAsyncCommands conn ^String key ^"[S;" keys)
(p/then resolve-to-bool))))
(defmethod impl-run :publish
[conn _ {:keys [channel message]}]
(-> (.publish ^RedisAsyncCommands conn ^String channel ^String message)
(p/then resolve-to-bool)))
(defmethod impl-run :hset
[^RedisAsyncCommands conn _ {:keys [key field value]}]
(.hset conn key field value))
(defmethod impl-run :hgetall
[^RedisAsyncCommands conn _ {:keys [key]}]
(.hgetall conn key))
(defmethod impl-run :hdel
[^RedisAsyncCommands conn _ {:keys [key field]}]
(let [fields (into-array String [field])]
(.hdel conn key fields)))