From bb5a4c0fa545f104f387304bb9b5305cd46e392c Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 16 Nov 2023 11:02:25 +0100 Subject: [PATCH] :sparkles: Update yetti and adapt for ring-2.0 --- backend/deps.edn | 4 +- backend/resources/log4j2-devenv.xml | 2 +- backend/src/app/auth/oidc.clj | 10 +- backend/src/app/http.clj | 46 ++-- backend/src/app/http/access_token.clj | 11 +- backend/src/app/http/assets.clj | 14 +- backend/src/app/http/awsns.clj | 8 +- backend/src/app/http/debug.clj | 138 +++++------ backend/src/app/http/errors.clj | 154 ++++++------ backend/src/app/http/middleware.clj | 113 ++++----- backend/src/app/http/session.clj | 8 +- backend/src/app/http/websocket.clj | 35 +-- backend/src/app/loggers/audit.clj | 8 +- backend/src/app/main.clj | 1 - backend/src/app/rpc.clj | 14 +- backend/src/app/rpc/commands/binfile.clj | 12 +- backend/src/app/rpc/cond.clj | 4 +- backend/src/app/rpc/doc.clj | 24 +- backend/src/app/rpc/helpers.clj | 4 +- backend/src/app/util/websocket.clj | 227 +++++++++--------- .../http_middleware_access_token_test.clj | 6 +- backend/test/backend_tests/rpc_audit_test.clj | 2 +- .../rpc_cond_middleware_test.clj | 2 +- 23 files changed, 407 insertions(+), 440 deletions(-) diff --git a/backend/deps.edn b/backend/deps.edn index d8ff1d16e..b3cacd663 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -21,8 +21,8 @@ java-http-clj/java-http-clj {:mvn/version "0.4.3"} funcool/yetti - {:git/tag "v9.16" - :git/sha "7df3e08" + {:git/tag "v10.0" + :git/sha "520613f" :git/url "https://github.com/funcool/yetti.git" :exclusions [org.slf4j/slf4j-api]} diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml index 70e54ba76..4fd93925c 100644 --- a/backend/resources/log4j2-devenv.xml +++ b/backend/resources/log4j2-devenv.xml @@ -31,7 +31,7 @@ - + diff --git a/backend/src/app/auth/oidc.clj b/backend/src/app/auth/oidc.clj index 733665151..206ec3915 100644 --- a/backend/src/app/auth/oidc.clj +++ b/backend/src/app/auth/oidc.clj @@ -31,7 +31,7 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] - [yetti.response :as-alias yrs])) + [ring.response :as-alias rres])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HELPERS @@ -479,8 +479,8 @@ (defn- redirect-response [uri] - {::yrs/status 302 - ::yrs/headers {"location" (str uri)}}) + {::rres/status 302 + ::rres/headers {"location" (str uri)}}) (defn- generate-error-redirect [_ cause] @@ -557,8 +557,8 @@ :props props :exp (dt/in-future "4h")}) uri (build-auth-uri cfg state)] - {::yrs/status 200 - ::yrs/body {:redirect-uri uri}})) + {::rres/status 200 + ::rres/body {:redirect-uri uri}})) (defn- callback-handler [cfg request] diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 59ba33861..599225827 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -23,15 +23,14 @@ [app.metrics :as mtx] [app.rpc :as-alias rpc] [app.rpc.doc :as-alias rpc.doc] - [app.worker :as wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.exec :as px] [reitit.core :as r] [reitit.middleware :as rr] - [yetti.adapter :as yt] - [yetti.request :as yrq] - [yetti.response :as-alias yrs])) + [ring.request :as rreq] + [ring.response :as-alias rres] + [yetti.adapter :as yt])) (declare router-handler) @@ -63,8 +62,7 @@ ::max-multipart-body-size ::router ::handler - ::io-threads - ::wrk/executor])) + ::io-threads])) (defmethod ig/init-key ::server [_ {:keys [::handler ::router ::host ::port] :as cfg}] @@ -75,11 +73,9 @@ :http/max-multipart-body-size (::max-multipart-body-size cfg) :xnio/io-threads (or (::io-threads cfg) (max 3 (px/get-available-processors))) - :xnio/worker-threads (or (::worker-threads cfg) - (max 6 (px/get-available-processors))) - :xnio/dispatch true - :socket/backlog 4069 - :ring/async true} + :xnio/dispatch :virtual + :ring/compat :ring2 + :socket/backlog 4069} handler (cond (some? router) @@ -102,13 +98,13 @@ (yt/stop! server)) (defn- not-found-handler - [_ respond _] - (respond {::yrs/status 404})) + [_] + {::rres/status 404}) (defn- router-handler [router] (letfn [(resolve-handler [request] - (if-let [match (r/match-by-path router (yrq/path request))] + (if-let [match (r/match-by-path router (rreq/path request))] (let [params (:path-params match) result (:result match) handler (or (:handler result) not-found-handler) @@ -120,18 +116,15 @@ (let [{:keys [body] :as response} (errors/handle cause request)] (cond-> response (map? body) - (-> (update ::yrs/headers assoc "content-type" "application/transit+json") - (assoc ::yrs/body (t/encode-str body {:type :json-verbose}))))))] + (-> (update ::rres/headers assoc "content-type" "application/transit+json") + (assoc ::rres/body (t/encode-str body {:type :json-verbose}))))))] - (fn [request respond _] - (let [handler (resolve-handler request) - exchange (yrq/exchange request)] - (handler - (fn [response] - (yt/dispatch! exchange (partial respond response))) - (fn [cause] - (let [response (on-error cause request)] - (yt/dispatch! exchange (partial respond response))))))))) + (fn [request] + (let [handler (resolve-handler request)] + (try + (handler) + (catch Throwable cause + (on-error cause request))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HTTP ROUTER @@ -160,8 +153,7 @@ [session/soft-auth cfg] [actoken/soft-auth cfg] [mw/errors errors/handle] - [mw/restrict-methods] - [mw/with-dispatch :vthread]]} + [mw/restrict-methods]]} (::mtx/routes cfg) (::assets/routes cfg) diff --git a/backend/src/app/http/access_token.clj b/backend/src/app/http/access_token.clj index 3f39e4121..bfddbb42d 100644 --- a/backend/src/app/http/access_token.clj +++ b/backend/src/app/http/access_token.clj @@ -11,13 +11,13 @@ [app.db :as db] [app.main :as-alias main] [app.tokens :as tokens] - [yetti.request :as yrq])) + [ring.request :as rreq])) (def header-re #"^Token\s+(.*)") (defn- get-token [request] - (some->> (yrq/get-header request "authorization") + (some->> (rreq/get-header request "authorization") (re-matches header-re) (second))) @@ -30,7 +30,7 @@ "SELECT perms, profile_id, expires_at FROM access_token WHERE id = ? - AND (expires_at IS NULL + AND (expires_at IS NULL OR (expires_at > now()));") (defn- get-token-data @@ -54,9 +54,8 @@ (l/trace :hint "exception on decoding malformed token" :cause cause) request)))] - (fn [request respond raise] - (let [request (handle-request request)] - (handler request respond raise))))) + (fn [request] + (handler (handle-request request))))) (defn- wrap-authz "Authorization middleware, will be executed synchronously on vthread." diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index efd494249..286cef655 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -16,7 +16,7 @@ [app.util.time :as dt] [clojure.spec.alpha :as s] [integrant.core :as ig] - [yetti.response :as-alias yrs])) + [ring.response :as-alias rres])) (def ^:private cache-max-age (dt/duration {:hours 24})) @@ -37,8 +37,8 @@ (defn- serve-object-from-s3 [{:keys [::sto/storage] :as cfg} obj] (let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})] - {::yrs/status 307 - ::yrs/headers {"location" (str url) + {::rres/status 307 + ::rres/headers {"location" (str url) "x-host" (cond-> host port (str ":" port)) "x-mtype" (-> obj meta :content-type) "cache-control" (str "max-age=" (inst-ms cache-max-age))}})) @@ -51,8 +51,8 @@ headers {"x-accel-redirect" (:path purl) "content-type" (:content-type mdata) "cache-control" (str "max-age=" (inst-ms cache-max-age))}] - {::yrs/status 204 - ::yrs/headers headers})) + {::rres/status 204 + ::rres/headers headers})) (defn- serve-object "Helper function that returns the appropriate response depending on @@ -70,7 +70,7 @@ obj (sto/get-object storage id)] (if obj (serve-object cfg obj) - {::yrs/status 404}))) + {::rres/status 404}))) (defn- generic-handler "A generic handler helper/common code for file-media based handlers." @@ -81,7 +81,7 @@ sobj (sto/get-object storage (kf mobj))] (if sobj (serve-object cfg sobj) - {::yrs/status 404}))) + {::rres/status 404}))) (defn file-objects-handler "Handler that serves storage objects by file media id." diff --git a/backend/src/app/http/awsns.clj b/backend/src/app/http/awsns.clj index 681e7045f..f9e9179b1 100644 --- a/backend/src/app/http/awsns.clj +++ b/backend/src/app/http/awsns.clj @@ -20,8 +20,8 @@ [integrant.core :as ig] [jsonista.core :as j] [promesa.exec :as px] - [yetti.request :as yrq] - [yetti.response :as-alias yrs])) + [ring.request :as rreq] + [ring.response :as-alias rres])) (declare parse-json) (declare handle-request) @@ -37,9 +37,9 @@ (defmethod ig/init-key ::routes [_ {:keys [::wrk/executor] :as cfg}] (letfn [(handler [request] - (let [data (-> request yrq/body slurp)] + (let [data (-> request rreq/body slurp)] (px/run! executor #(handle-request cfg data))) - {::yrs/status 200})] + {::rres/status 200})] ["/sns" {:handler handler :allowed-methods #{:post}}])) diff --git a/backend/src/app/http/debug.clj b/backend/src/app/http/debug.clj index a61017edf..1e18b8517 100644 --- a/backend/src/app/http/debug.clj +++ b/backend/src/app/http/debug.clj @@ -32,8 +32,8 @@ [integrant.core :as ig] [markdown.core :as md] [markdown.transformers :as mdt] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [ring.request :as rreq] + [ring.response :as rres])) ;; (selmer.parser/cache-off!) @@ -43,10 +43,10 @@ (defn index-handler [_cfg _request] - {::yrs/status 200 - ::yrs/headers {"content-type" "text/html"} - ::yrs/body (-> (io/resource "app/templates/debug.tmpl") - (tmpl/render {}))}) + {::rres/status 200 + ::rres/headers {"content-type" "text/html"} + ::rres/body (-> (io/resource "app/templates/debug.tmpl") + (tmpl/render {}))}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; FILE CHANGES @@ -55,17 +55,17 @@ (defn prepare-response [body] (let [headers {"content-type" "application/transit+json"}] - {::yrs/status 200 - ::yrs/body body - ::yrs/headers headers})) + {::rres/status 200 + ::rres/body body + ::rres/headers headers})) (defn prepare-download-response [body filename] (let [headers {"content-disposition" (str "attachment; filename=" filename) "content-type" "application/octet-stream"}] - {::yrs/status 200 - ::yrs/body body - ::yrs/headers headers})) + {::rres/status 200 + ::rres/body body + ::rres/headers headers})) (def sql:retrieve-range-of-changes "select revn, changes from file_change where file_id=? and revn >= ? and revn <= ? order by revn") @@ -107,8 +107,8 @@ (db/update! conn :file {:data data} {:id file-id}) - {::yrs/status 201 - ::yrs/body "OK CREATED"}))) + {::rres/status 201 + ::rres/body "OK CREATED"}))) :else (prepare-response (blob/decode data)))))) @@ -137,8 +137,8 @@ {:data data :deleted-at nil} {:id file-id}) - {::yrs/status 200 - ::yrs/body "OK UPDATED"}) + {::rres/status 200 + ::rres/body "OK UPDATED"}) (db/run! pool (fn [{:keys [::db/conn]}] (create-file conn {:id file-id @@ -148,15 +148,15 @@ (db/update! conn :file {:data data} {:id file-id}) - {::yrs/status 201 - ::yrs/body "OK CREATED"})))) + {::rres/status 201 + ::rres/body "OK CREATED"})))) - {::yrs/status 500 - ::yrs/body "ERROR"}))) + {::rres/status 500 + ::rres/body "ERROR"}))) (defn file-data-handler [cfg request] - (case (yrq/method request) + (case (rreq/method request) :get (retrieve-file-data cfg request) :post (upload-file-data cfg request) (ex/raise :type :http @@ -238,12 +238,12 @@ 1 (render-template-v1 report) 2 (render-template-v2 report) 3 (render-template-v3 report))] - {::yrs/status 200 - ::yrs/body result - ::yrs/headers {"content-type" "text/html; charset=utf-8" - "x-robots-tag" "noindex"}}) - {::yrs/status 404 - ::yrs/body "not found"}))) + {::rres/status 200 + ::rres/body result + ::rres/headers {"content-type" "text/html; charset=utf-8" + "x-robots-tag" "noindex"}}) + {::rres/status 404 + ::rres/body "not found"}))) (def sql:error-reports "SELECT id, created_at, @@ -256,11 +256,11 @@ [{:keys [::db/pool]} _request] (let [items (->> (db/exec! pool [sql:error-reports]) (map #(update % :created-at dt/format-instant :rfc1123)))] - {::yrs/status 200 - ::yrs/body (-> (io/resource "app/templates/error-list.tmpl") - (tmpl/render {:items items})) - ::yrs/headers {"content-type" "text/html; charset=utf-8" - "x-robots-tag" "noindex"}})) + {::rres/status 200 + ::rres/body (-> (io/resource "app/templates/error-list.tmpl") + (tmpl/render {:items items})) + ::rres/headers {"content-type" "text/html; charset=utf-8" + "x-robots-tag" "noindex"}})) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; EXPORT/IMPORT @@ -296,14 +296,14 @@ ::binf/profile-id profile-id ::binf/project-id project-id)) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "OK CLONED"}) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "OK CLONED"}) - {::yrs/status 200 - ::yrs/body (io/input-stream path) - ::yrs/headers {"content-type" "application/octet-stream" - "content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}})))) + {::rres/status 200 + ::rres/body (io/input-stream path) + ::rres/headers {"content-type" "application/octet-stream" + "content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}})))) @@ -334,9 +334,9 @@ ::binf/profile-id profile-id ::binf/project-id project-id)) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "OK"})) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "OK"})) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; ACTIONS @@ -363,34 +363,34 @@ (db/update! pool :profile {:is-blocked true} {:id (:id profile)}) (db/delete! pool :http-session {:profile-id (:id profile)}) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))}) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))}) (contains? params :unblock) (do (db/update! pool :profile {:is-blocked false} {:id (:id profile)}) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))}) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))}) (contains? params :resend) (if (:is-blocked profile) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "PROFILE ALREADY BLOCKED"} + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "PROFILE ALREADY BLOCKED"} (do (auth/send-email-verification! pool props profile) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "RESENDED FOR '%'" (:email profile))})) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "RESENDED FOR '%'" (:email profile))})) :else (do (db/update! pool :profile {:is-active true} {:id (:id profile)}) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))})))) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))})))) (defn- reset-file-data-version @@ -420,9 +420,9 @@ :migrate? false :inc-revn? false :save? true) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "OK"})) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "OK"})) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -434,13 +434,13 @@ [{:keys [::db/pool]} _] (try (db/exec-one! pool ["select count(*) as count from server_prop;"]) - {::yrs/status 200 - ::yrs/body "OK"} + {::rres/status 200 + ::rres/body "OK"} (catch Throwable cause (l/warn :hint "unable to execute query on health handler" :cause cause) - {::yrs/status 503 - ::yrs/body "KO"}))) + {::rres/status 503 + ::rres/body "KO"}))) (defn changelog-handler [_ _] @@ -449,11 +449,11 @@ (md->html [text] (md/md-to-html-string text :replacement-transformers (into [transform-emoji] mdt/transformer-vector)))] (if-let [clog (io/resource "changelog.md")] - {::yrs/status 200 - ::yrs/headers {"content-type" "text/html; charset=utf-8"} - ::yrs/body (-> clog slurp md->html)} - {::yrs/status 404 - ::yrs/body "NOT FOUND"}))) + {::rres/status 200 + ::rres/headers {"content-type" "text/html; charset=utf-8"} + ::rres/body (-> clog slurp md->html)} + {::rres/status 404 + ::rres/body "NOT FOUND"}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; INIT diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj index ce233b142..5bb14cc37 100644 --- a/backend/src/app/http/errors.clj +++ b/backend/src/app/http/errors.clj @@ -16,14 +16,14 @@ [app.http.session :as-alias session] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [ring.request :as rreq] + [ring.response :as rres])) (defn- parse-client-ip [request] - (or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) - (yrq/get-header request "x-real-ip") - (yrq/remote-addr request))) + (or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first) + (rreq/get-header request "x-real-ip") + (rreq/remote-addr request))) (defn request->context "Extracts error report relevant context data from request." @@ -34,10 +34,10 @@ {:request/path (:path request) :request/method (:method request) :request/params (:params request) - :request/user-agent (yrq/get-header request "user-agent") + :request/user-agent (rreq/get-header request "user-agent") :request/ip-addr (parse-client-ip request) :request/profile-id (:uid claims) - :version/frontend (or (yrq/get-header request "x-frontend-version") "unknown") + :version/frontend (or (rreq/get-header request "x-frontend-version") "unknown") :version/backend (:full cf/version)})) (defmulti handle-error @@ -50,30 +50,30 @@ (defmethod handle-error :authentication [err _ _] - {::yrs/status 401 - ::yrs/body (ex-data err)}) + {::rres/status 401 + ::rres/body (ex-data err)}) (defmethod handle-error :authorization [err _ _] - {::yrs/status 403 - ::yrs/body (ex-data err)}) + {::rres/status 403 + ::rres/body (ex-data err)}) (defmethod handle-error :restriction [err _ _] - {::yrs/status 400 - ::yrs/body (ex-data err)}) + {::rres/status 400 + ::rres/body (ex-data err)}) (defmethod handle-error :rate-limit [err _ _] (let [headers (-> err ex-data ::http/headers)] - {::yrs/status 429 - ::yrs/headers headers})) + {::rres/status 429 + ::rres/headers headers})) (defmethod handle-error :concurrency-limit [err _ _] (let [headers (-> err ex-data ::http/headers)] - {::yrs/status 429 - ::yrs/headers headers})) + {::rres/status 429 + ::rres/headers headers})) (defmethod handle-error :validation [err request parent-cause] @@ -81,38 +81,38 @@ (cond (= code :spec-validation) (let [explain (ex/explain data)] - {::yrs/status 400 - ::yrs/body (-> data - (dissoc ::s/problems ::s/value ::s/spec) - (cond-> explain (assoc :explain explain)))}) + {::rres/status 400 + ::rres/body (-> data + (dissoc ::s/problems ::s/value ::s/spec) + (cond-> explain (assoc :explain explain)))}) (= code :params-validation) (let [explain (::sm/explain data) explain (sm/humanize-data explain)] - {::yrs/status 400 - ::yrs/body (-> data - (dissoc ::sm/explain) - (assoc :explain explain))}) + {::rres/status 400 + ::rres/body (-> data + (dissoc ::sm/explain) + (assoc :explain explain))}) (= code :data-validation) (let [explain (::sm/explain data) explain (sm/humanize-data explain)] - {::yrs/status 400 - ::yrs/body (-> data - (dissoc ::sm/explain) - (assoc :explain explain))}) + {::rres/status 400 + ::rres/body (-> data + (dissoc ::sm/explain) + (assoc :explain explain))}) (= code :request-body-too-large) - {::yrs/status 413 ::yrs/body data} + {::rres/status 413 ::rres/body data} (= code :invalid-image) (binding [l/*context* (request->context request)] (let [cause (or parent-cause err)] (l/error :hint "unexpected error on processing image" :cause cause) - {::yrs/status 400 ::yrs/body data})) + {::rres/status 400 ::rres/body data})) :else - {::yrs/status 400 ::yrs/body data}))) + {::rres/status 400 ::rres/body data}))) (defmethod handle-error :assertion [error request parent-cause] @@ -123,46 +123,46 @@ (= code :data-validation) (let [explain (ex/explain data)] (l/error :hint "data assertion error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :assertion - :data (-> data - (dissoc ::sm/explain) - (cond-> explain (assoc :explain explain)))}}) + {::rres/status 500 + ::rres/body {:type :server-error + :code :assertion + :data (-> data + (dissoc ::sm/explain) + (cond-> explain (assoc :explain explain)))}}) (= code :spec-validation) (let [explain (ex/explain data)] (l/error :hint "spec assertion error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :assertion - :data (-> data - (dissoc ::s/problems ::s/value ::s/spec) - (cond-> explain (assoc :explain explain)))}}) + {::rres/status 500 + ::rres/body {:type :server-error + :code :assertion + :data (-> data + (dissoc ::s/problems ::s/value ::s/spec) + (cond-> explain (assoc :explain explain)))}}) :else (do (l/error :hint "assertion error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :assertion - :data data}}))))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :assertion + :data data}}))))) (defmethod handle-error :not-found [err _ _] - {::yrs/status 404 - ::yrs/body (ex-data err)}) + {::rres/status 404 + ::rres/body (ex-data err)}) (defmethod handle-error :internal [error request parent-cause] (binding [l/*context* (request->context request)] (let [cause (or parent-cause error)] (l/error :hint "internal error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unhandled - :hint (ex-message error) - :data (ex-data error)}}))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unhandled + :hint (ex-message error) + :data (ex-data error)}}))) (defmethod handle-error :default [error request parent-cause] @@ -186,23 +186,23 @@ :cause cause) (cond (= state "57014") - {::yrs/status 504 - ::yrs/body {:type :server-error - :code :statement-timeout - :hint (ex-message error)}} + {::rres/status 504 + ::rres/body {:type :server-error + :code :statement-timeout + :hint (ex-message error)}} (= state "25P03") - {::yrs/status 504 - ::yrs/body {:type :server-error - :code :idle-in-transaction-timeout - :hint (ex-message error)}} + {::rres/status 504 + ::rres/body {:type :server-error + :code :idle-in-transaction-timeout + :hint (ex-message error)}} :else - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unexpected - :hint (ex-message error) - :state state}})))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unexpected + :hint (ex-message error) + :state state}})))) (defmethod handle-exception :default [error request parent-cause] @@ -213,19 +213,19 @@ (nil? edata) (binding [l/*context* (request->context request)] (l/error :hint "unexpected error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unexpected - :hint (ex-message error)}}) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unexpected + :hint (ex-message error)}}) :else (binding [l/*context* (request->context request)] (l/error :hint "unhandled error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unhandled - :hint (ex-message error) - :data edata}})))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unhandled + :hint (ex-message error) + :data edata}})))) (defmethod handle-exception java.util.concurrent.CompletionException [cause request _] diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index f71f9da95..4ea815f07 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -12,13 +12,10 @@ [app.config :as cf] [app.util.json :as json] [cuerdas.core :as str] - [promesa.core :as p] - [promesa.exec :as px] - [promesa.util :as pu] + [ring.request :as rreq] + [ring.response :as rres] [yetti.adapter :as yt] - [yetti.middleware :as ymw] - [yetti.request :as yrq] - [yetti.response :as yrs]) + [yetti.middleware :as ymw]) (:import com.fasterxml.jackson.core.JsonParseException com.fasterxml.jackson.core.io.JsonEOFException @@ -46,17 +43,17 @@ (defn wrap-parse-request [handler] (letfn [(process-request [request] - (let [header (yrq/get-header request "content-type")] + (let [header (rreq/get-header request "content-type")] (cond (str/starts-with? header "application/transit+json") - (with-open [^InputStream is (yrq/body request)] + (with-open [^InputStream is (rreq/body request)] (let [params (t/read! (t/reader is))] (-> request (assoc :body-params params) (update :params merge params)))) (str/starts-with? header "application/json") - (with-open [^InputStream is (yrq/body request)] + (with-open [^InputStream is (rreq/body request)] (let [params (json/decode is json-mapper)] (-> request (assoc :body-params params) @@ -65,37 +62,36 @@ :else request))) - (handle-error [raise cause] + (handle-error [cause] (cond (instance? RuntimeException cause) (if-let [cause (ex-cause cause)] - (handle-error raise cause) - (raise cause)) + (handle-error cause) + (throw cause)) (instance? RequestTooBigException cause) - (raise (ex/error :type :validation - :code :request-body-too-large - :hint (ex-message cause))) - + (ex/raise :type :validation + :code :request-body-too-large + :hint (ex-message cause)) (or (instance? JsonEOFException cause) (instance? JsonParseException cause) (instance? MismatchedInputException cause)) - (raise (ex/error :type :validation - :code :malformed-json - :hint (ex-message cause) - :cause cause)) + (ex/raise :type :validation + :code :malformed-json + :hint (ex-message cause) + :cause cause) :else - (raise cause)))] + (throw cause)))] - (fn [request respond raise] - (if (= (yrq/method request) :post) + (fn [request] + (if (= (rreq/method request) :post) (let [request (ex/try! (process-request request))] (if (ex/exception? request) - (handle-error raise request) - (handler request respond raise))) - (handler request respond raise))))) + (handle-error request) + (handler request))) + (handler request))))) (def parse-request {:name ::parse-request @@ -113,7 +109,7 @@ (defn wrap-format-response [handler] (letfn [(transit-streamable-body [data opts] - (reify yrs/StreamableResponseBody + (reify rres/StreamableResponseBody (-write-body-to-stream [_ _ output-stream] (try (with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)] @@ -128,7 +124,7 @@ (.close ^OutputStream output-stream)))))) (json-streamable-body [data] - (reify yrs/StreamableResponseBody + (reify rres/StreamableResponseBody (-write-body-to-stream [_ _ output-stream] (try (with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)] @@ -143,24 +139,24 @@ (.close ^OutputStream output-stream)))))) (format-response-with-json [response _] - (let [body (::yrs/body response)] + (let [body (::rres/body response)] (if (or (boolean? body) (coll? body)) (-> response - (update ::yrs/headers assoc "content-type" "application/json") - (assoc ::yrs/body (json-streamable-body body))) + (update ::rres/headers assoc "content-type" "application/json") + (assoc ::rres/body (json-streamable-body body))) response))) (format-response-with-transit [response request] - (let [body (::yrs/body response)] + (let [body (::rres/body response)] (if (or (boolean? body) (coll? body)) - (let [qs (yrq/query request) + (let [qs (rreq/query request) opts (if (or (contains? cf/flags :transit-readable-response) (str/includes? qs "transit_verbose")) {:type :json-verbose} {:type :json})] (-> response - (update ::yrs/headers assoc "content-type" "application/transit+json") - (assoc ::yrs/body (transit-streamable-body body opts)))) + (update ::rres/headers assoc "content-type" "application/transit+json") + (assoc ::rres/body (transit-streamable-body body opts)))) response))) (format-from-params [{:keys [query-params] :as request}] @@ -169,7 +165,7 @@ (format-response [response request] (let [accept (or (format-from-params request) - (yrq/get-header request "accept"))] + (rreq/get-header request "accept"))] (cond (or (= accept "application/transit+json") (str/includes? accept "application/transit+json")) @@ -186,11 +182,9 @@ (cond-> response (map? response) (format-response request)))] - (fn [request respond raise] - (handler request - (fn [response] - (respond (process-response response request))) - raise)))) + (fn [request] + (let [response (handler request)] + (process-response response request))))) (def format-response {:name ::format-response @@ -198,12 +192,11 @@ (defn wrap-errors [handler on-error] - (fn [request respond raise] - (handler request respond (fn [cause] - (try - (respond (on-error cause request)) - (catch Throwable cause - (raise cause))))))) + (fn [request] + (try + (handler request) + (catch Throwable cause + (on-error cause request))))) (def errors {:name ::errors @@ -221,11 +214,11 @@ (defn wrap-cors [handler] (fn [request] - (let [response (if (= (yrq/method request) :options) - {::yrs/status 200} + (let [response (if (= (rreq/method request) :options) + {::rres/status 200} (handler request)) - origin (yrq/get-header request "origin")] - (update response ::yrs/headers with-cors-headers origin)))) + origin (rreq/get-header request "origin")] + (update response ::rres/headers with-cors-headers origin)))) (def cors {:name ::cors @@ -239,18 +232,8 @@ (fn [data _] (when-let [allowed (:allowed-methods data)] (fn [handler] - (fn [request respond raise] - (let [method (yrq/method request)] + (fn [request] + (let [method (rreq/method request)] (if (contains? allowed method) - (handler request respond raise) - (respond {::yrs/status 405})))))))}) - -(def with-dispatch - {:name ::with-dispatch - :compile - (fn [& _] - (fn [handler executor] - (let [executor (px/resolve-executor executor)] - (fn [request respond raise] - (->> (px/submit! executor (partial handler request)) - (p/fnly (pu/handler respond raise)))))))}) + (handler request) + {::rres/status 405}))))))}) diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj index 5c1081632..696cc6a3a 100644 --- a/backend/src/app/http/session.clj +++ b/backend/src/app/http/session.clj @@ -20,6 +20,7 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] + [ring.request :as rreq] [yetti.request :as yrq])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -142,7 +143,7 @@ (us/assert! ::us/uuid profile-id) (fn [request response] - (let [uagent (yrq/get-header request "user-agent") + (let [uagent (rreq/get-header request "user-agent") params {:profile-id profile-id :user-agent uagent :created-at (dt/now)} @@ -209,9 +210,8 @@ (l/trace :hint "exception on decoding malformed token" :cause cause) request)))] - (fn [request respond raise] - (let [request (handle-request request)] - (handler request respond raise))))) + (fn [request] + (handler (handle-request request))))) (defn- wrap-authz [handler {:keys [::manager]}] diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index bb29839a1..70a9ad97c 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -10,7 +10,7 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.pprint :as pp] - [app.common.spec :as us] + [app.common.schema :as sm] [app.common.uuid :as uuid] [app.db :as db] [app.http.session :as session] @@ -21,6 +21,7 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.exec.csp :as sp] + [ring.websocket :as rws] [yetti.websocket :as yws])) (def recv-labels @@ -277,19 +278,23 @@ :inc 1) message) - -(s/def ::session-id ::us/uuid) -(s/def ::handler-params - (s/keys :req-un [::session-id])) +(def ^:private schema:params + (sm/define + [:map {:title "params"} + [:session-id ::sm/uuid]])) (defn- http-handler [cfg {:keys [params ::session/profile-id] :as request}] - (let [{:keys [session-id]} (us/conform ::handler-params params)] + (let [{:keys [session-id]} (sm/conform! schema:params params)] (cond (not profile-id) (ex/raise :type :authentication :hint "Authentication required.") + ;; WORKAROUND: we use the adapter specific predicate for + ;; performance reasons; for now, the ring default impl for + ;; `upgrade-request?` parses all requests headers before perform + ;; any checking. (not (yws/upgrade-request? request)) (ex/raise :type :validation :code :websocket-request-expected @@ -298,14 +303,13 @@ :else (do (l/trace :hint "websocket request" :profile-id profile-id :session-id session-id) - (->> (ws/handler - ::ws/on-rcv-message (partial on-rcv-message cfg) - ::ws/on-snd-message (partial on-snd-message cfg) - ::ws/on-connect (partial on-connect cfg) - ::ws/handler (partial handle-message cfg) - ::profile-id profile-id - ::session-id session-id) - (yws/upgrade request)))))) + {::rws/listener (ws/listener request + ::ws/on-rcv-message (partial on-rcv-message cfg) + ::ws/on-snd-message (partial on-snd-message cfg) + ::ws/on-connect (partial on-connect cfg) + ::ws/handler (partial handle-message cfg) + ::profile-id profile-id + ::session-id session-id)})))) (defmethod ig/pre-init-spec ::routes [_] (s/keys :req [::mbus/msgbus @@ -318,5 +322,4 @@ (defmethod ig/init-key ::routes [_ cfg] ["/ws/notifications" {:middleware [[session/authz cfg]] - :handler (partial http-handler cfg) - :allowed-methods #{:get}}]) + :handler (partial http-handler cfg)}]) diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index df1ad5bf0..4171f52ab 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -33,7 +33,7 @@ [integrant.core :as ig] [lambdaisland.uri :as u] [promesa.exec :as px] - [yetti.request :as yrq])) + [ring.request :as rreq])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HELPERS @@ -41,9 +41,9 @@ (defn parse-client-ip [request] - (or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) - (yrq/get-header request "x-real-ip") - (some-> (yrq/remote-addr request) str))) + (or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first) + (rreq/get-header request "x-real-ip") + (some-> (rreq/remote-addr request) str))) (defn extract-utm-params "Extracts additional data from params and namespace them under diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index b8d9d8702..22c5109de 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -235,7 +235,6 @@ {::http/port (cf/get :http-server-port) ::http/host (cf/get :http-server-host) ::http/router (ig/ref ::http/router) - ::wrk/executor (ig/ref ::wrk/executor) ::http/io-threads (cf/get :http-server-io-threads) ::http/max-body-size (cf/get :http-server-max-body-size) ::http/max-multipart-body-size (cf/get :http-server-max-multipart-body-size)} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 201e83062..2ede5681f 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -34,8 +34,8 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [ring.request :as rreq] + [ring.response :as rres])) (s/def ::profile-id ::us/uuid) @@ -61,9 +61,9 @@ (if (fn? result) (result request) (let [mdata (meta result)] - (-> {::yrs/status (::http/status mdata 200) - ::yrs/headers (::http/headers mdata {}) - ::yrs/body (rph/unwrap result)} + (-> {::rres/status (::http/status mdata 200) + ::rres/headers (::http/headers mdata {}) + ::rres/body (rph/unwrap result)} (handle-response-transformation request mdata) (handle-before-comple-hook mdata))))) @@ -72,7 +72,7 @@ internal async flow into ring async flow." [methods {:keys [params path-params] :as request}] (let [type (keyword (:type path-params)) - etag (yrq/get-header request "if-none-match") + etag (rreq/get-header request "if-none-match") profile-id (or (::session/profile-id request) (::actoken/profile-id request)) @@ -138,6 +138,8 @@ (f cfg (us/conform spec params))) f))) +;; TODO: integrate with sm/define + (defn- wrap-params-validation [_ f mdata] (if-let [schema (::sm/params mdata)] diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index 7e0165ef8..8d0fdc9ea 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -44,8 +44,8 @@ [cuerdas.core :as str] [datoteka.io :as io] [promesa.util :as pu] - [yetti.adapter :as yt] - [yetti.response :as yrs]) + [ring.response :as rres] + [yetti.adapter :as yt]) (:import com.github.luben.zstd.ZstdInputStream com.github.luben.zstd.ZstdOutputStream @@ -1071,7 +1071,7 @@ ::webhooks/event? true} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}] (files/check-read-permissions! pool profile-id file-id) - (let [body (reify yrs/StreamableResponseBody + (let [body (reify rres/StreamableResponseBody (-write-body-to-stream [_ _ output-stream] (-> cfg (assoc ::file-ids [file-id]) @@ -1080,9 +1080,9 @@ (export! output-stream))))] (fn [_] - {::yrs/status 200 - ::yrs/body body - ::yrs/headers {"content-type" "application/octet-stream"}}))) + {::rres/status 200 + ::rres/body body + ::rres/headers {"content-type" "application/octet-stream"}}))) (s/def ::file ::media/upload) (s/def ::import-binfile diff --git a/backend/src/app/rpc/cond.clj b/backend/src/app/rpc/cond.clj index b683ded13..a7db513b8 100644 --- a/backend/src/app/rpc/cond.clj +++ b/backend/src/app/rpc/cond.clj @@ -29,7 +29,7 @@ [app.util.services :as-alias sv] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] - [yetti.response :as yrs])) + [ring.response :as-alias rres])) (def ^{:dynamic true @@ -57,7 +57,7 @@ (let [key' (when (or key reuse-key?) (some->> (get-object cfg params) (key-fn params) (fmt-key)))] (if (and (some? key) (= key key')) - (fn [_] {::yrs/status 304}) + (fn [_] {::rres/status 304}) (let [result (f cfg params) etag (or (and reuse-key? key') (some-> result meta ::key fmt-key) diff --git a/backend/src/app/rpc/doc.clj b/backend/src/app/rpc/doc.clj index 24451e553..326cd3727 100644 --- a/backend/src/app/rpc/doc.clj +++ b/backend/src/app/rpc/doc.clj @@ -27,7 +27,7 @@ [integrant.core :as ig] [malli.transform :as mt] [pretty-spec.core :as ps] - [yetti.response :as yrs])) + [ring.response :as-alias rres])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; DOC (human readable) @@ -86,11 +86,11 @@ (let [params (:query-params request) pstyle (:type params "js") context (assoc context :param-style pstyle)] - {::yrs/status 200 - ::yrs/body (-> (io/resource "app/templates/api-doc.tmpl") + {::rres/status 200 + ::rres/body (-> (io/resource "app/templates/api-doc.tmpl") (tmpl/render context))})) (fn [_] - {::yrs/status 404}))) + {::rres/status 404}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; OPENAPI / SWAGGER (v3.1) @@ -173,12 +173,12 @@ [context] (if (contains? cf/flags :backend-openapi-doc) (fn [_] - {::yrs/status 200 - ::yrs/headers {"content-type" "application/json; charset=utf-8"} - ::yrs/body (json/encode context)}) + {::rres/status 200 + ::rres/headers {"content-type" "application/json; charset=utf-8"} + ::rres/body (json/encode context)}) (fn [_] - {::yrs/status 404}))) + {::rres/status 404}))) (defn openapi-handler [] @@ -189,12 +189,12 @@ context {:public-uri (cf/get :public-uri) :swagger-js swagger-js :swagger-css swagger-cs}] - {::yrs/status 200 - ::yrs/headers {"content-type" "text/html"} - ::yrs/body (-> (io/resource "app/templates/openapi.tmpl") + {::rres/status 200 + ::rres/headers {"content-type" "text/html"} + ::rres/body (-> (io/resource "app/templates/openapi.tmpl") (tmpl/render context))})) (fn [_] - {::yrs/status 404}))) + {::rres/status 404}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MODULE INIT diff --git a/backend/src/app/rpc/helpers.clj b/backend/src/app/rpc/helpers.clj index 69d1a2d71..87b91f545 100644 --- a/backend/src/app/rpc/helpers.clj +++ b/backend/src/app/rpc/helpers.clj @@ -11,7 +11,7 @@ [app.common.data.macros :as dm] [app.http :as-alias http] [app.rpc :as-alias rpc] - [yetti.response :as-alias yrs])) + [ring.response :as-alias rres])) ;; A utilty wrapper object for wrap service responses that does not ;; implements the IObj interface that make possible attach metadata to @@ -77,4 +77,4 @@ (fn [_ response] (let [exp (if (integer? max-age) max-age (inst-ms max-age)) val (dm/fmt "max-age=%" (int (/ exp 1000.0)))] - (update response ::yrs/headers assoc "cache-control" val))))) + (update response ::rres/headers assoc "cache-control" val))))) diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index 1b8e16560..284ca4602 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -15,8 +15,8 @@ [app.util.time :as dt] [promesa.exec :as px] [promesa.exec.csp :as sp] - [yetti.request :as yr] - [yetti.util :as yu] + [ring.request :as rreq] + [ring.websocket :as rws] [yetti.websocket :as yws]) (:import java.nio.ByteBuffer)) @@ -50,7 +50,7 @@ (declare start-io-loop!) -(defn handler +(defn listener "A WebSocket upgrade handler factory. Returns a handler that can be used to upgrade to websocket connection. This handler implements the basic custom protocol on top of websocket connection with all the @@ -61,37 +61,34 @@ It also accepts some options that allows you parametrize the protocol behavior. The options map will be used as-as for the initial data of the `ws` data structure" - [& {:keys [::on-rcv-message - ::on-snd-message - ::on-connect - ::input-buff-size - ::output-buff-size - ::idle-timeout] - :or {input-buff-size 64 - output-buff-size 64 - idle-timeout 60000 - on-connect identity - on-snd-message identity-3 - on-rcv-message identity-3} - :as options}] + [request & {:keys [::on-rcv-message + ::on-snd-message + ::on-connect + ::input-buff-size + ::output-buff-size + ::idle-timeout] + :or {input-buff-size 64 + output-buff-size 64 + idle-timeout 60000 + on-connect identity + on-snd-message identity-3 + on-rcv-message identity-3} + :as options}] (assert (fn? on-rcv-message) "'on-rcv-message' should be a function") (assert (fn? on-snd-message) "'on-snd-message' should be a function") (assert (fn? on-connect) "'on-connect' should be a function") - (fn [{:keys [::yws/channel] :as request}] - (let [input-ch (sp/chan :buf input-buff-size) - output-ch (sp/chan :buf output-buff-size) - hbeat-ch (sp/chan :buf (sp/sliding-buffer 6)) - close-ch (sp/chan) - - ip-addr (parse-client-ip request) - uagent (yr/get-header request "user-agent") - id (uuid/next) - state (atom {}) - beats (atom #{}) - - options (-> options + (let [input-ch (sp/chan :buf input-buff-size) + output-ch (sp/chan :buf output-buff-size) + hbeat-ch (sp/chan :buf (sp/sliding-buffer 6)) + close-ch (sp/chan) + ip-addr (parse-client-ip request) + uagent (rreq/get-header request "user-agent") + id (uuid/next) + state (atom {}) + beats (atom #{}) + options (-> options (update ::handler wrap-handler) (assoc ::id id) (assoc ::state state) @@ -101,126 +98,118 @@ (assoc ::heartbeat-ch hbeat-ch) (assoc ::output-ch output-ch) (assoc ::close-ch close-ch) - (assoc ::channel channel) (assoc ::remote-addr ip-addr) - (assoc ::user-agent uagent) + (assoc ::user-agent uagent))] + + {:on-open + (fn on-open [channel] + (l/trace :fn "on-open" :conn-id id :channel channel) + (let [options (-> options + (assoc ::channel channel) (on-connect)) + timeout (dt/duration idle-timeout)] - on-ws-open - (fn [channel] - (l/trace :fn "on-ws-open" :conn-id id) - (let [timeout (dt/duration idle-timeout) - name (str "penpot/websocket/io-loop/" id)] - (yws/idle-timeout! channel timeout) - (px/fn->thread (partial start-io-loop! options) - {:name name :virtual true}))) + (yws/set-idle-timeout! channel timeout) + (px/submit! :vthread (partial start-io-loop! options)))) - on-ws-terminate - (fn [_ code reason] - (l/trace :fn "on-ws-terminate" - :conn-id id - :code code - :reason reason) - (sp/close! close-ch)) + :on-close + (fn on-close [_channel code reason] + (l/info :fn "on-ws-terminate" + :conn-id id + :code code + :reason reason) + (sp/close! close-ch)) - on-ws-error - (fn [_ cause] - (sp/close! close-ch cause)) + :on-error + (fn on-error [_channel cause] + (sp/close! close-ch cause)) - on-ws-message - (fn [_ message] - (sp/offer! input-ch message) - (swap! state assoc ::last-activity-at (dt/now))) + :on-message + (fn on-message [_channel message] + (when (string? message) + (sp/offer! input-ch message) + (swap! state assoc ::last-activity-at (dt/now)))) - on-ws-pong - (fn [_ buffers] - ;; (l/trace :fn "on-ws-pong" :buffers (pr-str buffers)) - (sp/put! hbeat-ch (yu/copy-many buffers)))] - - (yws/on-close! channel (fn [_] - (sp/close! close-ch))) - - {:on-open on-ws-open - :on-error on-ws-error - :on-close on-ws-terminate - :on-text on-ws-message - :on-pong on-ws-pong}))) + :on-pong + (fn on-pong [_channel data] + (l/trace :fn "on-pong" :data data) + (sp/put! hbeat-ch data))})) (defn- handle-ping! [{:keys [::id ::beats ::channel] :as wsp} beat-id] - (l/trace :hint "ping" :beat beat-id :conn-id id) - (yws/ping! channel (encode-beat beat-id)) + (l/trace :hint "send ping" :beat beat-id :conn-id id) + (rws/ping channel (encode-beat beat-id)) (let [issued (swap! beats conj (long beat-id))] (not (>= (count issued) max-missed-heartbeats)))) (defn- start-io-loop! [{:keys [::id ::close-ch ::input-ch ::output-ch ::heartbeat-ch ::channel ::handler ::beats ::on-rcv-message ::on-snd-message] :as wsp}] - (px/thread - {:name (str "penpot/websocket/io-loop/" id) - :virtual true} - (try - (handler wsp {:type :open}) - (loop [i 0] - (let [ping-ch (sp/timeout-chan heartbeat-interval) - [msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])] - (when (yws/connected? channel) - (cond - (identical? p ping-ch) - (if (handle-ping! wsp i) - (recur (inc i)) - (yws/close! channel 8802 "missing to many pings")) + (try + (handler wsp {:type :open}) + (loop [i 0] + (let [ping-ch (sp/timeout-chan heartbeat-interval) + [msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])] + (when (rws/open? channel) + (cond + (identical? p ping-ch) + (if (handle-ping! wsp i) + (recur (inc i)) + (rws/close channel 8802 "missing to many pings")) - (or (identical? p close-ch) (nil? msg)) - (do :nothing) + (or (identical? p close-ch) (nil? msg)) + (do :nothing) - (identical? p heartbeat-ch) - (let [beat (decode-beat msg)] - ;; (l/trace :hint "pong" :beat beat :conn-id id) - (swap! beats disj beat) - (recur i)) + (identical? p heartbeat-ch) + (let [beat (decode-beat msg)] + ;; (l/trace :hint "pong" :beat beat :conn-id id) + (swap! beats disj beat) + (recur i)) - (identical? p input-ch) - (let [message (t/decode-str msg) - message (on-rcv-message message) - {:keys [request-id] :as response} (handler wsp message)] - (when (map? response) - (sp/put! output-ch - (cond-> response - (some? request-id) - (assoc :request-id request-id)))) - (recur i)) + (identical? p input-ch) + (let [message (t/decode-str msg) + message (on-rcv-message message) + {:keys [request-id] :as response} (handler wsp message)] + (when (map? response) + (sp/put! output-ch + (cond-> response + (some? request-id) + (assoc :request-id request-id)))) + (recur i)) - (identical? p output-ch) - (let [message (on-snd-message msg) - message (t/encode-str message {:type :json-verbose})] - ;; (l/trace :hint "writing message to output" :message msg) - (yws/send! channel message) - (recur i)))))) + (identical? p output-ch) + (let [message (on-snd-message msg) + message (t/encode-str message {:type :json-verbose})] + ;; (l/trace :hint "writing message to output" :message msg) + (rws/send channel message) + (recur i)))))) - (catch java.nio.channels.ClosedChannelException _) - (catch java.net.SocketException _) - (catch java.io.IOException _) + (catch java.nio.channels.ClosedChannelException _) + (catch java.net.SocketException _) + (catch java.io.IOException _) - (catch InterruptedException _ - (l/debug :hint "websocket thread interrumpted" :conn-id id)) + (catch InterruptedException _cause + (l/debug :hint "websocket thread interrumpted" :conn-id id)) - (catch Throwable cause - (l/error :hint "unhandled exception on websocket thread" - :conn-id id - :cause cause)) - - (finally + (catch Throwable cause + (l/error :hint "unhandled exception on websocket thread" + :conn-id id + :cause cause)) + (finally + (try (handler wsp {:type :close}) - (when (yws/connected? channel) + (when (rws/open? channel) ;; NOTE: we need to ignore all exceptions here because ;; there can be a race condition that first returns that ;; channel is connected but on closing, will raise that ;; channel is already closed. (ex/ignoring - (yws/close! channel 8899 "terminated"))) + (rws/close channel 8899 "terminated"))) (when-let [on-disconnect (::on-disconnect wsp)] (on-disconnect)) - (l/trace :hint "websocket thread terminated" :conn-id id))))) + (catch Throwable cause + (throw cause))) + + (l/trace :hint "websocket thread terminated" :conn-id id)))) diff --git a/backend/test/backend_tests/http_middleware_access_token_test.clj b/backend/test/backend_tests/http_middleware_access_token_test.clj index ddc170355..0b658d853 100644 --- a/backend/test/backend_tests/http_middleware_access_token_test.clj +++ b/backend/test/backend_tests/http_middleware_access_token_test.clj @@ -31,17 +31,17 @@ request (volatile! nil) handler (#'app.http.access-token/wrap-soft-auth - (fn [req & _] (vreset! request req)) + (fn [req] (vreset! request req)) system)] (with-mocks [m1 {:target 'app.http.access-token/get-token :return nil}] - (handler {} nil nil) + (handler {}) (t/is (= {} @request))) (with-mocks [m1 {:target 'app.http.access-token/get-token :return (:token token)}] - (handler {} nil nil) + (handler {}) (let [token-id (get @request :app.http.access-token/id)] (t/is (= token-id (:id token)))))))) diff --git a/backend/test/backend_tests/rpc_audit_test.clj b/backend/test/backend_tests/rpc_audit_test.clj index 233728dac..7a7fc6bae 100644 --- a/backend/test/backend_tests/rpc_audit_test.clj +++ b/backend/test/backend_tests/rpc_audit_test.clj @@ -25,7 +25,7 @@ (def http-request (reify - yetti.request/Request + ring.request/Request (get-header [_ name] (case name "x-forwarded-for" "127.0.0.44")))) diff --git a/backend/test/backend_tests/rpc_cond_middleware_test.clj b/backend/test/backend_tests/rpc_cond_middleware_test.clj index c2ab68ad0..a19d85a0c 100644 --- a/backend/test/backend_tests/rpc_cond_middleware_test.clj +++ b/backend/test/backend_tests/rpc_cond_middleware_test.clj @@ -46,6 +46,6 @@ {:keys [error result]} (th/command! (assoc params ::cond/key etag))] (t/is (nil? error)) (t/is (fn? result)) - (t/is (= 304 (-> (result nil) :yetti.response/status)))) + (t/is (= 304 (-> (result nil) :ring.response/status)))) ))))