From 2e717882f19153a7c2a241a5883bfc72e51e5cd3 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 20 Feb 2023 12:44:35 +0100 Subject: [PATCH] :recycle: Refactor websockets impl to use virtual threads Removing the use of core.async code and implement code using plain old and familiar synchronous code --- backend/deps.edn | 4 +- backend/scripts/repl | 7 +- backend/src/app/http/websocket.clj | 365 ++++++++---------- backend/src/app/main.clj | 5 +- backend/src/app/msgbus.clj | 284 +++++++------- backend/src/app/redis.clj | 11 +- backend/src/app/rpc/commands/files_update.clj | 2 - backend/src/app/util/websocket.clj | 324 +++++++--------- 8 files changed, 465 insertions(+), 537 deletions(-) diff --git a/backend/deps.edn b/backend/deps.edn index eef16de18..2a52c1930 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -19,8 +19,8 @@ java-http-clj/java-http-clj {:mvn/version "0.4.3"} funcool/yetti - {:git/tag "v9.12" - :git/sha "51646d8" + {:git/tag "v9.13" + :git/sha "e2d25db" :git/url "https://github.com/funcool/yetti.git" :exclusions [org.slf4j/slf4j-api]} diff --git a/backend/scripts/repl b/backend/scripts/repl index f690399cf..5cae7d7be 100755 --- a/backend/scripts/repl +++ b/backend/scripts/repl @@ -42,6 +42,9 @@ export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3 export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000 export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot +#-J-Djdk.virtualThreadScheduler.parallelism=16 + + export OPTIONS=" -A:jmx-remote -A:dev \ -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager \ @@ -49,7 +52,9 @@ export OPTIONS=" -J-Dlog4j2.configurationFile=log4j2-devenv.xml \ -J-XX:-OmitStackTraceInFastThrow \ -J-XX:+UnlockDiagnosticVMOptions \ - -J-XX:+DebugNonSafepoints"; + -J-XX:+DebugNonSafepoints \ + -J-Djdk.tracePinnedThreads=full \ + -J--enable-preview"; # Setup HEAP export OPTIONS="$OPTIONS -J-Xms50m -J-Xmx1024m" diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index f06fd1d7c..309458b7d 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -17,9 +17,9 @@ [app.msgbus :as mbus] [app.util.time :as dt] [app.util.websocket :as ws] - [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] + [promesa.exec.csp :as sp] [yetti.websocket :as yws])) (def recv-labels @@ -34,70 +34,38 @@ (def state (atom {})) -(defn- on-connect - [{:keys [::mtx/metrics]} wsp] - (let [created-at (dt/now)] - (swap! state assoc (::ws/id @wsp) wsp) - (mtx/run! metrics - :id :websocket-active-connections - :inc 1) - (fn [] - (swap! state dissoc (::ws/id @wsp)) - (mtx/run! metrics :id :websocket-active-connections :dec 1) - (mtx/run! metrics - :id :websocket-session-timing - :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0))))) - -(defn- on-rcv-message - [{:keys [::mtx/metrics]} _ message] - (mtx/run! metrics - :id :websocket-messages-total - :labels recv-labels - :inc 1) - message) - -(defn- on-snd-message - [{:keys [::mtx/metrics]} _ message] - (mtx/run! metrics - :id :websocket-messages-total - :labels send-labels - :inc 1) - message) - ;; REPL HELPERS (defn repl-get-connections-for-file [file-id] (->> (vals @state) (filter #(= file-id (-> % deref ::file-subscription :file-id))) - (map deref) (map ::ws/id))) (defn repl-get-connections-for-team [team-id] (->> (vals @state) (filter #(= team-id (-> % deref ::team-subscription :team-id))) - (map deref) (map ::ws/id))) (defn repl-close-connection [id] - (when-let [wsp (get @state id)] - (a/>!! (::ws/close-ch @wsp) [8899 "closed from server"]) - (a/close! (::ws/close-ch @wsp)))) + (when-let [{:keys [::ws/close-ch] :as wsp} (get @state id)] + (sp/put! close-ch [8899 "closed from server"]) + (sp/close! close-ch))) (defn repl-get-connection-info [id] (when-let [wsp (get @state id)] {:id id - :created-at (::created-at @wsp) - :profile-id (::profile-id @wsp) - :session-id (::session-id @wsp) - :user-agent (::ws/user-agent @wsp) - :ip-addr (::ws/remote-addr @wsp) - :last-activity-at (::ws/last-activity-at @wsp) - :subscribed-file (-> wsp deref ::file-subscription :file-id) - :subscribed-team (-> wsp deref ::team-subscription :team-id)})) + :created-at (::created-at wsp) + :profile-id (::profile-id wsp) + :session-id (::session-id wsp) + :user-agent (::ws/user-agent wsp) + :ip-addr (::ws/remote-addr wsp) + :last-activity-at (::ws/last-activity-at wsp) + :subscribed-file (-> wsp ::file-subscription :file-id) + :subscribed-team (-> wsp ::team-subscription :team-id)})) (defn repl-print-connection-info [id] @@ -117,202 +85,195 @@ (fn [_ _ message] (:type message))) -(defmethod handle-message :connect - [cfg wsp _] +(defmethod handle-message :open + [{:keys [::mbus/msgbus]} {:keys [::ws/id ::ws/output-ch ::ws/state ::profile-id ::session-id] :as wsp} _] + (l/trace :fn "handle-message" :event "open" :conn-id id) + (let [ch (sp/chan :buf (sp/dropping-buffer 16) + :xf (remove #(= (:session-id %) session-id)))] - (let [msgbus (::mbus/msgbus cfg) - conn-id (::ws/id @wsp) - profile-id (::profile-id @wsp) - session-id (::session-id @wsp) - output-ch (::ws/output-ch @wsp) + ;; Subscribe to the profile channel and forward all messages to websocket output + ;; channel (send them to the client). + (swap! state assoc ::profile-subscription {:channel ch}) - xform (remove #(= (:session-id %) session-id)) - channel (a/chan (a/dropping-buffer 16) xform)] + ;; Forward the subscription messages directly to the websocket output channel + (sp/pipe ch output-ch false) - (l/trace :fn "handle-message" :event "connect" :conn-id conn-id) + ;; Subscribe to the profile topic on msgbus/redis + (mbus/sub! msgbus :topic profile-id :chan ch))) - ;; Subscribe to the profile channel and forward all messages to - ;; websocket output channel (send them to the client). - (swap! wsp assoc ::profile-subscription channel) - (a/pipe channel output-ch false) - (mbus/sub! msgbus :topic profile-id :chan channel))) +(defmethod handle-message :close + [{:keys [::mbus/msgbus]} {:keys [::ws/id ::ws/state ::profile-id ::session-id]} _] + (l/trace :fn "handle-message" :event "close" :conn-id id) + (let [psub (::profile-subscription @state) + fsub (::file-subscription @state) + tsub (::team-subscription @state) + msg {:type :disconnect + :subs-id profile-id + :profile-id profile-id + :session-id session-id}] -(defmethod handle-message :disconnect - [cfg wsp _] - (let [msgbus (::mbus/msgbus cfg) - conn-id (::ws/id @wsp) - profile-id (::profile-id @wsp) - session-id (::session-id @wsp) - profile-ch (::profile-subscription @wsp) - fsub (::file-subscription @wsp) - tsub (::team-subscription @wsp) + ;; Close profile subscription if exists + (when-let [ch (:channel psub)] + (sp/close! ch) + (mbus/purge! msgbus [ch])) - message {:type :disconnect - :subs-id profile-id - :profile-id profile-id - :session-id session-id}] - - (l/trace :fn "handle-message" - :event :disconnect - :conn-id conn-id) - - (a/go - ;; Close the main profile subscription - (a/close! profile-ch) - (a/! output-ch message) - (recur)))) + (mbus/pub! msgbus + :topic file-id + :message message))) + (recur))) - (a/go - ;; Subscribe to file topic - (a/ message + (assoc :subs-id profile-id) + (assoc :profile-id profile-id) + (assoc :session-id session-id))] + (mbus/pub! msgbus :topic profile-id :message message))) (defmethod handle-message :pointer-update - [cfg wsp {:keys [file-id] :as message}] - (let [msgbus (::mbus/msgbus cfg) - profile-id (::profile-id @wsp) - session-id (::session-id @wsp) - subs (::file-subscription @wsp) - message (-> message - (assoc :subs-id file-id) - (assoc :profile-id profile-id) - (assoc :session-id session-id))] - (a/go - ;; Only allow receive pointer updates when active subscription - (when subs - (a/ message + (assoc :subs-id file-id) + (assoc :profile-id profile-id) + (assoc :session-id session-id))] + (mbus/pub! msgbus :topic file-id :message message)))) (defmethod handle-message :default - [_ wsp message] - (let [conn-id (::ws/id @wsp)] - (l/warn :hint "received unexpected message" - :message message - :conn-id conn-id) - (a/go :none))) + [_ {:keys [::ws/id]} message] + (l/warn :hint "received unexpected message" + :message message + :conn-id id)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HTTP HANDLER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defn- on-connect + [{:keys [::mtx/metrics]} {:keys [::ws/id] :as wsp}] + (let [created-at (dt/now)] + (l/trace :fn "on-connect" :conn-id id) + (swap! state assoc id wsp) + (mtx/run! metrics + :id :websocket-active-connections + :inc 1) + + (assoc wsp ::ws/on-disconnect + (fn [] + (l/trace :fn "on-disconnect" :conn-id id) + (swap! state dissoc id) + (mtx/run! metrics :id :websocket-active-connections :dec 1) + (mtx/run! metrics + :id :websocket-session-timing + :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)))))) + +(defn- on-rcv-message + [{:keys [::mtx/metrics ::profile-id ::session-id]} message] + (mtx/run! metrics + :id :websocket-messages-total + :labels recv-labels + :inc 1) + (assoc message :profile-id profile-id :session-id session-id)) + +(defn- on-snd-message + [{:keys [::mtx/metrics]} message] + (mtx/run! metrics + :id :websocket-messages-total + :labels send-labels + :inc 1) + message) + + (s/def ::session-id ::us/uuid) (s/def ::handler-params (s/keys :req-un [::session-id])) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 9d5ae4ad0..b2cbce8f7 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -195,9 +195,8 @@ ::mtx/metrics (ig/ref ::mtx/metrics)} ::mbus/msgbus - {:backend (cf/get :msgbus-backend :redis) - :executor (ig/ref ::wrk/executor) - :redis (ig/ref ::rds/redis)} + {::wrk/executor (ig/ref ::wrk/executor) + ::rds/redis (ig/ref ::rds/redis)} :app.storage.tmp/cleaner {::wrk/executor (ig/ref ::wrk/executor)} diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index f0e4e28b4..cdf9af501 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -8,20 +8,18 @@ "The msgbus abstraction implemented using redis as underlying backend." (:require [app.common.data :as d] - [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] [app.common.transit :as t] [app.config :as cfg] - [app.redis :as redis] - [app.util.async :as aa] + [app.redis :as rds] [app.util.time :as dt] [app.worker :as wrk] - [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] - [promesa.exec :as px])) + [promesa.exec :as px] + [promesa.exec.csp :as sp])) (set! *warn-on-reflection* true) @@ -34,132 +32,116 @@ (def ^:private xform-prefix-topic (map (fn [obj] (update obj :topic prefix-topic)))) -(declare ^:private redis-connect) -(declare ^:private redis-disconnect) -(declare ^:private redis-pub) -(declare ^:private redis-sub) -(declare ^:private redis-unsub) +(declare ^:private redis-pub!) +(declare ^:private redis-sub!) +(declare ^:private redis-unsub!) (declare ^:private start-io-loop!) (declare ^:private subscribe-to-topics) (declare ^:private unsubscribe-channels) -(defmethod ig/prep-key ::msgbus - [_ cfg] - (merge {:buffer-size 128 - :timeout (dt/duration {:seconds 30})} - (d/without-nils cfg))) - -(s/def ::cmd-ch ::aa/channel) -(s/def ::rcv-ch ::aa/channel) -(s/def ::pub-ch ::aa/channel) +(s/def ::cmd-ch sp/chan?) +(s/def ::rcv-ch sp/chan?) +(s/def ::pub-ch sp/chan?) (s/def ::state ::us/agent) -(s/def ::pconn ::redis/connection-holder) -(s/def ::sconn ::redis/connection-holder) +(s/def ::pconn ::rds/connection-holder) +(s/def ::sconn ::rds/connection-holder) (s/def ::msgbus (s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor])) -(s/def ::buffer-size ::us/integer) - (defmethod ig/pre-init-spec ::msgbus [_] - (s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor])) + (s/keys :req [::rds/redis ::wrk/executor])) + +(defmethod ig/prep-key ::msgbus + [_ cfg] + (-> cfg + (assoc ::buffer-size 128) + (assoc ::timeout (dt/duration {:seconds 30})))) (defmethod ig/init-key ::msgbus - [_ {:keys [buffer-size executor] :as cfg}] + [_ {:keys [::buffer-size ::wrk/executor ::timeout ::rds/redis] :as cfg}] (l/info :hint "initialize msgbus" :buffer-size buffer-size) - (let [cmd-ch (a/chan buffer-size) - rcv-ch (a/chan (a/dropping-buffer buffer-size)) - pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic) + (let [cmd-ch (sp/chan :buf buffer-size) + rcv-ch (sp/chan :buf (sp/dropping-buffer buffer-size)) + pub-ch (sp/chan :buf (sp/dropping-buffer buffer-size) + :xf xform-prefix-topic) state (agent {}) - msgbus (-> (redis-connect cfg) + + pconn (rds/connect redis :timeout timeout) + sconn (rds/connect redis :type :pubsub :timeout timeout) + msgbus (-> cfg + (assoc ::pconn pconn) + (assoc ::sconn sconn) (assoc ::cmd-ch cmd-ch) (assoc ::rcv-ch rcv-ch) (assoc ::pub-ch pub-ch) (assoc ::state state) (assoc ::wrk/executor executor))] - (us/verify! ::msgbus msgbus) - (set-error-handler! state #(l/error :cause % :hint "unexpected error on agent" ::l/sync? true)) (set-error-mode! state :continue) - (start-io-loop! msgbus) - msgbus)) - -(defn sub! - [{:keys [::state ::wrk/executor] :as cfg} & {:keys [topic topics chan]}] - (let [done-ch (a/chan) - topics (into [] (map prefix-topic) (if topic [topic] topics))] - (l/debug :hint "subscribe" :topics topics) - (send-via executor state subscribe-to-topics cfg topics chan done-ch) - done-ch)) - -(defn pub! - [{::keys [pub-ch]} & {:as params}] - (a/go - (a/>! pub-ch params))) - -(defn purge! - [{:keys [::state ::wrk/executor] :as msgbus} chans] - (l/trace :hint "purge" :chans (count chans)) - (let [done-ch (a/chan)] - (send-via executor state unsubscribe-channels msgbus chans done-ch) - done-ch)) + (assoc msgbus ::io-thr (start-io-loop! msgbus)))) (defmethod ig/halt-key! ::msgbus [_ msgbus] - (redis-disconnect msgbus) - (a/close! (::cmd-ch msgbus)) - (a/close! (::rcv-ch msgbus)) - (a/close! (::pub-ch msgbus))) + (px/interrupt! (::io-thr msgbus)) + (sp/close! (::cmd-ch msgbus)) + (sp/close! (::rcv-ch msgbus)) + (sp/close! (::pub-ch msgbus)) + (d/close! (::pconn msgbus)) + (d/close! (::sconn msgbus))) + +(defn sub! + [{:keys [::state ::wrk/executor] :as cfg} & {:keys [topic topics chan]}] + (let [topics (into [] (map prefix-topic) (if topic [topic] topics))] + (l/debug :hint "subscribe" :topics topics :chan (hash chan)) + (send-via executor state subscribe-to-topics cfg topics chan) + nil)) + +(defn pub! + [{::keys [pub-ch]} & {:as params}] + (sp/put! pub-ch params)) + +(defn purge! + [{:keys [::state ::wrk/executor] :as msgbus} chans] + (l/debug :hint "purge" :chans (count chans)) + (send-via executor state unsubscribe-channels msgbus chans) + nil) ;; --- IMPL -(defn- redis-connect - [{:keys [timeout redis] :as cfg}] - (let [pconn (redis/connect redis :timeout timeout) - sconn (redis/connect redis :type :pubsub :timeout timeout)] - {::pconn pconn - ::sconn sconn})) - -(defn- redis-disconnect - [{:keys [::pconn ::sconn] :as cfg}] - (d/close! pconn) - (d/close! sconn)) - (defn- conj-subscription "A low level function that is responsible to create on-demand subscriptions on redis. It reuses the same subscription if it is - already established. Intended to be executed in agent." + already established." [nsubs cfg topic chan] (let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))] (when (= 1 (count nsubs)) (l/trace :hint "open subscription" :topic topic ::l/sync? true) - (redis-sub cfg topic)) + (redis-sub! cfg topic)) nsubs)) (defn- disj-subscription "A low level function responsible on removing subscriptions. The subscription is truly removed from redis once no single local - subscription is look for it. Intended to be executed in agent." + subscription is look for it." [nsubs cfg topic chan] (let [nsubs (disj nsubs chan)] (when (empty? nsubs) (l/trace :hint "close subscription" :topic topic ::l/sync? true) - (redis-unsub cfg topic)) + (redis-unsub! cfg topic)) nsubs)) (defn- subscribe-to-topics - "Function responsible to attach local subscription to the - state. Intended to be used in agent." - [state cfg topics chan done-ch] - (aa/with-closing done-ch - (let [state (update state :chans assoc chan topics)] - (reduce (fn [state topic] - (update-in state [:topics topic] conj-subscription cfg topic chan)) - state - topics)))) + "Function responsible to attach local subscription to the state." + [state cfg topics chan] + (let [state (update state :chans assoc chan topics)] + (reduce (fn [state topic] + (update-in state [:topics topic] conj-subscription cfg topic chan)) + state + topics))) -(defn- unsubscribe-single-channel +(defn- unsubscribe-channel "Auxiliary function responsible on removing a single local subscription from the state." [state cfg chan] @@ -174,87 +156,113 @@ "Function responsible from detach from state a seq of channels, useful when client disconnects or in-bulk unsubscribe operations. Intended to be executed in agent." - [state cfg channels done-ch] - (aa/with-closing done-ch - (reduce #(unsubscribe-single-channel %1 cfg %2) state channels))) + [state cfg channels] + (reduce #(unsubscribe-channel %1 cfg %2) state channels)) (defn- create-listener [rcv-ch] - (redis/pubsub-listener + (rds/pubsub-listener :on-message (fn [_ topic message] ;; There are no back pressure, so we use a slidding ;; buffer for cases when the pubsub broker sends ;; more messages that we can process. (let [val {:topic topic :message (t/decode message)}] - (when-not (a/offer! rcv-ch val) + (when-not (sp/offer! rcv-ch val) (l/warn :msg "dropping message on subscription loop")))))) +(defn- process-input! + [{:keys [::state ::wrk/executor] :as cfg} topic message] + (let [chans (get-in @state [:topics topic])] + (when-let [closed (loop [chans (seq chans) + closed #{}] + (if-let [ch (first chans)] + (if (sp/put! ch message) + (recur (rest chans) closed) + (recur (rest chans) (conj closed ch))) + (seq closed)))] + (send-via executor state unsubscribe-channels cfg closed)))) + + (defn start-io-loop! [{:keys [::sconn ::rcv-ch ::pub-ch ::state ::wrk/executor] :as cfg}] - (redis/add-listener! sconn (create-listener rcv-ch)) - (letfn [(send-to-topic [topic message] - (a/go-loop [chans (seq (get-in @state [:topics topic])) - closed #{}] - (if-let [ch (first chans)] - (if (a/>! ch message) - (recur (rest chans) closed) - (recur (rest chans) (conj closed ch))) - (seq closed)))) + (rds/add-listener! sconn (create-listener rcv-ch)) - (process-incoming [{:keys [topic message]}] - (a/go - (when-let [closed (a/> (vals state) - (mapcat identity) - (filter some?) - (run! a/close!)) - nil))) - - (= port rcv-ch) - (do - (a/> (:chans @state) + (map key) + (filter sp/closed?))] + (when (seq closed) + (send-via executor state unsubscribe-channels cfg closed) + (l/debug :hint "proactively purge channels" :count (count closed))) (recur)) - (= port pub-ch) - (let [result (a/ (redis/publish! pconn topic message) - (p/finally (fn [_ cause] - (when (and cause (redis/open? pconn)) - (a/offer! res cause)) - (a/close! res)))) - res)) + (try + (p/await! (rds/publish! pconn topic (t/encode message))) + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (l/error :hint "unexpected error on publishing" + :message message + :cause cause)))) -(defn redis-sub +(defn- redis-sub! "Create redis subscription. Blocking operation, intended to be used inside an agent." [{:keys [::sconn] :as cfg} topic] - (redis/subscribe! sconn topic)) + (try + (rds/subscribe! sconn topic) + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (l/trace :hint "exception on subscribing" :topic topic :cause cause)))) -(defn redis-unsub +(defn- redis-unsub! "Removes redis subscription. Blocking operation, intended to be used inside an agent." [{:keys [::sconn] :as cfg} topic] - (redis/unsubscribe! sconn topic)) + (try + (rds/unsubscribe! sconn topic) + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (l/trace :hint "exception on unsubscribing" :topic topic :cause cause)))) + diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index b00d51c7c..4f7a8ed5a 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -18,7 +18,8 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] - [promesa.core :as p]) + [promesa.core :as p] + [promesa.exec :as px]) (:import clojure.lang.IDeref clojure.lang.MapEntry @@ -99,11 +100,11 @@ (defmethod ig/prep-key ::redis [_ cfg] - (let [runtime (Runtime/getRuntime) - cpus (.availableProcessors ^Runtime runtime)] + (let [cpus (px/get-available-processors) + threads (max 1 (int (* cpus 0.2)))] (merge {::timeout (dt/duration "10s") - ::io-threads (max 3 cpus) - ::worker-threads (max 3 cpus)} + ::io-threads (max 3 threads) + ::worker-threads (max 3 threads)} (d/without-nils cfg)))) (defmethod ig/pre-init-spec ::redis [_] diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index d48d609e6..d767499a0 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -277,7 +277,6 @@ (let [lchanges (filter library-change? changes) msgbus (::mbus/msgbus cfg)] - ;; Asynchronously publish message to the msgbus (mbus/pub! msgbus :topic (:id file) :message {:type :file-change @@ -290,7 +289,6 @@ (when (and (:is-shared file) (seq lchanges)) (let [team-id (or (:team-id file) (files/get-team-id conn (:project-id file)))] - ;; Asynchronously publish message to the msgbus (mbus/pub! msgbus :topic team-id :message {:type :library-change diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index 5f8ec55c5..1b8e16560 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -5,7 +5,7 @@ ;; Copyright (c) KALEIDOS INC (ns app.util.websocket - "A general protocol implementation on top of websockets." + "A general protocol implementation on top of websockets using vthreads." (:require [app.common.exceptions :as ex] [app.common.logging :as l] @@ -13,22 +13,42 @@ [app.common.uuid :as uuid] [app.loggers.audit :refer [parse-client-ip]] [app.util.time :as dt] - [clojure.core.async :as a] + [promesa.exec :as px] + [promesa.exec.csp :as sp] [yetti.request :as yr] [yetti.util :as yu] [yetti.websocket :as yws]) (:import java.nio.ByteBuffer)) -(declare decode-beat) -(declare encode-beat) -(declare start-io-loop) -(declare ws-ping!) -(declare ws-send!) -(declare filter-options) - (def noop (constantly nil)) (def identity-3 (fn [_ _ o] o)) +(def max-missed-heartbeats 3) +(def heartbeat-interval 5000) + +(defn- encode-beat + [n] + (doto (ByteBuffer/allocate 8) + (.putLong n) + (.rewind))) + +(defn- decode-beat + [^ByteBuffer buffer] + (when (= 8 (.capacity buffer)) + (.rewind buffer) + (.getLong buffer))) + +(defn- wrap-handler + [handler] + (fn [wsp message] + (try + (handler wsp message) + (catch Throwable cause + (if (ex/error? cause) + {:type :error :error (ex-data cause)} + {:type :error :error {:message (ex-message cause)}}))))) + +(declare start-io-loop!) (defn handler "A WebSocket upgrade handler factory. Returns a handler that can be @@ -46,12 +66,11 @@ ::on-connect ::input-buff-size ::output-buff-size - ::handler ::idle-timeout] :or {input-buff-size 64 output-buff-size 64 idle-timeout 60000 - on-connect noop + on-connect identity on-snd-message identity-3 on-rcv-message identity-3} :as options}] @@ -61,91 +80,65 @@ (assert (fn? on-connect) "'on-connect' should be a function") (fn [{:keys [::yws/channel] :as request}] - (let [input-ch (a/chan input-buff-size) - output-ch (a/chan output-buff-size) - hbeat-ch (a/chan (a/sliding-buffer 6)) - close-ch (a/chan) - stop-ch (a/chan) + (let [input-ch (sp/chan :buf input-buff-size) + output-ch (sp/chan :buf output-buff-size) + hbeat-ch (sp/chan :buf (sp/sliding-buffer 6)) + close-ch (sp/chan) ip-addr (parse-client-ip request) uagent (yr/get-header request "user-agent") id (uuid/next) + state (atom {}) + beats (atom #{}) - options (-> (filter-options options) - (merge {::id id - ::created-at (dt/now) - ::input-ch input-ch - ::heartbeat-ch hbeat-ch - ::output-ch output-ch - ::close-ch close-ch - ::stop-ch stop-ch - ::channel channel - ::remote-addr ip-addr - ::user-agent uagent}) - (atom)) - - ;; call the on-connect hook and memoize the on-terminate instance - on-terminate (on-connect options) + options (-> options + (update ::handler wrap-handler) + (assoc ::id id) + (assoc ::state state) + (assoc ::beats beats) + (assoc ::created-at (dt/now)) + (assoc ::input-ch input-ch) + (assoc ::heartbeat-ch hbeat-ch) + (assoc ::output-ch output-ch) + (assoc ::close-ch close-ch) + (assoc ::channel channel) + (assoc ::remote-addr ip-addr) + (assoc ::user-agent uagent) + (on-connect)) on-ws-open (fn [channel] (l/trace :fn "on-ws-open" :conn-id id) - (yws/idle-timeout! channel (dt/duration idle-timeout))) + (let [timeout (dt/duration idle-timeout) + name (str "penpot/websocket/io-loop/" id)] + (yws/idle-timeout! channel timeout) + (px/fn->thread (partial start-io-loop! options) + {:name name :virtual true}))) on-ws-terminate (fn [_ code reason] - (l/trace :fn "on-ws-terminate" :conn-id id :code code :reason reason) - (a/close! close-ch)) + (l/trace :fn "on-ws-terminate" + :conn-id id + :code code + :reason reason) + (sp/close! close-ch)) on-ws-error - (fn [_ error] - (when-not (or (instance? java.nio.channels.ClosedChannelException error) - (instance? java.net.SocketException error) - (instance? java.io.IOException error)) - (l/error :fn "on-ws-error" :conn-id id - :hint (ex-message error) - :cause error)) - (on-ws-terminate nil 8801 "close after error")) + (fn [_ cause] + (sp/close! close-ch cause)) on-ws-message (fn [_ message] - (try - (let [message (on-rcv-message options message) - message (t/decode-str message)] - (a/offer! input-ch message) - (swap! options assoc ::last-activity-at (dt/now))) - (catch Throwable e - (l/warn :hint "error on decoding incoming message from websocket" - :wsmsg (pr-str message) - :cause e) - (a/>! close-ch [8802 "decode error"]) - (a/close! close-ch)))) + (sp/offer! input-ch message) + (swap! state assoc ::last-activity-at (dt/now))) on-ws-pong (fn [_ buffers] - (a/>!! hbeat-ch (yu/copy-many buffers)))] + ;; (l/trace :fn "on-ws-pong" :buffers (pr-str buffers)) + (sp/put! hbeat-ch (yu/copy-many buffers)))] - ;; Wait a close signal - (a/go - (let [[code reason] (a/= (count issued) max-missed-heartbeats)))) + +(defn- start-io-loop! + [{:keys [::id ::close-ch ::input-ch ::output-ch ::heartbeat-ch ::channel ::handler ::beats ::on-rcv-message ::on-snd-message] :as wsp}] + (px/thread + {:name (str "penpot/websocket/io-loop/" id) + :virtual true} (try - (yws/send! channel s (fn [e] - (when e (a/offer! ch e)) - (a/close! ch))) + (handler wsp {:type :open}) + (loop [i 0] + (let [ping-ch (sp/timeout-chan heartbeat-interval) + [msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])] + (when (yws/connected? channel) + (cond + (identical? p ping-ch) + (if (handle-ping! wsp i) + (recur (inc i)) + (yws/close! channel 8802 "missing to many pings")) + + (or (identical? p close-ch) (nil? msg)) + (do :nothing) + + (identical? p heartbeat-ch) + (let [beat (decode-beat msg)] + ;; (l/trace :hint "pong" :beat beat :conn-id id) + (swap! beats disj beat) + (recur i)) + + (identical? p input-ch) + (let [message (t/decode-str msg) + message (on-rcv-message message) + {:keys [request-id] :as response} (handler wsp message)] + (when (map? response) + (sp/put! output-ch + (cond-> response + (some? request-id) + (assoc :request-id request-id)))) + (recur i)) + + (identical? p output-ch) + (let [message (on-snd-message msg) + message (t/encode-str message {:type :json-verbose})] + ;; (l/trace :hint "writing message to output" :message msg) + (yws/send! channel message) + (recur i)))))) + + (catch java.nio.channels.ClosedChannelException _) + (catch java.net.SocketException _) + (catch java.io.IOException _) + + (catch InterruptedException _ + (l/debug :hint "websocket thread interrumpted" :conn-id id)) + (catch Throwable cause - (a/offer! ch cause) - (a/close! ch))) - ch)) + (l/error :hint "unhandled exception on websocket thread" + :conn-id id + :cause cause)) -(defn- ws-ping! - [channel s] - (let [ch (a/chan 1)] - (try - (yws/ping! channel s (fn [e] - (when e (a/offer! ch e)) - (a/close! ch))) - (catch Throwable cause - (a/offer! ch cause) - (a/close! ch))) - ch)) + (finally + (handler wsp {:type :close}) -(defn- encode-beat - [n] - (doto (ByteBuffer/allocate 8) - (.putLong n) - (.rewind))) + (when (yws/connected? channel) + ;; NOTE: we need to ignore all exceptions here because + ;; there can be a race condition that first returns that + ;; channel is connected but on closing, will raise that + ;; channel is already closed. + (ex/ignoring + (yws/close! channel 8899 "terminated"))) -(defn- decode-beat - [^ByteBuffer buffer] - (when (= 8 (.capacity buffer)) - (.rewind buffer) - (.getLong buffer))) + (when-let [on-disconnect (::on-disconnect wsp)] + (on-disconnect)) -(defn- wrap-handler - [handler] - (fn [wsp message] - (locking wsp - (handler wsp message)))) - -(def max-missed-heartbeats 3) -(def heartbeat-interval 5000) - -(defn- start-io-loop - [wsp handler on-snd-message on-ws-terminate] - (let [input-ch (::input-ch @wsp) - output-ch (::output-ch @wsp) - stop-ch (::stop-ch @wsp) - hbeat-pong-ch (::heartbeat-ch @wsp) - channel (::channel @wsp) - conn-id (::id @wsp) - handler (wrap-handler handler) - beats (atom #{}) - choices [stop-ch - input-ch - output-ch - hbeat-pong-ch]] - - ;; Start IO loop - (a/go - (a/= (count issued) max-missed-heartbeats) - (on-ws-terminate nil 8802 "heartbeat: timeout") - (recur (inc i))))) - - (= p hbeat-pong-ch) - (let [beat (decode-beat v)] - (l/trace :hint "pong" :beat beat :conn-id conn-id) - (swap! beats disj beat) - (recur i)) - - (= p input-ch) - (let [result (a/! output-ch {:type :error :error (ex-data result)}) - - (ex/exception? result) - (a/>! output-ch {:type :error :error {:message (ex-message result)}}) - - (map? result) - (a/>! output-ch (cond-> result (:request-id v) (assoc :request-id (:request-id v))))) - (recur i)) - - (= p output-ch) - (let [v (on-snd-message wsp v)] - ;; (l/trace :hint "writing message to output" :message v) - (a/