0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-18 21:06:11 -05:00

♻️ Reimplement websockets using streams.

This commit is contained in:
Andrey Antukh 2020-02-08 15:39:26 +01:00
parent 044f8487e2
commit 0cc5c7f7bb
6 changed files with 169 additions and 67 deletions

View file

@ -23,6 +23,7 @@
[vertx.util :as vu] [vertx.util :as vu]
[vertx.timers :as vt] [vertx.timers :as vt]
[vertx.web :as vw] [vertx.web :as vw]
[vertx.stream :as vs]
[vertx.web.websockets :as ws]) [vertx.web.websockets :as ws])
(:import (:import
java.lang.AutoCloseable java.lang.AutoCloseable
@ -40,9 +41,10 @@
(atom {})) (atom {}))
(defn send! (defn send!
[ws message] [{:keys [output] :as ws} message]
(ws/send! ws (-> (t/encode message) (let [msg (-> (t/encode message)
(t/bytes->str)))) (t/bytes->str))]
(vs/put! output msg)))
(defmulti handle-message (defmulti handle-message
(fn [ws message] (:type message))) (fn [ws message] (:type message)))
@ -52,14 +54,14 @@
(let [local (swap! state assoc-in [file-id user-id] ws) (let [local (swap! state assoc-in [file-id user-id] ws)
sessions (get local file-id) sessions (get local file-id)
message {:type :who :users (set (keys sessions))}] message {:type :who :users (set (keys sessions))}]
(run! #(send! % message) (vals sessions)))) (p/run! #(send! % message) (vals sessions))))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
[{:keys [user-id] :as ws} {:keys [file-id] :as message}] [{:keys [user-id] :as ws} {:keys [file-id] :as message}]
(let [local (swap! state update file-id dissoc user-id) (let [local (swap! state update file-id dissoc user-id)
sessions (get local file-id) sessions (get local file-id)
message {:type :who :users (set (keys sessions))}] message {:type :who :users (set (keys sessions))}]
(run! #(send! % message) (vals sessions)))) (p/run! #(send! % message) (vals sessions))))
(defmethod handle-message :who (defmethod handle-message :who
[{:keys [file-id] :as ws} message] [{:keys [file-id] :as ws} message]
@ -71,7 +73,7 @@
(let [sessions (->> (vals (get @state file-id)) (let [sessions (->> (vals (get @state file-id))
(remove #(= user-id (:user-id %)))) (remove #(= user-id (:user-id %))))
message (assoc message :user-id user-id)] message (assoc message :user-id user-id)]
(run! #(send! % message) sessions))) (p/run! #(send! % message) sessions)))
(defn- on-eventbus-message (defn- on-eventbus-message
[{:keys [file-id user-id] :as ws} {:keys [body] :as message}] [{:keys [file-id user-id] :as ws} {:keys [body] :as message}]
@ -85,7 +87,7 @@
;; --- Handler ;; --- Handler
(defn- on-init (defn- on-init
[req ws] [ws req]
(let [ctx (vu/current-context) (let [ctx (vu/current-context)
file-id (get-in req [:path-params :file-id]) file-id (get-in req [:path-params :file-id])
user-id (:user req) user-id (:user req)
@ -94,11 +96,11 @@
:file-id file-id) :file-id file-id)
send-ping #(send! ws {:type :ping}) send-ping #(send! ws {:type :ping})
sem1 (start-eventbus-consumer! ctx ws file-id) sem1 (start-eventbus-consumer! ctx ws file-id)
sem2 (vt/schedule-periodic! ctx 30000 send-ping)] sem2 (vt/schedule-periodic! ctx 5000 send-ping)]
(handle-message ws {:type :connect}) (handle-message ws {:type :connect})
(assoc ws ::sem1 sem1 ::sem2 sem2))) (p/resolved (assoc ws ::sem1 sem1 ::sem2 sem2))))
(defn- on-text-message (defn- on-message
[ws message] [ws message]
(->> (t/str->bytes message) (->> (t/str->bytes message)
(t/decode) (t/decode)
@ -109,13 +111,38 @@
(let [file-id (:file-id ws)] (let [file-id (:file-id ws)]
(handle-message ws {:type :disconnect (handle-message ws {:type :disconnect
:file-id file-id}) :file-id file-id})
(.close ^AutoCloseable (::sem1 ws)) (when-let [sem1 (::sem1 ws)]
(.close ^AutoCloseable (::sem2 ws)))) (.close ^AutoCloseable sem1))
(when-let [sem2 (::sem2 ws)]
(.close ^AutoCloseable sem2))))
(defn- rcv-loop
[{:keys [input] :as ws}]
(vs/loop []
(-> (vs/take! input)
(p/then (fn [message]
(when message
(p/do! (on-message ws message)
(p/recur))))))))
(defn- log-error
[err]
(log/error "Unexpected exception on websocket handler:\n"
(with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*)))))
(defn websocket-handler
[req ws]
(p/let [ws (on-init ws req)]
(-> (rcv-loop ws)
(p/finally (fn [_ error]
(.close ^AutoCloseable ws)
(on-close ws)
(when error
(log-error error)))))))
(defn handler (defn handler
[{:keys [user] :as req}] [{:keys [user] :as req}]
(ws/websocket :on-init (partial on-init req) (ws/websocket :handler (partial websocket-handler req)
:on-text-message on-text-message
;; :on-error on-error ;; :on-error on-error
:on-close on-close)) ))

View file

@ -3,6 +3,7 @@
funcool/promesa {:mvn/version "5.0.0"} funcool/promesa {:mvn/version "5.0.0"}
metosin/reitit-core {:mvn/version "0.3.10"} metosin/reitit-core {:mvn/version "0.3.10"}
metosin/sieppari {:mvn/version "0.0.0-alpha8"} metosin/sieppari {:mvn/version "0.0.0-alpha8"}
org.clojure/core.async {:mvn/version "0.7.559"}
io.vertx/vertx-core {:mvn/version "4.0.0-milestone4"} io.vertx/vertx-core {:mvn/version "4.0.0-milestone4"}
io.vertx/vertx-web {:mvn/version "4.0.0-milestone4"} io.vertx/vertx-web {:mvn/version "4.0.0-milestone4"}
io.vertx/vertx-web-client {:mvn/version "4.0.0-milestone4"}} io.vertx/vertx-web-client {:mvn/version "4.0.0-milestone4"}}

View file

@ -0,0 +1,70 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019-2020 Andrey Antukh <niwi@niwi.nz>
(ns vertx.stream
"A stream abstraction on top of core.async with awareness of vertx
execution context."
(:refer-clojure :exclude [loop])
(:require
[clojure.spec.alpha :as s]
[clojure.core.async :as a]
[clojure.core :as c]
[promesa.core :as p]
[vertx.impl :as impl]
[vertx.util :as vu]))
;; --- Streams
(defmacro loop
[& args]
`(let [ctx# (vu/current-context)]
(binding [p/*loop-run-fn* #(vu/run-on-context! ctx# %)]
(p/loop ~@args))))
(defn stream
([] (a/chan))
([b] (a/chan b))
([b c] (a/chan b c))
([b c e] (a/chan b c e)))
(defn take!
[c]
(let [d (p/deferred)
ctx (vu/current-context)]
(a/take! c (fn [res]
(vu/run-on-context! ctx #(p/resolve! d res))))
d))
(defn poll!
[c]
(a/poll! c))
(defn put!
[c v]
(let [d (p/deferred)
ctx (vu/current-context)]
(a/put! c v (fn [res]
(vu/run-on-context! ctx #(p/resolve! d res))))
d))
(defn offer!
[c v]
(a/offer! c v))
(defn alts!
([ports] (alts! ports {}))
([ports opts]
(let [d (p/deferred)
ctx (vu/current-context)
deliver #(vu/run-on-context! ctx (fn [] (p/resolve! d %)))
ret (a/do-alts deliver ports opts)]
(if ret
(p/resolved @ret)
d))))
(defn close!
[c]
(a/close! c))

View file

@ -103,6 +103,7 @@
(handle [_ v'] (handle [_ v']
(f))))) (f)))))
(defmacro loop (defmacro loop
[& args] [& args]
`(let [ctx# (current-context)] `(let [ctx# (current-context)]

View file

@ -13,6 +13,7 @@
[vertx.web :as vw] [vertx.web :as vw]
[vertx.impl :as vi] [vertx.impl :as vi]
[vertx.util :as vu] [vertx.util :as vu]
[vertx.stream :as vs]
[vertx.eventbus :as ve]) [vertx.eventbus :as ve])
(:import (:import
java.lang.AutoCloseable java.lang.AutoCloseable
@ -25,31 +26,29 @@
io.vertx.core.http.HttpServerResponse io.vertx.core.http.HttpServerResponse
io.vertx.core.http.ServerWebSocket)) io.vertx.core.http.ServerWebSocket))
(defprotocol IWebSocket (defrecord WebSocket [conn input output]
(send! [it message]))
(defrecord WebSocket [conn]
AutoCloseable AutoCloseable
(close [it] (close [it]
(.close ^ServerWebSocket conn)) (vs/close! input)
(vs/close! output)))
IWebSocket (defn- write-to-websocket
(send! [it message] [conn message]
(let [d (p/deferred)] (let [d (p/deferred)]
(cond (cond
(string? message) (string? message)
(.writeTextMessage ^ServerWebSocket conn (.writeTextMessage ^ServerWebSocket conn
^String message ^String message
^Handler (vi/deferred->handler d))
(instance? Buffer message)
(.writeBinaryMessage ^ServerWebSocket conn
^Buffer message
^Handler (vi/deferred->handler d)) ^Handler (vi/deferred->handler d))
(instance? Buffer message) :else
(.writeBinaryMessage ^ServerWebSocket conn (p/reject! (ex-info "invalid message type" {:message message})))
^Buffer message d))
^Handler (vi/deferred->handler d))
:else
(p/reject! (ex-info "invalid message type" {:message message})))
d)))
(defn- default-on-error (defn- default-on-error
[ws err] [ws err]
@ -58,46 +57,50 @@
(.printStackTrace err (java.io.PrintWriter. *out*)))) (.printStackTrace err (java.io.PrintWriter. *out*))))
(.close ^AutoCloseable ws)) (.close ^AutoCloseable ws))
(defrecord WebSocketResponse [on-init on-text-message on-error on-close] (defrecord WebSocketResponse [handler on-error]
vh/IAsyncResponse vh/IAsyncResponse
(-handle-response [it ctx] (-handle-response [it request]
(let [^HttpServerRequest req (::vh/request ctx) (let [^HttpServerRequest req (::vh/request request)
^ServerWebSocket conn (.upgrade req) ^ServerWebSocket conn (.upgrade req)
wsref (volatile! (->WebSocket conn)) inp-s (vs/stream 64)
out-s (vs/stream 64)
impl-on-error (fn [e] (on-error @wsref e)) ctx (vu/current-context)
impl-on-close (fn [_] (on-close @wsref)) ws (->WebSocket conn inp-s out-s)
impl-on-error
(fn [e] (on-error ws e))
impl-on-close
(fn [_]
(vs/close! inp-s)
(vs/close! out-s))
impl-on-message impl-on-message
(fn [message] (fn [message]
(-> (p/do! (on-text-message @wsref message)) (when-not (vs/offer! inp-s message)
(p/finally (fn [res err] (.pause conn)
(if err (prn "BUFF")
(impl-on-error err) (-> (vs/put! inp-s message)
(do (p/then' (fn [res]
(.fetch conn 1) (when-not (false? res)
(when (instance? WebSocket res) (.resume conn)))))))]
(vreset! wsref res))))))))]
(-> (p/do! (on-init @wsref)) (.exceptionHandler conn (vi/fn->handler impl-on-error))
(p/finally (fn [data error] (.textMessageHandler conn (vi/fn->handler impl-on-message))
(cond (.closeHandler conn (vi/fn->handler impl-on-close))
(not (nil? error))
(impl-on-error error)
(instance? WebSocket data) (vs/loop []
(do (p/let [msg (vs/take! out-s)]
(vreset! wsref data) (when-not (nil? msg)
(.exceptionHandler conn (vi/fn->handler impl-on-error)) (p/do!
(.textMessageHandler conn (vi/fn->handler impl-on-message)) (write-to-websocket conn msg)
(.closeHandler conn (vi/fn->handler impl-on-close))) (p/recur)))))
:else (vu/run-on-context! ctx #(handler ws)))))
(.reject conn)))))
nil)))
(defn websocket (defn websocket
[& {:keys [on-init on-text-message on-error on-close] [& {:keys [handler on-error]
:or {on-error default-on-error}}] :or {on-error default-on-error}}]
(->WebSocketResponse on-init on-text-message on-error on-close)) (->WebSocketResponse handler on-error))

View file

@ -98,7 +98,7 @@
(->> stream (->> stream
(rx/filter ms/pointer-event?) (rx/filter ms/pointer-event?)
(rx/sample 150) (rx/sample 50)
(rx/map #(handle-pointer-send file-id (:pt %))))) (rx/map #(handle-pointer-send file-id (:pt %)))))
(rx/take-until stoper)))))) (rx/take-until stoper))))))