mirror of
synced 2025-03-23 05:01:23 -05:00
✨ Update yetti and adapt for ring-2.0
This commit is contained in:
23 changed files with 407 additions and 440 deletions
@ -21,8 +21,8 @@
java-http-clj/java-http-clj {:mvn/version "0.4.3"}
{:git/tag "v9.16"
:git/sha "7df3e08"
{:git/tag "v10.0"
:git/sha "520613f"
:git/url "https://github.com/funcool/yetti.git"
:exclusions [org.slf4j/slf4j-api]}
@ -31,7 +31,7 @@
<Logger name="app.rpc.rlimit" level="info" />
<Logger name="app.rpc.climit" level="info" />
<Logger name="app.rpc.mutations.files" level="info" />
<Logger name="app.common.files.migrations" level="debug" />
<Logger name="app.common.files.migrations" level="info" />
<Logger name="app.loggers" level="debug" additivity="false">
<AppenderRef ref="main" level="debug" />
@ -31,7 +31,7 @@
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[yetti.response :as-alias yrs]))
[ring.response :as-alias rres]))
@ -479,8 +479,8 @@
(defn- redirect-response
{::yrs/status 302
::yrs/headers {"location" (str uri)}})
{::rres/status 302
::rres/headers {"location" (str uri)}})
(defn- generate-error-redirect
[_ cause]
@ -557,8 +557,8 @@
:props props
:exp (dt/in-future "4h")})
uri (build-auth-uri cfg state)]
{::yrs/status 200
::yrs/body {:redirect-uri uri}}))
{::rres/status 200
::rres/body {:redirect-uri uri}}))
(defn- callback-handler
[cfg request]
@ -23,15 +23,14 @@
[app.metrics :as mtx]
[app.rpc :as-alias rpc]
[app.rpc.doc :as-alias rpc.doc]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.exec :as px]
[reitit.core :as r]
[reitit.middleware :as rr]
[yetti.adapter :as yt]
[yetti.request :as yrq]
[yetti.response :as-alias yrs]))
[ring.request :as rreq]
[ring.response :as-alias rres]
[yetti.adapter :as yt]))
(declare router-handler)
@ -63,8 +62,7 @@
(defmethod ig/init-key ::server
[_ {:keys [::handler ::router ::host ::port] :as cfg}]
@ -75,11 +73,9 @@
:http/max-multipart-body-size (::max-multipart-body-size cfg)
:xnio/io-threads (or (::io-threads cfg)
(max 3 (px/get-available-processors)))
:xnio/worker-threads (or (::worker-threads cfg)
(max 6 (px/get-available-processors)))
:xnio/dispatch true
:socket/backlog 4069
:ring/async true}
:xnio/dispatch :virtual
:ring/compat :ring2
:socket/backlog 4069}
handler (cond
(some? router)
@ -102,13 +98,13 @@
(yt/stop! server))
(defn- not-found-handler
[_ respond _]
(respond {::yrs/status 404}))
{::rres/status 404})
(defn- router-handler
(letfn [(resolve-handler [request]
(if-let [match (r/match-by-path router (yrq/path request))]
(if-let [match (r/match-by-path router (rreq/path request))]
(let [params (:path-params match)
result (:result match)
handler (or (:handler result) not-found-handler)
@ -120,18 +116,15 @@
(let [{:keys [body] :as response} (errors/handle cause request)]
(cond-> response
(map? body)
(-> (update ::yrs/headers assoc "content-type" "application/transit+json")
(assoc ::yrs/body (t/encode-str body {:type :json-verbose}))))))]
(-> (update ::rres/headers assoc "content-type" "application/transit+json")
(assoc ::rres/body (t/encode-str body {:type :json-verbose}))))))]
(fn [request respond _]
(let [handler (resolve-handler request)
exchange (yrq/exchange request)]
(fn [response]
(yt/dispatch! exchange (partial respond response)))
(fn [cause]
(let [response (on-error cause request)]
(yt/dispatch! exchange (partial respond response)))))))))
(fn [request]
(let [handler (resolve-handler request)]
(catch Throwable cause
(on-error cause request)))))))
@ -160,8 +153,7 @@
[session/soft-auth cfg]
[actoken/soft-auth cfg]
[mw/errors errors/handle]
[mw/with-dispatch :vthread]]}
(::mtx/routes cfg)
(::assets/routes cfg)
@ -11,13 +11,13 @@
[app.db :as db]
[app.main :as-alias main]
[app.tokens :as tokens]
[yetti.request :as yrq]))
[ring.request :as rreq]))
(def header-re #"^Token\s+(.*)")
(defn- get-token
(some->> (yrq/get-header request "authorization")
(some->> (rreq/get-header request "authorization")
(re-matches header-re)
@ -30,7 +30,7 @@
"SELECT perms, profile_id, expires_at
FROM access_token
WHERE id = ?
AND (expires_at IS NULL
AND (expires_at IS NULL
OR (expires_at > now()));")
(defn- get-token-data
@ -54,9 +54,8 @@
(l/trace :hint "exception on decoding malformed token" :cause cause)
(fn [request respond raise]
(let [request (handle-request request)]
(handler request respond raise)))))
(fn [request]
(handler (handle-request request)))))
(defn- wrap-authz
"Authorization middleware, will be executed synchronously on vthread."
@ -16,7 +16,7 @@
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[yetti.response :as-alias yrs]))
[ring.response :as-alias rres]))
(def ^:private cache-max-age
(dt/duration {:hours 24}))
@ -37,8 +37,8 @@
(defn- serve-object-from-s3
[{:keys [::sto/storage] :as cfg} obj]
(let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})]
{::yrs/status 307
::yrs/headers {"location" (str url)
{::rres/status 307
::rres/headers {"location" (str url)
"x-host" (cond-> host port (str ":" port))
"x-mtype" (-> obj meta :content-type)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}}))
@ -51,8 +51,8 @@
headers {"x-accel-redirect" (:path purl)
"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}]
{::yrs/status 204
::yrs/headers headers}))
{::rres/status 204
::rres/headers headers}))
(defn- serve-object
"Helper function that returns the appropriate response depending on
@ -70,7 +70,7 @@
obj (sto/get-object storage id)]
(if obj
(serve-object cfg obj)
{::yrs/status 404})))
{::rres/status 404})))
(defn- generic-handler
"A generic handler helper/common code for file-media based handlers."
@ -81,7 +81,7 @@
sobj (sto/get-object storage (kf mobj))]
(if sobj
(serve-object cfg sobj)
{::yrs/status 404})))
{::rres/status 404})))
(defn file-objects-handler
"Handler that serves storage objects by file media id."
@ -20,8 +20,8 @@
[integrant.core :as ig]
[jsonista.core :as j]
[promesa.exec :as px]
[yetti.request :as yrq]
[yetti.response :as-alias yrs]))
[ring.request :as rreq]
[ring.response :as-alias rres]))
(declare parse-json)
(declare handle-request)
@ -37,9 +37,9 @@
(defmethod ig/init-key ::routes
[_ {:keys [::wrk/executor] :as cfg}]
(letfn [(handler [request]
(let [data (-> request yrq/body slurp)]
(let [data (-> request rreq/body slurp)]
(px/run! executor #(handle-request cfg data)))
{::yrs/status 200})]
{::rres/status 200})]
["/sns" {:handler handler
:allowed-methods #{:post}}]))
@ -32,8 +32,8 @@
[integrant.core :as ig]
[markdown.core :as md]
[markdown.transformers :as mdt]
[yetti.request :as yrq]
[yetti.response :as yrs]))
[ring.request :as rreq]
[ring.response :as rres]))
;; (selmer.parser/cache-off!)
@ -43,10 +43,10 @@
(defn index-handler
[_cfg _request]
{::yrs/status 200
::yrs/headers {"content-type" "text/html"}
::yrs/body (-> (io/resource "app/templates/debug.tmpl")
(tmpl/render {}))})
{::rres/status 200
::rres/headers {"content-type" "text/html"}
::rres/body (-> (io/resource "app/templates/debug.tmpl")
(tmpl/render {}))})
@ -55,17 +55,17 @@
(defn prepare-response
(let [headers {"content-type" "application/transit+json"}]
{::yrs/status 200
::yrs/body body
::yrs/headers headers}))
{::rres/status 200
::rres/body body
::rres/headers headers}))
(defn prepare-download-response
[body filename]
(let [headers {"content-disposition" (str "attachment; filename=" filename)
"content-type" "application/octet-stream"}]
{::yrs/status 200
::yrs/body body
::yrs/headers headers}))
{::rres/status 200
::rres/body body
::rres/headers headers}))
(def sql:retrieve-range-of-changes
"select revn, changes from file_change where file_id=? and revn >= ? and revn <= ? order by revn")
@ -107,8 +107,8 @@
(db/update! conn :file
{:data data}
{:id file-id})
{::yrs/status 201
::yrs/body "OK CREATED"})))
{::rres/status 201
::rres/body "OK CREATED"})))
(prepare-response (blob/decode data))))))
@ -137,8 +137,8 @@
{:data data
:deleted-at nil}
{:id file-id})
{::yrs/status 200
::yrs/body "OK UPDATED"})
{::rres/status 200
::rres/body "OK UPDATED"})
(db/run! pool (fn [{:keys [::db/conn]}]
(create-file conn {:id file-id
@ -148,15 +148,15 @@
(db/update! conn :file
{:data data}
{:id file-id})
{::yrs/status 201
::yrs/body "OK CREATED"}))))
{::rres/status 201
::rres/body "OK CREATED"}))))
{::yrs/status 500
::yrs/body "ERROR"})))
{::rres/status 500
::rres/body "ERROR"})))
(defn file-data-handler
[cfg request]
(case (yrq/method request)
(case (rreq/method request)
:get (retrieve-file-data cfg request)
:post (upload-file-data cfg request)
(ex/raise :type :http
@ -238,12 +238,12 @@
1 (render-template-v1 report)
2 (render-template-v2 report)
3 (render-template-v3 report))]
{::yrs/status 200
::yrs/body result
::yrs/headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}})
{::yrs/status 404
::yrs/body "not found"})))
{::rres/status 200
::rres/body result
::rres/headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}})
{::rres/status 404
::rres/body "not found"})))
(def sql:error-reports
"SELECT id, created_at,
@ -256,11 +256,11 @@
[{:keys [::db/pool]} _request]
(let [items (->> (db/exec! pool [sql:error-reports])
(map #(update % :created-at dt/format-instant :rfc1123)))]
{::yrs/status 200
::yrs/body (-> (io/resource "app/templates/error-list.tmpl")
(tmpl/render {:items items}))
::yrs/headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}}))
{::rres/status 200
::rres/body (-> (io/resource "app/templates/error-list.tmpl")
(tmpl/render {:items items}))
::rres/headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}}))
@ -296,14 +296,14 @@
::binf/profile-id profile-id
::binf/project-id project-id))
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
::yrs/body "OK CLONED"})
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
::rres/body "OK CLONED"})
{::yrs/status 200
::yrs/body (io/input-stream path)
::yrs/headers {"content-type" "application/octet-stream"
"content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}}))))
{::rres/status 200
::rres/body (io/input-stream path)
::rres/headers {"content-type" "application/octet-stream"
"content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}}))))
@ -334,9 +334,9 @@
::binf/profile-id profile-id
::binf/project-id project-id))
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
::yrs/body "OK"}))
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
::rres/body "OK"}))
@ -363,34 +363,34 @@
(db/update! pool :profile {:is-blocked true} {:id (:id profile)})
(db/delete! pool :http-session {:profile-id (:id profile)})
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))})
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
::rres/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))})
(contains? params :unblock)
(db/update! pool :profile {:is-blocked false} {:id (:id profile)})
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))})
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
::rres/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))})
(contains? params :resend)
(if (:is-blocked profile)
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
(auth/send-email-verification! pool props profile)
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "RESENDED FOR '%'" (:email profile))}))
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
::rres/body (str/ffmt "RESENDED FOR '%'" (:email profile))}))
(db/update! pool :profile {:is-active true} {:id (:id profile)})
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))}))))
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
::rres/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))}))))
(defn- reset-file-data-version
@ -420,9 +420,9 @@
:migrate? false
:inc-revn? false
:save? true)
{::yrs/status 200
::yrs/headers {"content-type" "text/plain"}
::yrs/body "OK"}))
{::rres/status 200
::rres/headers {"content-type" "text/plain"}
::rres/body "OK"}))
@ -434,13 +434,13 @@
[{:keys [::db/pool]} _]
(db/exec-one! pool ["select count(*) as count from server_prop;"])
{::yrs/status 200
::yrs/body "OK"}
{::rres/status 200
::rres/body "OK"}
(catch Throwable cause
(l/warn :hint "unable to execute query on health handler"
:cause cause)
{::yrs/status 503
::yrs/body "KO"})))
{::rres/status 503
::rres/body "KO"})))
(defn changelog-handler
[_ _]
@ -449,11 +449,11 @@
(md->html [text]
(md/md-to-html-string text :replacement-transformers (into [transform-emoji] mdt/transformer-vector)))]
(if-let [clog (io/resource "changelog.md")]
{::yrs/status 200
::yrs/headers {"content-type" "text/html; charset=utf-8"}
::yrs/body (-> clog slurp md->html)}
{::yrs/status 404
::yrs/body "NOT FOUND"})))
{::rres/status 200
::rres/headers {"content-type" "text/html; charset=utf-8"}
::rres/body (-> clog slurp md->html)}
{::rres/status 404
::rres/body "NOT FOUND"})))
@ -16,14 +16,14 @@
[app.http.session :as-alias session]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[yetti.request :as yrq]
[yetti.response :as yrs]))
[ring.request :as rreq]
[ring.response :as rres]))
(defn- parse-client-ip
(or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first)
(yrq/get-header request "x-real-ip")
(yrq/remote-addr request)))
(or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first)
(rreq/get-header request "x-real-ip")
(rreq/remote-addr request)))
(defn request->context
"Extracts error report relevant context data from request."
@ -34,10 +34,10 @@
{:request/path (:path request)
:request/method (:method request)
:request/params (:params request)
:request/user-agent (yrq/get-header request "user-agent")
:request/user-agent (rreq/get-header request "user-agent")
:request/ip-addr (parse-client-ip request)
:request/profile-id (:uid claims)
:version/frontend (or (yrq/get-header request "x-frontend-version") "unknown")
:version/frontend (or (rreq/get-header request "x-frontend-version") "unknown")
:version/backend (:full cf/version)}))
(defmulti handle-error
@ -50,30 +50,30 @@
(defmethod handle-error :authentication
[err _ _]
{::yrs/status 401
::yrs/body (ex-data err)})
{::rres/status 401
::rres/body (ex-data err)})
(defmethod handle-error :authorization
[err _ _]
{::yrs/status 403
::yrs/body (ex-data err)})
{::rres/status 403
::rres/body (ex-data err)})
(defmethod handle-error :restriction
[err _ _]
{::yrs/status 400
::yrs/body (ex-data err)})
{::rres/status 400
::rres/body (ex-data err)})
(defmethod handle-error :rate-limit
[err _ _]
(let [headers (-> err ex-data ::http/headers)]
{::yrs/status 429
::yrs/headers headers}))
{::rres/status 429
::rres/headers headers}))
(defmethod handle-error :concurrency-limit
[err _ _]
(let [headers (-> err ex-data ::http/headers)]
{::yrs/status 429
::yrs/headers headers}))
{::rres/status 429
::rres/headers headers}))
(defmethod handle-error :validation
[err request parent-cause]
@ -81,38 +81,38 @@
(= code :spec-validation)
(let [explain (ex/explain data)]
{::yrs/status 400
::yrs/body (-> data
(dissoc ::s/problems ::s/value ::s/spec)
(cond-> explain (assoc :explain explain)))})
{::rres/status 400
::rres/body (-> data
(dissoc ::s/problems ::s/value ::s/spec)
(cond-> explain (assoc :explain explain)))})
(= code :params-validation)
(let [explain (::sm/explain data)
explain (sm/humanize-data explain)]
{::yrs/status 400
::yrs/body (-> data
(dissoc ::sm/explain)
(assoc :explain explain))})
{::rres/status 400
::rres/body (-> data
(dissoc ::sm/explain)
(assoc :explain explain))})
(= code :data-validation)
(let [explain (::sm/explain data)
explain (sm/humanize-data explain)]
{::yrs/status 400
::yrs/body (-> data
(dissoc ::sm/explain)
(assoc :explain explain))})
{::rres/status 400
::rres/body (-> data
(dissoc ::sm/explain)
(assoc :explain explain))})
(= code :request-body-too-large)
{::yrs/status 413 ::yrs/body data}
{::rres/status 413 ::rres/body data}
(= code :invalid-image)
(binding [l/*context* (request->context request)]
(let [cause (or parent-cause err)]
(l/error :hint "unexpected error on processing image" :cause cause)
{::yrs/status 400 ::yrs/body data}))
{::rres/status 400 ::rres/body data}))
{::yrs/status 400 ::yrs/body data})))
{::rres/status 400 ::rres/body data})))
(defmethod handle-error :assertion
[error request parent-cause]
@ -123,46 +123,46 @@
(= code :data-validation)
(let [explain (ex/explain data)]
(l/error :hint "data assertion error" :cause cause)
{::yrs/status 500
::yrs/body {:type :server-error
:code :assertion
:data (-> data
(dissoc ::sm/explain)
(cond-> explain (assoc :explain explain)))}})
{::rres/status 500
::rres/body {:type :server-error
:code :assertion
:data (-> data
(dissoc ::sm/explain)
(cond-> explain (assoc :explain explain)))}})
(= code :spec-validation)
(let [explain (ex/explain data)]
(l/error :hint "spec assertion error" :cause cause)
{::yrs/status 500
::yrs/body {:type :server-error
:code :assertion
:data (-> data
(dissoc ::s/problems ::s/value ::s/spec)
(cond-> explain (assoc :explain explain)))}})
{::rres/status 500
::rres/body {:type :server-error
:code :assertion
:data (-> data
(dissoc ::s/problems ::s/value ::s/spec)
(cond-> explain (assoc :explain explain)))}})
(l/error :hint "assertion error" :cause cause)
{::yrs/status 500
::yrs/body {:type :server-error
:code :assertion
:data data}})))))
{::rres/status 500
::rres/body {:type :server-error
:code :assertion
:data data}})))))
(defmethod handle-error :not-found
[err _ _]
{::yrs/status 404
::yrs/body (ex-data err)})
{::rres/status 404
::rres/body (ex-data err)})
(defmethod handle-error :internal
[error request parent-cause]
(binding [l/*context* (request->context request)]
(let [cause (or parent-cause error)]
(l/error :hint "internal error" :cause cause)
{::yrs/status 500
::yrs/body {:type :server-error
:code :unhandled
:hint (ex-message error)
:data (ex-data error)}})))
{::rres/status 500
::rres/body {:type :server-error
:code :unhandled
:hint (ex-message error)
:data (ex-data error)}})))
(defmethod handle-error :default
[error request parent-cause]
@ -186,23 +186,23 @@
:cause cause)
(= state "57014")
{::yrs/status 504
::yrs/body {:type :server-error
:code :statement-timeout
:hint (ex-message error)}}
{::rres/status 504
::rres/body {:type :server-error
:code :statement-timeout
:hint (ex-message error)}}
(= state "25P03")
{::yrs/status 504
::yrs/body {:type :server-error
:code :idle-in-transaction-timeout
:hint (ex-message error)}}
{::rres/status 504
::rres/body {:type :server-error
:code :idle-in-transaction-timeout
:hint (ex-message error)}}
{::yrs/status 500
::yrs/body {:type :server-error
:code :unexpected
:hint (ex-message error)
:state state}}))))
{::rres/status 500
::rres/body {:type :server-error
:code :unexpected
:hint (ex-message error)
:state state}}))))
(defmethod handle-exception :default
[error request parent-cause]
@ -213,19 +213,19 @@
(nil? edata)
(binding [l/*context* (request->context request)]
(l/error :hint "unexpected error" :cause cause)
{::yrs/status 500
::yrs/body {:type :server-error
:code :unexpected
:hint (ex-message error)}})
{::rres/status 500
::rres/body {:type :server-error
:code :unexpected
:hint (ex-message error)}})
(binding [l/*context* (request->context request)]
(l/error :hint "unhandled error" :cause cause)
{::yrs/status 500
::yrs/body {:type :server-error
:code :unhandled
:hint (ex-message error)
:data edata}}))))
{::rres/status 500
::rres/body {:type :server-error
:code :unhandled
:hint (ex-message error)
:data edata}}))))
(defmethod handle-exception java.util.concurrent.CompletionException
[cause request _]
@ -12,13 +12,10 @@
[app.config :as cf]
[app.util.json :as json]
[cuerdas.core :as str]
[promesa.core :as p]
[promesa.exec :as px]
[promesa.util :as pu]
[ring.request :as rreq]
[ring.response :as rres]
[yetti.adapter :as yt]
[yetti.middleware :as ymw]
[yetti.request :as yrq]
[yetti.response :as yrs])
[yetti.middleware :as ymw])
@ -46,17 +43,17 @@
(defn wrap-parse-request
(letfn [(process-request [request]
(let [header (yrq/get-header request "content-type")]
(let [header (rreq/get-header request "content-type")]
(str/starts-with? header "application/transit+json")
(with-open [^InputStream is (yrq/body request)]
(with-open [^InputStream is (rreq/body request)]
(let [params (t/read! (t/reader is))]
(-> request
(assoc :body-params params)
(update :params merge params))))
(str/starts-with? header "application/json")
(with-open [^InputStream is (yrq/body request)]
(with-open [^InputStream is (rreq/body request)]
(let [params (json/decode is json-mapper)]
(-> request
(assoc :body-params params)
@ -65,37 +62,36 @@
(handle-error [raise cause]
(handle-error [cause]
(instance? RuntimeException cause)
(if-let [cause (ex-cause cause)]
(handle-error raise cause)
(raise cause))
(handle-error cause)
(throw cause))
(instance? RequestTooBigException cause)
(raise (ex/error :type :validation
:code :request-body-too-large
:hint (ex-message cause)))
(ex/raise :type :validation
:code :request-body-too-large
:hint (ex-message cause))
(or (instance? JsonEOFException cause)
(instance? JsonParseException cause)
(instance? MismatchedInputException cause))
(raise (ex/error :type :validation
:code :malformed-json
:hint (ex-message cause)
:cause cause))
(ex/raise :type :validation
:code :malformed-json
:hint (ex-message cause)
:cause cause)
(raise cause)))]
(throw cause)))]
(fn [request respond raise]
(if (= (yrq/method request) :post)
(fn [request]
(if (= (rreq/method request) :post)
(let [request (ex/try! (process-request request))]
(if (ex/exception? request)
(handle-error raise request)
(handler request respond raise)))
(handler request respond raise)))))
(handle-error request)
(handler request)))
(handler request)))))
(def parse-request
{:name ::parse-request
@ -113,7 +109,7 @@
(defn wrap-format-response
(letfn [(transit-streamable-body [data opts]
(reify yrs/StreamableResponseBody
(reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
(with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)]
@ -128,7 +124,7 @@
(.close ^OutputStream output-stream))))))
(json-streamable-body [data]
(reify yrs/StreamableResponseBody
(reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
(with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)]
@ -143,24 +139,24 @@
(.close ^OutputStream output-stream))))))
(format-response-with-json [response _]
(let [body (::yrs/body response)]
(let [body (::rres/body response)]
(if (or (boolean? body) (coll? body))
(-> response
(update ::yrs/headers assoc "content-type" "application/json")
(assoc ::yrs/body (json-streamable-body body)))
(update ::rres/headers assoc "content-type" "application/json")
(assoc ::rres/body (json-streamable-body body)))
(format-response-with-transit [response request]
(let [body (::yrs/body response)]
(let [body (::rres/body response)]
(if (or (boolean? body) (coll? body))
(let [qs (yrq/query request)
(let [qs (rreq/query request)
opts (if (or (contains? cf/flags :transit-readable-response)
(str/includes? qs "transit_verbose"))
{:type :json-verbose}
{:type :json})]
(-> response
(update ::yrs/headers assoc "content-type" "application/transit+json")
(assoc ::yrs/body (transit-streamable-body body opts))))
(update ::rres/headers assoc "content-type" "application/transit+json")
(assoc ::rres/body (transit-streamable-body body opts))))
(format-from-params [{:keys [query-params] :as request}]
@ -169,7 +165,7 @@
(format-response [response request]
(let [accept (or (format-from-params request)
(yrq/get-header request "accept"))]
(rreq/get-header request "accept"))]
(or (= accept "application/transit+json")
(str/includes? accept "application/transit+json"))
@ -186,11 +182,9 @@
(cond-> response
(map? response) (format-response request)))]
(fn [request respond raise]
(handler request
(fn [response]
(respond (process-response response request)))
(fn [request]
(let [response (handler request)]
(process-response response request)))))
(def format-response
{:name ::format-response
@ -198,12 +192,11 @@
(defn wrap-errors
[handler on-error]
(fn [request respond raise]
(handler request respond (fn [cause]
(respond (on-error cause request))
(catch Throwable cause
(raise cause)))))))
(fn [request]
(handler request)
(catch Throwable cause
(on-error cause request)))))
(def errors
{:name ::errors
@ -221,11 +214,11 @@
(defn wrap-cors
(fn [request]
(let [response (if (= (yrq/method request) :options)
{::yrs/status 200}
(let [response (if (= (rreq/method request) :options)
{::rres/status 200}
(handler request))
origin (yrq/get-header request "origin")]
(update response ::yrs/headers with-cors-headers origin))))
origin (rreq/get-header request "origin")]
(update response ::rres/headers with-cors-headers origin))))
(def cors
{:name ::cors
@ -239,18 +232,8 @@
(fn [data _]
(when-let [allowed (:allowed-methods data)]
(fn [handler]
(fn [request respond raise]
(let [method (yrq/method request)]
(fn [request]
(let [method (rreq/method request)]
(if (contains? allowed method)
(handler request respond raise)
(respond {::yrs/status 405})))))))})
(def with-dispatch
{:name ::with-dispatch
(fn [& _]
(fn [handler executor]
(let [executor (px/resolve-executor executor)]
(fn [request respond raise]
(->> (px/submit! executor (partial handler request))
(p/fnly (pu/handler respond raise)))))))})
(handler request)
{::rres/status 405}))))))})
@ -20,6 +20,7 @@
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[ring.request :as rreq]
[yetti.request :as yrq]))
@ -142,7 +143,7 @@
(us/assert! ::us/uuid profile-id)
(fn [request response]
(let [uagent (yrq/get-header request "user-agent")
(let [uagent (rreq/get-header request "user-agent")
params {:profile-id profile-id
:user-agent uagent
:created-at (dt/now)}
@ -209,9 +210,8 @@
(l/trace :hint "exception on decoding malformed token" :cause cause)
(fn [request respond raise]
(let [request (handle-request request)]
(handler request respond raise)))))
(fn [request]
(handler (handle-request request)))))
(defn- wrap-authz
[handler {:keys [::manager]}]
@ -10,7 +10,7 @@
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.pprint :as pp]
[app.common.spec :as us]
[app.common.schema :as sm]
[app.common.uuid :as uuid]
[app.db :as db]
[app.http.session :as session]
@ -21,6 +21,7 @@
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.exec.csp :as sp]
[ring.websocket :as rws]
[yetti.websocket :as yws]))
(def recv-labels
@ -277,19 +278,23 @@
:inc 1)
(s/def ::session-id ::us/uuid)
(s/def ::handler-params
(s/keys :req-un [::session-id]))
(def ^:private schema:params
[:map {:title "params"}
[:session-id ::sm/uuid]]))
(defn- http-handler
[cfg {:keys [params ::session/profile-id] :as request}]
(let [{:keys [session-id]} (us/conform ::handler-params params)]
(let [{:keys [session-id]} (sm/conform! schema:params params)]
(not profile-id)
(ex/raise :type :authentication
:hint "Authentication required.")
;; WORKAROUND: we use the adapter specific predicate for
;; performance reasons; for now, the ring default impl for
;; `upgrade-request?` parses all requests headers before perform
;; any checking.
(not (yws/upgrade-request? request))
(ex/raise :type :validation
:code :websocket-request-expected
@ -298,14 +303,13 @@
(l/trace :hint "websocket request" :profile-id profile-id :session-id session-id)
(->> (ws/handler
::ws/on-rcv-message (partial on-rcv-message cfg)
::ws/on-snd-message (partial on-snd-message cfg)
::ws/on-connect (partial on-connect cfg)
::ws/handler (partial handle-message cfg)
::profile-id profile-id
::session-id session-id)
(yws/upgrade request))))))
{::rws/listener (ws/listener request
::ws/on-rcv-message (partial on-rcv-message cfg)
::ws/on-snd-message (partial on-snd-message cfg)
::ws/on-connect (partial on-connect cfg)
::ws/handler (partial handle-message cfg)
::profile-id profile-id
::session-id session-id)}))))
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req [::mbus/msgbus
@ -318,5 +322,4 @@
(defmethod ig/init-key ::routes
[_ cfg]
["/ws/notifications" {:middleware [[session/authz cfg]]
:handler (partial http-handler cfg)
:allowed-methods #{:get}}])
:handler (partial http-handler cfg)}])
@ -33,7 +33,7 @@
[integrant.core :as ig]
[lambdaisland.uri :as u]
[promesa.exec :as px]
[yetti.request :as yrq]))
[ring.request :as rreq]))
@ -41,9 +41,9 @@
(defn parse-client-ip
(or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first)
(yrq/get-header request "x-real-ip")
(some-> (yrq/remote-addr request) str)))
(or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first)
(rreq/get-header request "x-real-ip")
(some-> (rreq/remote-addr request) str)))
(defn extract-utm-params
"Extracts additional data from params and namespace them under
@ -235,7 +235,6 @@
{::http/port (cf/get :http-server-port)
::http/host (cf/get :http-server-host)
::http/router (ig/ref ::http/router)
::wrk/executor (ig/ref ::wrk/executor)
::http/io-threads (cf/get :http-server-io-threads)
::http/max-body-size (cf/get :http-server-max-body-size)
::http/max-multipart-body-size (cf/get :http-server-max-multipart-body-size)}
@ -34,8 +34,8 @@
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]
[yetti.request :as yrq]
[yetti.response :as yrs]))
[ring.request :as rreq]
[ring.response :as rres]))
(s/def ::profile-id ::us/uuid)
@ -61,9 +61,9 @@
(if (fn? result)
(result request)
(let [mdata (meta result)]
(-> {::yrs/status (::http/status mdata 200)
::yrs/headers (::http/headers mdata {})
::yrs/body (rph/unwrap result)}
(-> {::rres/status (::http/status mdata 200)
::rres/headers (::http/headers mdata {})
::rres/body (rph/unwrap result)}
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata)))))
@ -72,7 +72,7 @@
internal async flow into ring async flow."
[methods {:keys [params path-params] :as request}]
(let [type (keyword (:type path-params))
etag (yrq/get-header request "if-none-match")
etag (rreq/get-header request "if-none-match")
profile-id (or (::session/profile-id request)
(::actoken/profile-id request))
@ -138,6 +138,8 @@
(f cfg (us/conform spec params)))
;; TODO: integrate with sm/define
(defn- wrap-params-validation
[_ f mdata]
(if-let [schema (::sm/params mdata)]
@ -44,8 +44,8 @@
[cuerdas.core :as str]
[datoteka.io :as io]
[promesa.util :as pu]
[yetti.adapter :as yt]
[yetti.response :as yrs])
[ring.response :as rres]
[yetti.adapter :as yt])
@ -1071,7 +1071,7 @@
::webhooks/event? true}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}]
(files/check-read-permissions! pool profile-id file-id)
(let [body (reify yrs/StreamableResponseBody
(let [body (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
(-> cfg
(assoc ::file-ids [file-id])
@ -1080,9 +1080,9 @@
(export! output-stream))))]
(fn [_]
{::yrs/status 200
::yrs/body body
::yrs/headers {"content-type" "application/octet-stream"}})))
{::rres/status 200
::rres/body body
::rres/headers {"content-type" "application/octet-stream"}})))
(s/def ::file ::media/upload)
(s/def ::import-binfile
@ -29,7 +29,7 @@
[app.util.services :as-alias sv]
[buddy.core.codecs :as bc]
[buddy.core.hash :as bh]
[yetti.response :as yrs]))
[ring.response :as-alias rres]))
^{:dynamic true
@ -57,7 +57,7 @@
(let [key' (when (or key reuse-key?)
(some->> (get-object cfg params) (key-fn params) (fmt-key)))]
(if (and (some? key) (= key key'))
(fn [_] {::yrs/status 304})
(fn [_] {::rres/status 304})
(let [result (f cfg params)
etag (or (and reuse-key? key')
(some-> result meta ::key fmt-key)
@ -27,7 +27,7 @@
[integrant.core :as ig]
[malli.transform :as mt]
[pretty-spec.core :as ps]
[yetti.response :as yrs]))
[ring.response :as-alias rres]))
;; DOC (human readable)
@ -86,11 +86,11 @@
(let [params (:query-params request)
pstyle (:type params "js")
context (assoc context :param-style pstyle)]
{::yrs/status 200
::yrs/body (-> (io/resource "app/templates/api-doc.tmpl")
{::rres/status 200
::rres/body (-> (io/resource "app/templates/api-doc.tmpl")
(tmpl/render context))}))
(fn [_]
{::yrs/status 404})))
{::rres/status 404})))
@ -173,12 +173,12 @@
(if (contains? cf/flags :backend-openapi-doc)
(fn [_]
{::yrs/status 200
::yrs/headers {"content-type" "application/json; charset=utf-8"}
::yrs/body (json/encode context)})
{::rres/status 200
::rres/headers {"content-type" "application/json; charset=utf-8"}
::rres/body (json/encode context)})
(fn [_]
{::yrs/status 404})))
{::rres/status 404})))
(defn openapi-handler
@ -189,12 +189,12 @@
context {:public-uri (cf/get :public-uri)
:swagger-js swagger-js
:swagger-css swagger-cs}]
{::yrs/status 200
::yrs/headers {"content-type" "text/html"}
::yrs/body (-> (io/resource "app/templates/openapi.tmpl")
{::rres/status 200
::rres/headers {"content-type" "text/html"}
::rres/body (-> (io/resource "app/templates/openapi.tmpl")
(tmpl/render context))}))
(fn [_]
{::yrs/status 404})))
{::rres/status 404})))
@ -11,7 +11,7 @@
[app.common.data.macros :as dm]
[app.http :as-alias http]
[app.rpc :as-alias rpc]
[yetti.response :as-alias yrs]))
[ring.response :as-alias rres]))
;; A utilty wrapper object for wrap service responses that does not
;; implements the IObj interface that make possible attach metadata to
@ -77,4 +77,4 @@
(fn [_ response]
(let [exp (if (integer? max-age) max-age (inst-ms max-age))
val (dm/fmt "max-age=%" (int (/ exp 1000.0)))]
(update response ::yrs/headers assoc "cache-control" val)))))
(update response ::rres/headers assoc "cache-control" val)))))
@ -15,8 +15,8 @@
[app.util.time :as dt]
[promesa.exec :as px]
[promesa.exec.csp :as sp]
[yetti.request :as yr]
[yetti.util :as yu]
[ring.request :as rreq]
[ring.websocket :as rws]
[yetti.websocket :as yws])
@ -50,7 +50,7 @@
(declare start-io-loop!)
(defn handler
(defn listener
"A WebSocket upgrade handler factory. Returns a handler that can be
used to upgrade to websocket connection. This handler implements the
basic custom protocol on top of websocket connection with all the
@ -61,37 +61,34 @@
It also accepts some options that allows you parametrize the
protocol behavior. The options map will be used as-as for the
initial data of the `ws` data structure"
[& {:keys [::on-rcv-message
:or {input-buff-size 64
output-buff-size 64
idle-timeout 60000
on-connect identity
on-snd-message identity-3
on-rcv-message identity-3}
:as options}]
[request & {:keys [::on-rcv-message
:or {input-buff-size 64
output-buff-size 64
idle-timeout 60000
on-connect identity
on-snd-message identity-3
on-rcv-message identity-3}
:as options}]
(assert (fn? on-rcv-message) "'on-rcv-message' should be a function")
(assert (fn? on-snd-message) "'on-snd-message' should be a function")
(assert (fn? on-connect) "'on-connect' should be a function")
(fn [{:keys [::yws/channel] :as request}]
(let [input-ch (sp/chan :buf input-buff-size)
output-ch (sp/chan :buf output-buff-size)
hbeat-ch (sp/chan :buf (sp/sliding-buffer 6))
close-ch (sp/chan)
ip-addr (parse-client-ip request)
uagent (yr/get-header request "user-agent")
id (uuid/next)
state (atom {})
beats (atom #{})
options (-> options
(let [input-ch (sp/chan :buf input-buff-size)
output-ch (sp/chan :buf output-buff-size)
hbeat-ch (sp/chan :buf (sp/sliding-buffer 6))
close-ch (sp/chan)
ip-addr (parse-client-ip request)
uagent (rreq/get-header request "user-agent")
id (uuid/next)
state (atom {})
beats (atom #{})
options (-> options
(update ::handler wrap-handler)
(assoc ::id id)
(assoc ::state state)
@ -101,126 +98,118 @@
(assoc ::heartbeat-ch hbeat-ch)
(assoc ::output-ch output-ch)
(assoc ::close-ch close-ch)
(assoc ::channel channel)
(assoc ::remote-addr ip-addr)
(assoc ::user-agent uagent)
(assoc ::user-agent uagent))]
(fn on-open [channel]
(l/trace :fn "on-open" :conn-id id :channel channel)
(let [options (-> options
(assoc ::channel channel)
timeout (dt/duration idle-timeout)]
(fn [channel]
(l/trace :fn "on-ws-open" :conn-id id)
(let [timeout (dt/duration idle-timeout)
name (str "penpot/websocket/io-loop/" id)]
(yws/idle-timeout! channel timeout)
(px/fn->thread (partial start-io-loop! options)
{:name name :virtual true})))
(yws/set-idle-timeout! channel timeout)
(px/submit! :vthread (partial start-io-loop! options))))
(fn [_ code reason]
(l/trace :fn "on-ws-terminate"
:conn-id id
:code code
:reason reason)
(sp/close! close-ch))
(fn on-close [_channel code reason]
(l/info :fn "on-ws-terminate"
:conn-id id
:code code
:reason reason)
(sp/close! close-ch))
(fn [_ cause]
(sp/close! close-ch cause))
(fn on-error [_channel cause]
(sp/close! close-ch cause))
(fn [_ message]
(sp/offer! input-ch message)
(swap! state assoc ::last-activity-at (dt/now)))
(fn on-message [_channel message]
(when (string? message)
(sp/offer! input-ch message)
(swap! state assoc ::last-activity-at (dt/now))))
(fn [_ buffers]
;; (l/trace :fn "on-ws-pong" :buffers (pr-str buffers))
(sp/put! hbeat-ch (yu/copy-many buffers)))]
(yws/on-close! channel (fn [_]
(sp/close! close-ch)))
{:on-open on-ws-open
:on-error on-ws-error
:on-close on-ws-terminate
:on-text on-ws-message
:on-pong on-ws-pong})))
(fn on-pong [_channel data]
(l/trace :fn "on-pong" :data data)
(sp/put! hbeat-ch data))}))
(defn- handle-ping!
[{:keys [::id ::beats ::channel] :as wsp} beat-id]
(l/trace :hint "ping" :beat beat-id :conn-id id)
(yws/ping! channel (encode-beat beat-id))
(l/trace :hint "send ping" :beat beat-id :conn-id id)
(rws/ping channel (encode-beat beat-id))
(let [issued (swap! beats conj (long beat-id))]
(not (>= (count issued) max-missed-heartbeats))))
(defn- start-io-loop!
[{:keys [::id ::close-ch ::input-ch ::output-ch ::heartbeat-ch ::channel ::handler ::beats ::on-rcv-message ::on-snd-message] :as wsp}]
{:name (str "penpot/websocket/io-loop/" id)
:virtual true}
(handler wsp {:type :open})
(loop [i 0]
(let [ping-ch (sp/timeout-chan heartbeat-interval)
[msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])]
(when (yws/connected? channel)
(identical? p ping-ch)
(if (handle-ping! wsp i)
(recur (inc i))
(yws/close! channel 8802 "missing to many pings"))
(handler wsp {:type :open})
(loop [i 0]
(let [ping-ch (sp/timeout-chan heartbeat-interval)
[msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])]
(when (rws/open? channel)
(identical? p ping-ch)
(if (handle-ping! wsp i)
(recur (inc i))
(rws/close channel 8802 "missing to many pings"))
(or (identical? p close-ch) (nil? msg))
(do :nothing)
(or (identical? p close-ch) (nil? msg))
(do :nothing)
(identical? p heartbeat-ch)
(let [beat (decode-beat msg)]
;; (l/trace :hint "pong" :beat beat :conn-id id)
(swap! beats disj beat)
(recur i))
(identical? p heartbeat-ch)
(let [beat (decode-beat msg)]
;; (l/trace :hint "pong" :beat beat :conn-id id)
(swap! beats disj beat)
(recur i))
(identical? p input-ch)
(let [message (t/decode-str msg)
message (on-rcv-message message)
{:keys [request-id] :as response} (handler wsp message)]
(when (map? response)
(sp/put! output-ch
(cond-> response
(some? request-id)
(assoc :request-id request-id))))
(recur i))
(identical? p input-ch)
(let [message (t/decode-str msg)
message (on-rcv-message message)
{:keys [request-id] :as response} (handler wsp message)]
(when (map? response)
(sp/put! output-ch
(cond-> response
(some? request-id)
(assoc :request-id request-id))))
(recur i))
(identical? p output-ch)
(let [message (on-snd-message msg)
message (t/encode-str message {:type :json-verbose})]
;; (l/trace :hint "writing message to output" :message msg)
(yws/send! channel message)
(recur i))))))
(identical? p output-ch)
(let [message (on-snd-message msg)
message (t/encode-str message {:type :json-verbose})]
;; (l/trace :hint "writing message to output" :message msg)
(rws/send channel message)
(recur i))))))
(catch java.nio.channels.ClosedChannelException _)
(catch java.net.SocketException _)
(catch java.io.IOException _)
(catch java.nio.channels.ClosedChannelException _)
(catch java.net.SocketException _)
(catch java.io.IOException _)
(catch InterruptedException _
(l/debug :hint "websocket thread interrumpted" :conn-id id))
(catch InterruptedException _cause
(l/debug :hint "websocket thread interrumpted" :conn-id id))
(catch Throwable cause
(l/error :hint "unhandled exception on websocket thread"
:conn-id id
:cause cause))
(catch Throwable cause
(l/error :hint "unhandled exception on websocket thread"
:conn-id id
:cause cause))
(handler wsp {:type :close})
(when (yws/connected? channel)
(when (rws/open? channel)
;; NOTE: we need to ignore all exceptions here because
;; there can be a race condition that first returns that
;; channel is connected but on closing, will raise that
;; channel is already closed.
(yws/close! channel 8899 "terminated")))
(rws/close channel 8899 "terminated")))
(when-let [on-disconnect (::on-disconnect wsp)]
(l/trace :hint "websocket thread terminated" :conn-id id)))))
(catch Throwable cause
(throw cause)))
(l/trace :hint "websocket thread terminated" :conn-id id))))
@ -31,17 +31,17 @@
request (volatile! nil)
handler (#'app.http.access-token/wrap-soft-auth
(fn [req & _] (vreset! request req))
(fn [req] (vreset! request req))
(with-mocks [m1 {:target 'app.http.access-token/get-token
:return nil}]
(handler {} nil nil)
(handler {})
(t/is (= {} @request)))
(with-mocks [m1 {:target 'app.http.access-token/get-token
:return (:token token)}]
(handler {} nil nil)
(handler {})
(let [token-id (get @request :app.http.access-token/id)]
(t/is (= token-id (:id token))))))))
@ -25,7 +25,7 @@
(def http-request
(get-header [_ name]
(case name
"x-forwarded-for" ""))))
@ -46,6 +46,6 @@
{:keys [error result]} (th/command! (assoc params ::cond/key etag))]
(t/is (nil? error))
(t/is (fn? result))
(t/is (= 304 (-> (result nil) :yetti.response/status))))
(t/is (= 304 (-> (result nil) :ring.response/status))))
Add table
Reference in a new issue