From 9d93b0d3fba35f85c14fdf26d3f812f556271eea Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 18 Dec 2019 18:15:25 +0100 Subject: [PATCH] :construction: Initial work on websocket communication. --- backend/src/uxbox/http.clj | 2 +- backend/src/uxbox/http/interceptors.clj | 6 +- backend/src/uxbox/http/ws.clj | 78 ++++++++++++++----- backend/src/uxbox/util/blob.clj | 34 +------- backend/src/uxbox/util/transit.clj | 42 ++++++---- frontend/src/uxbox/main/data/workspace.cljs | 32 ++++++++ frontend/src/uxbox/main/ui/workspace.cljs | 6 ++ .../src/uxbox/main/ui/workspace/viewport.cljs | 43 +++++++++- frontend/src/uxbox/main/websockets.cljs | 51 ++++++++++++ 9 files changed, 219 insertions(+), 75 deletions(-) create mode 100644 frontend/src/uxbox/main/websockets.cljs diff --git a/backend/src/uxbox/http.clj b/backend/src/uxbox/http.clj index 7c78ecc17..ac80b9cdb 100644 --- a/backend/src/uxbox/http.clj +++ b/backend/src/uxbox/http.clj @@ -44,7 +44,7 @@ interceptors/format-response-body (vxi/errors errors/handle)] - routes [["/sub/:page-id" {:interceptors [(vxi/cookies) + routes [["/sub/:file-id" {:interceptors [(vxi/cookies) (vxi/cors cors-opts) (session/auth)] :get ws/handler}] diff --git a/backend/src/uxbox/http/interceptors.clj b/backend/src/uxbox/http/interceptors.clj index 4d27697eb..45a33aef3 100644 --- a/backend/src/uxbox/http/interceptors.clj +++ b/backend/src/uxbox/http/interceptors.clj @@ -7,7 +7,7 @@ (ns uxbox.http.interceptors (:require [vertx.web :as vw] - [uxbox.util.blob :as blob] + [uxbox.util.transit :as t] [uxbox.util.exceptions :as ex]) (:import io.vertx.ext.web.RoutingContext @@ -20,7 +20,7 @@ mtype (get-in request [:headers "content-type"])] (if (= "application/transit+json" mtype) (try - (let [params (blob/decode-from-json body)] + (let [params (t/decode (t/buffer->bytes body))] (update data :request assoc :body-params params)) (catch Exception e (ex/raise :type :parse @@ -35,7 +35,7 @@ (coll? body) (-> data (assoc-in [:response :body] - (blob/encode-with-json body true)) + (t/bytes->buffer (t/encode body))) (update-in [:response :headers] assoc "content-type" "application/transit+json")) diff --git a/backend/src/uxbox/http/ws.clj b/backend/src/uxbox/http/ws.clj index 321278fa9..4e8e4b55b 100644 --- a/backend/src/uxbox/http/ws.clj +++ b/backend/src/uxbox/http/ws.clj @@ -7,6 +7,7 @@ (ns uxbox.http.ws "Web Socket handlers" (:require + [clojure.tools.logging :as log] [promesa.core :as p] [uxbox.emails :as emails] [uxbox.http.session :as session] @@ -14,6 +15,7 @@ [uxbox.services.mutations :as sm] [uxbox.services.queries :as sq] [uxbox.util.uuid :as uuid] + [uxbox.util.transit :as t] [uxbox.util.blob :as blob] [vertx.http :as vh] [vertx.web :as vw] @@ -31,41 +33,69 @@ (declare ws-websocket) (declare ws-send!) -(declare ws-on-message!) -(declare ws-on-close!) -;; --- Public API +;; --- State Management -(declare on-message) -(declare on-close) -(declare on-eventbus-message) +(defonce state + (atom {})) -(def state (atom {})) +(defn send! + [ws message] + (ws-send! ws (-> (t/encode message) + (t/bytes->str)))) + +(defmulti handle-message + (fn [ws message] (:type message))) + +(defmethod handle-message :connect + [ws {:keys [file-id user-id] :as message}] + (let [local (swap! state assoc-in [file-id user-id] ws) + sessions (get local file-id) + message {:type :who :users (set (keys sessions))}] + (run! #(send! % message) (vals sessions)))) + +(defmethod handle-message :disconnect + [{:keys [user-id] :as ws} {:keys [file-id] :as message}] + (swap! state update file-id dissoc user-id) + nil) + +(defmethod handle-message :who + [{:keys [file-id] :as ws} message] + (let [users (keys (get @state file-id))] + (send! ws {:type :who :users (set users)}))) + +;; --- Handler + +(declare start-eventbus-consumer!) (defn handler [{:keys [user] :as req}] (letfn [(on-init [ws] (let [vsm (::vw/execution-context req) - tpc "test.foobar" - pid (get-in req [:path-params :page-id]) - sem (ve/consumer vsm tpc #(on-eventbus-message ws %2))] - (swap! state update pid (fnil conj #{}) user) - (assoc ws ::sem sem))) + fid (get-in req [:path-params :file-id]) + sem (start-eventbus-consumer! vsm ws fid)] + + (handle-message ws {:type :connect :file-id fid :user-id user}) + (assoc ws + ::sem sem + :user-id user + :file-id fid))) (on-message [ws message] - (let [pid (get-in req [:path-params :page-id])] - (ws-send! ws (str (::counter ws 0))) - (update ws ::counter (fnil inc 0)))) + (try + (->> (t/str->bytes message) + (t/decode) + (handle-message ws)) + (catch Throwable err + (log/error "Unexpected exception:\n" + (with-out-str + (.printStackTrace err (java.io.PrintWriter. *out*))))))) (on-close [ws] - (let [pid (get-in req [:path-params :page-id])] - (swap! state update pid disj user) + (let [fid (get-in req [:path-params :file-id])] + (handle-message ws {:type :disconnect :file-id fid}) (.unregister (::sem ws))))] - ;; (ws-websocket :on-init on-init - ;; :on-message on-message - ;; :on-close on-close))) - (-> (ws-websocket) (assoc :on-init on-init :on-message on-message @@ -73,8 +103,14 @@ (defn- on-eventbus-message [ws {:keys [body] :as message}] + ;; TODO (ws-send! ws body)) +(defn- start-eventbus-consumer! + [vsm ws fid] + (let [topic (str "internal.uxbox.file." fid)] + (ve/consumer vsm topic #(on-eventbus-message ws %2)))) + ;; --- Internal (vertx api) (experimental) (defrecord WebSocket [on-init on-message on-close] diff --git a/backend/src/uxbox/util/blob.clj b/backend/src/uxbox/util/blob.clj index 6d467fdb9..1fcb0f023 100644 --- a/backend/src/uxbox/util/blob.clj +++ b/backend/src/uxbox/util/blob.clj @@ -29,38 +29,6 @@ String (->bytes [data] (.getBytes ^String data "UTF-8"))) -(defn str->bytes - "Convert string to byte array." - ([^String s] - (str->bytes s "UTF-8")) - ([^String s, ^String encoding] - (.getBytes s encoding))) - -(defn bytes->str - "Convert byte array to String." - ([^bytes data] - (bytes->str data "UTF-8")) - ([^bytes data, ^String encoding] - (String. data encoding))) - -(defn buffer - [^bytes data] - (Buffer/buffer data)) - -(defn encode-with-json - "A function used for encode data for transfer it to frontend." - ([data] (encode-with-json data false)) - ([data verbose?] - (let [type (if verbose? :json-verbose :json)] - (-> (t/encode data {:type type}) - (Buffer/buffer))))) - -(defn decode-from-json - "A function used for parse data coming from frontend." - [data] - (-> (->bytes data) - (t/decode {:type :json}))) - (defn encode "A function used for encode data for persist in the database." [data] @@ -73,7 +41,7 @@ (.writeInt dos (int data-len)) (.write dos ^bytes cdata (int 0) (alength cdata)) (-> (.toByteArray baos) - (buffer))))) + (t/bytes->buffer))))) (declare decode-v1) diff --git a/backend/src/uxbox/util/transit.clj b/backend/src/uxbox/util/transit.clj index f97bec799..bc76d4a3d 100644 --- a/backend/src/uxbox/util/transit.clj +++ b/backend/src/uxbox/util/transit.clj @@ -57,20 +57,8 @@ ([data] (decode data nil)) ([data opts] - (cond - (instance? Buffer data) - (decode (.getBytes ^Buffer data) opts) - - (bytes? data) - (with-open [input (ByteArrayInputStream. data)] - (read! (reader input opts))) - - (string? data) - (decode (.getBytes data "UTF-8") opts) - - :else - (with-open [input (io/input-stream data)] - (read! (reader input opts)))))) + (with-open [input (ByteArrayInputStream. ^bytes data)] + (read! (reader input opts))))) (defn encode ([data] @@ -80,3 +68,29 @@ (let [w (writer out opts)] (write! w data) (.toByteArray out))))) + +;; --- Helpers + +(defn str->bytes + "Convert string to byte array." + ([^String s] + (str->bytes s "UTF-8")) + ([^String s, ^String encoding] + (.getBytes s encoding))) + +(defn bytes->str + "Convert byte array to String." + ([^bytes data] + (bytes->str data "UTF-8")) + ([^bytes data, ^String encoding] + (String. data encoding))) + +(defn bytes->buffer + [^bytes data] + (Buffer/buffer data)) + +(defn buffer->bytes + [^Buffer data] + (.getBytes data)) + + diff --git a/frontend/src/uxbox/main/data/workspace.cljs b/frontend/src/uxbox/main/data/workspace.cljs index 14d3654db..04e946b31 100644 --- a/frontend/src/uxbox/main/data/workspace.cljs +++ b/frontend/src/uxbox/main/data/workspace.cljs @@ -12,6 +12,7 @@ [uxbox.config :as cfg] [uxbox.common.data :as d] [uxbox.common.pages :as cp] + [uxbox.main.websockets :as ws] [uxbox.main.constants :as c] [uxbox.main.data.icons :as udi] [uxbox.main.data.pages :as udp] @@ -28,6 +29,7 @@ [uxbox.util.perf :as perf] [uxbox.util.router :as rt] [uxbox.util.spec :as us] + [uxbox.util.transit :as t] [uxbox.util.time :as dt] [uxbox.util.uuid :as uuid])) @@ -179,6 +181,7 @@ (->> (rx/filter (ptk/type? ::initialized) stream) (rx/take 1) (rx/mapcat #(rx/of watch-page-changes))))) + ptk/EffectEvent (effect [_ state stream] ;; Optimistic prefetch of projects if them are not already fetched @@ -200,6 +203,35 @@ :workspace-data data :workspace-page page))))) +;; --- Initialize WebSocket + +(defn initialize-websocket + [file-id] + (ptk/reify ::initialize-websocket + ptk/UpdateEvent + (update [_ state] + (prn "initialize-websocket$update" file-id) + (let [uri (str "ws://localhost:6060/sub/" file-id)] + (assoc-in state [::ws file-id] (ws/open uri)))) + + ptk/WatchEvent + (watch [_ state stream] + (prn "initialize-websocket$watch" file-id) + (->> (ws/-stream (get-in state [::ws file-id])) + (rx/filter #(= :message (:type %))) + (rx/map :payload) + (rx/map t/decode) + (rx/tap #(js/console.log "ws-message" file-id %)) + (rx/ignore))))) + +(defn finalize-websocket + [file-id] + (ptk/reify ::finalize-websocket + ptk/EffectEvent + (effect [_ state stream] + (prn "finalize-websocket" file-id) + (ws/-close (get-in state [::ws file-id]))))) + ;; --- Toggle layout flag (defn toggle-layout-flag diff --git a/frontend/src/uxbox/main/ui/workspace.cljs b/frontend/src/uxbox/main/ui/workspace.cljs index 9cc31a8e9..9eb036cc8 100644 --- a/frontend/src/uxbox/main/ui/workspace.cljs +++ b/frontend/src/uxbox/main/ui/workspace.cljs @@ -103,6 +103,12 @@ (st/emit! (udw/initialize file-id page-id)) #(rx/cancel! sub)))}) + (mf/use-effect + {:deps #js [(str file-id)] + :fn (fn [] + (st/emit! (udw/initialize-websocket file-id)) + #(st/emit! (udw/finalize-websocket file-id)))}) + (let [layout (mf/deref refs/workspace-layout) file (mf/deref refs/workspace-file) page (mf/deref refs/workspace-page) diff --git a/frontend/src/uxbox/main/ui/workspace/viewport.cljs b/frontend/src/uxbox/main/ui/workspace/viewport.cljs index 6e1f96634..65f8f6ed2 100644 --- a/frontend/src/uxbox/main/ui/workspace/viewport.cljs +++ b/frontend/src/uxbox/main/ui/workspace/viewport.cljs @@ -128,6 +128,8 @@ ;; --- Viewport +(declare remote-user-cursor) + (mf/defc canvas-and-shapes {:wrap [mf/wrap-memo]} [props] @@ -276,9 +278,44 @@ (when (contains? flags :ruler) [:& ruler {:zoom zoom :ruler (:ruler local)}]) + ;; -- METER CURSOR MULTIUSUARIO - [:div.multiuser-cursor - [i/infocard] - [:span "USER_NAME"]] + ;;[:& remote-user-cursor] [:& selrect {:data (:selrect local)}]]]))) + + +(mf/defc remote-user-cursor + [props] + [:g.multiuser-cursor #_{:transform "translate(100, 100) scale(2)"} + [:svg {:x "100" + :y "100" + :style {:fill "#000"} + :width "106.824" + :height "20.176" + :viewBox "0 0 28.264 5.338"} + [:path {:d "M5.292 4.027L1.524.26l-.05-.01L0 0l.258 1.524 3.769 3.768zm-.45 0l-.313.314L1.139.95l.314-.314zm-.5.5l-.315.316-3.39-3.39.315-.315 3.39 3.39zM1.192.526l-.668.667L.431.646.64.43l.552.094z" + :font-family "sans-serif"}] + [:g {:transform "translate(0 -291.708)"} + [:rect {:width "21.415" + :height "5.292" + :x "6.849" + :y "291.755" + :fill-opacity ".893" + :paint-order "stroke fill markers" + :rx ".794" + :ry ".794"}] + [:text {:x "9.811" + :y "295.216" + :fill "#fff" + :stroke-width ".265" + :font-family "Open Sans" + :font-size"2.91" + :font-weight "400" + :letter-spacing"0" + :line-height "1.25" + :word-spacing "0" + ;; :style="line-height:1 + } + "User 2"]]]]) + diff --git a/frontend/src/uxbox/main/websockets.cljs b/frontend/src/uxbox/main/websockets.cljs new file mode 100644 index 000000000..ce17975d8 --- /dev/null +++ b/frontend/src/uxbox/main/websockets.cljs @@ -0,0 +1,51 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) 2015-2017 Andrey Antukh + +(ns uxbox.main.websockets + "A interface to webworkers exposed functionality." + (:require + [cljs.spec.alpha :as s] + [goog.events :as ev] + [beicon.core :as rx] + [potok.core :as ptk] + [uxbox.util.spec :as us]) + (:import + goog.net.WebSocket + goog.net.WebSocket.EventType)) + +(defprotocol IWebSocket + (-stream [_] "Retrienve the message stream") + (-send [_ message] "send a message") + (-close [_] "close websocket")) + + +(defn open + [uri] + (let [sb (rx/subject) + ws (WebSocket. #js {:autoReconnect true}) + lk1 (ev/listen ws EventType.MESSAGE + #(rx/push! sb {:type :message :payload (.-message %)})) + lk2 (ev/listen ws EventType.ERROR + #(rx/push! sb {:type :error :payload %})) + lk3 (ev/listen ws EventType.OPENED + #(rx/push! sb {:type :opened :payload %}))] + (.open ws uri) + (reify + cljs.core/IDeref + (-deref [_] ws) + + IWebSocket + (-stream [_] sb) + (-send [_ msg] + (when (.isOpen ws) + (.send ws msg))) + (-close [_] + (.close ws) + (rx/end! sb) + (ev/unlistenByKey lk1) + (ev/unlistenByKey lk2) + (ev/unlistenByKey lk3))))) +