Skip to content

Commit 2c4426d

Browse files
committed
Add config for turning on health check behavior
1 parent f641950 commit 2c4426d

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

pkg/distributor/distributor.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/prometheus/pkg/labels"
2121
"github.com/prometheus/prometheus/promql"
2222
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"
23+
"google.golang.org/grpc/health/grpc_health_v1"
2324

2425
billing "github.com/weaveworks/billing-client"
2526
"github.com/weaveworks/common/instrument"
@@ -28,7 +29,6 @@ import (
2829
ingester_client "github.com/weaveworks/cortex/pkg/ingester/client"
2930
"github.com/weaveworks/cortex/pkg/ring"
3031
"github.com/weaveworks/cortex/pkg/util"
31-
"google.golang.org/grpc/health/grpc_health_v1"
3232
)
3333

3434
var errIngestionRateLimitExceeded = errors.New("ingestion rate limit exceeded")
@@ -74,11 +74,12 @@ type Config struct {
7474
BillingConfig billing.Config
7575
IngesterClientConfig ingester_client.Config
7676

77-
ReplicationFactor int
78-
RemoteTimeout time.Duration
79-
ClientCleanupPeriod time.Duration
80-
IngestionRateLimit float64
81-
IngestionBurstSize int
77+
ReplicationFactor int
78+
RemoteTimeout time.Duration
79+
ClientCleanupPeriod time.Duration
80+
IngestionRateLimit float64
81+
IngestionBurstSize int
82+
HealthCheckIngesters bool
8283

8384
// for testing
8485
ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error)
@@ -94,6 +95,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9495
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
9596
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
9697
flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
98+
flag.BoolVar(&cfg.HealthCheckIngesters, "distributor.health-check-ingesters", false, "Run a health check on each ingester client during the cleanup period.")
9799
}
98100

99101
// New constructs a new Distributor
@@ -171,7 +173,9 @@ func (d *Distributor) Run() {
171173
select {
172174
case <-cleanupClients.C:
173175
d.removeStaleIngesterClients()
174-
d.healthCheckAndRemoveIngesters()
176+
if d.cfg.HealthCheckIngesters {
177+
d.healthCheckAndRemoveIngesters()
178+
}
175179
case <-d.quit:
176180
close(d.done)
177181
return
@@ -223,7 +227,7 @@ func (d *Distributor) healthCheckAndRemoveIngesters() {
223227
func (d *Distributor) healthCheckAndRemoveIngester(ingester *ring.IngesterDesc) {
224228
client, err := d.getClientFor(ingester)
225229
if err != nil {
226-
d.removeClientFor(ingester, err)
230+
return
227231
}
228232

229233
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)

0 commit comments

Comments
 (0)