diff --git a/caddyhttp/proxy/proxy.go b/caddyhttp/proxy/proxy.go index 0f48a61f..c0c2bb4b 100644 --- a/caddyhttp/proxy/proxy.go +++ b/caddyhttp/proxy/proxy.go @@ -39,6 +39,9 @@ type Upstream interface { // Gets how long to wait between selecting upstream // hosts in the case of cascading failures. GetTryInterval() time.Duration + + // Gets the number of upstream hosts. + GetHostCount() int } // UpstreamHostDownFunc can be used to customize how Down behaves. @@ -94,13 +97,26 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { // outreq is the request that makes a roundtrip to the backend outreq := createUpstreamRequest(r) - // record and replace outreq body - body, err := newBufferedBody(outreq.Body) - if err != nil { - return http.StatusBadRequest, errors.New("failed to read downstream request body") - } - if body != nil { - outreq.Body = body + // If we have more than one upstream host defined and if retrying is enabled + // by setting try_duration to a non-zero value, caddy will try to + // retry the request at a different host if the first one failed. + // + // This requires us to possibly rewind and replay the request body though, + // which in turn requires us to buffer the request body first. + // + // An unbuffered request is usually preferrable, because it reduces latency + // as well as memory usage. Furthermore it enables different kinds of + // HTTP streaming applications like gRPC for instance. + requiresBuffering := upstream.GetHostCount() > 1 && upstream.GetTryDuration() != 0 + + if requiresBuffering { + body, err := newBufferedBody(outreq.Body) + if err != nil { + return http.StatusBadRequest, errors.New("failed to read downstream request body") + } + if body != nil { + outreq.Body = body + } } // The keepRetrying function will return true if we should @@ -173,15 +189,25 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer) } - // rewind request body to its beginning - if err := body.rewind(); err != nil { - return http.StatusInternalServerError, errors.New("unable to rewind downstream request body") + // Before we retry the request we have to make sure + // that the body is rewound to it's beginning. + if bb, ok := outreq.Body.(*bufferedBody); ok { + if err := bb.rewind(); err != nil { + return http.StatusInternalServerError, errors.New("unable to rewind downstream request body") + } } // tell the proxy to serve the request - atomic.AddInt64(&host.Conns, 1) - backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn) - atomic.AddInt64(&host.Conns, -1) + // + // NOTE: + // The call to proxy.ServeHTTP can theoretically panic. + // To prevent host.Conns from getting out-of-sync we thus have to + // make sure that it's _always_ correctly decremented afterwards. + func() { + atomic.AddInt64(&host.Conns, 1) + defer atomic.AddInt64(&host.Conns, -1) + backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn) + }() // if no errors, we're done here if backendErr == nil { diff --git a/caddyhttp/proxy/upstream.go b/caddyhttp/proxy/upstream.go index 0309e242..5742eff0 100644 --- a/caddyhttp/proxy/upstream.go +++ b/caddyhttp/proxy/upstream.go @@ -423,6 +423,10 @@ func (u *staticUpstream) GetTryInterval() time.Duration { return u.TryInterval } +func (u *staticUpstream) GetHostCount() int { + return len(u.Hosts) +} + // RegisterPolicy adds a custom policy to the proxy. func RegisterPolicy(name string, policy func() Policy) { supportedPolicies[name] = policy