Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
* [CHANGE] Renamed `-<prefix>.redis.enable-tls` CLI flag to `-<prefix>.redis.tls-enabled`, and its respective YAML config option from `enable_tls` to `tls_enabled`. #3298
* [CHANGE] Increased default `-<prefix>.redis.timeout` from `100ms` to `500ms`. #3301
* [CHANGE] `cortex_alertmanager_config_invalid` has been removed in favor of `cortex_alertmanager_config_last_reload_successful`. #3289
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
* [FEATURE] Shuffle sharding: added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
* [FEATURE] Shuffle sharding: added support for shuffle-sharding ingesters on the read path. When ingesters shuffle-sharding is enabled and `-querier.shuffle-sharding-ingesters-lookback-period` is set, queriers will fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. #3252
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
Expand Down Expand Up @@ -92,6 +93,7 @@
* [BUGFIX] Use a valid grpc header when logging IP addresses. #3307
* [BUGFIX] Fixed the metric `cortex_prometheus_rule_group_duration_seconds` in the Ruler, it wouldn't report any values. #3310
* [BUGFIX] Fixed gRPC connections leaking in rulers when rulers sharding is enabled and APIs called. #3314
* [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299

## 1.4.0 / 2020-10-02

Expand Down
9 changes: 9 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ querier:
# Default value 0 means secondary store is always queried.
# CLI flag: -querier.use-second-store-before-time
[use_second_store_before_time: <time> | default = 0]

# When distributor's sharding strategy is shuffle-sharding and this setting is
# > 0, queriers fetch in-memory series from the minimum set of required
# ingesters, selecting only ingesters which may have received series since
# 'now - lookback period'. The lookback period should be greater or equal than
# the configured 'query store after'. If this setting is 0, queriers always
# query all ingesters (ingesters shuffle sharding on read path is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
```

### `blocks_storage_config`
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,15 @@ store_gateway_client:
# Default value 0 means secondary store is always queried.
# CLI flag: -querier.use-second-store-before-time
[use_second_store_before_time: <time> | default = 0]

# When distributor's sharding strategy is shuffle-sharding and this setting is >
# 0, queriers fetch in-memory series from the minimum set of required ingesters,
# selecting only ingesters which may have received series since 'now - lookback
# period'. The lookback period should be greater or equal than the configured
# 'query store after'. If this setting is 0, queriers always query all ingesters
# (ingesters shuffle sharding on read path is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
```

### `query_frontend_config`
Expand Down
6 changes: 6 additions & 0 deletions integration/e2e/metrics_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type MetricsOptions struct {
GetValue GetMetricValueFunc
LabelMatchers []*labels.Matcher
WaitMissingMetrics bool
SkipMissingMetrics bool
}

// WithMetricCount is an option to get the histogram/summary count as metric value.
Expand All @@ -43,6 +44,11 @@ func WaitMissingMetrics(opts *MetricsOptions) {
opts.WaitMissingMetrics = true
}

// SkipWaitMissingMetrics is an option to skip/ignore whenever an expected metric is missing.
func SkipMissingMetrics(opts *MetricsOptions) {
opts.SkipMissingMetrics = true
}

func buildMetricsOptions(opts []MetricsOption) MetricsOptions {
result := DefaultMetricsOptions
for _, opt := range opts {
Expand Down
8 changes: 8 additions & 0 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,12 +577,20 @@ func (s *HTTPService) SumMetrics(metricNames []string, opts ...MetricsOption) ([
// Get the metric family.
mf, ok := families[m]
if !ok {
if options.SkipMissingMetrics {
continue
}

return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name)
}

// Filter metrics.
metrics := filterMetrics(mf.GetMetric(), options)
if len(metrics) == 0 {
if options.SkipMissingMetrics {
continue
}

return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name)
}

Expand Down
41 changes: 39 additions & 2 deletions integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
Expand Down Expand Up @@ -43,9 +45,16 @@ func TestIngesterSharding(t *testing.T) {
defer s.Close()

flags := BlocksStorageFlags
flags["-distributor.shard-by-all-labels"] = "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ingester shuffle sharding on read path depend on shard-by-all-labels being set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't. I enabled it in this test just because it's how we (and to my knowledge most of users) run Cortex, so to have an integration test which better reflects real setups.

flags["-distributor.sharding-strategy"] = testData.shardingStrategy
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)

if testData.shardingStrategy == "shuffle-sharding" {
// Enable shuffle sharding on read path but not lookback, otherwise all ingesters would be
// queried being just registered.
flags["-querier.shuffle-sharding-ingesters-lookback-period"] = "1ns"
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
Expand All @@ -56,6 +65,7 @@ func TestIngesterSharding(t *testing.T) {
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags, "")
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags, "")
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags, "")
ingesters := e2ecortex.NewCompositeCortexService(ingester1, ingester2, ingester3)
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))

Expand All @@ -70,15 +80,19 @@ func TestIngesterSharding(t *testing.T) {

// Push series.
now := time.Now()
expectedVectors := map[string]model.Vector{}

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

for i := 1; i <= numSeriesToPush; i++ {
series, _ := generateSeries(fmt.Sprintf("series_%d", i), now)
metricName := fmt.Sprintf("series_%d", i)
series, expectedVector := generateSeries(metricName, now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

expectedVectors[metricName] = expectedVector
}

// Extract metrics from ingesters.
Expand All @@ -99,6 +113,29 @@ func TestIngesterSharding(t *testing.T) {
require.Equal(t, testData.expectedIngestersWithSeries, numIngestersWithSeries)
require.Equal(t, numSeriesToPush, totalIngestedSeries)

// Query back series.
for metricName, expectedVector := range expectedVectors {
result, err := client.Query(metricName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

// We expect that only ingesters belonging to tenant's shard have been queried if
// shuffle sharding is enabled.
expectedIngesters := ingesters.NumInstances()
if testData.shardingStrategy == "shuffle-sharding" {
expectedIngesters = testData.tenantShardSize
}

expectedCalls := expectedIngesters * len(expectedVectors)
require.NoError(t, ingesters.WaitSumMetricsWithOptions(
e2e.Equals(float64(expectedCalls)),
[]string{"cortex_request_duration_seconds"},
e2e.WithMetricCount,
e2e.SkipMissingMetrics, // Some ingesters may have received no request at all.
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"))))

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester1)
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (t *Cortex) initOverrides() (serv services.Service, err error) {

func (t *Cortex) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down
55 changes: 39 additions & 16 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ type Config struct {
// when true the distributor does not validate the label name, Cortex doesn't directly use
// this (and should never use it) but this feature is used by other projects built on top of it
SkipLabelNameValidation bool `yaml:"-"`

// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -622,16 +625,8 @@ func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, time
return err
}

// ForAllIngesters runs f, in parallel, for all ingesters
func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f func(context.Context, client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
if err != nil {
return nil, err
}
if reallyAll {
replicationSet.MaxErrors = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was this reallyAll branch that's being removed used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. reallyAll was used only by UserStats() to make sure the request will fail if querying any ingester will fail (by default we allow 1 failure, due to quorum). The logic has not changed: I've just moved replicationSet.MaxErrors = 0 to UserStats().

}

// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
Expand All @@ -644,10 +639,15 @@ func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f fun

// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) ([]string, error) {
replicationSet, err := d.getIngestersForMetadata(ctx)
if err != nil {
return nil, err
}

req := &client.LabelValuesRequest{
LabelName: string(labelName),
}
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.LabelValues(ctx, req)
})
if err != nil {
Expand All @@ -670,8 +670,13 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod

// LabelNames returns all of the label names.
func (d *Distributor) LabelNames(ctx context.Context) ([]string, error) {
replicationSet, err := d.getIngestersForMetadata(ctx)
if err != nil {
return nil, err
}

req := &client.LabelNamesRequest{}
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.LabelNames(ctx, req)
})
if err != nil {
Expand All @@ -698,12 +703,17 @@ func (d *Distributor) LabelNames(ctx context.Context) ([]string, error) {

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
replicationSet, err := d.getIngestersForMetadata(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
if err != nil {
return nil, err
}

resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.MetricsForLabelMatchers(ctx, req)
})
if err != nil {
Expand All @@ -729,9 +739,14 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

// MetricsMetadata returns all metric metadata of a user.
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
replicationSet, err := d.getIngestersForMetadata(ctx)
if err != nil {
return nil, err
}

req := &ingester_client.MetricsMetadataRequest{}
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -764,8 +779,16 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
replicationSet, err := d.getIngestersForMetadata(ctx)
if err != nil {
return nil, err
}

// Make sure we get a successful response from all of them.
replicationSet.MaxErrors = 0

req := &client.UserStatsRequest{}
resps, err := d.ForAllIngesters(ctx, true, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
return client.UserStats(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -801,7 +824,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {

req := &client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
// Not using d.ForAllIngesters(), so we can fail after first error.
// Not using d.ForReplicationSet(), so we can fail after first error.
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
if err != nil {
return nil, err
Expand Down
Loading