Skip to content
13 changes: 6 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,30 @@ type Distributor struct {
// Config contains the configuration require to
// create a Distributor
type Config struct {
EnableBilling bool
BillingConfig billing.Config
EnableBilling bool
BillingConfig billing.Config
IngesterClientConfig ingester_client.Config

ReplicationFactor int
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
IngestionBurstSize int
CompressToIngester bool

// for testing
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error)
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
cfg.BillingConfig.RegisterFlags(f)

cfg.IngesterClientConfig.RegisterFlags(f)
flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
flag.BoolVar(&cfg.CompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters.")
}

// New constructs a new Distributor
Expand Down Expand Up @@ -224,7 +223,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester
return client, nil
}

client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester)
client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.IngesterClientConfig)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestDistributorPush(t *testing.T) {
IngestionRateLimit: 10000,
IngestionBurstSize: 10000,

ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
ingesterClientFactory: func(addr string, _ client.Config) (client.IngesterClient, error) {
return ingesters[addr], nil
},
}, ring)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestDistributorQuery(t *testing.T) {
IngestionRateLimit: 10000,
IngestionBurstSize: 10000,

ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
ingesterClientFactory: func(addr string, _ client.Config) (client.IngesterClient, error) {
return ingesters[addr], nil
},
}, ring)
Expand Down
28 changes: 24 additions & 4 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,30 @@ import (
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered

"flag"
"github.com/weaveworks/common/middleware"
)

type closableIngesterClient struct {
IngesterClient
conn *grpc.ClientConn
cfg Config
}

// MakeIngesterClient makes a new IngesterClient
func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, error) {
func MakeIngesterClient(addr string, cfg Config) (IngesterClient, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
)),
// We have seen 20MB returns from queries - add a bit of headroom
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64 * 1024 * 1024)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)),
}
if withCompression {
if cfg.legacyCompressToIngester {
cfg.CompressToIngester = true
}
if cfg.CompressToIngester {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
}
conn, err := grpc.Dial(addr, opts...)
Expand All @@ -42,3 +46,19 @@ func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, erro
func (c *closableIngesterClient) Close() error {
return c.conn.Close()
}

// Config is the configuration struct for the ingester client
type Config struct {
MaxRecvMsgSize int
CompressToIngester bool
legacyCompressToIngester bool
}

// RegisterFlags registers configuration settings used by the ingester client config
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// We have seen 20MB returns from queries - add a bit of headroom
f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.")
flag.BoolVar(&cfg.CompressToIngester, "ingester.client.compress-to-ingester", false, "Compress data in calls to ingesters.")
// moved from distributor pkg, but flag prefix left as back compat fallback for existing users.
flag.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead")
}
6 changes: 3 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var (
type Config struct {
RingConfig ring.Config
userStatesConfig UserStatesConfig

clientConfig client.Config
// Config for the ingester lifecycle control
ListenPort *int
NumTokens int
Expand All @@ -90,15 +90,15 @@ type Config struct {
infName string
id string
skipUnregister bool
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
ingesterClientFactory func(addr string, cfg client.Config) (client.IngesterClient, error)
KVClient ring.KVClient
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RingConfig.RegisterFlags(f)
cfg.userStatesConfig.RegisterFlags(f)

cfg.clientConfig.RegisterFlags(f)
f.IntVar(&cfg.NumTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
f.DurationVar(&cfg.HeartbeatPeriod, "ingester.heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.")
f.DurationVar(&cfg.JoinAfter, "ingester.join-after", 0*time.Second, "Period to wait for a claim from another ingester; will join automatically after this.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (i *Ingester) transferChunks() error {
}

level.Info(util.Logger).Log("msg", "sending chunks to ingester", "ingester", targetIngester.Addr)
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false)
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.cfg.clientConfig)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestIngesterTransfer(t *testing.T) {
require.NoError(t, err)

// Let ing2 send chunks to ing1
ing1.cfg.ingesterClientFactory = func(addr string, _ bool) (client.IngesterClient, error) {
ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.IngesterClient, error) {
return ingesterClientAdapater{
ingester: ing2,
}, nil
Expand Down