From 6332f6552385c654c921ccb9568e9074ebc3aa7b Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Tue, 6 Feb 2018 16:09:45 -0800 Subject: [PATCH 1/8] Adding command line arg support for setting the max message size in the ingester client. --- pkg/distributor/distributor.go | 7 ++++--- pkg/ingester/client/client.go | 15 +++++++++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4fb3ddda91e..5164a072829 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -70,6 +70,7 @@ type Distributor struct { type Config struct { EnableBilling bool BillingConfig billing.Config + IngesterClientConfig ingester_client.Config ReplicationFactor int RemoteTimeout time.Duration @@ -79,14 +80,14 @@ type Config struct { CompressToIngester bool // for testing - ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error) + ingesterClientFactory func(addr string, withCompression bool,cfg ingester_client.Config) (client.IngesterClient, error) } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.") cfg.BillingConfig.RegisterFlags(f) - + cfg.IngesterClientConfig.RegisterFlags(f) flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") @@ -224,7 +225,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester return client, nil } - client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester) + client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.RemoteTimeout, d.cfg.CompressToIngester,d.cfg.IngesterClientConfig) if err != nil { return nil, err } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 9dc8d3fbd54..ac0ad671875 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -8,15 +8,17 @@ import ( _ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered "github.com/weaveworks/common/middleware" + "flag" ) type closableIngesterClient struct { IngesterClient conn *grpc.ClientConn + cfg Config } // MakeIngesterClient makes a new IngesterClient -func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, error) { +func MakeIngesterClient(addr string, withCompression bool,cfg Config) (IngesterClient, error) { opts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( @@ -24,7 +26,7 @@ func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, erro middleware.ClientUserHeaderInterceptor, )), // We have seen 20MB returns from queries - add a bit of headroom - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64 * 1024 * 1024)), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize( cfg.MaxRecvMsgSize)), } if withCompression { opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) @@ -42,3 +44,12 @@ func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, erro func (c *closableIngesterClient) Close() error { return c.conn.Close() } + + +type Config struct { + MaxRecvMsgSize int +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.MaxRecvMsgSize,"ingester.client.max-recv-message-size",64*1024*1024,"Maximum message size, in bytes, this client will receive") +} \ No newline at end of file From 7da1f75d7918567e591716d0ed87891ca3bac4e9 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Tue, 6 Feb 2018 17:30:15 -0800 Subject: [PATCH 2/8] Updating ingester config lifecycle to include initialization of ingester client config --- pkg/ingester/ingester.go | 6 +++--- pkg/ingester/ingester_lifecycle.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 54e1a7384b5..7595f825ba1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -64,7 +64,7 @@ var ( type Config struct { RingConfig ring.Config userStatesConfig UserStatesConfig - + clientConfig client.Config // Config for the ingester lifecycle control ListenPort *int NumTokens int @@ -90,7 +90,7 @@ type Config struct { infName string id string skipUnregister bool - ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error) + ingesterClientFactory func(addr string, withCompression bool, cfg client.Config) (client.IngesterClient, error) KVClient ring.KVClient } @@ -98,7 +98,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.RingConfig.RegisterFlags(f) cfg.userStatesConfig.RegisterFlags(f) - + cfg.clientConfig.RegisterFlags(f) f.IntVar(&cfg.NumTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") f.DurationVar(&cfg.HeartbeatPeriod, "ingester.heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") f.DurationVar(&cfg.JoinAfter, "ingester.join-after", 0*time.Second, "Period to wait for a claim from another ingester; will join automatically after this.") diff --git a/pkg/ingester/ingester_lifecycle.go b/pkg/ingester/ingester_lifecycle.go index da6bf0dde7d..3d025874d66 100644 --- a/pkg/ingester/ingester_lifecycle.go +++ b/pkg/ingester/ingester_lifecycle.go @@ -359,7 +359,7 @@ func (i *Ingester) transferChunks() error { } level.Info(util.Logger).Log("msg", "sending chunks to ingester", "ingester", targetIngester.Addr) - c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false) + c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false, i.cfg.clientConfig) if err != nil { return err } From 37579df66f08bc1787f0a388efc35d3bf1cef30a Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Thu, 15 Feb 2018 11:26:34 -0800 Subject: [PATCH 3/8] minor reformatting adjustments --- pkg/distributor/distributor.go | 8 ++++---- pkg/ingester/client/client.go | 11 +++++------ pkg/ingester/ingester.go | 2 +- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 5164a072829..6a09db722de 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -68,8 +68,8 @@ type Distributor struct { // Config contains the configuration require to // create a Distributor type Config struct { - EnableBilling bool - BillingConfig billing.Config + EnableBilling bool + BillingConfig billing.Config IngesterClientConfig ingester_client.Config ReplicationFactor int @@ -80,7 +80,7 @@ type Config struct { CompressToIngester bool // for testing - ingesterClientFactory func(addr string, withCompression bool,cfg ingester_client.Config) (client.IngesterClient, error) + ingesterClientFactory func(addr string, withCompression bool, cfg ingester_client.Config) (client.IngesterClient, error) } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -225,7 +225,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester return client, nil } - client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.RemoteTimeout, d.cfg.CompressToIngester,d.cfg.IngesterClientConfig) + client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester, d.cfg.IngesterClientConfig) if err != nil { return nil, err } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index ac0ad671875..8a9b3ce88b1 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -14,11 +14,11 @@ import ( type closableIngesterClient struct { IngesterClient conn *grpc.ClientConn - cfg Config + cfg Config } // MakeIngesterClient makes a new IngesterClient -func MakeIngesterClient(addr string, withCompression bool,cfg Config) (IngesterClient, error) { +func MakeIngesterClient(addr string, withCompression bool, cfg Config) (IngesterClient, error) { opts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( @@ -26,7 +26,7 @@ func MakeIngesterClient(addr string, withCompression bool,cfg Config) (IngesterC middleware.ClientUserHeaderInterceptor, )), // We have seen 20MB returns from queries - add a bit of headroom - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize( cfg.MaxRecvMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)), } if withCompression { opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) @@ -45,11 +45,10 @@ func (c *closableIngesterClient) Close() error { return c.conn.Close() } - type Config struct { MaxRecvMsgSize int } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.MaxRecvMsgSize,"ingester.client.max-recv-message-size",64*1024*1024,"Maximum message size, in bytes, this client will receive") -} \ No newline at end of file + f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive") +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7595f825ba1..fa6b20874f0 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -64,7 +64,7 @@ var ( type Config struct { RingConfig ring.Config userStatesConfig UserStatesConfig - clientConfig client.Config + clientConfig client.Config // Config for the ingester lifecycle control ListenPort *int NumTokens int From 4e32b729ab07b0ce6070db074819bdf26b03b617 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Thu, 15 Feb 2018 14:43:30 -0800 Subject: [PATCH 4/8] moving comment to where the default message size is set --- pkg/ingester/client/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 8a9b3ce88b1..b00f513d470 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -25,7 +25,6 @@ func MakeIngesterClient(addr string, withCompression bool, cfg Config) (Ingester otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, )), - // We have seen 20MB returns from queries - add a bit of headroom grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)), } if withCompression { @@ -50,5 +49,6 @@ type Config struct { } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive") + // We have seen 20MB returns from queries - add a bit of headroom + f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.") } From e954800d2a1fdc7007ef41bb33eb8cef7ec19ea3 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Fri, 16 Feb 2018 08:15:06 -0800 Subject: [PATCH 5/8] moving CompressToIngester setting to be contained within the ingesterClient config. --- pkg/distributor/distributor.go | 6 ++---- pkg/ingester/client/client.go | 7 +++++-- pkg/ingester/ingester.go | 2 +- pkg/ingester/ingester_lifecycle.go | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6a09db722de..0998d62efac 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -77,10 +77,9 @@ type Config struct { ClientCleanupPeriod time.Duration IngestionRateLimit float64 IngestionBurstSize int - CompressToIngester bool // for testing - ingesterClientFactory func(addr string, withCompression bool, cfg ingester_client.Config) (client.IngesterClient, error) + ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error) } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -93,7 +92,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") - flag.BoolVar(&cfg.CompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters.") } // New constructs a new Distributor @@ -225,7 +223,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester return client, nil } - client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester, d.cfg.IngesterClientConfig) + client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.IngesterClientConfig) if err != nil { return nil, err } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index b00f513d470..1b02fe4ee41 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -18,7 +18,7 @@ type closableIngesterClient struct { } // MakeIngesterClient makes a new IngesterClient -func MakeIngesterClient(addr string, withCompression bool, cfg Config) (IngesterClient, error) { +func MakeIngesterClient(addr string, cfg Config) (IngesterClient, error) { opts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( @@ -27,7 +27,7 @@ func MakeIngesterClient(addr string, withCompression bool, cfg Config) (Ingester )), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)), } - if withCompression { + if cfg.CompressToIngester { opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) } conn, err := grpc.Dial(addr, opts...) @@ -46,9 +46,12 @@ func (c *closableIngesterClient) Close() error { type Config struct { MaxRecvMsgSize int + CompressToIngester bool } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // We have seen 20MB returns from queries - add a bit of headroom f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.") + // moved from distributor pkg, but flag prefix left as-is so existing users do not break. + flag.BoolVar(&cfg.CompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters.") } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index fa6b20874f0..b3007aa61d8 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -90,7 +90,7 @@ type Config struct { infName string id string skipUnregister bool - ingesterClientFactory func(addr string, withCompression bool, cfg client.Config) (client.IngesterClient, error) + ingesterClientFactory func(addr string, cfg client.Config) (client.IngesterClient, error) KVClient ring.KVClient } diff --git a/pkg/ingester/ingester_lifecycle.go b/pkg/ingester/ingester_lifecycle.go index 3d025874d66..e82e77c43e2 100644 --- a/pkg/ingester/ingester_lifecycle.go +++ b/pkg/ingester/ingester_lifecycle.go @@ -359,7 +359,7 @@ func (i *Ingester) transferChunks() error { } level.Info(util.Logger).Log("msg", "sending chunks to ingester", "ingester", targetIngester.Addr) - c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false, i.cfg.clientConfig) + c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.cfg.clientConfig) if err != nil { return err } From 581d341f050e8dfbf3aa077955baf3679366ec6d Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Sat, 24 Feb 2018 11:34:36 -0800 Subject: [PATCH 6/8] Updating ingester client flags to have the a compress flag under the ingester.client namespace, but also leaving a fallback flag under distributor for back compat --- pkg/ingester/client/client.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 1b02fe4ee41..d323e02e15f 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -27,6 +27,9 @@ func MakeIngesterClient(addr string, cfg Config) (IngesterClient, error) { )), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)), } + if cfg.legacyCompressToIngester { + cfg.CompressToIngester = true + } if cfg.CompressToIngester { opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) } @@ -47,11 +50,13 @@ func (c *closableIngesterClient) Close() error { type Config struct { MaxRecvMsgSize int CompressToIngester bool + legacyCompressToIngester bool } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // We have seen 20MB returns from queries - add a bit of headroom f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.") - // moved from distributor pkg, but flag prefix left as-is so existing users do not break. - flag.BoolVar(&cfg.CompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters.") + flag.BoolVar(&cfg.CompressToIngester, "ingester.client.compress-to-ingester", false, "Compress data in calls to ingesters.") + // moved from distributor pkg, but flag prefix left as back compat fallback for existing users. + flag.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters.") } From 41f9cc600774bfd63a87265c41c0c21fa4180039 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Sat, 24 Feb 2018 12:53:06 -0800 Subject: [PATCH 7/8] Adding a help output comment to the back-combat flag option that it is deprecated --- pkg/ingester/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index d323e02e15f..e743c0356b8 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -58,5 +58,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.") flag.BoolVar(&cfg.CompressToIngester, "ingester.client.compress-to-ingester", false, "Compress data in calls to ingesters.") // moved from distributor pkg, but flag prefix left as back compat fallback for existing users. - flag.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters.") + flag.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead") } From 4935065a09d49e57d400157b515224361ca1c53a Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Tue, 27 Feb 2018 08:25:40 -0800 Subject: [PATCH 8/8] Updating missed tests & formatting --- pkg/distributor/distributor_test.go | 4 ++-- pkg/ingester/client/client.go | 8 +++++--- pkg/ingester/ingester_lifecycle_test.go | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2db97ba023f..437ee88c912 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -165,7 +165,7 @@ func TestDistributorPush(t *testing.T) { IngestionRateLimit: 10000, IngestionBurstSize: 10000, - ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) { + ingesterClientFactory: func(addr string, _ client.Config) (client.IngesterClient, error) { return ingesters[addr], nil }, }, ring) @@ -305,7 +305,7 @@ func TestDistributorQuery(t *testing.T) { IngestionRateLimit: 10000, IngestionBurstSize: 10000, - ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) { + ingesterClientFactory: func(addr string, _ client.Config) (client.IngesterClient, error) { return ingesters[addr], nil }, }, ring) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index e743c0356b8..808fb613059 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -7,8 +7,8 @@ import ( "google.golang.org/grpc" _ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered - "github.com/weaveworks/common/middleware" "flag" + "github.com/weaveworks/common/middleware" ) type closableIngesterClient struct { @@ -47,12 +47,14 @@ func (c *closableIngesterClient) Close() error { return c.conn.Close() } +// Config is the configuration struct for the ingester client type Config struct { - MaxRecvMsgSize int - CompressToIngester bool + MaxRecvMsgSize int + CompressToIngester bool legacyCompressToIngester bool } +// RegisterFlags registers configuration settings used by the ingester client config func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // We have seen 20MB returns from queries - add a bit of headroom f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.") diff --git a/pkg/ingester/ingester_lifecycle_test.go b/pkg/ingester/ingester_lifecycle_test.go index 473b642c357..58ffbf25b2b 100644 --- a/pkg/ingester/ingester_lifecycle_test.go +++ b/pkg/ingester/ingester_lifecycle_test.go @@ -118,7 +118,7 @@ func TestIngesterTransfer(t *testing.T) { require.NoError(t, err) // Let ing2 send chunks to ing1 - ing1.cfg.ingesterClientFactory = func(addr string, _ bool) (client.IngesterClient, error) { + ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.IngesterClient, error) { return ingesterClientAdapater{ ingester: ing2, }, nil