0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-10 14:51:37 -05:00

♻️ Refactor internal websocket connection stage management

This commit is contained in:
Andrey Antukh 2022-10-02 10:18:22 +02:00
parent 7dcd362abd
commit 8fae7f7aa6
4 changed files with 97 additions and 78 deletions

View file

@ -11,17 +11,22 @@
io.prometheus/simpleclient {:mvn/version "0.16.0"} io.prometheus/simpleclient {:mvn/version "0.16.0"}
io.prometheus/simpleclient_hotspot {:mvn/version "0.16.0"} io.prometheus/simpleclient_hotspot {:mvn/version "0.16.0"}
io.prometheus/simpleclient_jetty {:mvn/version "0.16.0" io.prometheus/simpleclient_jetty
:exclusions [org.eclipse.jetty/jetty-server {:mvn/version "0.16.0"
org.eclipse.jetty/jetty-servlet]} :exclusions [org.eclipse.jetty/jetty-server
org.eclipse.jetty/jetty-servlet]}
io.prometheus/simpleclient_httpserver {:mvn/version "0.16.0"} io.prometheus/simpleclient_httpserver {:mvn/version "0.16.0"}
io.lettuce/lettuce-core {:mvn/version "6.2.0.RELEASE"} io.lettuce/lettuce-core {:mvn/version "6.2.0.RELEASE"}
java-http-clj/java-http-clj {:mvn/version "0.4.3"} java-http-clj/java-http-clj {:mvn/version "0.4.3"}
funcool/yetti {:git/tag "v9.8" :git/sha "fbe1d7d" funcool/yetti
:git/url "https://github.com/funcool/yetti.git" {:git/tag "v9.9"
:exclusions [org.slf4j/slf4j-api]} :git/sha "f0a455d"
:git/url "https://github.com/funcool/yetti.git"
:exclusions [org.slf4j/slf4j-api]}
com.github.seancorfield/next.jdbc {:mvn/version "1.3.828"} com.github.seancorfield/next.jdbc {:mvn/version "1.3.828"}
metosin/reitit-core {:mvn/version "0.5.18"} metosin/reitit-core {:mvn/version "0.5.18"}
@ -34,8 +39,10 @@
buddy/buddy-sign {:mvn/version "3.4.333"} buddy/buddy-sign {:mvn/version "3.4.333"}
org.jsoup/jsoup {:mvn/version "1.15.1"} org.jsoup/jsoup {:mvn/version "1.15.1"}
org.im4java/im4java {:git/tag "1.4.0-penpot-2" :git/sha "e2b3e16" org.im4java/im4java
:git/url "https://github.com/penpot/im4java"} {:git/tag "1.4.0-penpot-2"
:git/sha "e2b3e16"
:git/url "https://github.com/penpot/im4java"}
org.lz4/lz4-java {:mvn/version "1.8.0"} org.lz4/lz4-java {:mvn/version "1.8.0"}

View file

@ -89,7 +89,7 @@
[id] [id]
(when-let [wsp (get @state id)] (when-let [wsp (get @state id)]
{:id id {:id id
:created-at (dt/instant id) :created-at (::created-at @wsp)
:profile-id (::profile-id @wsp) :profile-id (::profile-id @wsp)
:session-id (::session-id @wsp) :session-id (::session-id @wsp)
:user-agent (::ws/user-agent @wsp) :user-agent (::ws/user-agent @wsp)

View file

@ -10,6 +10,7 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.transit :as t] [app.common.transit :as t]
[app.common.uuid :as uuid]
[app.loggers.audit :refer [parse-client-ip]] [app.loggers.audit :refer [parse-client-ip]]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.core.async :as a] [clojure.core.async :as a]
@ -21,9 +22,7 @@
(declare decode-beat) (declare decode-beat)
(declare encode-beat) (declare encode-beat)
(declare process-heartbeat) (declare start-io-loop)
(declare process-input)
(declare process-output)
(declare ws-ping!) (declare ws-ping!)
(declare ws-send!) (declare ws-send!)
(declare filter-options) (declare filter-options)
@ -51,7 +50,7 @@
::idle-timeout] ::idle-timeout]
:or {input-buff-size 64 :or {input-buff-size 64
output-buff-size 64 output-buff-size 64
idle-timeout 30000 idle-timeout 60000
on-connect noop on-connect noop
on-snd-message identity-3 on-snd-message identity-3
on-rcv-message identity-3} on-rcv-message identity-3}
@ -64,17 +63,19 @@
(fn [{:keys [::yws/channel session-id] :as request}] (fn [{:keys [::yws/channel session-id] :as request}]
(let [input-ch (a/chan input-buff-size) (let [input-ch (a/chan input-buff-size)
output-ch (a/chan output-buff-size) output-ch (a/chan output-buff-size)
pong-ch (a/chan (a/sliding-buffer 6)) hbeat-ch (a/chan (a/sliding-buffer 6))
close-ch (a/chan) close-ch (a/chan)
stop-ch (a/chan) stop-ch (a/chan)
ip-addr (parse-client-ip request) ip-addr (parse-client-ip request)
uagent (yr/get-header request "user-agent") uagent (yr/get-header request "user-agent")
id (inst-ms (dt/now)) id (uuid/next)
options (-> (filter-options options) options (-> (filter-options options)
(merge {::id id (merge {::id id
::created-at (dt/now)
::input-ch input-ch ::input-ch input-ch
::heartbeat-ch hbeat-ch
::output-ch output-ch ::output-ch output-ch
::close-ch close-ch ::close-ch close-ch
::stop-ch stop-ch ::stop-ch stop-ch
@ -99,11 +100,13 @@
on-ws-error on-ws-error
(fn [_ error] (fn [_ error]
(a/close! close-ch)
(when-not (or (instance? java.nio.channels.ClosedChannelException error) (when-not (or (instance? java.nio.channels.ClosedChannelException error)
(instance? java.net.SocketException error) (instance? java.net.SocketException error)
(instance? java.io.IOException error)) (instance? java.io.IOException error))
(l/error :hint (ex-message error) :cause error))) (l/error :fn "on-ws-error" :conn-id id
:hint (ex-message error)
:cause error))
(on-ws-terminate nil 8801 "close after error"))
on-ws-message on-ws-message
(fn [_ message] (fn [_ message]
@ -116,23 +119,18 @@
(l/warn :hint "error on decoding incoming message from websocket" (l/warn :hint "error on decoding incoming message from websocket"
:wsmsg (pr-str message) :wsmsg (pr-str message)
:cause e) :cause e)
(a/>! close-ch [8801 "decode error"]) (a/>! close-ch [8802 "decode error"])
(a/close! close-ch)))) (a/close! close-ch))))
on-ws-pong on-ws-pong
(fn [_ buffers] (fn [_ buffers]
(a/>!! pong-ch (yu/copy-many buffers)))] (a/>!! hbeat-ch (yu/copy-many buffers)))]
;; Launch heartbeat process
(-> @options
(assoc ::pong-ch pong-ch)
(process-heartbeat))
;; Wait a close signal ;; Wait a close signal
(a/go (a/go
(let [[code reason] (a/<! close-ch)] (let [[code reason] (a/<! close-ch)]
(a/close! stop-ch) (a/close! stop-ch)
(a/close! pong-ch) (a/close! hbeat-ch)
(a/close! output-ch) (a/close! output-ch)
(a/close! input-ch) (a/close! input-ch)
@ -141,19 +139,14 @@
(yws/close! channel code reason)) (yws/close! channel code reason))
(when (fn? on-terminate) (when (fn? on-terminate)
(on-terminate)))) (on-terminate))
;; Forward all messages from output-ch to the websocket (l/trace :hint "connection terminated")))
;; connection
(a/go-loop []
(when-let [val (a/<! output-ch)]
(let [val (on-snd-message options val)]
(a/<! (ws-send! channel (t/encode-str val)))
(recur))))
;; React on messages received from the client ;; React on messages received from the client
(a/go
(process-input options handler) (a/<! (start-io-loop options handler on-snd-message on-ws-terminate))
(l/trace :hint "io loop terminated"))
{:on-open on-ws-open {:on-open on-ws-open
:on-error on-ws-error :on-error on-ws-error
@ -168,7 +161,7 @@
(yws/send! channel s (fn [e] (yws/send! channel s (fn [e]
(when e (a/offer! ch e)) (when e (a/offer! ch e))
(a/close! ch))) (a/close! ch)))
(catch java.io.IOException cause (catch Throwable cause
(a/offer! ch cause) (a/offer! ch cause)
(a/close! ch))) (a/close! ch)))
ch)) ch))
@ -178,9 +171,9 @@
(let [ch (a/chan 1)] (let [ch (a/chan 1)]
(try (try
(yws/ping! channel s (fn [e] (yws/ping! channel s (fn [e]
(when e (a/offer! ch e)) (when e (a/offer! ch e))
(a/close! ch))) (a/close! ch)))
(catch java.io.IOException cause (catch Throwable cause
(a/offer! ch cause) (a/offer! ch cause)
(a/close! ch))) (a/close! ch)))
ch)) ch))
@ -203,51 +196,71 @@
(locking wsp (locking wsp
(handler wsp message)))) (handler wsp message))))
(defn- process-input (def max-missed-heartbeats 3)
[wsp handler] (def heartbeat-interval 5000)
(let [{:keys [::input-ch ::output-ch ::stop-ch]} @wsp
handler (wrap-handler handler)] (defn- start-io-loop
[wsp handler on-snd-message on-ws-terminate]
(let [input-ch (::input-ch @wsp)
output-ch (::output-ch @wsp)
stop-ch (::stop-ch @wsp)
hbeat-pong-ch (::heartbeat-ch @wsp)
channel (::channel @wsp)
conn-id (::id @wsp)
handler (wrap-handler handler)
beats (atom #{})
choices [stop-ch
input-ch
output-ch
hbeat-pong-ch]]
;; Start IO loop
(a/go (a/go
(a/<! (handler wsp {:type :connect})) (a/<! (handler wsp {:type :connect}))
(a/<! (a/go-loop [] (a/<! (a/go-loop [i 0]
(when-let [message (a/<! input-ch)] (let [hbeat-ping-ch (a/timeout heartbeat-interval)
(let [[val port] (a/alts! [stop-ch (handler wsp message)] :priority true)] [v p] (a/alts! (conj choices hbeat-ping-ch))]
(when-not (= port stop-ch) (cond
(not (yws/connected? channel))
(on-ws-terminate nil 8800 "channel disconnected")
(= p hbeat-ping-ch)
(do
(l/trace :hint "ping" :beat i :conn-id conn-id)
(a/<! (ws-ping! channel (encode-beat i)))
(let [issued (swap! beats conj (long i))]
(if (>= (count issued) max-missed-heartbeats)
(on-ws-terminate nil 8802 "heartbeat: timeout")
(recur (inc i)))))
(= p hbeat-pong-ch)
(let [beat (decode-beat v)]
(l/trace :hint "pong" :beat beat :conn-id conn-id)
(swap! beats disj beat)
(recur i))
(= p input-ch)
(let [result (a/<! (handler wsp v))]
;; (l/trace :hint "message received" :message v)
(cond (cond
(ex/ex-info? val) (ex/ex-info? result)
(a/>! output-ch {:type :error :error (ex-data val)}) (a/>! output-ch {:type :error :error (ex-data result)})
(ex/exception? val) (ex/exception? result)
(a/>! output-ch {:type :error :error {:message (ex-message val)}}) (a/>! output-ch {:type :error :error {:message (ex-message result)}})
(map? result)
(a/>! output-ch (cond-> result (:request-id v) (assoc :request-id (:request-id v)))))
(recur i))
(= p output-ch)
(let [v (on-snd-message wsp v)]
;; (l/trace :hint "writing message to output" :message v)
(a/<! (ws-send! channel (t/encode-str v)))
(recur i))))))
(map? val)
(a/>! output-ch (cond-> val (:request-id message) (assoc :request-id (:request-id message)))))
(recur))))))
(a/<! (handler wsp {:type :disconnect}))))) (a/<! (handler wsp {:type :disconnect})))))
(defn- process-heartbeat
[{:keys [::channel ::stop-ch ::close-ch ::pong-ch
::heartbeat-interval ::max-missed-heartbeats]
:or {heartbeat-interval 2000
max-missed-heartbeats 4}}]
(let [beats (atom #{})]
(a/go-loop [i 0]
(let [[_ port] (a/alts! [stop-ch (a/timeout heartbeat-interval)] :priority true)]
(when (and (yws/connected? channel)
(not= port stop-ch))
(a/<! (ws-ping! channel (encode-beat i)))
(let [issued (swap! beats conj (long i))]
(if (>= (count issued) max-missed-heartbeats)
(do
(a/>! close-ch [8802 "heart-beat timeout"])
(a/close! close-ch))
(recur (inc i)))))))
(a/go-loop []
(when-let [buffer (a/<! pong-ch)]
(swap! beats disj (decode-beat buffer))
(recur)))))
(defn- filter-options (defn- filter-options
"Remove from options all namespace qualified keys that matches the "Remove from options all namespace qualified keys that matches the
current namespace." current namespace."

View file

@ -142,7 +142,6 @@ RUN set -ex; \
apt-get -qqy install postgresql-client-14; \ apt-get -qqy install postgresql-client-14; \
rm -rf /var/lib/apt/lists/*; rm -rf /var/lib/apt/lists/*;
RUN set -eux; \ RUN set -eux; \
ARCH="$(dpkg --print-architecture)"; \ ARCH="$(dpkg --print-architecture)"; \
case "${ARCH}" in \ case "${ARCH}" in \