diff --git a/backend/deps.edn b/backend/deps.edn index 32e20d11e..b928983be 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -22,8 +22,10 @@ io.lettuce/lettuce-core {:mvn/version "6.1.5.RELEASE"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - info.sunng/ring-jetty9-adapter {:mvn/version "0.15.2" - :exclusions [org.slf4j/slf4j-api]} + funcool/yetti {:git/tag "v4.0" :git/sha "59ed2a7" + :git/url "https://github.com/funcool/yetti.git" + :exclusions [org.slf4j/slf4j-api]} + com.github.seancorfield/next.jdbc {:mvn/version "1.2.761"} metosin/reitit-ring {:mvn/version "0.5.15"} org.postgresql/postgresql {:mvn/version "42.3.1"} diff --git a/backend/scripts/repl b/backend/scripts/repl index a578b187a..fca593dbd 100755 --- a/backend/scripts/repl +++ b/backend/scripts/repl @@ -6,7 +6,7 @@ export OPTIONS=" -A:jmx-remote:dev \ -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager \ -J-Dlog4j2.configurationFile=log4j2-devenv.xml \ - -J-XX:+UseShenandoahGC \ + -J-XX:+UseZGC \ -J-XX:-OmitStackTraceInFastThrow \ -J-Xms50m -J-Xmx512m"; diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 98c003e55..ba83d458e 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -42,6 +42,7 @@ (def defaults {:http-server-port 6060 + :http-server-host "localhost" :host "devenv" :tenant "dev" :database-uri "postgresql://postgres/penpot" @@ -125,6 +126,7 @@ (s/def ::oidc-roles-attr ::us/keyword) (s/def ::host ::us/string) (s/def ::http-server-port ::us/integer) +(s/def ::http-server-host ::us/string) (s/def ::http-session-idle-max-age ::dt/duration) (s/def ::http-session-updater-batch-max-age ::dt/duration) (s/def ::http-session-updater-batch-max-size ::us/integer) @@ -211,6 +213,7 @@ ::oidc-roles-attr ::oidc-roles ::host + ::http-server-host ::http-server-port ::http-session-idle-max-age ::http-session-updater-batch-max-age diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index b15a1e3b6..f04a21d90 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -17,75 +17,61 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [reitit.ring :as rr] - [ring.adapter.jetty9 :as jetty]) + [yetti.adapter :as yt]) (:import org.eclipse.jetty.server.Server - org.eclipse.jetty.server.handler.ErrorHandler org.eclipse.jetty.server.handler.StatisticsHandler)) -(declare router-handler) +(declare wrap-router) (s/def ::handler fn?) (s/def ::router some?) -(s/def ::ws (s/map-of ::us/string fn?)) (s/def ::port ::us/integer) +(s/def ::host ::us/string) (s/def ::name ::us/string) (defmethod ig/pre-init-spec ::server [_] (s/keys :req-un [::port] - :opt-un [::ws ::name ::mtx/metrics ::router ::handler])) + :opt-un [::name ::mtx/metrics ::router ::handler ::host])) (defmethod ig/prep-key ::server [_ cfg] (merge {:name "http"} (d/without-nils cfg))) +(defn- instrument-metrics + [^Server server metrics] + (let [stats (doto (StatisticsHandler.) + (.setHandler (.getHandler server)))] + (.setHandler server stats) + (mtx/instrument-jetty! (:registry metrics) stats) + server)) + (defmethod ig/init-key ::server - [_ {:keys [handler router ws port name metrics] :as opts}] + [_ {:keys [handler router port name metrics] :as opts}] (l/info :msg "starting http server" :port port :name name) - (let [pre-start (fn [^Server server] - (let [handler (doto (ErrorHandler.) - (.setShowStacks true) - (.setServer server))] - (.setErrorHandler server ^ErrorHandler handler) - (when metrics - (let [stats (StatisticsHandler.)] - (.setHandler ^StatisticsHandler stats (.getHandler server)) - (.setHandler server stats) - (mtx/instrument-jetty! (:registry metrics) stats))))) - - options (merge - {:port port - :h2c? true - :join? false - :allow-null-path-info true - :configurator pre-start} - (when (seq ws) - {:websockets ws})) - - handler (cond - (fn? handler) handler - (some? router) (router-handler router) - :else (ex/raise :type :internal - :code :invalid-argument - :hint "Missing `handler` or `router` option.")) - - server (jetty/run-jetty handler options)] - (assoc opts :server server))) + (let [options {:http/port port} + handler (cond + (fn? handler) handler + (some? router) (wrap-router router) + :else (ex/raise :type :internal + :code :invalid-argument + :hint "Missing `handler` or `router` option.")) + server (-> (yt/server handler options) + (cond-> metrics (instrument-metrics metrics)))] + (assoc opts :server (yt/start! server)))) (defmethod ig/halt-key! ::server [_ {:keys [server name port] :as opts}] - (l/info :msg "stoping http server" - :name name - :port port) - (jetty/stop-server server)) + (l/info :msg "stoping http server" :name name :port port) + (yt/stop! server)) -(defn- router-handler +(defn- wrap-router [router] - (let [handler (rr/ring-handler router - (rr/routes - (rr/create-resource-handler {:path "/"}) - (rr/create-default-handler)) - {:middleware [middleware/server-timing]})] + (let [default (rr/routes + (rr/create-resource-handler {:path "/"}) + (rr/create-default-handler)) + options {:middleware [middleware/server-timing]} + handler (rr/ring-handler router default options)] (fn [request] (try (handler request) @@ -95,7 +81,7 @@ {:status 500 :body "internal server error"})))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Http Main Handler (Router) +;; Http Router ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (s/def ::rpc map?) @@ -104,17 +90,18 @@ (s/def ::storage map?) (s/def ::assets map?) (s/def ::feedback fn?) +(s/def ::ws fn?) (s/def ::error-report-handler fn?) (s/def ::audit-http-handler fn?) (defmethod ig/pre-init-spec ::router [_] - (s/keys :req-un [::rpc ::session ::mtx/metrics + (s/keys :req-un [::rpc ::session ::mtx/metrics ::ws ::oauth ::storage ::assets ::feedback ::error-report-handler ::audit-http-handler])) (defmethod ig/init-key ::router - [_ {:keys [session rpc oauth metrics assets feedback] :as cfg}] + [_ {:keys [ws session rpc oauth metrics assets feedback] :as cfg}] (rr/router [["/metrics" {:get (:handler metrics)}] ["/assets" {:middleware [[middleware/format-response-body] @@ -131,6 +118,15 @@ ["/webhooks" ["/sns" {:post (:sns-webhook cfg)}]] + ["/ws/notifications" + {:middleware [[middleware/params] + [middleware/keyword-params] + [middleware/format-response-body] + [middleware/errors errors/handle] + [middleware/cookies] + [(:middleware session)]] + :get ws}] + ["/api" {:middleware [[middleware/cors] [middleware/etag] [middleware/params] diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index 5766286d0..394240d89 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -9,7 +9,6 @@ [app.common.logging :as l] [app.common.transit :as t] [app.config :as cf] - [app.metrics :as mtx] [app.util.json :as json] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] @@ -78,6 +77,9 @@ params (:query-params request) opts {:type (if (contains? params "transit_verbose") :json-verbose :json)}] (cond + (:ws response) + response + (coll? body) (-> response (update :headers assoc "content-type" "application/transit+json") @@ -112,11 +114,6 @@ {:name ::errors :compile (constantly wrap-errors)}) -(def metrics - {:name ::metrics - :wrap (fn [handler] - (mtx/wrap-counter handler {:id "http__requests_counter" - :help "Absolute http requests counter."}))}) (def cookies {:name ::cookies :compile (constantly wrap-cookies)}) diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj new file mode 100644 index 000000000..e4e80c178 --- /dev/null +++ b/backend/src/app/http/websocket.clj @@ -0,0 +1,145 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.http.websocket + "A penpot notification service for file cooperative edition." + (:require + [app.common.exceptions :as ex] + [app.common.logging :as l] + [app.common.spec :as us] + [app.db :as db] + [app.metrics :as mtx] + [app.util.websocket :as ws] + [app.worker :as wrk] + [clojure.core.async :as a] + [clojure.spec.alpha :as s] + [integrant.core :as ig] + [yetti.websocket :as yws])) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; WEBSOCKET HANDLER +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare send-presence!) + +(defmulti handle-message + (fn [_wsp message] (:type message))) + +(defmethod handle-message :connect + [wsp _] + (let [{:keys [msgbus file-id team-id session-id ::output-ch]} @wsp + sub-ch (a/chan (a/dropping-buffer 32))] + + (swap! wsp assoc :sub-ch sub-ch) + + ;; Start a subscription forwarding goroutine + (a/go-loop [] + (when-let [val (a/! output-ch val)) + (recur))) + + (a/go + (a/ (merge cfg params) + (assoc :profile-id profile-id) + (assoc :team-id (:team-id file)) + (assoc ::ws/metrics metrics))] + + (when-not profile-id + (ex/raise :type :authentication + :hint "Authentication required.")) + + (when-not file + (ex/raise :type :not-found + :code :object-not-found)) + + (when-not (yws/upgrade-request? req) + (ex/raise :type :validation + :code :websocket-request-expected + :hint "this endpoint only accepts websocket connections")) + + (->> (ws/handler handle-message cfg) + (yws/upgrade req)))))) + +(def ^:private + sql:retrieve-file + "select f.id as id, + p.team_id as team_id + from file as f + join project as p on (p.id = f.project_id) + where f.id = ?") + +(defn- retrieve-file + [conn id] + (db/exec-one! conn [sql:retrieve-file id])) + diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 068136336..da25f83df 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -22,33 +22,15 @@ :min-pool-size 0 :max-pool-size 30} + :app.migrations/migrations + {} + :app.metrics/metrics - {:definitions - {:profile-register - {:name "actions_profile_register_count" - :help "A global counter of user registrations." - :type :counter} - - :profile-activation - {:name "actions_profile_activation_count" - :help "A global counter of profile activations" - :type :counter} - - :update-file-changes - {:name "rpc_update_file_changes_total" - :help "A total number of changes submitted to update-file." - :type :counter} - - :update-file-bytes-processed - {:name "rpc_update_file_bytes_processed_total" - :help "A total number of bytes processed by update-file." - :type :counter}}} + {} :app.migrations/all {:main (ig/ref :app.migrations/migrations)} - :app.migrations/migrations - {} :app.msgbus/msgbus {:backend (cf/get :msgbus-backend :redis) @@ -91,23 +73,30 @@ :app.http/server {:port (cf/get :http-server-port) + :host (cf/get :http-server-host) :router (ig/ref :app.http/router) - :metrics (ig/ref :app.metrics/metrics) - :ws {"/ws/notifications" (ig/ref :app.notifications/handler)}} + :metrics (ig/ref :app.metrics/metrics)} :app.http/router - {:rpc (ig/ref :app.rpc/rpc) - :session (ig/ref :app.http.session/session) - :tokens (ig/ref :app.tokens/tokens) - :public-uri (cf/get :public-uri) - :metrics (ig/ref :app.metrics/metrics) - :oauth (ig/ref :app.http.oauth/handler) - :assets (ig/ref :app.http.assets/handlers) - :storage (ig/ref :app.storage/storage) - :sns-webhook (ig/ref :app.http.awsns/handler) - :feedback (ig/ref :app.http.feedback/handler) + {:assets (ig/ref :app.http.assets/handlers) + :feedback (ig/ref :app.http.feedback/handler) + :session (ig/ref :app.http.session/session) + :sns-webhook (ig/ref :app.http.awsns/handler) + :oauth (ig/ref :app.http.oauth/handler) + :ws (ig/ref :app.http.websocket/handler) + :metrics (ig/ref :app.metrics/metrics) + :public-uri (cf/get :public-uri) + :storage (ig/ref :app.storage/storage) + :tokens (ig/ref :app.tokens/tokens) :audit-http-handler (ig/ref :app.loggers.audit/http-handler) - :error-report-handler (ig/ref :app.loggers.database/handler)} + :error-report-handler (ig/ref :app.loggers.database/handler) + :rpc (ig/ref :app.rpc/rpc)} + + :app.http.websocket/handler + {:pool (ig/ref :app.db/pool) + :executor (ig/ref :app.worker/executor) + :metrics (ig/ref :app.metrics/metrics) + :msgbus (ig/ref :app.msgbus/msgbus)} :app.http.assets/handlers {:metrics (ig/ref :app.metrics/metrics) @@ -120,12 +109,12 @@ {:pool (ig/ref :app.db/pool)} :app.http.oauth/handler - {:rpc (ig/ref :app.rpc/rpc) - :session (ig/ref :app.http.session/session) - :pool (ig/ref :app.db/pool) - :tokens (ig/ref :app.tokens/tokens) - :audit (ig/ref :app.loggers.audit/collector) - :public-uri (cf/get :public-uri)} + {:rpc (ig/ref :app.rpc/rpc) + :session (ig/ref :app.http.session/session) + :pool (ig/ref :app.db/pool) + :tokens (ig/ref :app.tokens/tokens) + :audit (ig/ref :app.loggers.audit/collector) + :public-uri (cf/get :public-uri)} :app.rpc/rpc {:pool (ig/ref :app.db/pool) @@ -137,13 +126,6 @@ :public-uri (cf/get :public-uri) :audit (ig/ref :app.loggers.audit/collector)} - :app.notifications/handler - {:msgbus (ig/ref :app.msgbus/msgbus) - :pool (ig/ref :app.db/pool) - :session (ig/ref :app.http.session/session) - :metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref :app.worker/executor)} - :app.worker/executor {:min-threads 0 :max-threads 256 diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index b1d0033e6..57e1ba531 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -26,27 +26,57 @@ (declare instrument) (declare create-registry) (declare create) +(declare handler) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Defaults +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + + +(def default-metrics + {:profile-register + {:name "actions_profile_register_count" + :help "A global counter of user registrations." + :type :counter} + + :profile-activation + {:name "actions_profile_activation_count" + :help "A global counter of profile activations" + :type :counter} + + :update-file-changes + {:name "rpc_update_file_changes_total" + :help "A total number of changes submitted to update-file." + :type :counter} + + :update-file-bytes-processed + {:name "rpc_update_file_bytes_processed_total" + :help "A total number of bytes processed by update-file." + :type :counter} + + :websocket-active-connections + {:name "websocket_active_connections" + :help "Active websocket connections gauge" + :type :gauge} + + :websocket-messages-total + {:name "websocket_message_total" + :help "Counter of processed messages." + :labels ["op"] + :type :counter} + + :websocket-session-timing + {:name "websocket_session_timing" + :help "Websocket session timing (seconds)." + :quantiles [] + :type :summary}}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Entry Point ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn- handler - [registry _request] - (let [samples (.metricFamilySamples ^CollectorRegistry registry) - writer (StringWriter.)] - (TextFormat/write004 writer samples) - {:headers {"content-type" TextFormat/CONTENT_TYPE_004} - :body (.toString writer)})) - -(s/def ::definitions - (s/map-of keyword? map?)) - -(defmethod ig/pre-init-spec ::metrics [_] - (s/keys :opt-un [::definitions])) - (defmethod ig/init-key ::metrics - [_ {:keys [definitions] :as cfg}] + [_ _] (l/info :action "initialize metrics") (let [registry (create-registry) definitions (reduce-kv (fn [res k v] @@ -54,7 +84,7 @@ (create) (assoc res k))) {} - definitions)] + default-metrics)] {:handler (partial handler registry) :definitions definitions :registry registry})) @@ -64,6 +94,14 @@ (s/def ::metrics (s/keys :req-un [::registry ::handler])) +(defn- handler + [registry _request] + (let [samples (.metricFamilySamples ^CollectorRegistry registry) + writer (StringWriter.)] + (TextFormat/write004 writer samples) + {:headers {"content-type" TextFormat/CONTENT_TYPE_004} + :body (.toString writer)})) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Implementation ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/notifications.clj b/backend/src/app/notifications.clj deleted file mode 100644 index 62490b212..000000000 --- a/backend/src/app/notifications.clj +++ /dev/null @@ -1,281 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) UXBOX Labs SL - -(ns app.notifications - "A websocket based notifications mechanism." - (:require - [app.common.logging :as l] - [app.common.spec :as us] - [app.common.transit :as t] - [app.db :as db] - [app.metrics :as mtx] - [app.util.async :as aa] - [app.util.time :as dt] - [app.worker :as wrk] - [clojure.core.async :as a] - [clojure.spec.alpha :as s] - [integrant.core :as ig] - [ring.adapter.jetty9 :as jetty] - [ring.middleware.cookies :refer [wrap-cookies]] - [ring.middleware.keyword-params :refer [wrap-keyword-params]] - [ring.middleware.params :refer [wrap-params]])) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Http Handler -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(declare retrieve-file) -(declare websocket) -(declare handler) - -(s/def ::session map?) -(s/def ::msgbus fn?) - -(defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::msgbus ::db/pool ::session ::mtx/metrics ::wrk/executor])) - -(defmethod ig/init-key ::handler - [_ {:keys [session metrics] :as cfg}] - (let [wrap-session (:middleware session) - - mtx-active-connections - (mtx/create - {:name "websocket_active_connections" - :registry (:registry metrics) - :type :gauge - :help "Active websocket connections."}) - - mtx-messages - (mtx/create - {:name "websocket_message_total" - :registry (:registry metrics) - :labels ["op"] - :type :counter - :help "Counter of processed messages."}) - - mtx-sessions - (mtx/create - {:name "websocket_session_timing" - :registry (:registry metrics) - :quantiles [] - :help "Websocket session timing (seconds)." - :type :summary}) - - cfg (assoc cfg - :mtx-active-connections mtx-active-connections - :mtx-messages mtx-messages - :mtx-sessions mtx-sessions - )] - - (-> #(handler cfg %) - (wrap-session) - (wrap-keyword-params) - (wrap-cookies) - (wrap-params)))) - -(s/def ::file-id ::us/uuid) -(s/def ::session-id ::us/uuid) - -(s/def ::websocket-handler-params - (s/keys :req-un [::file-id ::session-id])) - -(defn- handler - [{:keys [pool] :as cfg} {:keys [profile-id params] :as req}] - (let [params (us/conform ::websocket-handler-params params) - file (retrieve-file pool (:file-id params)) - cfg (merge cfg params - {:profile-id profile-id - :team-id (:team-id file)})] - (cond - (not profile-id) - {:error {:code 403 :message "Authentication required"}} - - (not file) - {:error {:code 404 :message "File does not exists"}} - - :else - (websocket cfg)))) - -(def ^:private - sql:retrieve-file - "select f.id as id, - p.team_id as team_id - from file as f - join project as p on (p.id = f.project_id) - where f.id = ?") - -(defn- retrieve-file - [conn id] - (db/exec-one! conn [sql:retrieve-file id])) - - -;; --- WEBSOCKET INIT - -(declare handle-connect) - -(defn- ws-send - [conn data] - (try - (when (jetty/connected? conn) - (jetty/send! conn data) - true) - (catch java.lang.NullPointerException _e - false))) - -(defn websocket - [{:keys [file-id team-id msgbus executor] :as cfg}] - (let [rcv-ch (a/chan 32) - out-ch (a/chan 32) - mtx-aconn (:mtx-active-connections cfg) - mtx-messages (:mtx-messages cfg) - mtx-sessions (:mtx-sessions cfg) - created-at (dt/now) - ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])] - - (letfn [(on-connect [conn] - ((::mtx/fn mtx-aconn) {:cmd :inc :by 1}) - ;; A subscription channel should use a lossy buffer - ;; because we can't penalize normal clients when one - ;; slow client is connected to the room. - (let [sub-ch (a/chan (a/dropping-buffer 128)) - cfg (assoc cfg - :conn conn - :rcv-ch rcv-ch - :out-ch out-ch - :sub-ch sub-ch)] - - (l/trace :event "connect" :session (:session-id cfg)) - - ;; Forward all messages from out-ch to the websocket - ;; connection - (a/go-loop [] - (let [val (a/! out-ch val)) - (recur)) - - ;; When timeout channel is signaled, we need to send a ping - ;; message to the output channel. TODO: we need to make this - ;; more smart. - (= port timeout) - (do - (a/>! out-ch {:type :ping}) - (recur)))))) - -(defn send-presence - ([cfg] (send-presence cfg :presence)) - ([{:keys [msgbus session-id profile-id file-id]} type] - (a/go - (a/ ~definitions ~name ::mtx/fn)] + (mtx-fn# ~@args))) + +(def noop (constantly nil)) + +(defn handler + "A WebSocket upgrade handler factory. Returns a handler that can be + used to upgrade to websocket connection. This handler implements the + basic custom protocol on top of websocket connection with all the + borring stuff already handled (lifecycle, heartbeat,...). + + The provided function should have the `(fn [ws msg])` signature. + + It also accepts some options that allows you parametrize the + protocol behavior. The options map will be used as-as for the + initial data of the `ws` data structure" + ([handle-message] (handler handle-message {})) + ([handle-message {:keys [::input-buff-size + ::output-buff-size + ::idle-timeout + ::metrics] + :or {input-buff-size 32 + output-buff-size 32 + idle-timeout 30000} + :as options}] + (fn [_] + (let [input-ch (a/chan input-buff-size) + output-ch (a/chan output-buff-size) + pong-ch (a/chan (a/sliding-buffer 6)) + close-ch (a/chan) + options (-> options + (assoc ::input-ch input-ch) + (assoc ::output-ch output-ch) + (assoc ::close-ch close-ch) + (dissoc ::metrics)) + + terminated (atom false) + created-at (dt/now) + + on-terminate + (fn [& [_ error]] + (when (ex/exception? error) + (l/warn :hint (ex-message error) :cause error)) + + (when (compare-and-set! terminated false true) + (call-mtx metrics :connections {:cmd :dec :by 1}) + (call-mtx metrics :sessions {:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}) + + (a/close! close-ch) + (a/close! pong-ch) + (a/close! output-ch) + (a/close! input-ch))) + + on-connect + (fn [conn] + (call-mtx metrics :connections {:cmd :inc :by 1}) + + (let [wsp (atom (assoc options ::conn conn))] + ;; Handle heartbeat + (yws/idle-timeout! conn (dt/duration idle-timeout)) + (-> @wsp + (assoc ::pong-ch pong-ch) + (assoc ::on-close on-terminate) + (process-heartbeat)) + + ;; Forward all messages from output-ch to the websocket + ;; connection + (a/go-loop [] + (when-let [val (a/!! pong-ch buffer))] + + {:on-connect on-connect + :on-error on-terminate + :on-close on-terminate + :on-text on-message + :on-pong on-pong})))) + +(defn- ws-send! + [conn s] + (let [ch (a/chan 1)] + (yws/send! conn s (fn [e] + (when e (a/offer! ch e)) + (a/close! ch))) + ch)) + +(defn- ws-ping! + [conn s] + (let [ch (a/chan 1)] + (yws/ping! conn s (fn [e] + (when e (a/offer! ch e)) + (a/close! ch))) + ch)) + +(defn- encode-beat + [n] + (doto (ByteBuffer/allocate 8) + (.putLong n) + (.rewind))) + +(defn- decode-beat + [^ByteBuffer buffer] + (when (= 8 (.capacity buffer)) + (.rewind buffer) + (.getLong buffer))) + +(defn- process-input + [wsp handler] + (let [{:keys [::input-ch ::output-ch ::close-ch]} @wsp] + (a/go + (a/! output-ch {:type :error :error (ex-data val)}) + + (ex/exception? val) + (a/>! output-ch {:type :error :error {:message (ex-message val)}}) + + (map? val) + (a/>! output-ch (cond-> val (:request-id request) (assoc :request-id (:request-id request))))) + + (recur)))))) + (a/= (count issued) max-missed-heartbeats) + (on-close conn -1 "heartbeat-timeout") + (recur (inc i))))))) + + (a/go-loop [] + (when-let [buffer (a/", "repository": { "type": "git",