diff --git a/backend/src/vertx/core.clj b/backend/src/vertx/core.clj index bc5b28610..9a457efe6 100644 --- a/backend/src/vertx/core.clj +++ b/backend/src/vertx/core.clj @@ -208,8 +208,6 @@ (let [opts (DeploymentOptions.)] (when instances (.setInstances opts (int instances))) (when worker (.setWorker opts worker)) - ;; (.setInstances opts 4) - ;; (.setWorkerPoolSize opts 4) opts)) (defn- opts->vertx-options diff --git a/backend/src/vertx/http.clj b/backend/src/vertx/http.clj index d02ffde7d..2f4bf031d 100644 --- a/backend/src/vertx/http.clj +++ b/backend/src/vertx/http.clj @@ -11,11 +11,15 @@ [promesa.core :as p] [vertx.util :as vu]) (:import + java.util.Map$Entry + clojure.lang.MapEntry io.vertx.core.Vertx io.vertx.core.Verticle io.vertx.core.Handler io.vertx.core.Future + io.vertx.core.MultiMap io.vertx.core.Context + io.vertx.core.buffer.Buffer io.vertx.core.http.HttpServer io.vertx.core.http.HttpServerRequest io.vertx.core.http.HttpServerResponse @@ -26,7 +30,38 @@ ;; --- Public Api -(s/def :vertx.http/handler fn?) +(declare -handle-response) +(declare -handle-body) + +(defn ->headers + [^HttpServerRequest request] + (let [headers (.headers request) + it (.iterator ^MultiMap headers)] + (loop [m (transient {})] + (if (.hasNext it) + (let [^Map$Entry me (.next it) + key (.toLowerCase (.getKey me)) + val (.getValue me)] + (recur (assoc! m key val))) + (persistent! m))))) + +(defn- ->request + [^HttpServerRequest request] + {:method (-> request .rawMethod .toLowerCase keyword) + :path (.path request) + :headers (->headers request) + ::request request + ::response (.response request)}) + +(defn handler + [vsm f] + (reify Handler + (handle [this request] + (let [ctx (->request request)] + (-handle-response (f ctx) ctx))))) + +(s/def :vertx.http/handler + (s/or :fn fn? :handler #(instance? Handler %))) (s/def :vertx.http/host string?) (s/def :vertx.http/port pos?) (s/def ::server-options @@ -54,21 +89,62 @@ (let [opts (HttpServerOptions.)] (.setReuseAddress opts true) (.setReusePort opts true) - ;; (.setTcpNoDelay opts true) - ;; (.setTcpFastOpen opts true) + (.setTcpNoDelay opts true) + (.setTcpFastOpen opts true) (when host (.setHost opts host)) (when port (.setPort opts port)) opts)) -(defn- fn->handler - [f] - (reify Handler - (handle [_ request] - (f request)))) - (defn- resolve-handler [handler] (cond - (fn? handler) (fn->handler handler) + (fn? handler) (vu/fn->handler handler) (instance? Handler handler) handler :else (throw (ex-info "invalid handler" {})))) + +(defn- assign-status-and-headers! + [^HttpServerResponse res response] + (let [headers (:headers response) + status (:status response 200)] + (when (map? headers) + (vu/doseq [[key val] headers] + (.putHeader res ^String (name key) ^String (str val)))) + (.setStatusCode res status))) + +(defprotocol IAsyncResponse + (-handle-response [_ _])) + +(defprotocol IAsyncBody + (-handle-body [_ _])) + +(extend-protocol IAsyncResponse + java.util.concurrent.CompletionStage + (-handle-response [data ctx] + (p/then' data #(-handle-response % ctx))) + + clojure.lang.IPersistentMap + (-handle-response [data ctx] + (let [body (:body data) + res (::response ctx)] + (assign-status-and-headers! res data) + (-handle-body body res)))) + +(extend-protocol IAsyncBody + (Class/forName "[B") + (-handle-body [data res] + (.end ^HttpServerResponse res (Buffer/buffer data))) + + Buffer + (-handle-body [data res] + (.end ^HttpServerResponse res ^Buffer data)) + + nil + (-handle-body [data res] + (.putHeader ^HttpServerResponse res "content-length" "0") + (.end ^HttpServerResponse res)) + + String + (-handle-body [data res] + (let [length (count data)] + (.putHeader ^HttpServerResponse res "content-length" (str length)) + (.end ^HttpServerResponse res data)))) diff --git a/backend/src/vertx/timers.clj b/backend/src/vertx/timers.clj index 9484ad9fb..84213c0f3 100644 --- a/backend/src/vertx/timers.clj +++ b/backend/src/vertx/timers.clj @@ -33,23 +33,3 @@ java.lang.AutoCloseable (close [_] (.cancelTimer system timer-id))))) - -(defn schedule-task! - [vsm ms f] - (let [^Vertx system (vu/resolve-system vsm) - tid* (atom nil) - task (fn wrapped-task [] - (-> (p/do! (f)) - (p/then (fn [_] - (let [tid (schedule-task! vsm ms wrapped-task)] - (reset! tid* tid) - nil))))) - tid (schedule-task! vsm ms task)] - (reset! tid* tid) - (reify - java.lang.AutoCloseable - (close [this] - (locking this - (when-let [timer-id (deref tid*)] - (.cancelTimer system timer-id) - (reset! tid* nil))))))) diff --git a/backend/src/vertx/util.clj b/backend/src/vertx/util.clj index fc86b28fc..7e7f09113 100644 --- a/backend/src/vertx/util.clj +++ b/backend/src/vertx/util.clj @@ -5,6 +5,7 @@ ;; Copyright (c) 2019 Andrey Antukh (ns vertx.util + (:refer-clojure :exclude [doseq]) (:require [promesa.core :as p]) (:import io.vertx.core.Vertx io.vertx.core.Handler @@ -38,3 +39,13 @@ (p/reject! d (.cause ar)) (p/resolve! d (.result ar)))))) +(defmacro doseq + "A faster version of doseq." + [[bsym csym] & body] + `(let [it# (.iterator ~csym)] + (loop [] + (when (.hasNext it#) + (let [~bsym (.next it#)] + ~@body + (recur)))))) + diff --git a/backend/src/vertx/util/transit.clj b/backend/src/vertx/util/transit.clj deleted file mode 100644 index d96011133..000000000 --- a/backend/src/vertx/util/transit.clj +++ /dev/null @@ -1,87 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) 2019 Andrey Antukh - -(ns vertx.util.transit - (:require [cognitect.transit :as t] - [clojure.java.io :as io]) - (:import java.io.ByteArrayInputStream - java.io.ByteArrayOutputStream - java.time.Instant)) - -(def ^:private write-handler - (t/write-handler - (constantly "m") - (fn [v] (str (.toEpochMilli v))))) - -(def ^:private read-handler - (t/read-handler - (fn [v] (-> (Long/parseLong v) - (Instant/ofEpochMilli))))) - -(def +read-handlers+ - {"m" read-handler}) - -(def +write-handlers+ - {Instant write-handler}) - -(defmethod print-method Instant - [mv ^java.io.Writer writer] - (.write writer (str "#instant \"" (.toString mv) "\""))) - -(defmethod print-dup Instant [o w] - (print-method o w)) - -;; --- Low-Level Api - -(defn reader - ([istream] - (reader istream nil)) - ([istream {:keys [type] :or {type :msgpack}}] - (t/reader istream type {:handlers +read-handlers+}))) - -(defn read! - "Read value from streamed transit reader." - [reader] - (t/read reader)) - -(defn writer - ([ostream] - (writer ostream nil)) - ([ostream {:keys [type] :or {type :msgpack}}] - (t/writer ostream type {:handlers +write-handlers+}))) - -(defn write! - [writer data] - (t/write writer data)) - -;; --- High-Level Api - -;; TODO: check performance of different options - -(defn decode - ([data] - (decode data nil)) - ([data opts] - (cond - (string? data) - (decode (.getBytes data "UTF-8") opts) - - (bytes? data) - (with-open [input (ByteArrayInputStream. data)] - (read! (reader input opts))) - - :else - (with-open [input (io/input-stream data)] - (read! (reader input opts)))))) - -(defn encode - (^bytes [data] - (encode data nil)) - (^bytes [data opts] - (with-open [out (ByteArrayOutputStream.)] - (let [w (writer out opts)] - (write! w data) - (.toByteArray out))))) diff --git a/backend/src/vertx/web.clj b/backend/src/vertx/web.clj index 321042f77..9a06f49a6 100644 --- a/backend/src/vertx/web.clj +++ b/backend/src/vertx/web.clj @@ -6,36 +6,33 @@ (ns vertx.web "High level api for http servers." - (:require [clojure.spec.alpha :as s] - [promesa.core :as p] - [sieppari.core :as sp] - [reitit.core :as rt] - [vertx.http :as vxh] - [vertx.util :as vu]) + (:require + [clojure.tools.logging :as log] + [clojure.spec.alpha :as s] + [promesa.core :as p] + [sieppari.core :as sp] + [reitit.core :as rt] + [vertx.http :as vh] + [vertx.util :as vu]) (:import - clojure.lang.Keyword clojure.lang.IPersistentMap - io.vertx.core.Vertx - io.vertx.core.Handler + clojure.lang.Keyword io.vertx.core.Future + io.vertx.core.Handler + io.vertx.core.Vertx io.vertx.core.buffer.Buffer io.vertx.core.http.Cookie io.vertx.core.http.HttpServer + io.vertx.core.http.HttpServerOptions io.vertx.core.http.HttpServerRequest io.vertx.core.http.HttpServerResponse - io.vertx.core.http.HttpServerOptions io.vertx.ext.web.Route io.vertx.ext.web.Router io.vertx.ext.web.RoutingContext io.vertx.ext.web.handler.BodyHandler - io.vertx.ext.web.handler.StaticHandler + io.vertx.ext.web.handler.LoggerHandler io.vertx.ext.web.handler.ResponseTimeHandler - io.vertx.ext.web.handler.LoggerHandler)) - -;; --- Constants & Declarations - -(declare -handle-response) -(declare -handle-body) + io.vertx.ext.web.handler.StaticHandler)) ;; --- Public Api @@ -43,16 +40,17 @@ (s/or :fn fn? :vec (s/every fn? :kind vector?))) -(defn- make-ctx +(defn- ->request [^RoutingContext routing-context] (let [^HttpServerRequest request (.request ^RoutingContext routing-context) ^HttpServerResponse response (.response ^RoutingContext routing-context) ^Vertx system (.vertx routing-context)] {:body (.getBody routing-context) :path (.path request) + :headers (vh/->headers request) :method (-> request .rawMethod .toLowerCase keyword) - ::request request - ::response response + ::vh/request request + ::vh/response response ::execution-context (.getContext system) ::routing-context routing-context})) @@ -84,6 +82,12 @@ {:status 405} {:status 404})) +(defn- default-on-error + [err req] + (log/error err) + {:status 500 + :body "Internal server error!\n"}) + (defn- run-chain [ctx chain handler] (let [d (p/deferred)] @@ -106,64 +110,47 @@ ([routes] (router routes {})) ([routes {:keys [delete-uploads? upload-dir + on-error log-requests? time-response?] :or {delete-uploads? true upload-dir "/tmp/vertx.uploads" + on-error default-on-error log-requests? false time-response? true} :as options}] (let [rtr (rt/router routes options) - hdr #(router-handler rtr %)] + f #(router-handler rtr %)] (fn [^Router router] (let [^Route route (.route router)] (when time-response? (.handler route (ResponseTimeHandler/create))) (when log-requests? (.handler route (LoggerHandler/create))) - (.handler route (doto (BodyHandler/create true) - (.setDeleteUploadedFilesOnEnd delete-uploads?) - (.setUploadsDirectory upload-dir))) - (.handler route (reify Handler - (handle [_ context] - (let [ctx (make-ctx context)] - (-> (p/do! (hdr ctx)) - (p/then' #(-handle-response % ctx)) - (p/catch #(do (prn %) (.fail (:context ctx) %))))))))) - router)))) + (doto route + (.failureHandler + (reify Handler + (handle [_ rc] + (let [err (.failure ^RoutingContext rc) + req (.get ^RoutingContext rc "vertx$clj$req")] + (-> (p/do! (on-error err req)) + (vh/-handle-response req)))))) -;; --- Impl + (.handler + (doto (BodyHandler/create true) + (.setDeleteUploadedFilesOnEnd delete-uploads?) + (.setUploadsDirectory upload-dir))) -(defprotocol IAsyncResponse - (-handle-response [_ _])) -(extend-protocol IAsyncResponse - clojure.lang.IPersistentMap - (-handle-response [data ctx] - (let [status (or (:status data) 200) - body (:body data) - res (::response ctx)] - (.setStatusCode ^HttpServerResponse res status) - (-handle-body body res)))) - -(defprotocol IAsyncBody - (-handle-body [_ _])) - -(extend-protocol IAsyncBody - (Class/forName "[B") - (-handle-body [data res] - (.end ^HttpServerResponse res (Buffer/buffer data))) - - Buffer - (-handle-body [data res] - (.end ^HttpServerResponse res ^Buffer data)) - - nil - (-handle-body [data res] - (.putHeader ^HttpServerResponse res "content-length" "0") - (.end ^HttpServerResponse res)) - - String - (-handle-body [data res] - (let [length (count data)] - (.putHeader ^HttpServerResponse res "content-length" (str length)) - (.end ^HttpServerResponse res data)))) + (.handler + (reify Handler + (handle [_ rc] + (let [req (->request rc) + efn (fn [err] + (.put ^RoutingContext rc "vertx$clj$req" req) + (.fail ^RoutingContext rc err))] + (try + (-> (vh/-handle-response (f req) req) + (p/catch' efn)) + (catch Exception err + (efn err))))))))) + router)))) diff --git a/backend/src/vertx/web/interceptors.clj b/backend/src/vertx/web/interceptors.clj index 50d960e64..c2c0b00d2 100644 --- a/backend/src/vertx/web/interceptors.clj +++ b/backend/src/vertx/web/interceptors.clj @@ -11,22 +11,25 @@ [clojure.string :as str] [promesa.core :as p] [reitit.core :as r] + [vertx.http :as vh] [vertx.web :as vw] + [vertx.util :as vu] [sieppari.context :as spx] [sieppari.core :as sp]) (:import clojure.lang.Keyword clojure.lang.MapEntry - java.util.Map - java.util.Map$Entry - io.vertx.core.Vertx - io.vertx.core.Handler io.vertx.core.Future + io.vertx.core.Handler + io.vertx.core.MultiMap + io.vertx.core.Vertx io.vertx.core.http.Cookie io.vertx.core.http.HttpServerRequest io.vertx.core.http.HttpServerResponse io.vertx.ext.web.FileUpload - io.vertx.ext.web.RoutingContext)) + io.vertx.ext.web.RoutingContext + java.util.Map + java.util.Map$Entry)) ;; --- Cookies @@ -40,71 +43,50 @@ (defn cookies [] - {:enter (fn [data] - (let [^HttpServerRequest req (get-in data [:request ::vw/request]) - parse-cookie (fn [^Cookie item] [(.getName item) (.getValue item)]) - cookies (into {} (map parse-cookie) (vals (.cookieMap req)))] - (update data :request assoc :cookies cookies))) - :leave (fn [data] - (let [cookies (get-in data [:response :cookies]) - ^HttpServerResponse res (get-in data [:request ::vw/response])] - (when (map? cookies) - (reduce-kv #(.addCookie res (build-cookie %1 %2)) nil cookies)) - data))}) -;; --- Headers - -(def ^:private lowercase-keys-t - (map (fn [^Map$Entry entry] - (MapEntry. (.toLowerCase (.getKey entry)) (.getValue entry))))) - -(defn- parse-headers - [req] - (let [^HttpServerRequest request (::vw/request req)] - (into {} lowercase-keys-t (.headers request)))) - -(defn headers - [] - {:enter (fn [data] - (update data :request assoc :headers (parse-headers (:request data)))) - :leave (fn [data] - (let [^HttpServerResponse res (get-in data [:request ::vw/response]) - headers (get-in data [:response :headers])] - (run! (fn [[key value]] - (.putHeader ^HttpServerResponse res - ^String (name key) - ^String (str value))) - headers) - data))}) + {:enter + (fn [data] + (let [^HttpServerRequest req (get-in data [:request ::vh/request]) + parse-cookie (fn [^Cookie item] [(.getName item) (.getValue item)]) + cookies (into {} (map parse-cookie) (vals (.cookieMap req)))] + (update data :request assoc :cookies cookies))) + :leave + (fn [data] + (let [cookies (get-in data [:response :cookies]) + ^HttpServerResponse res (get-in data [:request ::vh/response])] + (when (map? cookies) + (vu/doseq [[key val] cookies] + (.addCookie res (build-cookie key val)))) + data))}) ;; --- Params -(defn- parse-param-entry - [acc ^Map$Entry item] - (let [key (keyword (.toLowerCase (.getKey item))) - prv (get acc key ::default)] - (cond - (= prv ::default) - (assoc! acc key (.getValue item)) - - (vector? prv) - (assoc! acc key (conj prv (.getValue item))) - - :else - (assoc! acc key [prv (.getValue item)])))) - (defn- parse-params - [req] - (let [request (::vw/request req)] - (persistent! - (reduce parse-param-entry - (transient {}) - (.params ^HttpServerResponse request))))) + [^HttpServerRequest request] + (let [params (.params request) + it (.iterator ^MultiMap params)] + (loop [m (transient {})] + (if (.hasNext it) + (let [^Map$Entry o (.next it) + key (keyword (.toLowerCase (.getKey o))) + prv (get m key ::default) + val (.getValue o)] + (cond + (= prv ::default) + (recur (assoc! m key val)) + + (vector? prv) + (recur (assoc! m key (conj prv val))) + + :else + (recur (assoc! m key [prv val])))) + (persistent! m))))) (defn params ([] (params nil)) ([{:keys [attr] :or {attr :params}}] {:enter (fn [data] - (let [params (parse-params (:request data))] + (let [request (get-in data [:request ::vh/request]) + params (parse-params request)] (update data :request assoc attr params)))})) ;; --- Uploads