Skip to content

Commit 2abac33

Browse files
Julien Pivottopracuccipstibrany
authored
Support metric_relabel_configs in distributor (#3329)
* Support metric_relabel_configs in distributor Signed-off-by: Julien Pivotto <[email protected]> * Add unmarshal test Signed-off-by: Julien Pivotto <[email protected]> * Mark as experimental Signed-off-by: Julien Pivotto <[email protected]> * Update docs/configuration/config-file-reference.template Co-authored-by: Marco Pracucci <[email protected]> Signed-off-by: Peter Štibraný <[email protected]> * Fix docs. Signed-off-by: Peter Štibraný <[email protected]> Co-authored-by: Marco Pracucci <[email protected]> Co-authored-by: Peter Štibraný <[email protected]> Co-authored-by: Peter Štibraný <[email protected]>
1 parent f841c52 commit 2abac33

File tree

9 files changed

+143
-1
lines changed

9 files changed

+143
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
* [FEATURE] Shuffle sharding: added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
5252
* [FEATURE] Shuffle sharding: added support for shuffle-sharding ingesters on the read path. When ingesters shuffle-sharding is enabled and `-querier.shuffle-sharding-ingesters-lookback-period` is set, queriers will fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. #3252
5353
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
54+
* [FEATURE] Added support for applying Prometheus relabel configs on series received by the distributor. A `metric_relabel_configs` field has been added to the per-tenant limits configuration. #3329
5455
* [ENHANCEMENT] Ruler: Introduces two new limits `-ruler.max-rules-per-rule-group` and `-ruler.max-rule-groups-per-tenant` to control the number of rules per rule group and the total number of rule groups for a given user. They are disabled by default. #3366
5556
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275
5657
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ To specify which configuration file to load, pass the `-config.file` flag at the
2424
* `<string>`: a regular string
2525
* `<url>`: an URL
2626
* `<prefix>`: a CLI flag prefix based on the context (look at the parent configuration block to see which CLI flags prefix should be used)
27+
* `<relabel_config>`: a [Prometheus relabeling configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config).
2728
* `<time>`: a timestamp, with available formats: `2006-01-20` (midnight, local timezone), `2006-01-20T15:04` (local timezone), and RFC 3339 formats: `2006-01-20T15:04:05Z` (UTC) or `2006-01-20T15:04:05+07:00` (explicit timezone)
2829

2930
### Use environment variables in the configuration
@@ -2799,6 +2800,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27992800
# CLI flag: -distributor.ingestion-tenant-shard-size
28002801
[ingestion_tenant_shard_size: <int> | default = 0]
28012802
2803+
# List of metric relabel configurations. Note that in most situations, it is
2804+
# more effective to use metrics relabeling directly in the Prometheus server,
2805+
# e.g. remote_write.write_relabel_configs.
2806+
[metric_relabel_configs: <relabel_config...> | default = ]
2807+
28022808
# The maximum number of series for which a query can fetch samples from each
28032809
# ingester. This limit is enforced only in the ingesters (when querying samples
28042810
# not flushed to the storage yet) and it's a per-instance limit. This limit is

docs/configuration/config-file-reference.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ To specify which configuration file to load, pass the `-config.file` flag at the
2424
* `<string>`: a regular string
2525
* `<url>`: an URL
2626
* `<prefix>`: a CLI flag prefix based on the context (look at the parent configuration block to see which CLI flags prefix should be used)
27+
* `<relabel_config>`: a [Prometheus relabeling configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config).
2728
* `<time>`: a timestamp, with available formats: `2006-01-20` (midnight, local timezone), `2006-01-20T15:04` (local timezone), and RFC 3339 formats: `2006-01-20T15:04:05Z` (UTC) or `2006-01-20T15:04:05+07:00` (explicit timezone)
2829

2930
### Use environment variables in the configuration

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,4 @@ Currently experimental features are:
5151
- TLS configuration in gRPC and HTTP clients.
5252
- TLS configuration in Etcd client.
5353
- Blocksconvert tools
54+
- Metric relabeling in the distributor.

pkg/distributor/distributor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/prometheus/client_golang/prometheus/promauto"
1616
"github.com/prometheus/common/model"
1717
"github.com/prometheus/prometheus/pkg/labels"
18+
"github.com/prometheus/prometheus/pkg/relabel"
1819
"github.com/prometheus/prometheus/scrape"
1920
"github.com/weaveworks/common/httpgrpc"
2021
"github.com/weaveworks/common/instrument"
@@ -446,6 +447,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
446447
latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
447448
}
448449

450+
if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
451+
l := relabel.Process(client.FromLabelAdaptersToLabels(ts.Labels), mrc...)
452+
ts.Labels = client.FromLabelsToLabelAdapters(l)
453+
}
454+
449455
// If we found both the cluster and replica labels, we only want to include the cluster label when
450456
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
451457
// series we're trying to dedupe when HA tracking moves over to a different replica.

pkg/distributor/distributor_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/prometheus/client_golang/prometheus/testutil"
1919
"github.com/prometheus/common/model"
2020
"github.com/prometheus/prometheus/pkg/labels"
21+
"github.com/prometheus/prometheus/pkg/relabel"
2122
"github.com/stretchr/testify/assert"
2223
"github.com/stretchr/testify/require"
2324
"github.com/weaveworks/common/httpgrpc"
@@ -1720,6 +1721,80 @@ func TestSortLabels(t *testing.T) {
17201721
})
17211722
}
17221723

1724+
func TestDistributor_Push_Relabel(t *testing.T) {
1725+
ctx = user.InjectOrgID(context.Background(), "user")
1726+
1727+
type testcase struct {
1728+
inputSeries labels.Labels
1729+
expectedSeries labels.Labels
1730+
metricRelabelConfigs []*relabel.Config
1731+
}
1732+
1733+
cases := []testcase{
1734+
// No relabel config.
1735+
{
1736+
inputSeries: labels.Labels{
1737+
{Name: "__name__", Value: "foo"},
1738+
{Name: "cluster", Value: "one"},
1739+
},
1740+
expectedSeries: labels.Labels{
1741+
{Name: "__name__", Value: "foo"},
1742+
{Name: "cluster", Value: "one"},
1743+
},
1744+
},
1745+
{
1746+
inputSeries: labels.Labels{
1747+
{Name: "__name__", Value: "foo"},
1748+
{Name: "cluster", Value: "one"},
1749+
},
1750+
expectedSeries: labels.Labels{
1751+
{Name: "__name__", Value: "foo"},
1752+
{Name: "cluster", Value: "two"},
1753+
},
1754+
metricRelabelConfigs: []*relabel.Config{
1755+
{
1756+
SourceLabels: []model.LabelName{"cluster"},
1757+
Action: relabel.DefaultRelabelConfig.Action,
1758+
Regex: relabel.DefaultRelabelConfig.Regex,
1759+
TargetLabel: "cluster",
1760+
Replacement: "two",
1761+
},
1762+
},
1763+
},
1764+
}
1765+
1766+
for _, tc := range cases {
1767+
var err error
1768+
var limits validation.Limits
1769+
flagext.DefaultValues(&limits)
1770+
limits.MetricRelabelConfigs = tc.metricRelabelConfigs
1771+
1772+
ds, ingesters, r := prepare(t, prepConfig{
1773+
numIngesters: 2,
1774+
happyIngesters: 2,
1775+
numDistributors: 1,
1776+
shardByAllLabels: true,
1777+
limits: &limits,
1778+
})
1779+
defer stopAll(ds, r)
1780+
1781+
// Push the series to the distributor
1782+
req := mockWriteRequest(tc.inputSeries, 1, 1)
1783+
_, err = ds[0].Push(ctx, req)
1784+
require.NoError(t, err)
1785+
1786+
// Since each test pushes only 1 series, we do expect the ingester
1787+
// to have received exactly 1 series
1788+
for i := range ingesters {
1789+
timeseries := ingesters[i].series()
1790+
assert.Equal(t, 1, len(timeseries))
1791+
for _, v := range timeseries {
1792+
assert.Equal(t, tc.expectedSeries, client.FromLabelAdaptersToLabels(v.Labels))
1793+
}
1794+
}
1795+
}
1796+
}
1797+
17231798
func countMockIngestersCalls(ingesters []mockIngester, name string) int {
17241799
count := 0
17251800
for i := 0; i < len(ingesters); i++ {

pkg/util/validation/limits.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"flag"
66
"time"
77

8+
"github.com/prometheus/prometheus/pkg/relabel"
9+
810
"github.com/cortexproject/cortex/pkg/util/flagext"
911
)
1012

@@ -46,6 +48,7 @@ type Limits struct {
4648
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name"`
4749
EnforceMetricName bool `yaml:"enforce_metric_name"`
4850
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size"`
51+
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."`
4952

5053
// Ingester enforced limits.
5154
// Series
@@ -376,6 +379,11 @@ func (o *Overrides) EvaluationDelay(userID string) time.Duration {
376379
return o.getOverridesForUser(userID).RulerEvaluationDelay
377380
}
378381

382+
// MetricRelabelConfigs returns the metric relabel configs for a given user.
383+
func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config {
384+
return o.getOverridesForUser(userID).MetricRelabelConfigs
385+
}
386+
379387
// RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy.
380388
func (o *Overrides) RulerTenantShardSize(userID string) int {
381389
return o.getOverridesForUser(userID).RulerTenantShardSize

pkg/util/validation/limits_test.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package validation
33
import (
44
"testing"
55

6+
"github.com/prometheus/common/model"
7+
"github.com/prometheus/prometheus/pkg/relabel"
68
"github.com/stretchr/testify/assert"
79
"github.com/stretchr/testify/require"
810
"gopkg.in/yaml.v2"
@@ -82,9 +84,32 @@ func TestLimitsLoadingFromYaml(t *testing.T) {
8284
inp := `ingestion_rate: 0.5`
8385

8486
l := Limits{}
85-
err := yaml.Unmarshal([]byte(inp), &l)
87+
err := yaml.UnmarshalStrict([]byte(inp), &l)
8688
require.NoError(t, err)
8789

8890
assert.Equal(t, 0.5, l.IngestionRate, "from yaml")
8991
assert.Equal(t, 100, l.MaxLabelNameLength, "from defaults")
9092
}
93+
94+
func TestMetricRelabelConfigLimitsLoadingFromYaml(t *testing.T) {
95+
SetDefaultLimitsForYAMLUnmarshalling(Limits{})
96+
97+
inp := `
98+
metric_relabel_configs:
99+
- action: drop
100+
source_labels: [le]
101+
regex: .+
102+
`
103+
exp := relabel.DefaultRelabelConfig
104+
exp.Action = relabel.Drop
105+
regex, err := relabel.NewRegexp(".+")
106+
require.NoError(t, err)
107+
exp.Regex = regex
108+
exp.SourceLabels = model.LabelNames([]model.LabelName{"le"})
109+
110+
l := Limits{}
111+
err = yaml.UnmarshalStrict([]byte(inp), &l)
112+
require.NoError(t, err)
113+
114+
assert.Equal(t, []*relabel.Config{&exp}, l.MetricRelabelConfigs)
115+
}

tools/doc-generator/parser.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,16 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl
193193
if err != nil {
194194
return nil, errors.Wrapf(err, "config=%s.%s", t.PkgPath(), t.Name())
195195
}
196+
if fieldFlag == nil {
197+
block.Add(&configEntry{
198+
kind: "field",
199+
name: fieldName,
200+
required: isFieldRequired(field),
201+
fieldDesc: getFieldDescription(field, ""),
202+
fieldType: fieldType,
203+
})
204+
continue
205+
}
196206

197207
block.Add(&configEntry{
198208
kind: "field",
@@ -243,6 +253,8 @@ func getFieldType(t reflect.Type) (string, error) {
243253
return "string", nil
244254
case "flagext.StringSliceCSV":
245255
return "string", nil
256+
case "[]*relabel.Config":
257+
return "relabel_config...", nil
246258
}
247259

248260
// Fallback to auto-detection of built-in data types
@@ -297,6 +309,9 @@ func getFieldType(t reflect.Type) (string, error) {
297309
}
298310

299311
func getFieldFlag(field reflect.StructField, fieldValue reflect.Value, flags map[uintptr]*flag.Flag) (*flag.Flag, error) {
312+
if isAbsentInCLI(field) {
313+
return nil, nil
314+
}
300315
fieldPtr := fieldValue.Addr().Pointer()
301316
fieldFlag, ok := flags[fieldPtr]
302317
if !ok {
@@ -395,6 +410,10 @@ func isFieldHidden(f reflect.StructField) bool {
395410
return getDocTagFlag(f, "hidden")
396411
}
397412

413+
func isAbsentInCLI(f reflect.StructField) bool {
414+
return getDocTagFlag(f, "nocli")
415+
}
416+
398417
func isFieldRequired(f reflect.StructField) bool {
399418
return getDocTagFlag(f, "required")
400419
}

0 commit comments

Comments
 (0)