From 8fae7f7aa673b6a58b53bf0fcdfd20a996efa5f4 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sun, 2 Oct 2022 10:18:22 +0200 Subject: [PATCH] :recycle: Refactor internal websocket connection stage management --- backend/deps.edn | 23 +++-- backend/src/app/http/websocket.clj | 2 +- backend/src/app/util/websocket.clj | 149 ++++++++++++++++------------- docker/devenv/Dockerfile | 1 - 4 files changed, 97 insertions(+), 78 deletions(-) diff --git a/backend/deps.edn b/backend/deps.edn index e08b9e422..b18dfd124 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -11,17 +11,22 @@ io.prometheus/simpleclient {:mvn/version "0.16.0"} io.prometheus/simpleclient_hotspot {:mvn/version "0.16.0"} - io.prometheus/simpleclient_jetty {:mvn/version "0.16.0" - :exclusions [org.eclipse.jetty/jetty-server - org.eclipse.jetty/jetty-servlet]} + io.prometheus/simpleclient_jetty + {:mvn/version "0.16.0" + :exclusions [org.eclipse.jetty/jetty-server + org.eclipse.jetty/jetty-servlet]} + + io.prometheus/simpleclient_httpserver {:mvn/version "0.16.0"} io.lettuce/lettuce-core {:mvn/version "6.2.0.RELEASE"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/yetti {:git/tag "v9.8" :git/sha "fbe1d7d" - :git/url "https://github.com/funcool/yetti.git" - :exclusions [org.slf4j/slf4j-api]} + funcool/yetti + {:git/tag "v9.9" + :git/sha "f0a455d" + :git/url "https://github.com/funcool/yetti.git" + :exclusions [org.slf4j/slf4j-api]} com.github.seancorfield/next.jdbc {:mvn/version "1.3.828"} metosin/reitit-core {:mvn/version "0.5.18"} @@ -34,8 +39,10 @@ buddy/buddy-sign {:mvn/version "3.4.333"} org.jsoup/jsoup {:mvn/version "1.15.1"} - org.im4java/im4java {:git/tag "1.4.0-penpot-2" :git/sha "e2b3e16" - :git/url "https://github.com/penpot/im4java"} + org.im4java/im4java + {:git/tag "1.4.0-penpot-2" + :git/sha "e2b3e16" + :git/url "https://github.com/penpot/im4java"} org.lz4/lz4-java {:mvn/version "1.8.0"} diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index 154319378..7e16d74f6 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -89,7 +89,7 @@ [id] (when-let [wsp (get @state id)] {:id id - :created-at (dt/instant id) + :created-at (::created-at @wsp) :profile-id (::profile-id @wsp) :session-id (::session-id @wsp) :user-agent (::ws/user-agent @wsp) diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index f16b4eddc..07dc5a52a 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -10,6 +10,7 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.transit :as t] + [app.common.uuid :as uuid] [app.loggers.audit :refer [parse-client-ip]] [app.util.time :as dt] [clojure.core.async :as a] @@ -21,9 +22,7 @@ (declare decode-beat) (declare encode-beat) -(declare process-heartbeat) -(declare process-input) -(declare process-output) +(declare start-io-loop) (declare ws-ping!) (declare ws-send!) (declare filter-options) @@ -51,7 +50,7 @@ ::idle-timeout] :or {input-buff-size 64 output-buff-size 64 - idle-timeout 30000 + idle-timeout 60000 on-connect noop on-snd-message identity-3 on-rcv-message identity-3} @@ -64,17 +63,19 @@ (fn [{:keys [::yws/channel session-id] :as request}] (let [input-ch (a/chan input-buff-size) output-ch (a/chan output-buff-size) - pong-ch (a/chan (a/sliding-buffer 6)) + hbeat-ch (a/chan (a/sliding-buffer 6)) close-ch (a/chan) stop-ch (a/chan) ip-addr (parse-client-ip request) uagent (yr/get-header request "user-agent") - id (inst-ms (dt/now)) + id (uuid/next) options (-> (filter-options options) (merge {::id id + ::created-at (dt/now) ::input-ch input-ch + ::heartbeat-ch hbeat-ch ::output-ch output-ch ::close-ch close-ch ::stop-ch stop-ch @@ -99,11 +100,13 @@ on-ws-error (fn [_ error] - (a/close! close-ch) (when-not (or (instance? java.nio.channels.ClosedChannelException error) (instance? java.net.SocketException error) (instance? java.io.IOException error)) - (l/error :hint (ex-message error) :cause error))) + (l/error :fn "on-ws-error" :conn-id id + :hint (ex-message error) + :cause error)) + (on-ws-terminate nil 8801 "close after error")) on-ws-message (fn [_ message] @@ -116,23 +119,18 @@ (l/warn :hint "error on decoding incoming message from websocket" :wsmsg (pr-str message) :cause e) - (a/>! close-ch [8801 "decode error"]) + (a/>! close-ch [8802 "decode error"]) (a/close! close-ch)))) on-ws-pong (fn [_ buffers] - (a/>!! pong-ch (yu/copy-many buffers)))] - - ;; Launch heartbeat process - (-> @options - (assoc ::pong-ch pong-ch) - (process-heartbeat)) + (a/>!! hbeat-ch (yu/copy-many buffers)))] ;; Wait a close signal (a/go (let [[code reason] (a/= (count issued) max-missed-heartbeats) + (on-ws-terminate nil 8802 "heartbeat: timeout") + (recur (inc i))))) + + (= p hbeat-pong-ch) + (let [beat (decode-beat v)] + (l/trace :hint "pong" :beat beat :conn-id conn-id) + (swap! beats disj beat) + (recur i)) + + (= p input-ch) + (let [result (a/! output-ch {:type :error :error (ex-data val)}) + (ex/ex-info? result) + (a/>! output-ch {:type :error :error (ex-data result)}) - (ex/exception? val) - (a/>! output-ch {:type :error :error {:message (ex-message val)}}) + (ex/exception? result) + (a/>! output-ch {:type :error :error {:message (ex-message result)}}) + + (map? result) + (a/>! output-ch (cond-> result (:request-id v) (assoc :request-id (:request-id v))))) + (recur i)) + + (= p output-ch) + (let [v (on-snd-message wsp v)] + ;; (l/trace :hint "writing message to output" :message v) + (a/! output-ch (cond-> val (:request-id message) (assoc :request-id (:request-id message))))) - (recur)))))) (a/= (count issued) max-missed-heartbeats) - (do - (a/>! close-ch [8802 "heart-beat timeout"]) - (a/close! close-ch)) - (recur (inc i))))))) - - (a/go-loop [] - (when-let [buffer (a/