From 5ef76ff3e6e73282deb59df8b1a14bb966de36be Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Tue, 9 Feb 2021 14:15:04 -0700 Subject: [PATCH] reverseproxy: Response buffering & configurable buffer size Proxy response bodies can now be buffered, and the size of the request body and response body buffer can be limited. Any remaining content that doesn't fit in the buffer will remain on the wire until it can be read; i.e. bodies are not truncated, even if the buffer is not big enough. This fulfills a customer requirement. This was made possible by their sponsorship! --- modules/caddyhttp/reverseproxy/caddyfile.go | 19 +++++ .../caddyhttp/reverseproxy/httptransport.go | 1 + .../caddyhttp/reverseproxy/reverseproxy.go | 69 ++++++++++++++++--- 3 files changed, 80 insertions(+), 9 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 895bcbb9..71ed21fa 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -499,6 +499,25 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } h.BufferRequests = true + case "buffer_responses": + if d.NextArg() { + return d.ArgErr() + } + h.BufferResponses = true + + case "max_buffer_size": + if !d.NextArg() { + return d.ArgErr() + } + size, err := strconv.Atoi(d.Val()) + if err != nil { + return d.Errf("invalid size (bytes): %s", d.Val()) + } + if d.NextArg() { + return d.ArgErr() + } + h.MaxBufferSize = int64(size) + case "header_up": var err error diff --git a/modules/caddyhttp/reverseproxy/httptransport.go b/modules/caddyhttp/reverseproxy/httptransport.go index 61e90543..fdaf56e1 100644 --- a/modules/caddyhttp/reverseproxy/httptransport.go +++ b/modules/caddyhttp/reverseproxy/httptransport.go @@ -334,6 +334,7 @@ func (t TLSConfig) MakeTLSClientConfig(ctx caddy.Context) (*tls.Config, error) { cfg.Certificates = []tls.Certificate{cert} } if t.ClientCertificateAutomate != "" { + // TODO: use or enable ctx.IdentityCredentials() ... tlsAppIface, err := ctx.App("tls") if err != nil { return nil, fmt.Errorf("getting tls app: %v", err) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 3a2457f6..76506ca8 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" "net/http" "regexp" @@ -93,9 +92,20 @@ type Handler struct { // If true, the entire request body will be read and buffered // in memory before being proxied to the backend. This should - // be avoided if at all possible for performance reasons. + // be avoided if at all possible for performance reasons, but + // could be useful if the backend is intolerant of read latency. BufferRequests bool `json:"buffer_requests,omitempty"` + // If true, the entire response body will be read and buffered + // in memory before being proxied to the client. This should + // be avoided if at all possible for performance reasons, but + // could be useful if the backend has tighter memory constraints. + BufferResponses bool `json:"buffer_responses,omitempty"` + + // If body buffering is enabled, the maximum size of the buffers + // used for the requests and responses (in bytes). + MaxBufferSize int64 `json:"max_buffer_size,omitempty"` + // List of handlers and their associated matchers to evaluate // after successful roundtrips. The first handler that matches // the response from a backend will be invoked. The response @@ -337,12 +347,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht // required, if read timeouts are set, // and if body size is limited if h.BufferRequests { - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() - defer bufPool.Put(buf) - _, _ = io.Copy(buf, r.Body) - r.Body.Close() - r.Body = ioutil.NopCloser(buf) + r.Body = h.bufferedBody(r.Body) } // prepare the request for proxying; this is needed only once @@ -563,6 +568,11 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, di Dia } } + // if enabled, buffer the response body + if h.BufferResponses { + res.Body = h.bufferedBody(res.Body) + } + // see if any response handler is configured for this response from the backend for i, rh := range h.HandleResponse { if rh.Match != nil && !rh.Match.Match(res.StatusCode, res.Header) { @@ -599,7 +609,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, di Dia } } - // Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc) + // deal with 101 Switching Protocols responses: (WebSocket, h2c, etc) if res.StatusCode == http.StatusSwitchingProtocols { h.handleUpgradeResponse(rw, req, res) return nil @@ -735,6 +745,30 @@ func (h Handler) directRequest(req *http.Request, di DialInfo) { req.URL.Host = reqHost } +// bufferedBody reads originalBody into a buffer, then returns a reader for the buffer. +// Always close the return value when done with it, just like if it was the original body! +func (h Handler) bufferedBody(originalBody io.ReadCloser) io.ReadCloser { + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + if h.MaxBufferSize > 0 { + n, err := io.CopyN(buf, originalBody, h.MaxBufferSize) + if err != nil || n == h.MaxBufferSize { + return bodyReadCloser{ + Reader: io.MultiReader(buf, originalBody), + buf: buf, + body: originalBody, + } + } + } else { + _, _ = io.Copy(buf, originalBody) + } + originalBody.Close() // no point in keeping it open + return bodyReadCloser{ + Reader: buf, + buf: buf, + } +} + func copyHeader(dst, src http.Header) { for k, vv := range src { for _, v := range vv { @@ -858,6 +892,23 @@ type TLSTransport interface { // roundtrip succeeded, but an error occurred after-the-fact. type roundtripSucceeded struct{ error } +// bodyReadCloser is a reader that, upon closing, will return +// its buffer to the pool and close the underlying body reader. +type bodyReadCloser struct { + io.Reader + buf *bytes.Buffer + body io.ReadCloser +} + +func (brc bodyReadCloser) Close() error { + bufPool.Put(brc.buf) + if brc.body != nil { + return brc.body.Close() + } + return nil +} + +// bufPool is used for buffering requests and responses. var bufPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer)