♻️ Refactor websocket layer.
This commit replaces rj9a with funcool/yetti ring adapter. Cleans the websocket api and makes it fully asynchronous. Also a common websocket protocol abstraction that will allow more easy path for creating new websocket based services.
This commit is contained in:
11 changed files with 478 additions and 403 deletions
@ -22,8 +22,10 @@
io.lettuce/lettuce-core {:mvn/version "6.1.5.RELEASE"}
java-http-clj/java-http-clj {:mvn/version "0.4.3"}
info.sunng/ring-jetty9-adapter {:mvn/version "0.15.2"
:exclusions [org.slf4j/slf4j-api]}
funcool/yetti {:git/tag "v4.0" :git/sha "59ed2a7"
:git/url "https://github.com/funcool/yetti.git"
:exclusions [org.slf4j/slf4j-api]}
com.github.seancorfield/next.jdbc {:mvn/version "1.2.761"}
metosin/reitit-ring {:mvn/version "0.5.15"}
org.postgresql/postgresql {:mvn/version "42.3.1"}
@ -6,7 +6,7 @@ export OPTIONS="
-A:jmx-remote:dev \
-J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager \
-J-Dlog4j2.configurationFile=log4j2-devenv.xml \
-J-XX:+UseShenandoahGC \
-J-XX:+UseZGC \
-J-XX:-OmitStackTraceInFastThrow \
-J-Xms50m -J-Xmx512m";
@ -42,6 +42,7 @@
(def defaults
{:http-server-port 6060
:http-server-host "localhost"
:host "devenv"
:tenant "dev"
:database-uri "postgresql://postgres/penpot"
@ -125,6 +126,7 @@
(s/def ::oidc-roles-attr ::us/keyword)
(s/def ::host ::us/string)
(s/def ::http-server-port ::us/integer)
(s/def ::http-server-host ::us/string)
(s/def ::http-session-idle-max-age ::dt/duration)
(s/def ::http-session-updater-batch-max-age ::dt/duration)
(s/def ::http-session-updater-batch-max-size ::us/integer)
@ -211,6 +213,7 @@
@ -17,75 +17,61 @@
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[reitit.ring :as rr]
[ring.adapter.jetty9 :as jetty])
[yetti.adapter :as yt])
(declare router-handler)
(declare wrap-router)
(s/def ::handler fn?)
(s/def ::router some?)
(s/def ::ws (s/map-of ::us/string fn?))
(s/def ::port ::us/integer)
(s/def ::host ::us/string)
(s/def ::name ::us/string)
(defmethod ig/pre-init-spec ::server [_]
(s/keys :req-un [::port]
:opt-un [::ws ::name ::mtx/metrics ::router ::handler]))
:opt-un [::name ::mtx/metrics ::router ::handler ::host]))
(defmethod ig/prep-key ::server
[_ cfg]
(merge {:name "http"} (d/without-nils cfg)))
(defn- instrument-metrics
[^Server server metrics]
(let [stats (doto (StatisticsHandler.)
(.setHandler (.getHandler server)))]
(.setHandler server stats)
(mtx/instrument-jetty! (:registry metrics) stats)
(defmethod ig/init-key ::server
[_ {:keys [handler router ws port name metrics] :as opts}]
[_ {:keys [handler router port name metrics] :as opts}]
(l/info :msg "starting http server" :port port :name name)
(let [pre-start (fn [^Server server]
(let [handler (doto (ErrorHandler.)
(.setShowStacks true)
(.setServer server))]
(.setErrorHandler server ^ErrorHandler handler)
(when metrics
(let [stats (StatisticsHandler.)]
(.setHandler ^StatisticsHandler stats (.getHandler server))
(.setHandler server stats)
(mtx/instrument-jetty! (:registry metrics) stats)))))
options (merge
{:port port
:h2c? true
:join? false
:allow-null-path-info true
:configurator pre-start}
(when (seq ws)
{:websockets ws}))
handler (cond
(fn? handler) handler
(some? router) (router-handler router)
:else (ex/raise :type :internal
:code :invalid-argument
:hint "Missing `handler` or `router` option."))
server (jetty/run-jetty handler options)]
(assoc opts :server server)))
(let [options {:http/port port}
handler (cond
(fn? handler) handler
(some? router) (wrap-router router)
:else (ex/raise :type :internal
:code :invalid-argument
:hint "Missing `handler` or `router` option."))
server (-> (yt/server handler options)
(cond-> metrics (instrument-metrics metrics)))]
(assoc opts :server (yt/start! server))))
(defmethod ig/halt-key! ::server
[_ {:keys [server name port] :as opts}]
(l/info :msg "stoping http server"
:name name
:port port)
(jetty/stop-server server))
(l/info :msg "stoping http server" :name name :port port)
(yt/stop! server))
(defn- router-handler
(defn- wrap-router
(let [handler (rr/ring-handler router
(rr/create-resource-handler {:path "/"})
{:middleware [middleware/server-timing]})]
(let [default (rr/routes
(rr/create-resource-handler {:path "/"})
options {:middleware [middleware/server-timing]}
handler (rr/ring-handler router default options)]
(fn [request]
(handler request)
@ -95,7 +81,7 @@
{:status 500 :body "internal server error"}))))))
;; Http Main Handler (Router)
;; Http Router
(s/def ::rpc map?)
@ -104,17 +90,18 @@
(s/def ::storage map?)
(s/def ::assets map?)
(s/def ::feedback fn?)
(s/def ::ws fn?)
(s/def ::error-report-handler fn?)
(s/def ::audit-http-handler fn?)
(defmethod ig/pre-init-spec ::router [_]
(s/keys :req-un [::rpc ::session ::mtx/metrics
(s/keys :req-un [::rpc ::session ::mtx/metrics ::ws
::oauth ::storage ::assets ::feedback
(defmethod ig/init-key ::router
[_ {:keys [session rpc oauth metrics assets feedback] :as cfg}]
[_ {:keys [ws session rpc oauth metrics assets feedback] :as cfg}]
[["/metrics" {:get (:handler metrics)}]
["/assets" {:middleware [[middleware/format-response-body]
@ -131,6 +118,15 @@
["/sns" {:post (:sns-webhook cfg)}]]
{:middleware [[middleware/params]
[middleware/errors errors/handle]
[(:middleware session)]]
:get ws}]
["/api" {:middleware [[middleware/cors]
@ -9,7 +9,6 @@
[app.common.logging :as l]
[app.common.transit :as t]
[app.config :as cf]
[app.metrics :as mtx]
[app.util.json :as json]
[buddy.core.codecs :as bc]
[buddy.core.hash :as bh]
@ -78,6 +77,9 @@
params (:query-params request)
opts {:type (if (contains? params "transit_verbose") :json-verbose :json)}]
(:ws response)
(coll? body)
(-> response
(update :headers assoc "content-type" "application/transit+json")
@ -112,11 +114,6 @@
{:name ::errors
:compile (constantly wrap-errors)})
(def metrics
{:name ::metrics
:wrap (fn [handler]
(mtx/wrap-counter handler {:id "http__requests_counter"
:help "Absolute http requests counter."}))})
(def cookies
{:name ::cookies
:compile (constantly wrap-cookies)})
@ -0,0 +1,145 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; Copyright (c) UXBOX Labs SL
(ns app.http.websocket
"A penpot notification service for file cooperative edition."
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
[app.db :as db]
[app.metrics :as mtx]
[app.util.websocket :as ws]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[yetti.websocket :as yws]))
(declare send-presence!)
(defmulti handle-message
(fn [_wsp message] (:type message)))
(defmethod handle-message :connect
[wsp _]
(let [{:keys [msgbus file-id team-id session-id ::output-ch]} @wsp
sub-ch (a/chan (a/dropping-buffer 32))]
(swap! wsp assoc :sub-ch sub-ch)
;; Start a subscription forwarding goroutine
(a/go-loop []
(when-let [val (a/<! sub-ch)]
(when-not (= (:session-id val) session-id)
;; If we receive a connect message of other user, we need
;; to send an update presence to all participants.
(when (= :connect (:type val))
(a/<! (send-presence! @wsp :presence)))
;; Then, just forward the message
(a/>! output-ch val))
(a/<! (msgbus :sub {:topics [file-id team-id] :chan sub-ch}))
(a/<! (send-presence! @wsp :connect)))))
(defmethod handle-message :disconnect
[wsp _]
(a/close! (:sub-ch @wsp))
(send-presence! @wsp :disconnect))
(defmethod handle-message :keepalive
[_ _]
(a/go :nothing))
(defmethod handle-message :pointer-update
[wsp message]
(let [{:keys [profile-id file-id session-id msgbus]} @wsp]
(msgbus :pub {:topic file-id
:message (assoc message
:profile-id profile-id
:session-id session-id)})))
(defmethod handle-message :default
[_ message]
(l/log :level :warn
:msg "received unexpected message"
:message message)))
;; --- IMPL
(defn- send-presence!
([ws] (send-presence! ws :presence))
([{:keys [msgbus session-id profile-id file-id]} type]
(msgbus :pub {:topic file-id
:message {:type type
:session-id session-id
:profile-id profile-id}})))
(declare retrieve-file)
(s/def ::msgbus fn?)
(s/def ::file-id ::us/uuid)
(s/def ::session-id ::us/uuid)
(s/def ::handler-params
(s/keys :req-un [::file-id ::session-id]))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::msgbus ::db/pool ::mtx/metrics ::wrk/executor]))
(defmethod ig/init-key ::handler
[_ {:keys [metrics pool] :as cfg}]
(let [metrics {:connections (get-in metrics [:definitions :websocket-active-connections])
:messages (get-in metrics [:definitions :websocket-messages-total])
:sessions (get-in metrics [:definitions :websocket-session-timing])}]
(fn [{:keys [profile-id params] :as req}]
(let [params (us/conform ::handler-params params)
file (retrieve-file pool (:file-id params))
cfg (-> (merge cfg params)
(assoc :profile-id profile-id)
(assoc :team-id (:team-id file))
(assoc ::ws/metrics metrics))]
(when-not profile-id
(ex/raise :type :authentication
:hint "Authentication required."))
(when-not file
(ex/raise :type :not-found
:code :object-not-found))
(when-not (yws/upgrade-request? req)
(ex/raise :type :validation
:code :websocket-request-expected
:hint "this endpoint only accepts websocket connections"))
(->> (ws/handler handle-message cfg)
(yws/upgrade req))))))
(def ^:private
"select f.id as id,
p.team_id as team_id
from file as f
join project as p on (p.id = f.project_id)
where f.id = ?")
(defn- retrieve-file
[conn id]
(db/exec-one! conn [sql:retrieve-file id]))
@ -22,33 +22,15 @@
:min-pool-size 0
:max-pool-size 30}
{:name "actions_profile_register_count"
:help "A global counter of user registrations."
:type :counter}
{:name "actions_profile_activation_count"
:help "A global counter of profile activations"
:type :counter}
{:name "rpc_update_file_changes_total"
:help "A total number of changes submitted to update-file."
:type :counter}
{:name "rpc_update_file_bytes_processed_total"
:help "A total number of bytes processed by update-file."
:type :counter}}}
{:main (ig/ref :app.migrations/migrations)}
{:backend (cf/get :msgbus-backend :redis)
@ -91,23 +73,30 @@
{:port (cf/get :http-server-port)
:host (cf/get :http-server-host)
:router (ig/ref :app.http/router)
:metrics (ig/ref :app.metrics/metrics)
:ws {"/ws/notifications" (ig/ref :app.notifications/handler)}}
:metrics (ig/ref :app.metrics/metrics)}
{:rpc (ig/ref :app.rpc/rpc)
:session (ig/ref :app.http.session/session)
:tokens (ig/ref :app.tokens/tokens)
:public-uri (cf/get :public-uri)
:metrics (ig/ref :app.metrics/metrics)
:oauth (ig/ref :app.http.oauth/handler)
:assets (ig/ref :app.http.assets/handlers)
:storage (ig/ref :app.storage/storage)
:sns-webhook (ig/ref :app.http.awsns/handler)
:feedback (ig/ref :app.http.feedback/handler)
{:assets (ig/ref :app.http.assets/handlers)
:feedback (ig/ref :app.http.feedback/handler)
:session (ig/ref :app.http.session/session)
:sns-webhook (ig/ref :app.http.awsns/handler)
:oauth (ig/ref :app.http.oauth/handler)
:ws (ig/ref :app.http.websocket/handler)
:metrics (ig/ref :app.metrics/metrics)
:public-uri (cf/get :public-uri)
:storage (ig/ref :app.storage/storage)
:tokens (ig/ref :app.tokens/tokens)
:audit-http-handler (ig/ref :app.loggers.audit/http-handler)
:error-report-handler (ig/ref :app.loggers.database/handler)}
:error-report-handler (ig/ref :app.loggers.database/handler)
:rpc (ig/ref :app.rpc/rpc)}
{:pool (ig/ref :app.db/pool)
:executor (ig/ref :app.worker/executor)
:metrics (ig/ref :app.metrics/metrics)
:msgbus (ig/ref :app.msgbus/msgbus)}
{:metrics (ig/ref :app.metrics/metrics)
@ -120,12 +109,12 @@
{:pool (ig/ref :app.db/pool)}
{:rpc (ig/ref :app.rpc/rpc)
:session (ig/ref :app.http.session/session)
:pool (ig/ref :app.db/pool)
:tokens (ig/ref :app.tokens/tokens)
:audit (ig/ref :app.loggers.audit/collector)
:public-uri (cf/get :public-uri)}
{:rpc (ig/ref :app.rpc/rpc)
:session (ig/ref :app.http.session/session)
:pool (ig/ref :app.db/pool)
:tokens (ig/ref :app.tokens/tokens)
:audit (ig/ref :app.loggers.audit/collector)
:public-uri (cf/get :public-uri)}
{:pool (ig/ref :app.db/pool)
@ -137,13 +126,6 @@
:public-uri (cf/get :public-uri)
:audit (ig/ref :app.loggers.audit/collector)}
{:msgbus (ig/ref :app.msgbus/msgbus)
:pool (ig/ref :app.db/pool)
:session (ig/ref :app.http.session/session)
:metrics (ig/ref :app.metrics/metrics)
:executor (ig/ref :app.worker/executor)}
{:min-threads 0
:max-threads 256
@ -26,27 +26,57 @@
(declare instrument)
(declare create-registry)
(declare create)
(declare handler)
;; Defaults
(def default-metrics
{:name "actions_profile_register_count"
:help "A global counter of user registrations."
:type :counter}
{:name "actions_profile_activation_count"
:help "A global counter of profile activations"
:type :counter}
{:name "rpc_update_file_changes_total"
:help "A total number of changes submitted to update-file."
:type :counter}
{:name "rpc_update_file_bytes_processed_total"
:help "A total number of bytes processed by update-file."
:type :counter}
{:name "websocket_active_connections"
:help "Active websocket connections gauge"
:type :gauge}
{:name "websocket_message_total"
:help "Counter of processed messages."
:labels ["op"]
:type :counter}
{:name "websocket_session_timing"
:help "Websocket session timing (seconds)."
:quantiles []
:type :summary}})
;; Entry Point
(defn- handler
[registry _request]
(let [samples (.metricFamilySamples ^CollectorRegistry registry)
writer (StringWriter.)]
(TextFormat/write004 writer samples)
{:headers {"content-type" TextFormat/CONTENT_TYPE_004}
:body (.toString writer)}))
(s/def ::definitions
(s/map-of keyword? map?))
(defmethod ig/pre-init-spec ::metrics [_]
(s/keys :opt-un [::definitions]))
(defmethod ig/init-key ::metrics
[_ {:keys [definitions] :as cfg}]
[_ _]
(l/info :action "initialize metrics")
(let [registry (create-registry)
definitions (reduce-kv (fn [res k v]
@ -54,7 +84,7 @@
(assoc res k)))
{:handler (partial handler registry)
:definitions definitions
:registry registry}))
@ -64,6 +94,14 @@
(s/def ::metrics
(s/keys :req-un [::registry ::handler]))
(defn- handler
[registry _request]
(let [samples (.metricFamilySamples ^CollectorRegistry registry)
writer (StringWriter.)]
(TextFormat/write004 writer samples)
{:headers {"content-type" TextFormat/CONTENT_TYPE_004}
:body (.toString writer)}))
;; Implementation
@ -0,0 +1,193 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; Copyright (c) UXBOX Labs SL
(ns app.util.websocket
"A general protocol implementation on top of websockets."
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.transit :as t]
[app.metrics :as mtx]
[app.util.time :as dt]
[clojure.core.async :as a]
[yetti.websocket :as yws])
(declare decode-beat)
(declare encode-beat)
(declare process-heartbeat)
(declare process-input)
(declare process-output)
(declare ws-ping!)
(declare ws-send!)
(defmacro call-mtx
[definitions name & args]
`(when-let [mtx-fn# (some-> ~definitions ~name ::mtx/fn)]
(mtx-fn# ~@args)))
(def noop (constantly nil))
(defn handler
"A WebSocket upgrade handler factory. Returns a handler that can be
used to upgrade to websocket connection. This handler implements the
basic custom protocol on top of websocket connection with all the
borring stuff already handled (lifecycle, heartbeat,...).
The provided function should have the `(fn [ws msg])` signature.
It also accepts some options that allows you parametrize the
protocol behavior. The options map will be used as-as for the
initial data of the `ws` data structure"
([handle-message] (handler handle-message {}))
([handle-message {:keys [::input-buff-size
:or {input-buff-size 32
output-buff-size 32
idle-timeout 30000}
:as options}]
(fn [_]
(let [input-ch (a/chan input-buff-size)
output-ch (a/chan output-buff-size)
pong-ch (a/chan (a/sliding-buffer 6))
close-ch (a/chan)
options (-> options
(assoc ::input-ch input-ch)
(assoc ::output-ch output-ch)
(assoc ::close-ch close-ch)
(dissoc ::metrics))
terminated (atom false)
created-at (dt/now)
(fn [& [_ error]]
(when (ex/exception? error)
(l/warn :hint (ex-message error) :cause error))
(when (compare-and-set! terminated false true)
(call-mtx metrics :connections {:cmd :dec :by 1})
(call-mtx metrics :sessions {:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)})
(a/close! close-ch)
(a/close! pong-ch)
(a/close! output-ch)
(a/close! input-ch)))
(fn [conn]
(call-mtx metrics :connections {:cmd :inc :by 1})
(let [wsp (atom (assoc options ::conn conn))]
;; Handle heartbeat
(yws/idle-timeout! conn (dt/duration idle-timeout))
(-> @wsp
(assoc ::pong-ch pong-ch)
(assoc ::on-close on-terminate)
;; Forward all messages from output-ch to the websocket
;; connection
(a/go-loop []
(when-let [val (a/<! output-ch)]
(call-mtx metrics :messages ["send"])
(a/<! (ws-send! conn (t/encode-str val)))
;; React on messages received from the client
(process-input wsp handle-message)))
(fn [_ message]
(call-mtx metrics :messages {:labels ["recv"]})
(let [message (t/decode-str message)]
(when-not (a/offer! input-ch message)
(l/warn :hint "drop messages"))))
(fn [_ buffer]
(a/>!! pong-ch buffer))]
{:on-connect on-connect
:on-error on-terminate
:on-close on-terminate
:on-text on-message
:on-pong on-pong}))))
(defn- ws-send!
[conn s]
(let [ch (a/chan 1)]
(yws/send! conn s (fn [e]
(when e (a/offer! ch e))
(a/close! ch)))
(defn- ws-ping!
[conn s]
(let [ch (a/chan 1)]
(yws/ping! conn s (fn [e]
(when e (a/offer! ch e))
(a/close! ch)))
(defn- encode-beat
(doto (ByteBuffer/allocate 8)
(.putLong n)
(defn- decode-beat
[^ByteBuffer buffer]
(when (= 8 (.capacity buffer))
(.rewind buffer)
(.getLong buffer)))
(defn- process-input
[wsp handler]
(let [{:keys [::input-ch ::output-ch ::close-ch]} @wsp]
(a/<! (handler wsp {:type :connect}))
(a/<! (a/go-loop []
(when-let [request (a/<! input-ch)]
(let [[val port] (a/alts! [(handler wsp request) close-ch])]
(when-not (= port close-ch)
(ex/ex-info? val)
(a/>! output-ch {:type :error :error (ex-data val)})
(ex/exception? val)
(a/>! output-ch {:type :error :error {:message (ex-message val)}})
(map? val)
(a/>! output-ch (cond-> val (:request-id request) (assoc :request-id (:request-id request)))))
(a/<! (handler wsp {:type :disconnect})))))
(defn- process-heartbeat
[{:keys [::conn ::close-ch ::on-close ::pong-ch
::heartbeat-interval ::max-missed-heartbeats]
:or {heartbeat-interval 2000
max-missed-heartbeats 8}}]
(let [beats (atom #{})]
(a/go-loop [i 0]
(let [[_ port] (a/alts! [close-ch (a/timeout heartbeat-interval)])]
(when (and (yws/connected? conn)
(not= port close-ch))
(a/<! (ws-ping! conn (encode-beat i)))
(let [issued (swap! beats conj (long i))]
(if (>= (count issued) max-missed-heartbeats)
(on-close conn -1 "heartbeat-timeout")
(recur (inc i)))))))
(a/go-loop []
(when-let [buffer (a/<! pong-ch)]
(swap! beats disj (decode-beat buffer))
@ -2,7 +2,7 @@
"name": "app",
"version": "0.1.0",
"description": "The Open-Source prototyping tool",
"author": "UXBOX LABS SL",
"author": "Kaleidos Ventures SL",
"license": "SEE LICENSE IN <LICENSE>",
"repository": {
"type": "git",
