From 0f9b2923c2657590b619d2e06df840bcdb5428a5 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 12 Feb 2021 16:01:59 +0100 Subject: [PATCH] :tada: Add msgbus abstraction. As a replacement for the current pubsub approach. It now uses a single connection for multiple subscriptions (instead of conn per subscription); has asynchronous publish and uses more efficient blob encoding for message encoding (the same used as page storage). --- CHANGES.md | 2 + backend/deps.edn | 2 +- backend/src/app/main.clj | 6 +- backend/src/app/msgbus.clj | 185 ++++++++++++++++++++++++ backend/src/app/notifications.clj | 39 +++-- backend/src/app/redis.clj | 58 -------- backend/src/app/rpc/mutations/files.clj | 116 ++++++++------- backend/src/app/util/redis.clj | 166 --------------------- 8 files changed, 269 insertions(+), 305 deletions(-) create mode 100644 backend/src/app/msgbus.clj delete mode 100644 backend/src/app/redis.clj delete mode 100644 backend/src/app/util/redis.clj diff --git a/CHANGES.md b/CHANGES.md index 6ebc27f3d..4487e3a75 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ - Bounce & Complaint handling. - Disable groups interactions when holding "Ctrl" key (deep selection) - New action in context menu to "edit" some shapes (binded to key "Enter") +- Add major refactor of internal pubsub/redis code; improves + scalability and performance #640 ### Bugs fixed diff --git a/backend/deps.edn b/backend/deps.edn index 2452df5d9..7220b11b3 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -34,7 +34,7 @@ expound/expound {:mvn/version "0.8.7"} com.cognitect/transit-clj {:mvn/version "1.0.324"} - io.lettuce/lettuce-core {:mvn/version "5.2.2.RELEASE"} + io.lettuce/lettuce-core {:mvn/version "6.1.0.M1"} java-http-clj/java-http-clj {:mvn/version "0.4.1"} info.sunng/ring-jetty9-adapter {:mvn/version "0.14.2"} diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ba8594798..2d4745719 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -49,7 +49,7 @@ :app.telemetry/migrations {} - :app.redis/redis + :app.msgbus/msgbus {:uri (:redis-uri config)} :app.tokens/tokens @@ -170,12 +170,12 @@ :tokens (ig/ref :app.tokens/tokens) :metrics (ig/ref :app.metrics/metrics) :storage (ig/ref :app.storage/storage) - :redis (ig/ref :app.redis/redis) + :msgbus (ig/ref :app.msgbus/msgbus) :rlimits (ig/ref :app.rlimits/all) :svgc (ig/ref :app.svgparse/svgc)} :app.notifications/handler - {:redis (ig/ref :app.redis/redis) + {:msgbus (ig/ref :app.msgbus/msgbus) :pool (ig/ref :app.db/pool) :session (ig/ref :app.http.session/session) :metrics (ig/ref :app.metrics/metrics)} diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj new file mode 100644 index 000000000..ba38f76a3 --- /dev/null +++ b/backend/src/app/msgbus.clj @@ -0,0 +1,185 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2021 UXBOX Labs SL + +(ns app.msgbus + "The msgbus abstraction implemented using redis as underlying backend." + (:require + [app.common.spec :as us] + [app.util.blob :as blob] + [clojure.core.async :as a] + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] + [integrant.core :as ig] + [promesa.core :as p]) + (:import + io.lettuce.core.RedisClient + io.lettuce.core.RedisURI + io.lettuce.core.api.StatefulRedisConnection + io.lettuce.core.api.async.RedisAsyncCommands + io.lettuce.core.codec.ByteArrayCodec + io.lettuce.core.codec.RedisCodec + io.lettuce.core.codec.StringCodec + io.lettuce.core.pubsub.RedisPubSubListener + io.lettuce.core.pubsub.StatefulRedisPubSubConnection + io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands)) + +(declare impl-publish-loop) +(declare impl-redis-pub) +(declare impl-redis-sub) +(declare impl-redis-unsub) +(declare impl-subscribe-loop) + + +;; --- STATE INIT: Publisher + +(s/def ::uri ::us/string) +(s/def ::buffer-size ::us/integer) + +(defmethod ig/pre-init-spec ::msgbus [_] + (s/keys :req-un [::uri] + :opt-un [::buffer-size])) + +(defmethod ig/prep-key ::msgbus + [_ cfg] + (merge {:buffer-size 128} cfg)) + +(defmethod ig/init-key ::msgbus + [_ {:keys [uri buffer-size] :as cfg}] + (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) + + uri (RedisURI/create uri) + rclient (RedisClient/create ^RedisURI uri) + + snd-conn (.connect ^RedisClient rclient ^RedisCodec codec) + rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec) + + snd-buff (a/chan (a/sliding-buffer buffer-size)) + rcv-buff (a/chan (a/sliding-buffer buffer-size)) + sub-buff (a/chan 1) + cch (a/chan 1)] + + ;; Start the sending (publishing) loop + (impl-publish-loop snd-conn snd-buff cch) + + ;; Start the receiving (subscribing) loop + (impl-subscribe-loop rcv-conn rcv-buff sub-buff cch) + + (with-meta + (fn run + ([command] (run command nil)) + ([command params] + (a/go + (case command + :pub (a/>! snd-buff params) + :sub (a/>! sub-buff params))))) + + {::snd-conn snd-conn + ::rcv-conn rcv-conn + ::cch cch + ::snd-buff snd-buff + ::rcv-buff rcv-buff}))) + +(defmethod ig/halt-key! ::msgbus + [_ f] + (let [mdata (meta f)] + (.close ^StatefulRedisConnection (::snd-conn mdata)) + (.close ^StatefulRedisPubSubConnection (::rcv-conn mdata)) + (a/close! (::cch mdata)) + (a/close! (::snd-buff mdata)) + (a/close! (::rcv-buff mdata)))) + +(defn- impl-redis-pub + [rac {:keys [topic message]}] + (let [topic (str topic) + message (blob/encode message) + res (a/chan 1)] + (-> (.publish ^RedisAsyncCommands rac ^String topic ^bytes message) + (p/finally (fn [_ e] + (when e (a/>!! res e)) + (a/close! res)))) + res)) + +(defn- impl-publish-loop + [conn in-buff cch] + (let [rac (.async ^StatefulRedisConnection conn)] + (a/go-loop [] + (let [[val _] (a/alts! [in-buff cch])] + (when (some? val) + (let [result (a/! ch (:message val)) + (recur (rest chans) pending) + (recur (rest chans) (conj pending ch))) + pending)) + chans (update chans topic #(reduce disj % pending))] + (when (empty? (get chans topic)) + (a/ (.subscribe cmd (into-array String [topic])) + (p/finally (fn [_ e] + (when e (a/>!! res e)) + (a/close! res)))) + res)) + + +(defn impl-redis-unsub + [conn topic] + (let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn) + res (a/chan 1)] + (-> (.unsubscribe cmd (into-array String [topic])) + (p/finally (fn [_ e] + (when e (a/>!! res e)) + (a/close! res)))) + res)) diff --git a/backend/src/app/notifications.clj b/backend/src/app/notifications.clj index e8543d9d7..7aefa4e45 100644 --- a/backend/src/app/notifications.clj +++ b/backend/src/app/notifications.clj @@ -13,7 +13,6 @@ [app.common.spec :as us] [app.db :as db] [app.metrics :as mtx] - [app.redis :as rd] [app.util.async :as aa] [app.util.transit :as t] [clojure.core.async :as a] @@ -34,9 +33,10 @@ (declare handler) (s/def ::session map?) +(s/def ::msgbus fn?) (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::rd/redis ::db/pool ::session ::mtx/metrics])) + (s/keys :req-un [::msgbus ::db/pool ::session ::mtx/metrics])) (defmethod ig/init-key ::handler [_ {:keys [session metrics] :as cfg}] @@ -127,7 +127,7 @@ false))) (defn websocket - [{:keys [file-id team-id redis] :as cfg}] + [{:keys [file-id team-id msgbus] :as cfg}] (let [in (a/chan 32) out (a/chan 32) mtx-active-connections (:mtx-active-connections cfg) @@ -138,10 +138,13 @@ (letfn [(on-connect [conn] (mtx-active-connections :inc) - (let [sub (rd/subscribe redis {:xform (map t/decode-str) - :topics [file-id team-id]}) + (let [sub (a/chan) ws (WebSocket. conn in out sub nil cfg)] + ;; Subscribe to corresponding topics + (a/ - -(ns app.redis - (:refer-clojure :exclude [run!]) - (:require - [app.common.spec :as us] - [app.util.redis :as redis] - [clojure.spec.alpha :as s] - [integrant.core :as ig]) - (:import - java.lang.AutoCloseable)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; State -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defmethod ig/pre-init-spec ::redis [_] - (s/keys :req-un [::uri])) - -(defmethod ig/init-key ::redis - [_ cfg] - (let [client (redis/client (:uri cfg "redis://redis/0")) - conn (redis/connect client)] - {::client client - ::conn conn})) - -(defmethod ig/halt-key! ::redis - [_ {:keys [::client ::conn]}] - (.close ^AutoCloseable conn) - (.close ^AutoCloseable client)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; API -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(s/def ::client some?) -(s/def ::conn some?) -(s/def ::redis (s/keys :req [::client ::conn])) - -(defn subscribe - [client opts] - (us/assert ::redis client) - (redis/subscribe (::client client) opts)) - -(defn run! - [client cmd params] - (us/assert ::redis client) - (redis/run! (::conn client) cmd params)) - -(defn run - [client cmd params] - (us/assert ::redis client) - (redis/run (::conn client) cmd params)) - diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index 9d04afeb2..990273881 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -16,14 +16,12 @@ [app.common.uuid :as uuid] [app.config :as cfg] [app.db :as db] - [app.redis :as rd] [app.rpc.queries.files :as files] [app.rpc.queries.projects :as proj] [app.tasks :as tasks] [app.util.blob :as blob] [app.util.services :as sv] [app.util.time :as dt] - [app.util.transit :as t] [clojure.spec.alpha :as s])) ;; --- Helpers & Specs @@ -252,19 +250,22 @@ :reg-objects :mov-objects} (:type change)) (some? (:component-id change))))) -(declare update-file) -(declare retrieve-lagged-changes) (declare insert-change) +(declare retrieve-lagged-changes) +(declare retrieve-team-id) +(declare send-notifications) +(declare update-file) (sv/defmethod ::update-file [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (let [{:keys [id] :as file} (db/get-by-id conn :file id {:for-update true})] (files/check-edition-permissions! conn profile-id id) - (update-file (assoc cfg :conn conn) file params)))) + (update-file (assoc cfg :conn conn) + (assoc params :file file))))) (defn- update-file - [{:keys [conn redis]} file params] + [{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}] (when (> (:revn params) (:revn file)) (ex/raise :type :validation @@ -272,64 +273,69 @@ :hint "The incoming revision number is greater that stored version." :context {:incoming-revn (:revn params) :stored-revn (:revn file)})) - (let [sid (:session-id params) - changes (:changes params) - file (-> file - (update :data blob/decode) - (update :data assoc :id (:id file)) - (update :data pmg/migrate-data) - (update :data cp/process-changes changes) - (update :data blob/encode) - (update :revn inc) - (assoc :changes (blob/encode changes) - :session-id sid)) - _ (insert-change conn file) - msg {:type :file-change - :profile-id (:profile-id params) - :file-id (:id file) - :session-id sid - :revn (:revn file) - :changes changes} - - library-changes (filter library-change? changes)] - - @(rd/run! redis :publish {:channel (str (:id file)) - :message (t/encode-str msg)}) - - (when (and (:is-shared file) (seq library-changes)) - (let [{:keys [team-id] :as project} - (db/get-by-id conn :project (:project-id file)) - - msg {:type :library-change - :profile-id (:profile-id params) + (let [file (-> file + (update :revn inc) + (update :data (fn [data] + (-> data + (blob/decode) + (assoc :id (:id file)) + (pmg/migrate-data) + (cp/process-changes changes) + (blob/encode)))))] + ;; Insert change to the xlog + (db/insert! conn :file-change + {:id (uuid/next) + :session-id session-id :file-id (:id file) - :session-id sid :revn (:revn file) - :modified-at (dt/now) - :changes library-changes}] - - @(rd/run! redis :publish {:channel (str team-id) - :message (t/encode-str msg)}))) + :data (:data file) + :changes (blob/encode changes)}) + ;; Update file (db/update! conn :file {:revn (:revn file) - :data (:data file)} + :data (:data file) + :has-media-trimmed false} {:id (:id file)}) - (retrieve-lagged-changes conn params))) + (let [params (assoc params :file file)] + ;; Send asynchronous notifications + (send-notifications cfg params) -(defn- insert-change - [conn {:keys [revn data changes session-id] :as file}] - (let [id (uuid/next) - file-id (:id file)] - (db/insert! conn :file-change - {:id id - :session-id session-id - :file-id file-id - :revn revn - :data data - :changes changes}))) + ;; Retrieve and return lagged data + (retrieve-lagged-changes conn params)))) + +(defn- send-notifications + [{:keys [msgbus conn] :as cfg} {:keys [file changes session-id] :as params}] + (let [lchanges (filter library-change? changes)] + + ;; Asynchronously publish message to the msgbus + (msgbus :pub {:topic (str (:id file)) + :message + {:type :file-change + :profile-id (:profile-id params) + :file-id (:id file) + :session-id (:session-id params) + :revn (:revn file) + :changes changes}}) + + (when (and (:is-shared file) (seq lchanges)) + (let [team-id (retrieve-team-id conn (:project-id file))] + ;; Asynchronously publish message to the msgbus + (msgbus :pub {:topic (str team-id) + :message + {:type :library-change + :profile-id (:profile-id params) + :file-id (:id file) + :session-id session-id + :revn (:revn file) + :modified-at (dt/now) + :changes lchanges}}))))) + +(defn- retrieve-team-id + [conn project-id] + (:team-id (db/get-by-id conn :project project-id {:columns [:team-id]}))) (def ^:private sql:lagged-changes diff --git a/backend/src/app/util/redis.clj b/backend/src/app/util/redis.clj deleted file mode 100644 index 0be8b5b46..000000000 --- a/backend/src/app/util/redis.clj +++ /dev/null @@ -1,166 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; This Source Code Form is "Incompatible With Secondary Licenses", as -;; defined by the Mozilla Public License, v. 2.0. -;; -;; Copyright (c) 2020 UXBOX Labs SL - -(ns app.util.redis - "Asynchronous posgresql client." - (:refer-clojure :exclude [run!]) - (:require - [clojure.core.async :as a] - [promesa.core :as p]) - (:import - io.lettuce.core.RedisClient - io.lettuce.core.RedisURI - io.lettuce.core.codec.StringCodec - io.lettuce.core.api.async.RedisAsyncCommands - io.lettuce.core.api.StatefulRedisConnection - io.lettuce.core.pubsub.RedisPubSubListener - io.lettuce.core.pubsub.StatefulRedisPubSubConnection - io.lettuce.core.pubsub.api.sync.RedisPubSubCommands - )) - -(defrecord Client [^RedisClient inner - ^RedisURI uri] - clojure.lang.IDeref - (deref [_] inner) - - java.lang.AutoCloseable - (close [_] - (.shutdown inner))) - -(defrecord Connection [^StatefulRedisConnection inner - ^RedisAsyncCommands cmd] - clojure.lang.IDeref - (deref [_] inner) - - java.lang.AutoCloseable - (close [_] - (.close ^StatefulRedisConnection inner))) - -(defn client - [uri] - (->Client (RedisClient/create) - (RedisURI/create uri))) - -(defn connect - [{:keys [uri] :as client}] - (let [conn (.connect ^RedisClient @client StringCodec/UTF8 ^RedisURI uri)] - (->Connection conn (.async ^StatefulRedisConnection conn)))) - -(defn- impl-subscribe - [topics xform ^StatefulRedisPubSubConnection conn] - (let [cmd (.sync conn) - output (a/chan 1 (comp (filter string?) xform)) - buffer (a/chan (a/sliding-buffer 64)) - sub (reify RedisPubSubListener - (message [it pattern channel message]) - (message [it channel message] - ;; There are no back pressure, so we use a slidding - ;; buffer for cases when the pubsub broker sends - ;; more messages that we can process. - (a/put! buffer message)) - (psubscribed [it pattern count]) - (punsubscribed [it pattern count]) - (subscribed [it channel count]) - (unsubscribed [it channel count]))] - - ;; Start message event-loop (with keepalive mechanism) - (a/go-loop [] - (let [[val port] (a/alts! [buffer (a/timeout 5000)]) - message (if (= port buffer) val ::keepalive)] - (if (a/>! output message) - (recur) - (do - (a/close! buffer) - (.removeListener conn sub) - (when (.isOpen conn) - (.close conn)))))) - - ;; Synchronously subscribe to topics - (.addListener conn sub) - (.subscribe ^RedisPubSubCommands cmd topics) - - ;; Return the output channel - output)) - -(defn subscribe - [{:keys [uri] :as client} {:keys [topics xform]}] - (let [topics (if (vector? topics) - (into-array String (map str topics)) - (into-array String [(str topics)]))] - (->> (.connectPubSub ^RedisClient @client StringCodec/UTF8 ^RedisURI uri) - (impl-subscribe topics xform)))) - -(defn- resolve-to-bool - [v] - (if (= v 1) - true - false)) - -(defmulti impl-run (fn [_ cmd _] cmd)) - -(defn run! - [conn cmd params] - (let [^RedisAsyncCommands conn (:cmd conn)] - (impl-run conn cmd params))) - -(defn run - [conn cmd params] - (let [res (a/chan 1)] - (if (instance? Connection conn) - (-> (run! conn cmd params) - (p/finally (fn [v e] - (if e - (a/offer! res e) - (a/offer! res v))))) - (a/close! res)) - res)) - -(defmethod impl-run :get - [conn _ {:keys [key]}] - (.get ^RedisAsyncCommands conn ^String key)) - -(defmethod impl-run :set - [conn _ {:keys [key val]}] - (.set ^RedisAsyncCommands conn ^String key ^String val)) - -(defmethod impl-run :smembers - [conn _ {:keys [key]}] - (-> (.smembers ^RedisAsyncCommands conn ^String key) - (p/then' #(into #{} %)))) - -(defmethod impl-run :sadd - [conn _ {:keys [key val]}] - (let [keys (into-array String [val])] - (-> (.sadd ^RedisAsyncCommands conn ^String key ^"[S;" keys) - (p/then resolve-to-bool)))) - -(defmethod impl-run :srem - [conn _ {:keys [key val]}] - (let [keys (into-array String [val])] - (-> (.srem ^RedisAsyncCommands conn ^String key ^"[S;" keys) - (p/then resolve-to-bool)))) - -(defmethod impl-run :publish - [conn _ {:keys [channel message]}] - (-> (.publish ^RedisAsyncCommands conn ^String channel ^String message) - (p/then resolve-to-bool))) - -(defmethod impl-run :hset - [^RedisAsyncCommands conn _ {:keys [key field value]}] - (.hset conn key field value)) - -(defmethod impl-run :hgetall - [^RedisAsyncCommands conn _ {:keys [key]}] - (.hgetall conn key)) - -(defmethod impl-run :hdel - [^RedisAsyncCommands conn _ {:keys [key field]}] - (let [fields (into-array String [field])] - (.hdel conn key fields))) -