From 83ece3ba61c9c9f8839a92e22db4258afafb5233 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sun, 17 Jan 2016 02:13:09 +0100 Subject: [PATCH 01/10] Dynamic Proxy Hosts: basic prototype with etcd backend. --- middleware/proxy/provider/etcd/etcd.go | 115 +++++++++++++++++++++++++ middleware/proxy/provider/provider.go | 44 ++++++++++ 2 files changed, 159 insertions(+) create mode 100644 middleware/proxy/provider/etcd/etcd.go create mode 100644 middleware/proxy/provider/provider.go diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go new file mode 100644 index 000000000..2cf62c138 --- /dev/null +++ b/middleware/proxy/provider/etcd/etcd.go @@ -0,0 +1,115 @@ +package etcd + +import ( + "fmt" + "strings" + + "github.com/coreos/etcd/client" + "github.com/mholt/caddy/middleware/proxy/provider" + "github.com/syndtr/goleveldb/leveldb/errors" + "golang.org/x/net/context" +) + +const ( + Scheme = "etcd://" + + DefaultDirectory = "/CADDY_PROXY_HOSTS" +) + +var ( + ErrInvalidScheme = errors.New("invalid Etcd scheme") + ErrNotDirectory = errors.New("not an Etcd directory") + ErrNotKey = errors.New("not an Etcd key") + ErrNotInDirectory = errors.New("not in expected directory") +) + +type Provider struct { + endpoints []string + directory string + client.KeysAPI +} + +// New creates a new Etcd DynamicProvider +func New(addr string) (provider.DynamicProvider, error) { + store, err := parseAddr(addr) + if err != nil { + return nil, err + } + + cfg := client.Config{ + Endpoints: store.endpoints, + Transport: client.DefaultTransport, + } + c, err := client.New(cfg) + if err != nil { + return nil, err + } + + store.KeysAPI = client.NewKeysAPI(c) + return &store +} + +func (p *Provider) Hosts([]string, error) { + var hosts []string + resp, err := p.Get(context.Background(), p.directory, nil) + if err != nil { + return nil, err + } + if !resp.Node.Dir { + return nil, fmt.Errorf("%s is %v", p.directory, ErrNotDirectory) + } + for _, node := range resp.Node.Nodes { + if node.Dir { + return nil, fmt.Errorf("%s is %v", node.Key, ErrNotKey) + } + hosts = append(hosts, node.Value) + } + return hosts, nil +} + +func (p *Provider) Watch() provider.Watcher { + w := p.Watcher(p.directory, client.WatcherOptions{Recursive: true}) + return watcher{ + next: func() (string, error) { + resp, err := w.Next(context.Background()) + if err != nil { + return "", err + } + if resp.Node.Dir { + return "", fmt.Errorf("%s is %v", resp.Node.Key, ErrNotKey) + } + if len(strings.Split(strings.TrimPrefix(resp.Node.Key, p.directory), "/")) != 1 { + return "", fmt.Errorf("%s is %v '%v", resp.Node.Key, ErrNotInDirectory, p.directory) + } + return resp.Node.Value, nil + }, + } +} + +type watcher struct { + next func() (host string, err error) +} + +func (w watcher) Next() (string, error) { + return w.next() +} + +// URL format +// etcd://,/ +func parseAddr(addr string) (Provider, error) { + store := Provider{} + if !strings.HasPrefix(addr, Scheme) { + return ErrInvalidScheme + } + addr = strings.TrimPrefix(addr, Scheme) + s := strings.SplitN(addr, "/", 2) + for _, v := range strings.Split(s, ",") { + store.endpoints = append(store.endpoints, "http://"+v) + } + if len(s) == 2 { + store.directory = "/" + s[1] + } else { + store.directory = DefaultDirectory + } + return store, nil +} diff --git a/middleware/proxy/provider/provider.go b/middleware/proxy/provider/provider.go new file mode 100644 index 000000000..c3213666d --- /dev/null +++ b/middleware/proxy/provider/provider.go @@ -0,0 +1,44 @@ +package provider + +import "github.com/mholt/caddy/middleware/proxy/provider/etcd" + +type Provider interface { + Hosts() ([]string, error) +} + +type staticProvider string + +func (s staticProvider) Hosts([]string, error) { + return []string{s}, nil +} + +func newStaticProvider(host string) (Provider, error) { + return staticProvider(host) +} + +var providers = make(map[string]NewFunc) + +type NewFunc func(string) (Provider, error) + +func RegisterProvider(scheme string, initFunc NewFunc) { + providers[scheme] = initFunc +} + +// DynamicProvider represents a dynamic hosts provider. +type DynamicProvider interface { + Provider + Watch() Watcher +} + +// Watcher watches for changes in the store. +// Next blocks until a new host is available. +type Watcher interface { + Next() (host string, err error) +} + +func init() { + // register all providers + RegisterProvider("http://", newStaticProvider) + RegisterProvider("https://", newStaticProvider) + RegisterProvider("etcd://", etcd.New) +} From fe5fc118ec6a31b2ebbbf8c1a812c0df51d1fa36 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sun, 17 Jan 2016 03:26:14 +0100 Subject: [PATCH 02/10] integrate provider backend with proxy. --- middleware/proxy/provider/etcd/etcd.go | 62 ++++++++++---- middleware/proxy/provider/provider.go | 56 ++++++++++--- middleware/proxy/upstream.go | 107 ++++++++++++++++++------- 3 files changed, 167 insertions(+), 58 deletions(-) diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go index 2cf62c138..2e4348bc6 100644 --- a/middleware/proxy/provider/etcd/etcd.go +++ b/middleware/proxy/provider/etcd/etcd.go @@ -3,6 +3,7 @@ package etcd import ( "fmt" "strings" + "sync" "github.com/coreos/etcd/client" "github.com/mholt/caddy/middleware/proxy/provider" @@ -27,6 +28,8 @@ type Provider struct { endpoints []string directory string client.KeysAPI + watching bool + sync.Mutex } // New creates a new Etcd DynamicProvider @@ -70,30 +73,56 @@ func (p *Provider) Hosts([]string, error) { func (p *Provider) Watch() provider.Watcher { w := p.Watcher(p.directory, client.WatcherOptions{Recursive: true}) return watcher{ - next: func() (string, error) { - resp, err := w.Next(context.Background()) - if err != nil { - return "", err - } - if resp.Node.Dir { - return "", fmt.Errorf("%s is %v", resp.Node.Key, ErrNotKey) - } - if len(strings.Split(strings.TrimPrefix(resp.Node.Key, p.directory), "/")) != 1 { - return "", fmt.Errorf("%s is %v '%v", resp.Node.Key, ErrNotInDirectory, p.directory) - } - return resp.Node.Value, nil + next: func() <-chan provider.Config { + ch := make(<-chan provider.Config) + go func() { + p.Lock() + p.watching = true + p.Unlock() + + for { + resp, err := w.Next(context.Background()) + if err != nil { + return "", err + } + if resp.Node.Dir { + return "", fmt.Errorf("%s is %v", resp.Node.Key, ErrNotKey) + } + if len(strings.Split(strings.TrimPrefix(resp.Node.Key, p.directory), "/")) != 1 { + return "", fmt.Errorf("%s is %v '%v", resp.Node.Key, ErrNotInDirectory, p.directory) + } + ch <- provider.Config{resp.Node.Value, nil} + p.Lock() + if !p.watching { + p.Unlock() + break + } + p.Unlock() + } + }() + return ch + }, + stop: func() { + p.Lock() + p.watching = false + p.Unlock() }, } } type watcher struct { - next func() (host string, err error) + next func() <-chan provider.Config + stop func() } -func (w watcher) Next() (string, error) { +func (w watcher) Next() <-chan provider.Config { return w.next() } +func (w watcher) Stop() { + w.stop() +} + // URL format // etcd://,/ func parseAddr(addr string) (Provider, error) { @@ -113,3 +142,8 @@ func parseAddr(addr string) (Provider, error) { } return store, nil } + +func init() { + // register + provider.Register("etcd://", New) +} diff --git a/middleware/proxy/provider/provider.go b/middleware/proxy/provider/provider.go index c3213666d..eb51f9d91 100644 --- a/middleware/proxy/provider/provider.go +++ b/middleware/proxy/provider/provider.go @@ -1,6 +1,16 @@ package provider -import "github.com/mholt/caddy/middleware/proxy/provider/etcd" +import ( + "errors" + "fmt" + "strings" +) + +var ( + providers = make(map[string]NewFunc) + + ErrUnsupportedScheme = errors.New("scheme is not supported.") +) type Provider interface { Hosts() ([]string, error) @@ -8,37 +18,57 @@ type Provider interface { type staticProvider string -func (s staticProvider) Hosts([]string, error) { - return []string{s}, nil +func (s staticProvider) Hosts() ([]string, error) { + return []string{string(s)}, nil } -func newStaticProvider(host string) (Provider, error) { - return staticProvider(host) +func newStatic(host string) (Provider, error) { + if !strings.HasPrefix(host, "http") { + host = "http://" + host + } + return staticProvider(host), nil } -var providers = make(map[string]NewFunc) - type NewFunc func(string) (Provider, error) -func RegisterProvider(scheme string, initFunc NewFunc) { +func Register(scheme string, initFunc NewFunc) { providers[scheme] = initFunc } +func Get(addr string) (Provider, error) { + scheme := "" + s := strings.SplitN(addr, "://", 2) + if len(s) > 1 { + scheme = s[0] + } + if f, ok := providers[scheme]; ok { + return f(addr) + } + return nil, fmt.Errorf("%s %v", scheme, ErrUnsupportedScheme) +} + // DynamicProvider represents a dynamic hosts provider. type DynamicProvider interface { Provider Watch() Watcher } +// Config +type Config struct { + Host string + Err error +} + // Watcher watches for changes in the store. // Next blocks until a new host is available. type Watcher interface { - Next() (host string, err error) + Next() <-chan Config + Stop() } func init() { - // register all providers - RegisterProvider("http://", newStaticProvider) - RegisterProvider("https://", newStaticProvider) - RegisterProvider("etcd://", etcd.New) + // register provider + Register("http://", newStatic) + Register("https://", newStatic) + Register("", newStatic) } diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 28dbb665a..67aedb663 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -3,15 +3,17 @@ package proxy import ( "io" "io/ioutil" + "log" "net/http" "net/url" "path" "strconv" - "strings" + "sync" "time" "github.com/mholt/caddy/caddy/parse" "github.com/mholt/caddy/middleware" + "github.com/mholt/caddy/middleware/proxy/provider" ) var ( @@ -32,6 +34,8 @@ type staticUpstream struct { } WithoutPathPrefix string IgnoredSubPaths []string + + sync.Mutex } // NewStaticUpstreams parses the configuration input and sets up @@ -62,38 +66,27 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { } } - upstream.Hosts = make([]*UpstreamHost, len(to)) - for i, host := range to { - if !strings.HasPrefix(host, "http") { - host = "http://" + host - } - uh := &UpstreamHost{ - Name: host, - Conns: 0, - Fails: 0, - FailTimeout: upstream.FailTimeout, - Unhealthy: false, - ExtraHeaders: upstream.proxyHeaders, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { - if uh.Unhealthy { - return true - } - if uh.Fails >= upstream.MaxFails && - upstream.MaxFails != 0 { - return true - } - return false - } - }(upstream), - WithoutPathPrefix: upstream.WithoutPathPrefix, - } - if baseURL, err := url.Parse(uh.Name); err == nil { - uh.ReverseProxy = NewSingleHostReverseProxy(baseURL, uh.WithoutPathPrefix) - } else { + for _, addr := range to { + p, err := provider.Get(addr) + if err != nil { return upstreams, err } - upstream.Hosts[i] = uh + hosts, err := p.Hosts() + if err != nil { + return upstreams, err + } + for _, host := range hosts { + err := addToUpstream(upstream, host) + if err != nil { + return upstreams, err + } + // if provider is dynamic + // watch for changes + if dp, ok := p.(provider.DynamicProvider); ok { + watcher := dp.Watch() + go providerWorker(upstream, watcher, nil) + } + } } if upstream.HealthCheck.Path != "" { @@ -104,6 +97,58 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { return upstreams, nil } +func addToUpstream(upstream *staticUpstream, host string) error { + uh := &UpstreamHost{ + Name: host, + Conns: 0, + Fails: 0, + FailTimeout: upstream.FailTimeout, + Unhealthy: false, + ExtraHeaders: upstream.proxyHeaders, + CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { + return func(uh *UpstreamHost) bool { + if uh.Unhealthy { + return true + } + if uh.Fails >= upstream.MaxFails && + upstream.MaxFails != 0 { + return true + } + return false + } + }(upstream), + WithoutPathPrefix: upstream.WithoutPathPrefix, + } + if baseURL, err := url.Parse(uh.Name); err == nil { + uh.ReverseProxy = NewSingleHostReverseProxy(baseURL, uh.WithoutPathPrefix) + } else { + return err + } + upstream.Lock() + upstream.Hosts = append(upstream.Hosts, uh) + upstream.Unlock() + return nil +} + +func providerWorker(upstream *staticUpstream, watcher provider.Watcher, stop chan struct{}) { + for { + select { + case c := <-watcher.Next(): + if c.Err != nil { + log.Println(c.Err) + } else { + if err := addToUpstream(upstream, c.Host); err != nil { + log.Println(err) + } else { + log.Println("New host added to ") + } + } + case <-stop: + watcher.Stop() + } + } +} + // RegisterPolicy adds a custom policy to the proxy. func RegisterPolicy(name string, policy func() Policy) { supportedPolicies[name] = policy From b81526519422ee7facf36de0980f8a4dfde2dee4 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sun, 17 Jan 2016 13:30:57 +0100 Subject: [PATCH 03/10] End to end functionality. Add user/pass auth to Etcd. --- middleware/proxy/dynamic.go | 11 ++ middleware/proxy/provider/etcd/etcd.go | 124 ++++++++-------- middleware/proxy/provider/provider.go | 26 ++-- middleware/proxy/upstream.go | 190 ++++++++++++++++--------- 4 files changed, 213 insertions(+), 138 deletions(-) create mode 100644 middleware/proxy/dynamic.go diff --git a/middleware/proxy/dynamic.go b/middleware/proxy/dynamic.go new file mode 100644 index 000000000..d6f40c387 --- /dev/null +++ b/middleware/proxy/dynamic.go @@ -0,0 +1,11 @@ +package proxy + +import ( + "github.com/mholt/caddy/middleware/proxy/provider" + "github.com/mholt/caddy/middleware/proxy/provider/etcd" +) + +func init() { + // register dynamic providers + provider.Register("etcd", etcd.New) +} diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go index 2e4348bc6..38cf5fa92 100644 --- a/middleware/proxy/provider/etcd/etcd.go +++ b/middleware/proxy/provider/etcd/etcd.go @@ -14,7 +14,7 @@ import ( const ( Scheme = "etcd://" - DefaultDirectory = "/CADDY_PROXY_HOSTS" + DefaultDirectory = "/CADDY_PROXY_HOSTS/" ) var ( @@ -27,32 +27,35 @@ var ( type Provider struct { endpoints []string directory string + username string + password string client.KeysAPI - watching bool sync.Mutex } // New creates a new Etcd DynamicProvider -func New(addr string) (provider.DynamicProvider, error) { - store, err := parseAddr(addr) +func New(addr string) (provider.Provider, error) { + p, err := parseAddr(addr) if err != nil { return nil, err } cfg := client.Config{ - Endpoints: store.endpoints, + Endpoints: p.endpoints, Transport: client.DefaultTransport, + Username: p.username, + Password: p.password, } c, err := client.New(cfg) if err != nil { return nil, err } - store.KeysAPI = client.NewKeysAPI(c) - return &store + p.KeysAPI = client.NewKeysAPI(c) + return &p, nil } -func (p *Provider) Hosts([]string, error) { +func (p *Provider) Hosts() ([]string, error) { var hosts []string resp, err := p.Get(context.Background(), p.directory, nil) if err != nil { @@ -71,79 +74,80 @@ func (p *Provider) Hosts([]string, error) { } func (p *Provider) Watch() provider.Watcher { - w := p.Watcher(p.directory, client.WatcherOptions{Recursive: true}) - return watcher{ - next: func() <-chan provider.Config { - ch := make(<-chan provider.Config) - go func() { - p.Lock() - p.watching = true - p.Unlock() + w := p.Watcher(p.directory, &client.WatcherOptions{Recursive: true}) + return &watcher{ + next: func() (msg provider.WatcherMsg, err error) { + var resp *client.Response + if resp, err = w.Next(context.Background()); err != nil { + return + } + if resp.Node.Dir { + err = fmt.Errorf("%s is %v", resp.Node.Key, ErrNotKey) + return + } + if len(strings.Split(strings.TrimPrefix(resp.Node.Key, p.directory), "/")) != 1 { + err = fmt.Errorf("%s is %v '%v", resp.Node.Key, ErrNotInDirectory, p.directory) + return + } - for { - resp, err := w.Next(context.Background()) - if err != nil { - return "", err - } - if resp.Node.Dir { - return "", fmt.Errorf("%s is %v", resp.Node.Key, ErrNotKey) - } - if len(strings.Split(strings.TrimPrefix(resp.Node.Key, p.directory), "/")) != 1 { - return "", fmt.Errorf("%s is %v '%v", resp.Node.Key, ErrNotInDirectory, p.directory) - } - ch <- provider.Config{resp.Node.Value, nil} - p.Lock() - if !p.watching { - p.Unlock() - break - } - p.Unlock() + // TODO Nonfunctional code block + // Etcd client not reporting delete events. + // Relying on proxy health checker to bring inactive hosts down. + if resp.Node.Value == "" { + if resp.PrevNode != nil { + return provider.WatcherMsg{Host: resp.PrevNode.Value, Remove: true}, nil } - }() - return ch - }, - stop: func() { - p.Lock() - p.watching = false - p.Unlock() + // should not happen + err = errors.New("Node is previously empty") + } + + return provider.WatcherMsg{Host: resp.Node.Value, Remove: false}, err }, } } type watcher struct { - next func() <-chan provider.Config - stop func() + next func() (provider.WatcherMsg, error) } -func (w watcher) Next() <-chan provider.Config { +func (w *watcher) Next() (provider.WatcherMsg, error) { return w.next() } -func (w watcher) Stop() { - w.stop() -} - // URL format -// etcd://,/ +// etcd://username:password@,/ func parseAddr(addr string) (Provider, error) { - store := Provider{} + p := Provider{} if !strings.HasPrefix(addr, Scheme) { - return ErrInvalidScheme + return p, ErrInvalidScheme } - addr = strings.TrimPrefix(addr, Scheme) + p.username, p.password, addr = extractUserPass(strings.TrimPrefix(addr, Scheme)) s := strings.SplitN(addr, "/", 2) - for _, v := range strings.Split(s, ",") { - store.endpoints = append(store.endpoints, "http://"+v) + for _, v := range strings.Split(s[0], ",") { + p.endpoints = append(p.endpoints, "http://"+v) } if len(s) == 2 { - store.directory = "/" + s[1] + p.directory = "/" + s[1] + if !strings.HasSuffix(s[1], "/") { + p.directory += "/" + } } else { - store.directory = DefaultDirectory + p.directory = DefaultDirectory } - return store, nil + return p, nil } -func init() { - // register - provider.Register("etcd://", New) +func extractUserPass(addr string) (username, password, remaining string) { + s := strings.SplitN(addr, "@", 2) + if len(s) == 1 { + remaining = addr + return + } + userPass := strings.Split(s[0], "@") + username = userPass[0] + if len(userPass) > 1 { + password = userPass[1] + } + remaining = s[1] + return } diff --git a/middleware/proxy/provider/provider.go b/middleware/proxy/provider/provider.go index eb51f9d91..0f5ae6cad 100644 --- a/middleware/proxy/provider/provider.go +++ b/middleware/proxy/provider/provider.go @@ -23,16 +23,14 @@ func (s staticProvider) Hosts() ([]string, error) { } func newStatic(host string) (Provider, error) { - if !strings.HasPrefix(host, "http") { - host = "http://" + host - } return staticProvider(host), nil } -type NewFunc func(string) (Provider, error) +// NewFunc creates a new Provider. +type NewFunc func(host string) (Provider, error) -func Register(scheme string, initFunc NewFunc) { - providers[scheme] = initFunc +func Register(scheme string, newFunc NewFunc) { + providers[scheme] = newFunc } func Get(addr string) (Provider, error) { @@ -53,22 +51,22 @@ type DynamicProvider interface { Watch() Watcher } -// Config -type Config struct { +type WatcherMsg struct { + // Host is the affected host Host string - Err error + // Remove is true if the host should be removed instead. + Remove bool } // Watcher watches for changes in the store. // Next blocks until a new host is available. type Watcher interface { - Next() <-chan Config - Stop() + Next() (msg WatcherMsg, err error) } func init() { - // register provider - Register("http://", newStatic) - Register("https://", newStatic) + // register static provider + Register("http", newStatic) + Register("https", newStatic) Register("", newStatic) } diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 67aedb663..97bbbe589 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -8,6 +8,7 @@ import ( "net/url" "path" "strconv" + "strings" "sync" "time" @@ -35,7 +36,7 @@ type staticUpstream struct { WithoutPathPrefix string IgnoredSubPaths []string - sync.Mutex + sync.RWMutex } // NewStaticUpstreams parses the configuration input and sets up @@ -67,25 +68,29 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { } for _, addr := range to { - p, err := provider.Get(addr) - if err != nil { + var p provider.Provider + var err error + var hosts []string + // fetch provider + if p, err = provider.Get(addr); err != nil { return upstreams, err } - hosts, err := p.Hosts() - if err != nil { + // fetch hosts from provider + if hosts, err = p.Hosts(); err != nil { return upstreams, err } + // add hosts to upstream for _, host := range hosts { - err := addToUpstream(upstream, host) + err := upstream.AddHost(host) if err != nil { return upstreams, err } - // if provider is dynamic - // watch for changes - if dp, ok := p.(provider.DynamicProvider); ok { - watcher := dp.Watch() - go providerWorker(upstream, watcher, nil) - } + } + // if provider is dynamic + // watch for upcoming changes + if dp, ok := p.(provider.DynamicProvider); ok { + watcher := dp.Watch() + go upstream.ProviderWorker(watcher, nil) } } @@ -97,58 +102,6 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { return upstreams, nil } -func addToUpstream(upstream *staticUpstream, host string) error { - uh := &UpstreamHost{ - Name: host, - Conns: 0, - Fails: 0, - FailTimeout: upstream.FailTimeout, - Unhealthy: false, - ExtraHeaders: upstream.proxyHeaders, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { - if uh.Unhealthy { - return true - } - if uh.Fails >= upstream.MaxFails && - upstream.MaxFails != 0 { - return true - } - return false - } - }(upstream), - WithoutPathPrefix: upstream.WithoutPathPrefix, - } - if baseURL, err := url.Parse(uh.Name); err == nil { - uh.ReverseProxy = NewSingleHostReverseProxy(baseURL, uh.WithoutPathPrefix) - } else { - return err - } - upstream.Lock() - upstream.Hosts = append(upstream.Hosts, uh) - upstream.Unlock() - return nil -} - -func providerWorker(upstream *staticUpstream, watcher provider.Watcher, stop chan struct{}) { - for { - select { - case c := <-watcher.Next(): - if c.Err != nil { - log.Println(c.Err) - } else { - if err := addToUpstream(upstream, c.Host); err != nil { - log.Println(err) - } else { - log.Println("New host added to ") - } - } - case <-stop: - watcher.Stop() - } - } -} - // RegisterPolicy adds a custom policy to the proxy. func RegisterPolicy(name string, policy func() Policy) { supportedPolicies[name] = policy @@ -254,7 +207,49 @@ func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) { } } +func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <-chan struct{}) { + type msg struct { + msg provider.WatcherMsg + err error + } + resp := make(chan msg) + go func() { + // blocks until there's a message from Watcher + m, err := watcher.Next() + resp <- msg{m, err} + }() + +worker: + for { + select { + case m := <-resp: + if m.err != nil { + log.Println(m.err) + continue worker + } + if m.msg.Remove { + // remove from upstream + upstream.RemoveHost(m.msg.Host) + log.Printf("Host %v removed to upstream\n", m.msg.Host) + } else { + // add host to upstream + if err := upstream.AddHost(m.msg.Host); err != nil { + log.Println(err) + continue worker + } + log.Printf("New host %v added to upstream\n", m.msg.Host) + } + case <-stop: + break worker + } + + } +} + func (u *staticUpstream) Select() *UpstreamHost { + u.RLock() + defer u.RUnlock() + pool := u.Hosts if len(pool) == 1 { if pool[0].Down() { @@ -287,3 +282,70 @@ func (u *staticUpstream) IsAllowedPath(requestPath string) bool { } return true } + +func (upstream *staticUpstream) AddHost(host string) error { + host = hostName(host).String() + uh := &UpstreamHost{ + Name: host, + Conns: 0, + Fails: 0, + FailTimeout: upstream.FailTimeout, + Unhealthy: false, + ExtraHeaders: upstream.proxyHeaders, + CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { + return func(uh *UpstreamHost) bool { + if uh.Unhealthy { + return true + } + if uh.Fails >= upstream.MaxFails && + upstream.MaxFails != 0 { + return true + } + return false + } + }(upstream), + WithoutPathPrefix: upstream.WithoutPathPrefix, + } + if baseURL, err := url.Parse(uh.Name); err == nil { + uh.ReverseProxy = NewSingleHostReverseProxy(baseURL, uh.WithoutPathPrefix) + } else { + return err + } + upstream.Lock() + upstream.Hosts = append(upstream.Hosts, uh) + upstream.Unlock() + return nil +} + +func (upstream *staticUpstream) RemoveHost(host string) { + upstream.Lock() + defer upstream.Unlock() + idx := -1 + for i, h := range upstream.Hosts { + if hostName(host).equals(h.Name) { + idx = i + break + } + } + if idx == -1 { + return + } + if idx == len(upstream.Hosts)-1 { + upstream.Hosts = upstream.Hosts[:idx] + return + } + upstream.Hosts = append(upstream.Hosts[:idx], upstream.Hosts[idx+1:]...) +} + +type hostName string + +func (h hostName) equals(host string) bool { + return h.String() == hostName(host).String() +} + +func (h hostName) String() string { + if !strings.HasPrefix(string(h), "http") { + return "http://" + string(h) + } + return string(h) +} From af7d98eedf8dc2d55f7b9ce2e0191ed11022b3cc Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sun, 17 Jan 2016 14:01:53 +0100 Subject: [PATCH 04/10] Oops. Dynamic removal actually also works. --- middleware/proxy/provider/etcd/etcd.go | 7 +++---- middleware/proxy/upstream.go | 23 ++++++++++++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go index 38cf5fa92..210d47a43 100644 --- a/middleware/proxy/provider/etcd/etcd.go +++ b/middleware/proxy/provider/etcd/etcd.go @@ -90,9 +90,7 @@ func (p *Provider) Watch() provider.Watcher { return } - // TODO Nonfunctional code block - // Etcd client not reporting delete events. - // Relying on proxy health checker to bring inactive hosts down. + // Remove host if resp.Node.Value == "" { if resp.PrevNode != nil { return provider.WatcherMsg{Host: resp.PrevNode.Value, Remove: true}, nil @@ -101,6 +99,7 @@ func (p *Provider) Watch() provider.Watcher { err = errors.New("Node is previously empty") } + // Add host return provider.WatcherMsg{Host: resp.Node.Value, Remove: false}, err }, } @@ -143,7 +142,7 @@ func extractUserPass(addr string) (username, password, remaining string) { remaining = addr return } - userPass := strings.Split(s[0], "@") + userPass := strings.Split(s[0], ":") username = userPass[0] if len(userPass) > 1 { password = userPass[1] diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 97bbbe589..ae41c025d 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -214,15 +214,26 @@ func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <- } resp := make(chan msg) go func() { - // blocks until there's a message from Watcher - m, err := watcher.Next() - resp <- msg{m, err} + outer: + for { + // blocks until there's a message from Watcher + m, err := watcher.Next() + select { + case resp <- msg{m, err}: + case <-stop: + close(resp) + break outer + } + } }() worker: for { select { - case m := <-resp: + case m, ok := <-resp: + if !ok { + break worker + } if m.err != nil { log.Println(m.err) continue worker @@ -230,7 +241,7 @@ worker: if m.msg.Remove { // remove from upstream upstream.RemoveHost(m.msg.Host) - log.Printf("Host %v removed to upstream\n", m.msg.Host) + log.Printf("Host %v removed from upstream\n", m.msg.Host) } else { // add host to upstream if err := upstream.AddHost(m.msg.Host); err != nil { @@ -239,8 +250,6 @@ worker: } log.Printf("New host %v added to upstream\n", m.msg.Host) } - case <-stop: - break worker } } From de1a34b0b2ab5cf1b6ebdd01d1147b537a917083 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sun, 17 Jan 2016 15:02:19 +0100 Subject: [PATCH 05/10] Code documentaion. --- middleware/proxy/provider/etcd/etcd.go | 39 ++++++++++++++++++-------- middleware/proxy/provider/provider.go | 22 +++++++++++---- middleware/proxy/upstream.go | 33 +++++++++++++++++++--- 3 files changed, 72 insertions(+), 22 deletions(-) diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go index 210d47a43..ca4b8f2e8 100644 --- a/middleware/proxy/provider/etcd/etcd.go +++ b/middleware/proxy/provider/etcd/etcd.go @@ -1,3 +1,4 @@ +// Package etcd is an Etcd backed provider. package etcd import ( @@ -12,18 +13,21 @@ import ( ) const ( + // Scheme is Ectd url scheme. Scheme = "etcd://" + // DefaultDirectory is the default Etcd config directory. DefaultDirectory = "/CADDY_PROXY_HOSTS/" ) var ( - ErrInvalidScheme = errors.New("invalid Etcd scheme") - ErrNotDirectory = errors.New("not an Etcd directory") - ErrNotKey = errors.New("not an Etcd key") - ErrNotInDirectory = errors.New("not in expected directory") + errInvalidScheme = errors.New("invalid Etcd scheme") + errNotDirectory = errors.New("not an Etcd directory") + errNotKey = errors.New("not an Etcd key") + errNotInDirectory = errors.New("not in expected directory") ) +// Provider is Etcd provider. type Provider struct { endpoints []string directory string @@ -33,7 +37,7 @@ type Provider struct { sync.Mutex } -// New creates a new Etcd DynamicProvider +// New creates a new Etcd Provider func New(addr string) (provider.Provider, error) { p, err := parseAddr(addr) if err != nil { @@ -55,6 +59,7 @@ func New(addr string) (provider.Provider, error) { return &p, nil } +// Hosts satisfies provider.Provider interface. func (p *Provider) Hosts() ([]string, error) { var hosts []string resp, err := p.Get(context.Background(), p.directory, nil) @@ -62,17 +67,18 @@ func (p *Provider) Hosts() ([]string, error) { return nil, err } if !resp.Node.Dir { - return nil, fmt.Errorf("%s is %v", p.directory, ErrNotDirectory) + return nil, fmt.Errorf("%s is %v", p.directory, errNotDirectory) } for _, node := range resp.Node.Nodes { if node.Dir { - return nil, fmt.Errorf("%s is %v", node.Key, ErrNotKey) + return nil, fmt.Errorf("%s is %v", node.Key, errNotKey) } hosts = append(hosts, node.Value) } return hosts, nil } +// Watch satisfies provider.DynamicProvider interface. func (p *Provider) Watch() provider.Watcher { w := p.Watcher(p.directory, &client.WatcherOptions{Recursive: true}) return &watcher{ @@ -82,11 +88,11 @@ func (p *Provider) Watch() provider.Watcher { return } if resp.Node.Dir { - err = fmt.Errorf("%s is %v", resp.Node.Key, ErrNotKey) + err = fmt.Errorf("%s is %v", resp.Node.Key, errNotKey) return } if len(strings.Split(strings.TrimPrefix(resp.Node.Key, p.directory), "/")) != 1 { - err = fmt.Errorf("%s is %v '%v", resp.Node.Key, ErrNotInDirectory, p.directory) + err = fmt.Errorf("%s is %v '%v", resp.Node.Key, errNotInDirectory, p.directory) return } @@ -113,18 +119,25 @@ func (w *watcher) Next() (provider.WatcherMsg, error) { return w.next() } -// URL format -// etcd://username:password@,/ +// parseAddr passes addr into a new configured Provider. +// URL format: etcd://username:password@,/ func parseAddr(addr string) (Provider, error) { p := Provider{} + // validate scheme. if !strings.HasPrefix(addr, Scheme) { - return p, ErrInvalidScheme + return p, errInvalidScheme } + + // extract username and password if present. p.username, p.password, addr = extractUserPass(strings.TrimPrefix(addr, Scheme)) + + // extract endpoints. s := strings.SplitN(addr, "/", 2) for _, v := range strings.Split(s[0], ",") { p.endpoints = append(p.endpoints, "http://"+v) } + + // extract directory if len(s) == 2 { p.directory = "/" + s[1] if !strings.HasSuffix(s[1], "/") { @@ -133,9 +146,11 @@ func parseAddr(addr string) (Provider, error) { } else { p.directory = DefaultDirectory } + return p, nil } +// extractUserPass extracts username and password from addr. func extractUserPass(addr string) (username, password, remaining string) { s := strings.SplitN(addr, "@", 2) if len(s) == 1 { diff --git a/middleware/proxy/provider/provider.go b/middleware/proxy/provider/provider.go index 0f5ae6cad..c5ee21ea0 100644 --- a/middleware/proxy/provider/provider.go +++ b/middleware/proxy/provider/provider.go @@ -7,32 +7,39 @@ import ( ) var ( - providers = make(map[string]NewFunc) + providers = make(map[string]NewProviderFunc) - ErrUnsupportedScheme = errors.New("scheme is not supported.") + errUnsupportedScheme = errors.New("scheme is not supported.") ) +// Provider is hosts provider. type Provider interface { + // Hosts returns all hosts provided by this provider. Hosts() ([]string, error) } +// staticProvider cater for static hardcoded hosts. type staticProvider string +// Hosts satisfies Provider interface. func (s staticProvider) Hosts() ([]string, error) { return []string{string(s)}, nil } +// newStatic creates a new static host provider. func newStatic(host string) (Provider, error) { return staticProvider(host), nil } -// NewFunc creates a new Provider. -type NewFunc func(host string) (Provider, error) +// NewProviderFunc creates a new Provider. +type NewProviderFunc func(host string) (Provider, error) -func Register(scheme string, newFunc NewFunc) { +// Register registers a url scheme against a new provider function. +func Register(scheme string, newFunc NewProviderFunc) { providers[scheme] = newFunc } +// Get fetches a provider using the scheme of the provided address. func Get(addr string) (Provider, error) { scheme := "" s := strings.SplitN(addr, "://", 2) @@ -42,15 +49,18 @@ func Get(addr string) (Provider, error) { if f, ok := providers[scheme]; ok { return f(addr) } - return nil, fmt.Errorf("%s %v", scheme, ErrUnsupportedScheme) + return nil, fmt.Errorf("%s %v", scheme, errUnsupportedScheme) } // DynamicProvider represents a dynamic hosts provider. type DynamicProvider interface { Provider + // Watch creates a new Watcher. Watch() Watcher } +// WatcherMsg is the message sent by Watcher when there is a +// change to a host. type WatcherMsg struct { // Host is the affected host Host string diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index ae41c025d..ed7b5ca36 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -36,6 +36,9 @@ type staticUpstream struct { WithoutPathPrefix string IgnoredSubPaths []string + // Ensure distinct hosts + hostSet map[string]struct{} + sync.RWMutex } @@ -51,6 +54,7 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { Policy: &Random{}, FailTimeout: 10 * time.Second, MaxFails: 1, + hostSet: make(map[string]struct{}), } if !c.Args(&upstream.from) { @@ -241,14 +245,14 @@ worker: if m.msg.Remove { // remove from upstream upstream.RemoveHost(m.msg.Host) - log.Printf("Host %v removed from upstream\n", m.msg.Host) + log.Printf("Host %v removed from upstream", m.msg.Host) } else { // add host to upstream if err := upstream.AddHost(m.msg.Host); err != nil { log.Println(err) continue worker } - log.Printf("New host %v added to upstream\n", m.msg.Host) + log.Printf("New host %v added to upstream", m.msg.Host) } } @@ -292,8 +296,18 @@ func (u *staticUpstream) IsAllowedPath(requestPath string) bool { return true } +// AddHost adds host to upstream hosts. func (upstream *staticUpstream) AddHost(host string) error { + upstream.Lock() + defer upstream.Unlock() + host = hostName(host).String() + + // If its previously added, ignore + if _, ok := upstream.hostSet[host]; ok { + return nil + } + uh := &UpstreamHost{ Name: host, Conns: 0, @@ -320,15 +334,23 @@ func (upstream *staticUpstream) AddHost(host string) error { } else { return err } - upstream.Lock() upstream.Hosts = append(upstream.Hosts, uh) - upstream.Unlock() + upstream.hostSet[host] = struct{}{} return nil } +// RemoveHost removes host from upstream hosts. func (upstream *staticUpstream) RemoveHost(host string) { upstream.Lock() defer upstream.Unlock() + + host = hostName(host).String() + + // If it does not exist, ignore + if _, ok := upstream.hostSet[host]; !ok { + return + } + idx := -1 for i, h := range upstream.Hosts { if hostName(host).equals(h.Name) { @@ -339,6 +361,9 @@ func (upstream *staticUpstream) RemoveHost(host string) { if idx == -1 { return } + + delete(upstream.hostSet, host) + if idx == len(upstream.Hosts)-1 { upstream.Hosts = upstream.Hosts[:idx] return From a8e3e02efa6e8e54e6196248daca3808fcfc6f23 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sun, 17 Jan 2016 17:13:15 +0100 Subject: [PATCH 06/10] Added tests. --- middleware/proxy/provider/etcd/etcd.go | 16 +-- middleware/proxy/provider/etcd/etcd_test.go | 141 ++++++++++++++++++++ middleware/proxy/provider/provider_test.go | 113 ++++++++++++++++ middleware/proxy/upstream.go | 40 +++--- middleware/proxy/upstream_test.go | 45 +++++++ 5 files changed, 325 insertions(+), 30 deletions(-) create mode 100644 middleware/proxy/provider/etcd/etcd_test.go create mode 100644 middleware/proxy/provider/provider_test.go diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go index ca4b8f2e8..c682c65a8 100644 --- a/middleware/proxy/provider/etcd/etcd.go +++ b/middleware/proxy/provider/etcd/etcd.go @@ -2,13 +2,12 @@ package etcd import ( + "errors" "fmt" "strings" - "sync" "github.com/coreos/etcd/client" "github.com/mholt/caddy/middleware/proxy/provider" - "github.com/syndtr/goleveldb/leveldb/errors" "golang.org/x/net/context" ) @@ -17,7 +16,7 @@ const ( Scheme = "etcd://" // DefaultDirectory is the default Etcd config directory. - DefaultDirectory = "/CADDY_PROXY_HOSTS/" + DefaultDirectory = "/caddy_proxy_hosts/" ) var ( @@ -34,7 +33,6 @@ type Provider struct { username string password string client.KeysAPI - sync.Mutex } // New creates a new Etcd Provider @@ -56,7 +54,7 @@ func New(addr string) (provider.Provider, error) { } p.KeysAPI = client.NewKeysAPI(c) - return &p, nil + return p, nil } // Hosts satisfies provider.Provider interface. @@ -121,11 +119,11 @@ func (w *watcher) Next() (provider.WatcherMsg, error) { // parseAddr passes addr into a new configured Provider. // URL format: etcd://username:password@,/ -func parseAddr(addr string) (Provider, error) { - p := Provider{} +func parseAddr(addr string) (*Provider, error) { + p := &Provider{} // validate scheme. if !strings.HasPrefix(addr, Scheme) { - return p, errInvalidScheme + return nil, errInvalidScheme } // extract username and password if present. @@ -157,7 +155,7 @@ func extractUserPass(addr string) (username, password, remaining string) { remaining = addr return } - userPass := strings.Split(s[0], ":") + userPass := strings.SplitN(s[0], ":", 2) username = userPass[0] if len(userPass) > 1 { password = userPass[1] diff --git a/middleware/proxy/provider/etcd/etcd_test.go b/middleware/proxy/provider/etcd/etcd_test.go new file mode 100644 index 000000000..836de2782 --- /dev/null +++ b/middleware/proxy/provider/etcd/etcd_test.go @@ -0,0 +1,141 @@ +package etcd + +import ( + "fmt" + "testing" + + "github.com/mholt/caddy/middleware/proxy/provider" +) + +func TestParseAddr(t *testing.T) { + tests := []struct { + address string + expected *Provider + }{ + { + "etcd://localhost:2015", + &Provider{ + endpoints: []string{"http://localhost:2015"}, + }, + }, + { + "etcdlocalhost:2015", nil, + }, + { + "user:pass@localhost:2015", nil, + }, + { + "etcd://user:pass@localhost:2015", + &Provider{ + endpoints: []string{"http://localhost:2015"}, + username: "user", + password: "pass", + }, + }, + { + "etcd://localhost:2015,localhost:2016", + &Provider{ + endpoints: []string{"http://localhost:2015", "http://localhost:2016"}, + }, + }, + { + "etcd://user:pass@localhost:2015,localhost:2016", + &Provider{ + endpoints: []string{"http://localhost:2015", "http://localhost:2016"}, + username: "user", + password: "pass", + }, + }, + { + "etcd://user:pass@localhost:2015/directory", + &Provider{ + endpoints: []string{"http://localhost:2015"}, + username: "user", + password: "pass", + directory: "/directory/", + }, + }, + { + "etcd://user:pass@2015/directory", + &Provider{ + endpoints: []string{"http://2015"}, + username: "user", + password: "pass", + directory: "/directory/", + }, + }, + } + + for i, test := range tests { + pr, _ := parseAddr(test.address) + if !equalProvider(test.expected, pr, t) { + t.Errorf("Test %d: failed", i) + } + } + +} + +func equalProvider(expected, value *Provider, t *testing.T) bool { + if expected == nil && value != nil { + return false + } else if expected == nil { + return true + } + + if expected.directory == "" { + expected.directory = DefaultDirectory + } + + if expected.directory != value.directory { + t.Errorf("Expected directory %v, found %v", expected.directory, value.directory) + return false + } + + if expected.username != value.username { + t.Errorf("Expected username %v, found %v", expected.username, value.password) + return false + } + if expected.endpoints != nil && fmt.Sprint(expected.endpoints) != fmt.Sprint(value.endpoints) { + t.Errorf("Expected endpoints %v, found %v", expected.endpoints, value.endpoints) + return false + } + if expected.password != value.password { + t.Errorf("Expected password %v, found %v", expected.password, value.password) + return false + } + return true +} + +func TestGetProvider(t *testing.T) { + tests := []struct { + addr string + valid bool + }{ + {"etc://localhost", false}, + {"http://localhost", false}, + {"ftp://localhost", false}, + {"https://localhost", false}, + {"etcd://localhost", true}, + {"etcde://localhost", false}, + {"etcd:/localhost", false}, + {"etcd:localhost", false}, + } + + provider.Register("etcd", New) + + for i, test := range tests { + pr, _ := provider.Get(test.addr) + if test.valid { + if _, ok := pr.(*Provider); !ok { + t.Errorf("Test %d: expecting provider to be etcd.Provider", i) + } + if _, ok := pr.(provider.DynamicProvider); !ok { + t.Errorf("Test %d: expecting provider to be dynamic provider", i) + } + } else { + if _, ok := pr.(*Provider); ok { + t.Errorf("Test %d: not expecting etcd.Provider", i) + } + } + } +} diff --git a/middleware/proxy/provider/provider_test.go b/middleware/proxy/provider/provider_test.go new file mode 100644 index 000000000..a005632ca --- /dev/null +++ b/middleware/proxy/provider/provider_test.go @@ -0,0 +1,113 @@ +package provider + +import ( + "testing" + "time" +) + +func TestProvider(t *testing.T) { + tests := []struct { + addr string + valid bool + }{ + {"etc://localhost", false}, + {"http://localhost", false}, + {"ftp://localhost", false}, + {"https://localhost", false}, + {"fake://localhost", true}, + {"faked://localhost", false}, + } + + Register("fake", func(s string) (Provider, error) { + return fakeProvider(s), nil + }) + + for i, test := range tests { + pr, _ := Get(test.addr) + if test.valid { + if _, ok := pr.(fakeProvider); !ok { + t.Errorf("Test %d: expecting provider to be fakeProvider", i) + } + if _, ok := pr.(Provider); !ok { + t.Errorf("Test %d: expecting provider to be provider", i) + } + } else { + if _, ok := pr.(fakeProvider); ok { + t.Errorf("Test %d: not expecting fakeProvider", i) + } + } + } + + Register("fake", func(s string) (Provider, error) { + return fakeDynamic{fakeProvider(s)}, nil + }) + + for i, test := range tests { + pr, _ := Get(test.addr) + if test.valid { + if _, ok := pr.(fakeDynamic); !ok { + t.Errorf("Test %d: expecting provider to be fakeDynamic", i) + } + if _, ok := pr.(DynamicProvider); !ok { + t.Errorf("Test %d: expecting provider to be dynamic provider", i) + } + } else { + if _, ok := pr.(fakeDynamic); ok { + t.Errorf("Test %d: not expecting fakeDynamic", i) + } + } + } + + tests = []struct { + addr string + valid bool + }{ + {"etc://localhost", false}, + {"http://localhost", true}, + {"ftp://localhost", false}, + {"https://localhost", true}, + {"localhost", true}, + {"http://localhost", true}, + {"faked://localhost", false}, + } + + Register("http", newStatic) + Register("https", newStatic) + Register("", newStatic) + + for i, test := range tests { + pr, _ := Get(test.addr) + if test.valid { + if _, ok := pr.(staticProvider); !ok { + t.Errorf("Test %d: expecting provider to be staticProvider", i) + } + if _, ok := pr.(Provider); !ok { + t.Errorf("Test %d: expecting provider to be provider", i) + } + } else { + if _, ok := pr.(staticProvider); ok { + t.Errorf("Test %d: not expecting staticProvider", i) + } + } + } + +} + +type fakeProvider string + +func (f fakeProvider) Hosts() ([]string, error) { + return []string{string(f)}, nil +} + +type fakeDynamic struct { + fakeProvider +} + +func (f fakeDynamic) Watch() Watcher { + return f +} + +func (f fakeDynamic) Next() (msg WatcherMsg, err error) { + time.Sleep(100) + return WatcherMsg{}, nil +} diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index ed7b5ca36..423b19680 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -99,6 +99,7 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { } if upstream.HealthCheck.Path != "" { + // TODO provide stop channel go upstream.HealthCheckWorker(nil) } upstreams = append(upstreams, upstream) @@ -231,29 +232,26 @@ func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <- } }() -worker: for { - select { - case m, ok := <-resp: - if !ok { - break worker - } - if m.err != nil { - log.Println(m.err) - continue worker - } - if m.msg.Remove { - // remove from upstream - upstream.RemoveHost(m.msg.Host) - log.Printf("Host %v removed from upstream", m.msg.Host) - } else { - // add host to upstream - if err := upstream.AddHost(m.msg.Host); err != nil { - log.Println(err) - continue worker - } - log.Printf("New host %v added to upstream", m.msg.Host) + m, ok := <-resp + if !ok { + break + } + if m.err != nil { + log.Println(m.err) + continue + } + if m.msg.Remove { + // remove from upstream + upstream.RemoveHost(m.msg.Host) + log.Printf("Host %v removed from upstream", m.msg.Host) + } else { + // add host to upstream + if err := upstream.AddHost(m.msg.Host); err != nil { + log.Println(err) + continue } + log.Printf("New host %v added to upstream", m.msg.Host) } } diff --git a/middleware/proxy/upstream_test.go b/middleware/proxy/upstream_test.go index 5b2fdb1da..e22c15a56 100644 --- a/middleware/proxy/upstream_test.go +++ b/middleware/proxy/upstream_test.go @@ -49,7 +49,52 @@ func TestRegisterPolicy(t *testing.T) { if _, ok := supportedPolicies[name]; !ok { t.Error("Expected supportedPolicies to have a custom policy.") } +} +func TestAddRemove(t *testing.T) { + upstream := &staticUpstream{ + from: "", + Hosts: nil, + Policy: &Random{}, + FailTimeout: 10 * time.Second, + MaxFails: 1, + hostSet: make(map[string]struct{}), + } + upstream.AddHost("localhost") + if len(upstream.Hosts) != 1 { + t.Errorf("Expecting %v found %v", 1, len(upstream.Hosts)) + } + if upstream.Hosts[0].Name != "http://localhost" { + t.Errorf("Expecting %v found %v", "http://localhost", upstream.Hosts[0]) + } + upstream.AddHost("localhost") + if len(upstream.Hosts) != 1 { + t.Errorf("Expecting %v found %v", 1, len(upstream.Hosts)) + } + if upstream.Hosts[0].Name != "http://localhost" { + t.Errorf("Expecting %v found %v", "http://localhost", upstream.Hosts[0]) + } + upstream.AddHost("localhost1") + upstream.AddHost("localhost2") + if len(upstream.Hosts) != 3 { + t.Errorf("Expecting %v found %v", 3, len(upstream.Hosts)) + } + upstream.RemoveHost("localhost4") + if len(upstream.Hosts) != 3 { + t.Errorf("Expecting %v found %v", 3, len(upstream.Hosts)) + } + upstream.RemoveHost("localhost2") + if len(upstream.Hosts) != 2 { + t.Errorf("Expecting %v found %v", 2, len(upstream.Hosts)) + } + if upstream.Hosts[1].Name != "http://localhost1" { + t.Errorf("Expecting %v found %v", "http://localhost1", upstream.Hosts[1]) + } + upstream.RemoveHost("localhost1") + upstream.RemoveHost("localhost") + if len(upstream.Hosts) != 0 { + t.Errorf("Expecting %v found %v", 0, len(upstream.Hosts)) + } } func TestAllowedPaths(t *testing.T) { From 27838d03490ec98edaf1fc4d4e85636c08864db4 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Thu, 21 Jan 2016 01:15:23 +0100 Subject: [PATCH 07/10] delay before retrying on failure when waiting for etcd updates. --- middleware/proxy/upstream.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 423b19680..a92353a22 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -232,6 +232,17 @@ func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <- } }() + interval := 5 + delay := func() { + log.Printf("Delaying %d seconds before next attempt.", interval) + time.Sleep(time.Second * time.Duration(interval)) + + // delay at most 60 seconds. + interval += 5 + if interval > 60 { + interval = 60 + } + } for { m, ok := <-resp if !ok { @@ -239,7 +250,11 @@ func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <- } if m.err != nil { log.Println(m.err) + delay() continue + } else { + // no errors, reset delay interval. + interval = 0 } if m.msg.Remove { // remove from upstream From cc7167521d8223b2a06a33b554165fb468051dcb Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Sat, 30 Jan 2016 05:15:29 +0100 Subject: [PATCH 08/10] minor fixes --- middleware/proxy/dynamic.go | 2 +- middleware/proxy/provider/dynamic_provider.go | 23 ++++++++ middleware/proxy/provider/etcd/etcd.go | 56 +++++++------------ middleware/proxy/provider/etcd/etcd_test.go | 7 ++- middleware/proxy/provider/provider.go | 22 -------- middleware/proxy/upstream.go | 4 +- 6 files changed, 48 insertions(+), 66 deletions(-) create mode 100644 middleware/proxy/provider/dynamic_provider.go diff --git a/middleware/proxy/dynamic.go b/middleware/proxy/dynamic.go index d6f40c387..282ed025b 100644 --- a/middleware/proxy/dynamic.go +++ b/middleware/proxy/dynamic.go @@ -7,5 +7,5 @@ import ( func init() { // register dynamic providers - provider.Register("etcd", etcd.New) + provider.Register(etcd.Scheme, etcd.New) } diff --git a/middleware/proxy/provider/dynamic_provider.go b/middleware/proxy/provider/dynamic_provider.go new file mode 100644 index 000000000..6b837ddbc --- /dev/null +++ b/middleware/proxy/provider/dynamic_provider.go @@ -0,0 +1,23 @@ +package provider + +// DynamicProvider represents a dynamic hosts provider. +type DynamicProvider interface { + Provider + // Watch creates a new Watcher. + Watch() Watcher +} + +// WatcherMsg is the message sent by Watcher when there is a +// change to a host. +type WatcherMsg struct { + // Host is the affected host + Host string + // Remove is true if the host should be removed instead. + Remove bool +} + +// Watcher watches for changes in the store. +// Next blocks until a new host is available. +type Watcher interface { + Next() (msg WatcherMsg, err error) +} diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go index c682c65a8..1621a1af7 100644 --- a/middleware/proxy/provider/etcd/etcd.go +++ b/middleware/proxy/provider/etcd/etcd.go @@ -4,6 +4,7 @@ package etcd import ( "errors" "fmt" + "net/url" "strings" "github.com/coreos/etcd/client" @@ -13,10 +14,10 @@ import ( const ( // Scheme is Ectd url scheme. - Scheme = "etcd://" + Scheme = "etcd" // DefaultDirectory is the default Etcd config directory. - DefaultDirectory = "/caddy_proxy_hosts/" + DefaultDirectory = "/caddyserver.com/proxy/default/hosts/" ) var ( @@ -121,45 +122,26 @@ func (w *watcher) Next() (provider.WatcherMsg, error) { // URL format: etcd://username:password@,/ func parseAddr(addr string) (*Provider, error) { p := &Provider{} - // validate scheme. - if !strings.HasPrefix(addr, Scheme) { + + u, err := url.Parse(addr) + if err != nil { + return p, err + } + + if u.Scheme != Scheme { return nil, errInvalidScheme } - // extract username and password if present. - p.username, p.password, addr = extractUserPass(strings.TrimPrefix(addr, Scheme)) - - // extract endpoints. - s := strings.SplitN(addr, "/", 2) - for _, v := range strings.Split(s[0], ",") { - p.endpoints = append(p.endpoints, "http://"+v) + p.directory = DefaultDirectory + if u.Path != "" && u.Path != "/" { + p.directory = u.Path } - - // extract directory - if len(s) == 2 { - p.directory = "/" + s[1] - if !strings.HasSuffix(s[1], "/") { - p.directory += "/" - } - } else { - p.directory = DefaultDirectory + for _, endpoint := range strings.Split(u.Host, ",") { + p.endpoints = append(p.endpoints, "http://"+endpoint) + } + if u.User != nil { + p.username = u.User.Username() + p.password, _ = u.User.Password() } - return p, nil } - -// extractUserPass extracts username and password from addr. -func extractUserPass(addr string) (username, password, remaining string) { - s := strings.SplitN(addr, "@", 2) - if len(s) == 1 { - remaining = addr - return - } - userPass := strings.SplitN(s[0], ":", 2) - username = userPass[0] - if len(userPass) > 1 { - password = userPass[1] - } - remaining = s[1] - return -} diff --git a/middleware/proxy/provider/etcd/etcd_test.go b/middleware/proxy/provider/etcd/etcd_test.go index 836de2782..58b0d1b81 100644 --- a/middleware/proxy/provider/etcd/etcd_test.go +++ b/middleware/proxy/provider/etcd/etcd_test.go @@ -13,9 +13,10 @@ func TestParseAddr(t *testing.T) { expected *Provider }{ { - "etcd://localhost:2015", + "etcd://localhost:2015/", &Provider{ endpoints: []string{"http://localhost:2015"}, + directory: "", }, }, { @@ -52,7 +53,7 @@ func TestParseAddr(t *testing.T) { endpoints: []string{"http://localhost:2015"}, username: "user", password: "pass", - directory: "/directory/", + directory: "/directory", }, }, { @@ -61,7 +62,7 @@ func TestParseAddr(t *testing.T) { endpoints: []string{"http://2015"}, username: "user", password: "pass", - directory: "/directory/", + directory: "/directory", }, }, } diff --git a/middleware/proxy/provider/provider.go b/middleware/proxy/provider/provider.go index c5ee21ea0..016ede50e 100644 --- a/middleware/proxy/provider/provider.go +++ b/middleware/proxy/provider/provider.go @@ -52,28 +52,6 @@ func Get(addr string) (Provider, error) { return nil, fmt.Errorf("%s %v", scheme, errUnsupportedScheme) } -// DynamicProvider represents a dynamic hosts provider. -type DynamicProvider interface { - Provider - // Watch creates a new Watcher. - Watch() Watcher -} - -// WatcherMsg is the message sent by Watcher when there is a -// change to a host. -type WatcherMsg struct { - // Host is the affected host - Host string - // Remove is true if the host should be removed instead. - Remove bool -} - -// Watcher watches for changes in the store. -// Next blocks until a new host is available. -type Watcher interface { - Next() (msg WatcherMsg, err error) -} - func init() { // register static provider Register("http", newStatic) diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index eec1913a8..310f75dc3 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -222,15 +222,13 @@ func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <- } resp := make(chan msg) go func() { - outer: for { // blocks until there's a message from Watcher m, err := watcher.Next() select { case resp <- msg{m, err}: case <-stop: - close(resp) - break outer + return } } }() From 1512c99c7c8340030dba7ac28999afeb161cad57 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Tue, 2 Feb 2016 05:58:25 +0100 Subject: [PATCH 09/10] Use key value library. Support etcd, consul, zk. --- kvstore/kvstore.go | 63 ++++++++ kvstore/kvstore_test.go | 54 +++++++ middleware/proxy/dynamic.go | 11 -- middleware/proxy/provider/dynamic_provider.go | 112 ++++++++++++- middleware/proxy/provider/etcd/etcd.go | 147 ------------------ middleware/proxy/provider/etcd/etcd_test.go | 142 ----------------- middleware/proxy/provider/provider.go | 7 - middleware/proxy/provider/provider_test.go | 4 +- middleware/proxy/provider/register.go | 13 ++ middleware/proxy/upstream.go | 42 ++--- 10 files changed, 266 insertions(+), 329 deletions(-) create mode 100644 kvstore/kvstore.go create mode 100644 kvstore/kvstore_test.go delete mode 100644 middleware/proxy/dynamic.go delete mode 100644 middleware/proxy/provider/etcd/etcd.go delete mode 100644 middleware/proxy/provider/etcd/etcd_test.go create mode 100644 middleware/proxy/provider/register.go diff --git a/kvstore/kvstore.go b/kvstore/kvstore.go new file mode 100644 index 000000000..ac7a57d94 --- /dev/null +++ b/kvstore/kvstore.go @@ -0,0 +1,63 @@ +package kvstore + +import ( + "net/url" + "path" + "strings" + "time" + + "github.com/docker/libkv" + "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" + "github.com/docker/libkv/store/etcd" + "github.com/docker/libkv/store/zookeeper" +) + +const ( + Timeout = 10 * time.Second + DefaultDirectory = "caddyserver.com/" +) + +func init() { + // Register backends + consul.Register() + etcd.Register() + zookeeper.Register() +} + +type Store struct { + store.Store + Type string // Backend + BaseDir string // Base directory +} + +func NewStore(addr string) (*Store, error) { + s := new(Store) + u, err := url.Parse(addr) + if err != nil { + return s, err + } + + config := &store.Config{ + ConnectionTimeout: Timeout, + } + + s.BaseDir = DefaultDirectory + s.Type = u.Scheme + if u.Path != "" && u.Path != "/" { + s.BaseDir = u.Path[1:] + } + + var endpoints []string + for _, endpoint := range strings.Split(u.Host, ",") { + endpoints = append(endpoints, endpoint) + } + + s.Store, err = libkv.NewStore(store.Backend(u.Scheme), endpoints, config) + + return s, err +} + +func (s Store) Key(key string) string { + return path.Join(s.BaseDir, key) +} diff --git a/kvstore/kvstore_test.go b/kvstore/kvstore_test.go new file mode 100644 index 000000000..68139e8e3 --- /dev/null +++ b/kvstore/kvstore_test.go @@ -0,0 +1,54 @@ +package kvstore + +import ( + "testing" +) + +func TestKV(t *testing.T) { + tests := []struct { + url string + expectedType string + expectedDir string + }{ + { + "etcd://localhost:1000/dir", + "etcd", + "dir", + }, + { + "etcd://localhost:1000/", + "etcd", + DefaultDirectory, + }, + { + "consul://localhost:1000/dir", + "consul", + "dir", + }, + { + "consul://localhost:1000/", + "consul", + DefaultDirectory, + }, + { + "zk://localhost:1000/dir", + "zk", + "dir", + }, + { + "zk://localhost:1000/", + "zk", + DefaultDirectory, + }, + } + + for i, test := range tests { + s, _ := NewStore(test.url) + if s.BaseDir != test.expectedDir { + t.Errorf("Test %d: expected %s found %s", i, test.expectedDir, s.BaseDir) + } + if s.Type != test.expectedType { + t.Errorf("Test %d: expected %s found %s", i, test.expectedType, s.Type) + } + } +} diff --git a/middleware/proxy/dynamic.go b/middleware/proxy/dynamic.go deleted file mode 100644 index 282ed025b..000000000 --- a/middleware/proxy/dynamic.go +++ /dev/null @@ -1,11 +0,0 @@ -package proxy - -import ( - "github.com/mholt/caddy/middleware/proxy/provider" - "github.com/mholt/caddy/middleware/proxy/provider/etcd" -) - -func init() { - // register dynamic providers - provider.Register(etcd.Scheme, etcd.New) -} diff --git a/middleware/proxy/provider/dynamic_provider.go b/middleware/proxy/provider/dynamic_provider.go index 6b837ddbc..5dd5c5912 100644 --- a/middleware/proxy/provider/dynamic_provider.go +++ b/middleware/proxy/provider/dynamic_provider.go @@ -1,5 +1,17 @@ package provider +import ( + "errors" + "fmt" + "log" + "sync" + + "github.com/docker/libkv/store" + "github.com/mholt/caddy/kvstore" +) + +var ErrWatchFailure = errors.New("WatchTree failed for") + // DynamicProvider represents a dynamic hosts provider. type DynamicProvider interface { Provider @@ -19,5 +31,103 @@ type WatcherMsg struct { // Watcher watches for changes in the store. // Next blocks until a new host is available. type Watcher interface { - Next() (msg WatcherMsg, err error) + Next() (msgs []WatcherMsg, err error) +} + +func newDynamic(addr string) (Provider, error) { + store, err := kvstore.NewStore(addr) + if err != nil { + return nil, err + } + return &dynamicProvider{ + Store: store, + hosts: make(map[string]struct{}), + }, nil +} + +type dynamicProvider struct { + hosts map[string]struct{} + *kvstore.Store +} + +func (d *dynamicProvider) Hosts() ([]string, error) { + var hosts []string + + // create directory if not exists + if ok, _ := d.Exists(d.BaseDir); !ok { + err := d.Put(d.Key(""), nil, &store.WriteOptions{IsDir: true}) + if err != nil { + return hosts, err + } + } + + kvs, err := d.List(d.BaseDir) + if err != nil { + return hosts, err + } + for _, kv := range kvs { + host := string(kv.Value) + hosts = append(hosts, host) + d.hosts[host] = struct{}{} + } + return hosts, nil +} + +func (d *dynamicProvider) Watch() Watcher { + keysChan, err := d.WatchTree(d.BaseDir, nil) + if err != nil { + log.Println(err) + return nil + } + + var m sync.Mutex + return &watcher{ + next: func() ([]WatcherMsg, error) { + m.Lock() + defer m.Unlock() + + var msgs []WatcherMsg + + keys, ok := <-keysChan + if !ok { + // attempt to resurrect the closed channel ahead of next retry. + if keysChan, err = d.WatchTree(d.BaseDir, nil); err != nil { + // return watcher's error message + return msgs, err + } + // return generic error message, next try will be successful. + return msgs, fmt.Errorf("%v %s", ErrWatchFailure, d.Type) + } + + // comparison set + hosts := make(map[string]struct{}) + + // additions. hosts not in existing provider set. + for _, key := range keys { + host := string(key.Value) + if _, ok := d.hosts[host]; !ok { + msgs = append(msgs, WatcherMsg{Host: host}) + d.hosts[host] = struct{}{} + } + // populate comparison set + hosts[host] = struct{}{} + } + + // removals, hosts not in new comparison set but in provider set. + for host, _ := range d.hosts { + if _, ok := hosts[host]; !ok { + msgs = append(msgs, WatcherMsg{Host: host, Remove: true}) + } + } + return msgs, nil + }, + } +} + +type watcher struct { + next func() ([]WatcherMsg, error) +} + +func (w *watcher) Next() ([]WatcherMsg, error) { + return w.next() } diff --git a/middleware/proxy/provider/etcd/etcd.go b/middleware/proxy/provider/etcd/etcd.go deleted file mode 100644 index 1621a1af7..000000000 --- a/middleware/proxy/provider/etcd/etcd.go +++ /dev/null @@ -1,147 +0,0 @@ -// Package etcd is an Etcd backed provider. -package etcd - -import ( - "errors" - "fmt" - "net/url" - "strings" - - "github.com/coreos/etcd/client" - "github.com/mholt/caddy/middleware/proxy/provider" - "golang.org/x/net/context" -) - -const ( - // Scheme is Ectd url scheme. - Scheme = "etcd" - - // DefaultDirectory is the default Etcd config directory. - DefaultDirectory = "/caddyserver.com/proxy/default/hosts/" -) - -var ( - errInvalidScheme = errors.New("invalid Etcd scheme") - errNotDirectory = errors.New("not an Etcd directory") - errNotKey = errors.New("not an Etcd key") - errNotInDirectory = errors.New("not in expected directory") -) - -// Provider is Etcd provider. -type Provider struct { - endpoints []string - directory string - username string - password string - client.KeysAPI -} - -// New creates a new Etcd Provider -func New(addr string) (provider.Provider, error) { - p, err := parseAddr(addr) - if err != nil { - return nil, err - } - - cfg := client.Config{ - Endpoints: p.endpoints, - Transport: client.DefaultTransport, - Username: p.username, - Password: p.password, - } - c, err := client.New(cfg) - if err != nil { - return nil, err - } - - p.KeysAPI = client.NewKeysAPI(c) - return p, nil -} - -// Hosts satisfies provider.Provider interface. -func (p *Provider) Hosts() ([]string, error) { - var hosts []string - resp, err := p.Get(context.Background(), p.directory, nil) - if err != nil { - return nil, err - } - if !resp.Node.Dir { - return nil, fmt.Errorf("%s is %v", p.directory, errNotDirectory) - } - for _, node := range resp.Node.Nodes { - if node.Dir { - return nil, fmt.Errorf("%s is %v", node.Key, errNotKey) - } - hosts = append(hosts, node.Value) - } - return hosts, nil -} - -// Watch satisfies provider.DynamicProvider interface. -func (p *Provider) Watch() provider.Watcher { - w := p.Watcher(p.directory, &client.WatcherOptions{Recursive: true}) - return &watcher{ - next: func() (msg provider.WatcherMsg, err error) { - var resp *client.Response - if resp, err = w.Next(context.Background()); err != nil { - return - } - if resp.Node.Dir { - err = fmt.Errorf("%s is %v", resp.Node.Key, errNotKey) - return - } - if len(strings.Split(strings.TrimPrefix(resp.Node.Key, p.directory), "/")) != 1 { - err = fmt.Errorf("%s is %v '%v", resp.Node.Key, errNotInDirectory, p.directory) - return - } - - // Remove host - if resp.Node.Value == "" { - if resp.PrevNode != nil { - return provider.WatcherMsg{Host: resp.PrevNode.Value, Remove: true}, nil - } - // should not happen - err = errors.New("Node is previously empty") - } - - // Add host - return provider.WatcherMsg{Host: resp.Node.Value, Remove: false}, err - }, - } -} - -type watcher struct { - next func() (provider.WatcherMsg, error) -} - -func (w *watcher) Next() (provider.WatcherMsg, error) { - return w.next() -} - -// parseAddr passes addr into a new configured Provider. -// URL format: etcd://username:password@,/ -func parseAddr(addr string) (*Provider, error) { - p := &Provider{} - - u, err := url.Parse(addr) - if err != nil { - return p, err - } - - if u.Scheme != Scheme { - return nil, errInvalidScheme - } - - p.directory = DefaultDirectory - if u.Path != "" && u.Path != "/" { - p.directory = u.Path - } - for _, endpoint := range strings.Split(u.Host, ",") { - p.endpoints = append(p.endpoints, "http://"+endpoint) - } - if u.User != nil { - p.username = u.User.Username() - p.password, _ = u.User.Password() - } - return p, nil -} diff --git a/middleware/proxy/provider/etcd/etcd_test.go b/middleware/proxy/provider/etcd/etcd_test.go deleted file mode 100644 index 58b0d1b81..000000000 --- a/middleware/proxy/provider/etcd/etcd_test.go +++ /dev/null @@ -1,142 +0,0 @@ -package etcd - -import ( - "fmt" - "testing" - - "github.com/mholt/caddy/middleware/proxy/provider" -) - -func TestParseAddr(t *testing.T) { - tests := []struct { - address string - expected *Provider - }{ - { - "etcd://localhost:2015/", - &Provider{ - endpoints: []string{"http://localhost:2015"}, - directory: "", - }, - }, - { - "etcdlocalhost:2015", nil, - }, - { - "user:pass@localhost:2015", nil, - }, - { - "etcd://user:pass@localhost:2015", - &Provider{ - endpoints: []string{"http://localhost:2015"}, - username: "user", - password: "pass", - }, - }, - { - "etcd://localhost:2015,localhost:2016", - &Provider{ - endpoints: []string{"http://localhost:2015", "http://localhost:2016"}, - }, - }, - { - "etcd://user:pass@localhost:2015,localhost:2016", - &Provider{ - endpoints: []string{"http://localhost:2015", "http://localhost:2016"}, - username: "user", - password: "pass", - }, - }, - { - "etcd://user:pass@localhost:2015/directory", - &Provider{ - endpoints: []string{"http://localhost:2015"}, - username: "user", - password: "pass", - directory: "/directory", - }, - }, - { - "etcd://user:pass@2015/directory", - &Provider{ - endpoints: []string{"http://2015"}, - username: "user", - password: "pass", - directory: "/directory", - }, - }, - } - - for i, test := range tests { - pr, _ := parseAddr(test.address) - if !equalProvider(test.expected, pr, t) { - t.Errorf("Test %d: failed", i) - } - } - -} - -func equalProvider(expected, value *Provider, t *testing.T) bool { - if expected == nil && value != nil { - return false - } else if expected == nil { - return true - } - - if expected.directory == "" { - expected.directory = DefaultDirectory - } - - if expected.directory != value.directory { - t.Errorf("Expected directory %v, found %v", expected.directory, value.directory) - return false - } - - if expected.username != value.username { - t.Errorf("Expected username %v, found %v", expected.username, value.password) - return false - } - if expected.endpoints != nil && fmt.Sprint(expected.endpoints) != fmt.Sprint(value.endpoints) { - t.Errorf("Expected endpoints %v, found %v", expected.endpoints, value.endpoints) - return false - } - if expected.password != value.password { - t.Errorf("Expected password %v, found %v", expected.password, value.password) - return false - } - return true -} - -func TestGetProvider(t *testing.T) { - tests := []struct { - addr string - valid bool - }{ - {"etc://localhost", false}, - {"http://localhost", false}, - {"ftp://localhost", false}, - {"https://localhost", false}, - {"etcd://localhost", true}, - {"etcde://localhost", false}, - {"etcd:/localhost", false}, - {"etcd:localhost", false}, - } - - provider.Register("etcd", New) - - for i, test := range tests { - pr, _ := provider.Get(test.addr) - if test.valid { - if _, ok := pr.(*Provider); !ok { - t.Errorf("Test %d: expecting provider to be etcd.Provider", i) - } - if _, ok := pr.(provider.DynamicProvider); !ok { - t.Errorf("Test %d: expecting provider to be dynamic provider", i) - } - } else { - if _, ok := pr.(*Provider); ok { - t.Errorf("Test %d: not expecting etcd.Provider", i) - } - } - } -} diff --git a/middleware/proxy/provider/provider.go b/middleware/proxy/provider/provider.go index 016ede50e..d2892865d 100644 --- a/middleware/proxy/provider/provider.go +++ b/middleware/proxy/provider/provider.go @@ -51,10 +51,3 @@ func Get(addr string) (Provider, error) { } return nil, fmt.Errorf("%s %v", scheme, errUnsupportedScheme) } - -func init() { - // register static provider - Register("http", newStatic) - Register("https", newStatic) - Register("", newStatic) -} diff --git a/middleware/proxy/provider/provider_test.go b/middleware/proxy/provider/provider_test.go index a005632ca..c149da1a2 100644 --- a/middleware/proxy/provider/provider_test.go +++ b/middleware/proxy/provider/provider_test.go @@ -107,7 +107,7 @@ func (f fakeDynamic) Watch() Watcher { return f } -func (f fakeDynamic) Next() (msg WatcherMsg, err error) { +func (f fakeDynamic) Next() (msgs []WatcherMsg, err error) { time.Sleep(100) - return WatcherMsg{}, nil + return []WatcherMsg{}, nil } diff --git a/middleware/proxy/provider/register.go b/middleware/proxy/provider/register.go new file mode 100644 index 000000000..4b200b928 --- /dev/null +++ b/middleware/proxy/provider/register.go @@ -0,0 +1,13 @@ +package provider + +func init() { + // static + Register("http", newStatic) + Register("https", newStatic) + Register("", newStatic) + + // dynamic + Register("etcd", newDynamic) + Register("consul", newDynamic) + Register("zk", newDynamic) +} diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 310f75dc3..ecd140ac8 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -216,17 +216,17 @@ func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) { } func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <-chan struct{}) { - type msg struct { - msg provider.WatcherMsg - err error + type message struct { + msgs []provider.WatcherMsg + err error } - resp := make(chan msg) + resp := make(chan message) go func() { for { // blocks until there's a message from Watcher m, err := watcher.Next() select { - case resp <- msg{m, err}: + case resp <- message{m, err}: case <-stop: return } @@ -257,17 +257,20 @@ func (upstream *staticUpstream) ProviderWorker(watcher provider.Watcher, stop <- // no errors, reset delay interval. interval = 0 } - if m.msg.Remove { - // remove from upstream - upstream.RemoveHost(m.msg.Host) - log.Printf("Host %v removed from upstream", m.msg.Host) - } else { - // add host to upstream - if err := upstream.AddHost(m.msg.Host); err != nil { - log.Println(err) - continue + for _, msg := range m.msgs { + if msg.Remove { + // remove from upstream + if upstream.RemoveHost(msg.Host) { + log.Printf("Host %v removed from upstream", msg.Host) + } + } else { + // add host to upstream + if err := upstream.AddHost(msg.Host); err != nil { + log.Println(err) + continue + } + log.Printf("New host %v added to upstream", msg.Host) } - log.Printf("New host %v added to upstream", m.msg.Host) } } @@ -357,7 +360,7 @@ func (upstream *staticUpstream) AddHost(host string) error { } // RemoveHost removes host from upstream hosts. -func (upstream *staticUpstream) RemoveHost(host string) { +func (upstream *staticUpstream) RemoveHost(host string) bool { upstream.Lock() defer upstream.Unlock() @@ -365,7 +368,7 @@ func (upstream *staticUpstream) RemoveHost(host string) { // If it does not exist, ignore if _, ok := upstream.hostSet[host]; !ok { - return + return false } idx := -1 @@ -376,16 +379,17 @@ func (upstream *staticUpstream) RemoveHost(host string) { } } if idx == -1 { - return + return false } delete(upstream.hostSet, host) if idx == len(upstream.Hosts)-1 { upstream.Hosts = upstream.Hosts[:idx] - return + return true } upstream.Hosts = append(upstream.Hosts[:idx], upstream.Hosts[idx+1:]...) + return true } type hostName string From d8d6481eb417ec7bd9e32000fff1457c07414dc4 Mon Sep 17 00:00:00 2001 From: Abiola Ibrahim Date: Tue, 2 Feb 2016 10:14:53 +0100 Subject: [PATCH 10/10] Update docs. --- kvstore/kvstore.go | 13 +++++++++++-- .../provider/{dynamic_provider.go => dynamic.go} | 11 +++++++---- middleware/proxy/provider/provider.go | 13 ------------- middleware/proxy/provider/provider_test.go | 6 +++--- middleware/proxy/provider/register.go | 12 ++++++------ middleware/proxy/provider/static.go | 14 ++++++++++++++ 6 files changed, 41 insertions(+), 28 deletions(-) rename middleware/proxy/provider/{dynamic_provider.go => dynamic.go} (89%) create mode 100644 middleware/proxy/provider/static.go diff --git a/kvstore/kvstore.go b/kvstore/kvstore.go index ac7a57d94..8c391d761 100644 --- a/kvstore/kvstore.go +++ b/kvstore/kvstore.go @@ -14,23 +14,31 @@ import ( ) const ( - Timeout = 10 * time.Second + // Default connection timeout + Timeout = 10 * time.Second + + // Default key-value directory. DefaultDirectory = "caddyserver.com/" ) func init() { - // Register backends + // Register backends. consul.Register() etcd.Register() zookeeper.Register() } +// Store is a key value store. type Store struct { store.Store Type string // Backend BaseDir string // Base directory } +// NewStore creates a new Store. +// The store is detected from addr scheme and a new Store with +// corresponding backend is returned. +// e.g. etcd://127.0.0.1:2349 returns Store with Etcd backend. func NewStore(addr string) (*Store, error) { s := new(Store) u, err := url.Parse(addr) @@ -58,6 +66,7 @@ func NewStore(addr string) (*Store, error) { return s, err } +// Key prefixes key with base directory of the Store s. func (s Store) Key(key string) string { return path.Join(s.BaseDir, key) } diff --git a/middleware/proxy/provider/dynamic_provider.go b/middleware/proxy/provider/dynamic.go similarity index 89% rename from middleware/proxy/provider/dynamic_provider.go rename to middleware/proxy/provider/dynamic.go index 5dd5c5912..cb937702e 100644 --- a/middleware/proxy/provider/dynamic_provider.go +++ b/middleware/proxy/provider/dynamic.go @@ -10,9 +10,9 @@ import ( "github.com/mholt/caddy/kvstore" ) -var ErrWatchFailure = errors.New("WatchTree failed for") +var errWatchFailure = errors.New("WatchTree failed for") -// DynamicProvider represents a dynamic hosts provider. +// DynamicProvider is a dynamic hosts provider. type DynamicProvider interface { Provider // Watch creates a new Watcher. @@ -34,7 +34,8 @@ type Watcher interface { Next() (msgs []WatcherMsg, err error) } -func newDynamic(addr string) (Provider, error) { +// dynamic creates a new dynamic host provider. +func dynamic(addr string) (Provider, error) { store, err := kvstore.NewStore(addr) if err != nil { return nil, err @@ -50,6 +51,7 @@ type dynamicProvider struct { *kvstore.Store } +// Hosts satisfy Provider. func (d *dynamicProvider) Hosts() ([]string, error) { var hosts []string @@ -73,6 +75,7 @@ func (d *dynamicProvider) Hosts() ([]string, error) { return hosts, nil } +// Watch satisfies DynamicProvider. func (d *dynamicProvider) Watch() Watcher { keysChan, err := d.WatchTree(d.BaseDir, nil) if err != nil { @@ -96,7 +99,7 @@ func (d *dynamicProvider) Watch() Watcher { return msgs, err } // return generic error message, next try will be successful. - return msgs, fmt.Errorf("%v %s", ErrWatchFailure, d.Type) + return msgs, fmt.Errorf("%v %s", errWatchFailure, d.Type) } // comparison set diff --git a/middleware/proxy/provider/provider.go b/middleware/proxy/provider/provider.go index d2892865d..71361db23 100644 --- a/middleware/proxy/provider/provider.go +++ b/middleware/proxy/provider/provider.go @@ -18,19 +18,6 @@ type Provider interface { Hosts() ([]string, error) } -// staticProvider cater for static hardcoded hosts. -type staticProvider string - -// Hosts satisfies Provider interface. -func (s staticProvider) Hosts() ([]string, error) { - return []string{string(s)}, nil -} - -// newStatic creates a new static host provider. -func newStatic(host string) (Provider, error) { - return staticProvider(host), nil -} - // NewProviderFunc creates a new Provider. type NewProviderFunc func(host string) (Provider, error) diff --git a/middleware/proxy/provider/provider_test.go b/middleware/proxy/provider/provider_test.go index c149da1a2..fed864acc 100644 --- a/middleware/proxy/provider/provider_test.go +++ b/middleware/proxy/provider/provider_test.go @@ -71,9 +71,9 @@ func TestProvider(t *testing.T) { {"faked://localhost", false}, } - Register("http", newStatic) - Register("https", newStatic) - Register("", newStatic) + Register("http", static) + Register("https", static) + Register("", static) for i, test := range tests { pr, _ := Get(test.addr) diff --git a/middleware/proxy/provider/register.go b/middleware/proxy/provider/register.go index 4b200b928..0a65e5521 100644 --- a/middleware/proxy/provider/register.go +++ b/middleware/proxy/provider/register.go @@ -2,12 +2,12 @@ package provider func init() { // static - Register("http", newStatic) - Register("https", newStatic) - Register("", newStatic) + Register("http", static) + Register("https", static) + Register("", static) // dynamic - Register("etcd", newDynamic) - Register("consul", newDynamic) - Register("zk", newDynamic) + Register("etcd", dynamic) + Register("consul", dynamic) + Register("zk", dynamic) } diff --git a/middleware/proxy/provider/static.go b/middleware/proxy/provider/static.go new file mode 100644 index 000000000..9829d1bd6 --- /dev/null +++ b/middleware/proxy/provider/static.go @@ -0,0 +1,14 @@ +package provider + +// staticProvider cater for static hardcoded hosts. +type staticProvider string + +// Hosts satisfies Provider interface. +func (s staticProvider) Hosts() ([]string, error) { + return []string{string(s)}, nil +} + +// static creates a new static host provider. +func static(host string) (Provider, error) { + return staticProvider(host), nil +}