Skip to content

Commit 6191d41

Browse files
khainesbboreham
authored andcommitted
Updating IngesterClient MaxCallRecvMsgSize to be configurable vs fixed size (#712)
* Adding command line arg support for setting the max message size in the ingester client. * Updating ingester config lifecycle to include initialization of ingester client config * moving CompressToIngester setting to be contained within the ingesterClient config. * Updating ingester client flags to have the a compress flag under the ingester.client namespace, but also leaving a (deprecated) fallback flag under distributor for back compat
1 parent d81150d commit 6191d41

File tree

6 files changed

+37
-18
lines changed

6 files changed

+37
-18
lines changed

pkg/distributor/distributor.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,31 +68,30 @@ type Distributor struct {
6868
// Config contains the configuration require to
6969
// create a Distributor
7070
type Config struct {
71-
EnableBilling bool
72-
BillingConfig billing.Config
71+
EnableBilling bool
72+
BillingConfig billing.Config
73+
IngesterClientConfig ingester_client.Config
7374

7475
ReplicationFactor int
7576
RemoteTimeout time.Duration
7677
ClientCleanupPeriod time.Duration
7778
IngestionRateLimit float64
7879
IngestionBurstSize int
79-
CompressToIngester bool
8080

8181
// for testing
82-
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
82+
ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error)
8383
}
8484

8585
// RegisterFlags adds the flags required to config this to the given FlagSet
8686
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8787
flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
8888
cfg.BillingConfig.RegisterFlags(f)
89-
89+
cfg.IngesterClientConfig.RegisterFlags(f)
9090
flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
9191
flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
9292
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
9393
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
9494
flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
95-
flag.BoolVar(&cfg.CompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters.")
9695
}
9796

9897
// New constructs a new Distributor
@@ -224,7 +223,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester
224223
return client, nil
225224
}
226225

227-
client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester)
226+
client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.IngesterClientConfig)
228227
if err != nil {
229228
return nil, err
230229
}

pkg/distributor/distributor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func TestDistributorPush(t *testing.T) {
165165
IngestionRateLimit: 10000,
166166
IngestionBurstSize: 10000,
167167

168-
ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
168+
ingesterClientFactory: func(addr string, _ client.Config) (client.IngesterClient, error) {
169169
return ingesters[addr], nil
170170
},
171171
}, ring)
@@ -305,7 +305,7 @@ func TestDistributorQuery(t *testing.T) {
305305
IngestionRateLimit: 10000,
306306
IngestionBurstSize: 10000,
307307

308-
ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
308+
ingesterClientFactory: func(addr string, _ client.Config) (client.IngesterClient, error) {
309309
return ingesters[addr], nil
310310
},
311311
}, ring)

pkg/ingester/client/client.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,30 @@ import (
77
"google.golang.org/grpc"
88
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered
99

10+
"flag"
1011
"github.com/weaveworks/common/middleware"
1112
)
1213

1314
type closableIngesterClient struct {
1415
IngesterClient
1516
conn *grpc.ClientConn
17+
cfg Config
1618
}
1719

1820
// MakeIngesterClient makes a new IngesterClient
19-
func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, error) {
21+
func MakeIngesterClient(addr string, cfg Config) (IngesterClient, error) {
2022
opts := []grpc.DialOption{
2123
grpc.WithInsecure(),
2224
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
2325
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
2426
middleware.ClientUserHeaderInterceptor,
2527
)),
26-
// We have seen 20MB returns from queries - add a bit of headroom
27-
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64 * 1024 * 1024)),
28+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)),
2829
}
29-
if withCompression {
30+
if cfg.legacyCompressToIngester {
31+
cfg.CompressToIngester = true
32+
}
33+
if cfg.CompressToIngester {
3034
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
3135
}
3236
conn, err := grpc.Dial(addr, opts...)
@@ -42,3 +46,19 @@ func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, erro
4246
func (c *closableIngesterClient) Close() error {
4347
return c.conn.Close()
4448
}
49+
50+
// Config is the configuration struct for the ingester client
51+
type Config struct {
52+
MaxRecvMsgSize int
53+
CompressToIngester bool
54+
legacyCompressToIngester bool
55+
}
56+
57+
// RegisterFlags registers configuration settings used by the ingester client config
58+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
59+
// We have seen 20MB returns from queries - add a bit of headroom
60+
f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.")
61+
flag.BoolVar(&cfg.CompressToIngester, "ingester.client.compress-to-ingester", false, "Compress data in calls to ingesters.")
62+
// moved from distributor pkg, but flag prefix left as back compat fallback for existing users.
63+
flag.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead")
64+
}

pkg/ingester/ingester.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ var (
6464
type Config struct {
6565
RingConfig ring.Config
6666
userStatesConfig UserStatesConfig
67-
67+
clientConfig client.Config
6868
// Config for the ingester lifecycle control
6969
ListenPort *int
7070
NumTokens int
@@ -90,15 +90,15 @@ type Config struct {
9090
infName string
9191
id string
9292
skipUnregister bool
93-
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
93+
ingesterClientFactory func(addr string, cfg client.Config) (client.IngesterClient, error)
9494
KVClient ring.KVClient
9595
}
9696

9797
// RegisterFlags adds the flags required to config this to the given FlagSet
9898
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9999
cfg.RingConfig.RegisterFlags(f)
100100
cfg.userStatesConfig.RegisterFlags(f)
101-
101+
cfg.clientConfig.RegisterFlags(f)
102102
f.IntVar(&cfg.NumTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
103103
f.DurationVar(&cfg.HeartbeatPeriod, "ingester.heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.")
104104
f.DurationVar(&cfg.JoinAfter, "ingester.join-after", 0*time.Second, "Period to wait for a claim from another ingester; will join automatically after this.")

pkg/ingester/ingester_lifecycle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (i *Ingester) transferChunks() error {
359359
}
360360

361361
level.Info(util.Logger).Log("msg", "sending chunks to ingester", "ingester", targetIngester.Addr)
362-
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false)
362+
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.cfg.clientConfig)
363363
if err != nil {
364364
return err
365365
}

pkg/ingester/ingester_lifecycle_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func TestIngesterTransfer(t *testing.T) {
118118
require.NoError(t, err)
119119

120120
// Let ing2 send chunks to ing1
121-
ing1.cfg.ingesterClientFactory = func(addr string, _ bool) (client.IngesterClient, error) {
121+
ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.IngesterClient, error) {
122122
return ingesterClientAdapater{
123123
ingester: ing2,
124124
}, nil

0 commit comments

Comments
 (0)