Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/x509"
"fmt"
"reflect"
"time"
)

type Config struct {
Expand All @@ -19,6 +20,8 @@ type Upstream struct {
LocalBindAddress string
LocalBindPort int
Protocol string
ConnectTimeout time.Duration
ReadTimeout time.Duration

TLS

Expand Down Expand Up @@ -46,11 +49,14 @@ func (n UpstreamNode) Equal(o UpstreamNode) bool {
}

type Downstream struct {
LocalBindAddress string
LocalBindPort int
Protocol string
TargetAddress string
TargetPort int
LocalBindAddress string
LocalBindPort int
Protocol string
TargetAddress string
TargetPort int
ConnectTimeout time.Duration
ReadTimeout time.Duration

EnableForwardFor bool
AppNameHeaderName string

Expand Down
80 changes: 63 additions & 17 deletions consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/connect/proxy"
log "github.com/sirupsen/logrus"
)

const (
defaultDownstreamBindAddr = "0.0.0.0"
defaultUpstreamBindAddr = "127.0.0.1"
DefaultDownstreamBindAddr = "0.0.0.0"
DefaultUpstreamBindAddr = "127.0.0.1"
DefaultReadTimeout = 60 * time.Second
DefaultConnectTimeout = 30 * time.Second

errorWaitTime = 5 * time.Second
preparedQueryPollInterval = 30 * time.Second
Expand All @@ -26,6 +29,8 @@ type upstream struct {
Datacenter string
Protocol string
Nodes []*api.ServiceEntry
ReadTimeout time.Duration
ConnectTimeout time.Duration

done bool
}
Expand All @@ -38,6 +43,8 @@ type downstream struct {
TargetPort int
EnableForwardFor bool
AppNameHeaderName string
ReadTimeout time.Duration
ConnectTimeout time.Duration
}

type certLeaf struct {
Expand Down Expand Up @@ -115,9 +122,11 @@ func (w *Watcher) Run() error {
}

func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
w.downstream.LocalBindAddress = defaultDownstreamBindAddr
w.downstream.LocalBindAddress = DefaultDownstreamBindAddr
w.downstream.LocalBindPort = srv.Port
w.downstream.TargetAddress = defaultUpstreamBindAddr
w.downstream.TargetAddress = DefaultUpstreamBindAddr
w.downstream.ReadTimeout = DefaultReadTimeout
w.downstream.ConnectTimeout = DefaultConnectTimeout

if srv.Proxy != nil && srv.Proxy.Config != nil {
if c, ok := srv.Proxy.Config["protocol"].(string); ok {
Expand All @@ -135,6 +144,22 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
if a, ok := srv.Proxy.Config["appname_header"].(string); ok {
w.downstream.AppNameHeaderName = a
}
if a, ok := srv.Proxy.Config["connect_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("bad connect_timeout value in config: %s. Using default: %s", err, DefaultConnectTimeout)
} else {
w.downstream.ConnectTimeout = to
}
}
if a, ok := srv.Proxy.Config["read_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("bad read_timeout value in config: %s. Using default: %s", err, DefaultReadTimeout)
} else {
w.downstream.ReadTimeout = to
}
}
}

keep := make(map[string]bool)
Expand Down Expand Up @@ -168,20 +193,46 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
}
}

func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
w.log.Infof("consul: watching upstream for service %s", up.DestinationName)

func (w *Watcher) buildUpstream(up api.Upstream, name string) *upstream {
u := &upstream{
LocalBindAddress: up.LocalBindAddress,
LocalBindPort: up.LocalBindPort,
Name: name,
Datacenter: up.Datacenter,
ReadTimeout: DefaultReadTimeout,
ConnectTimeout: DefaultConnectTimeout,
}

if p, ok := up.Config["protocol"].(string); ok {
u.Protocol = p
}

if a, ok := up.Config["read_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("upstream %s: bad read_timeout value in config: %s. Using default: %s", name, err, DefaultReadTimeout)
} else {
u.ReadTimeout = to
}
}

if a, ok := up.Config["connect_timeout"].(string); ok {
to, err := time.ParseDuration(a)
if err != nil {
log.Errorf("upstream %s: bad connect_timeout value in config: %s. Using default: %s", name, err, DefaultConnectTimeout)
} else {
u.ConnectTimeout = to
}
}

return u
}

func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
w.log.Infof("consul: watching upstream for service %s", up.DestinationName)

u := w.buildUpstream(up, name)

w.lock.Lock()
w.upstreams[name] = u
w.lock.Unlock()
Expand Down Expand Up @@ -219,16 +270,7 @@ func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
func (w *Watcher) startUpstreamPreparedQuery(up api.Upstream, name string) {
w.log.Infof("consul: watching upstream for prepared_query %s", up.DestinationName)

u := &upstream{
LocalBindAddress: up.LocalBindAddress,
LocalBindPort: up.LocalBindPort,
Name: name,
Datacenter: up.Datacenter,
}

if p, ok := up.Config["protocol"].(string); ok {
u.Protocol = p
}
u := w.buildUpstream(up, name)

interval := preparedQueryPollInterval
if p, ok := up.Config["poll_interval"].(string); ok {
Expand Down Expand Up @@ -429,6 +471,8 @@ func (w *Watcher) genCfg() Config {
TargetAddress: w.downstream.TargetAddress,
TargetPort: w.downstream.TargetPort,
Protocol: w.downstream.Protocol,
ConnectTimeout: w.downstream.ConnectTimeout,
ReadTimeout: w.downstream.ReadTimeout,
EnableForwardFor: w.downstream.EnableForwardFor,
AppNameHeaderName: w.downstream.AppNameHeaderName,

Expand All @@ -446,6 +490,8 @@ func (w *Watcher) genCfg() Config {
LocalBindAddress: up.LocalBindAddress,
LocalBindPort: up.LocalBindPort,
Protocol: up.Protocol,
ConnectTimeout: up.ConnectTimeout,
ReadTimeout: up.ConnectTimeout,
TLS: TLS{
CAs: w.certCAs,
Cert: w.leaf.Cert,
Expand Down
6 changes: 3 additions & 3 deletions haproxy/state/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func generateDownstream(opts Options, certStore CertificateStore, cfg consul.Dow
Frontend: models.Frontend{
Name: feName,
DefaultBackend: beName,
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
Mode: feMode,
Httplog: opts.LogRequests,
},
Expand Down Expand Up @@ -85,8 +85,8 @@ func generateDownstream(opts Options, certStore CertificateStore, cfg consul.Dow
be := Backend{
Backend: models.Backend{
Name: beName,
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(cfg.ConnectTimeout.Milliseconds())),
Mode: beMode,
Forwardfor: forwardFor,
},
Expand Down
20 changes: 12 additions & 8 deletions haproxy/state/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ func GetTestConsulConfig() consul.Config {
TargetAddress: "128.0.0.5",
TargetPort: 8888,
AppNameHeaderName: "X-App",
ConnectTimeout: consul.DefaultConnectTimeout,
ReadTimeout: consul.DefaultReadTimeout,
},
Upstreams: []consul.Upstream{
consul.Upstream{
Name: "service_1",
LocalBindAddress: "127.0.0.1",
LocalBindPort: 10000,
ConnectTimeout: consul.DefaultConnectTimeout,
ReadTimeout: consul.DefaultReadTimeout,
Nodes: []consul.UpstreamNode{
consul.UpstreamNode{
Host: "1.2.3.4",
Expand All @@ -50,7 +54,7 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Frontend: models.Frontend{
Name: "front_downstream",
DefaultBackend: "back_downstream",
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
Mode: models.FrontendModeHTTP,
Httplog: true,
},
Expand Down Expand Up @@ -91,7 +95,7 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Frontend: models.Frontend{
Name: "front_service_1",
DefaultBackend: "back_service_1",
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
Mode: models.FrontendModeHTTP,
Httplog: true,
},
Expand All @@ -115,8 +119,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Backend{
Backend: models.Backend{
Name: "back_downstream",
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(consul.DefaultConnectTimeout.Milliseconds())),
Mode: models.BackendModeHTTP,
},
Servers: []models.Server{
Expand Down Expand Up @@ -146,8 +150,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Backend{
Backend: models.Backend{
Name: "back_service_1",
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(consul.DefaultReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(consul.DefaultConnectTimeout.Milliseconds())),
Mode: models.BackendModeHTTP,
Balance: &models.Balance{
Algorithm: models.BalanceAlgorithmLeastconn,
Expand Down Expand Up @@ -189,8 +193,8 @@ func GetTestHAConfig(baseCfg string, certVersion string) State {
Backend{
Backend: models.Backend{
Name: "spoe_back",
ServerTimeout: &spoeTimeout,
ConnectTimeout: &spoeTimeout,
ServerTimeout: int64p(int(spoeTimeout.Milliseconds())),
ConnectTimeout: int64p(int(spoeTimeout.Milliseconds())),
Mode: models.BackendModeTCP,
},
Servers: []models.Server{
Expand Down
9 changes: 7 additions & 2 deletions haproxy/state/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ package state
import (
"fmt"
"sort"
"time"

"github.com/haproxytech/haproxy-consul-connect/consul"
"github.com/haproxytech/models"
)

const (
spoeTimeout = 30 * time.Second
)

type Options struct {
EnableIntentions bool
LogRequests bool
Expand Down Expand Up @@ -44,8 +49,8 @@ func Generate(opts Options, certStore CertificateStore, oldState State, cfg cons
newState.Backends = append(newState.Backends, Backend{
Backend: models.Backend{
Name: "spoe_back",
ServerTimeout: int64p(30000),
ConnectTimeout: int64p(30000),
ServerTimeout: int64p(int(spoeTimeout.Milliseconds())),
ConnectTimeout: int64p(int(spoeTimeout.Milliseconds())),
Mode: models.BackendModeTCP,
},
Servers: []models.Server{
Expand Down
10 changes: 0 additions & 10 deletions haproxy/state/timeouts.go

This file was deleted.

6 changes: 3 additions & 3 deletions haproxy/state/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func generateUpstream(opts Options, certStore CertificateStore, cfg consul.Upstr
Frontend: models.Frontend{
Name: feName,
DefaultBackend: beName,
ClientTimeout: &clientTimeout,
ClientTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
Mode: feMode,
Httplog: opts.LogRequests,
},
Expand All @@ -48,8 +48,8 @@ func generateUpstream(opts Options, certStore CertificateStore, cfg consul.Upstr
be := Backend{
Backend: models.Backend{
Name: beName,
ServerTimeout: &serverTimeout,
ConnectTimeout: &connectTimeout,
ServerTimeout: int64p(int(cfg.ReadTimeout.Milliseconds())),
ConnectTimeout: int64p(int(cfg.ConnectTimeout.Milliseconds())),
Balance: &models.Balance{
Algorithm: models.BalanceAlgorithmLeastconn,
},
Expand Down