diff --git a/caddyhttp/proxy/proxy.go b/caddyhttp/proxy/proxy.go index 36544b2e..82a63354 100644 --- a/caddyhttp/proxy/proxy.go +++ b/caddyhttp/proxy/proxy.go @@ -19,15 +19,26 @@ type Proxy struct { Upstreams []Upstream } -// Upstream manages a pool of proxy upstream hosts. Select should return a -// suitable upstream host, or nil if no such hosts are available. +// Upstream manages a pool of proxy upstream hosts. type Upstream interface { // The path this upstream host should be routed on From() string - // Selects an upstream host to be routed to. + + // Selects an upstream host to be routed to. It + // should return a suitable upstream host, or nil + // if no such hosts are available. Select(*http.Request) *UpstreamHost + // Checks if subpath is not an ignored path AllowedPath(string) bool + + // Gets how long to try selecting upstream hosts + // in the case of cascading failures. + GetTryDuration() time.Duration + + // Gets how long to wait between selecting upstream + // hosts in the case of cascading failures. + GetTryInterval() time.Duration } // UpstreamHostDownFunc can be used to customize how Down behaves. @@ -91,7 +102,7 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { // hosts until timeout (or until we get a nil host). start := time.Now() var backendErr error - for time.Now().Sub(start) < tryDuration { + for { host := upstream.Select(r) if host == nil { if backendErr == nil { @@ -146,26 +157,35 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn) atomic.AddInt64(&host.Conns, -1) - // if no errors, we're done here; otherwise failover + // if no errors, we're done here if backendErr == nil { return 0, nil } + + // failover; remember this failure for some time if + // request failure counting is enabled timeout := host.FailTimeout - if timeout == 0 { - timeout = 10 * time.Second + if timeout > 0 { + atomic.AddInt32(&host.Fails, 1) + go func(host *UpstreamHost, timeout time.Duration) { + time.Sleep(timeout) + atomic.AddInt32(&host.Fails, -1) + }(host, timeout) } - atomic.AddInt32(&host.Fails, 1) - go func(host *UpstreamHost, timeout time.Duration) { - time.Sleep(timeout) - atomic.AddInt32(&host.Fails, -1) - }(host, timeout) + + // if we've tried long enough, break + if time.Now().Sub(start) >= upstream.GetTryDuration() { + break + } + + // otherwise, wait and try the next available host + time.Sleep(upstream.GetTryInterval()) } return http.StatusBadGateway, backendErr } -// match finds the best match for a proxy config based -// on r. +// match finds the best match for a proxy config based on r. func (p Proxy) match(r *http.Request) Upstream { var u Upstream var longestMatch int diff --git a/caddyhttp/proxy/proxy_test.go b/caddyhttp/proxy/proxy_test.go index c7db7a4b..91e2631e 100644 --- a/caddyhttp/proxy/proxy_test.go +++ b/caddyhttp/proxy/proxy_test.go @@ -766,9 +766,9 @@ func (u *fakeUpstream) Select(r *http.Request) *UpstreamHost { return u.host } -func (u *fakeUpstream) AllowedPath(requestPath string) bool { - return true -} +func (u *fakeUpstream) AllowedPath(requestPath string) bool { return true } +func (u *fakeUpstream) GetTryDuration() time.Duration { return 1 * time.Second } +func (u *fakeUpstream) GetTryInterval() time.Duration { return 250 * time.Millisecond } // newWebSocketTestProxy returns a test proxy that will // redirect to the specified backendAddr. The function @@ -808,9 +808,9 @@ func (u *fakeWsUpstream) Select(r *http.Request) *UpstreamHost { } } -func (u *fakeWsUpstream) AllowedPath(requestPath string) bool { - return true -} +func (u *fakeWsUpstream) AllowedPath(requestPath string) bool { return true } +func (u *fakeWsUpstream) GetTryDuration() time.Duration { return 1 * time.Second } +func (u *fakeWsUpstream) GetTryInterval() time.Duration { return 250 * time.Millisecond } // recorderHijacker is a ResponseRecorder that can // be hijacked. diff --git a/caddyhttp/proxy/upstream.go b/caddyhttp/proxy/upstream.go index 2a0f1a77..47162ea2 100644 --- a/caddyhttp/proxy/upstream.go +++ b/caddyhttp/proxy/upstream.go @@ -31,6 +31,8 @@ type staticUpstream struct { FailTimeout time.Duration MaxFails int32 + TryDuration time.Duration + TryInterval time.Duration MaxConns int64 HealthCheck struct { Client http.Client @@ -53,8 +55,8 @@ func NewStaticUpstreams(c caddyfile.Dispenser) ([]Upstream, error) { downstreamHeaders: make(http.Header), Hosts: nil, Policy: &Random{}, - FailTimeout: 10 * time.Second, MaxFails: 1, + TryInterval: 1 * time.Second, MaxConns: 0, KeepAlive: http.DefaultMaxIdleConnsPerHost, } @@ -114,11 +116,6 @@ func NewStaticUpstreams(c caddyfile.Dispenser) ([]Upstream, error) { return upstreams, nil } -// RegisterPolicy adds a custom policy to the proxy. -func RegisterPolicy(name string, policy func() Policy) { - supportedPolicies[name] = policy -} - func (u *staticUpstream) From() string { return u.from } @@ -141,8 +138,7 @@ func (u *staticUpstream) NewHost(host string) (*UpstreamHost, error) { if uh.Unhealthy { return true } - if uh.Fails >= u.MaxFails && - u.MaxFails != 0 { + if uh.Fails >= u.MaxFails { return true } return false @@ -237,7 +233,28 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { if err != nil { return err } + if n < 1 { + return c.Err("max_fails must be at least 1") + } u.MaxFails = int32(n) + case "try_duration": + if !c.NextArg() { + return c.ArgErr() + } + dur, err := time.ParseDuration(c.Val()) + if err != nil { + return err + } + u.TryDuration = dur + case "try_interval": + if !c.NextArg() { + return c.ArgErr() + } + interval, err := time.ParseDuration(c.Val()) + if err != nil { + return err + } + u.TryInterval = interval case "max_conns": if !c.NextArg() { return c.ArgErr() @@ -397,3 +414,18 @@ func (u *staticUpstream) AllowedPath(requestPath string) bool { } return true } + +// GetTryDuration returns u.TryDuration. +func (u *staticUpstream) GetTryDuration() time.Duration { + return u.TryDuration +} + +// GetTryInterval returns u.TryInterval. +func (u *staticUpstream) GetTryInterval() time.Duration { + return u.TryInterval +} + +// RegisterPolicy adds a custom policy to the proxy. +func RegisterPolicy(name string, policy func() Policy) { + supportedPolicies[name] = policy +}