From 7afb3e2c6dda3f266ae99b3a08d47c7be750f2a0 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 17 Jan 2022 23:39:26 +0100 Subject: [PATCH] :sparkles: Stream transit encoding to the response output-stream. Instead of in-memmory encoding. This will prevent many OOM errors. --- backend/src/app/http.clj | 2 +- backend/src/app/http/middleware.clj | 64 ++++++++++++++++++----------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index a0d872e35..d55028199 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -141,11 +141,11 @@ :get ws}] ["/api" {:middleware [[middleware/cors] - [middleware/etag] [middleware/params] [middleware/multipart-params] [middleware/keyword-params] [middleware/format-response-body] + [middleware/etag] [middleware/parse-request-body] [middleware/errors errors/handle] [middleware/cookies]]} diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index 265c3198e..1aa4b2766 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -12,10 +12,12 @@ [app.util.json :as json] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] + [ring.core.protocols :as rp] [ring.middleware.cookies :refer [wrap-cookies]] [ring.middleware.keyword-params :refer [wrap-keyword-params]] [ring.middleware.multipart-params :refer [wrap-multipart-params]] - [ring.middleware.params :refer [wrap-params]])) + [ring.middleware.params :refer [wrap-params]] + [yetti.adapter :as yt])) (defn wrap-server-timing [handler] @@ -65,11 +67,33 @@ {:name ::parse-request-body :compile (constantly wrap-parse-request-body)}) +(defn buffered-output-stream + "Returns a buffered output stream that ignores flush calls. This is + needed because transit-java calls flush very aggresivelly on each + object write." + [^java.io.OutputStream os ^long chunk-size] + (proxy [java.io.BufferedOutputStream] [os (int chunk-size)] + ;; Explicitly do not forward flush + (flush []) + (close [] + (proxy-super flush) + (proxy-super close)))) + +(def ^:const buffer-size (:http/output-buffer-size yt/base-defaults)) + +(defn- transit-streamable-body + [data opts] + (reify rp/StreamableResponseBody + (write-body-to-stream [_ _ output-stream] + ;; Use the same buffer as jetty output buffer size + (with-open [bos (buffered-output-stream output-stream buffer-size)] + (let [tw (t/writer bos opts)] + (t/write! tw data)))))) + (defn- impl-format-response-body - [response request] + [response {:keys [query-params] :as request}] (let [body (:body response) - params (:query-params request) - opts {:type (if (contains? params "transit_verbose") :json-verbose :json)}] + opts {:type (if (contains? query-params "transit_verbose") :json-verbose :json)}] (cond (:ws response) @@ -78,7 +102,7 @@ (coll? body) (-> response (update :headers assoc "content-type" "application/transit+json") - (assoc :body (t/encode body opts))) + (assoc :body (transit-streamable-body body opts))) (nil? body) (assoc response :status 204 :body "") @@ -131,24 +155,18 @@ (defn wrap-etag [handler] - (letfn [(generate-etag [{:keys [body] :as response}] - (str "W/\"" (-> body bh/blake2b-128 bc/bytes->hex) "\"")) - (get-match [{:keys [headers] :as request}] - (get headers "if-none-match"))] - (fn [request] - (let [response (handler request)] - (if (= :get (:request-method request)) - (let [etag (generate-etag response) - match (get-match request) - response (update response :headers #(assoc % "ETag" etag))] - (cond-> response - (and (string? match) - (= :get (:request-method request)) - (= etag match)) - (-> response - (assoc :body "") - (assoc :status 304)))) - response))))) + (letfn [(encode [data] + (when (string? data) + (str "W/\"" (-> data bh/blake2b-128 bc/bytes->hex) "\"")))] + (fn [{method :request-method headers :headers :as request}] + (cond-> (handler request) + (= :get method) + (as-> $ (if-let [etag (-> $ :body meta :etag encode)] + (cond-> (update $ :headers assoc "etag" etag) + (= etag (get headers "if-none-match")) + (-> (assoc :body "") + (assoc :status 304))) + $)))))) (def etag {:name ::etag