Skip to content

Commit f1c3f89

Browse files
committed
Implementy max_fetched_data_bytes_per_query limit
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 48bc900 commit f1c3f89

File tree

17 files changed

+363
-52
lines changed

17 files changed

+363
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* [FEATURE] Added `-api.http-request-headers-to-log` allowing for the addition of HTTP Headers to logs #4803
5656
* [FEATURE] Distributor: Added a new limit `-validation.max-labels-size-bytes` allowing to limit the combined size of labels for each timeseries. #4848
5757
* [FEATURE] Storage/Bucket: Added `-*.s3.bucket-lookup-type` allowing to configure the s3 bucket lookup type. #4794
58+
* [FEATURE] Querier: Added a new limit `-querier.max-fetched-data-bytes-per-query` allowing to limit the maximum size of all data in bytes that a query can fetch from each ingester and storage. #4854
5859
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804
5960
* [BUGFIX] Ruler: Fix /ruler/rule_groups returns YAML with extra fields. #4767
6061

docs/configuration/config-file-reference.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2673,12 +2673,19 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
26732673
# CLI flag: -querier.max-fetched-series-per-query
26742674
[max_fetched_series_per_query: <int> | default = 0]
26752675
2676-
# The maximum size of all chunks in bytes that a query can fetch from each
2677-
# ingester and storage. This limit is enforced in the querier and ruler only
2678-
# when running Cortex with blocks storage. 0 to disable.
2676+
# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
2677+
# of all chunks in bytes that a query can fetch from each ingester and storage.
2678+
# This limit is enforced in the querier and ruler only when running Cortex with
2679+
# blocks storage. 0 to disable.
26792680
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
26802681
[max_fetched_chunk_bytes_per_query: <int> | default = 0]
26812682
2683+
# The maximum combined size of all data that a query can fetch from each
2684+
# ingester and storage. This limit is enforced in the querier and ruler only
2685+
# when running Cortex with blocks storage. 0 to disable.
2686+
# CLI flag: -querier.max-fetched-data-bytes-per-query
2687+
[max_fetched_data_bytes_per_query: <int> | default = 0]
2688+
26822689
# Limit how long back data (series and metadata) can be queried, up until
26832690
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
26842691
# and ruler. If the requested time range is outside the allowed range, the

pkg/distributor/distributor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,9 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
10291029
}
10301030
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
10311031
for _, m := range ms {
1032+
if err := queryLimiter.AddDataBytes(resp.LabelsSize()); err != nil {
1033+
return nil, err
1034+
}
10321035
if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil {
10331036
return nil, err
10341037
}
@@ -1065,6 +1068,9 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
10651068
for _, metric := range resp.Metric {
10661069
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)
10671070

1071+
if err := queryLimiter.AddDataBytes(resp.LabelsSize()); err != nil {
1072+
return nil, err
1073+
}
10681074
if err := queryLimiter.AddSeries(metric.Labels); err != nil {
10691075
return nil, err
10701076
}

pkg/distributor/distributor_test.go

Lines changed: 92 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
10321032
limits: limits,
10331033
})
10341034

1035-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit))
1035+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, 0))
10361036

10371037
// Push a number of series below the max chunks limit. Each series has 1 sample,
10381038
// so expect 1 chunk per series when querying back.
@@ -1077,7 +1077,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
10771077
ctx := user.InjectOrgID(context.Background(), "user")
10781078
limits := &validation.Limits{}
10791079
flagext.DefaultValues(limits)
1080-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0))
1080+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0))
10811081

10821082
// Prepare distributors.
10831083
ds, _, _, _ := prepare(t, prepConfig{
@@ -1161,7 +1161,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
11611161
var maxBytesLimit = (seriesToAdd) * responseChunkSize
11621162

11631163
// Update the limiter with the calculated limits.
1164-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0))
1164+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0, 0))
11651165

11661166
// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
11671167
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
@@ -1192,6 +1192,75 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
11921192
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, maxBytesLimit)))
11931193
}
11941194

1195+
func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsReached(t *testing.T) {
1196+
const seriesToAdd = 10
1197+
1198+
ctx := user.InjectOrgID(context.Background(), "user")
1199+
limits := &validation.Limits{}
1200+
flagext.DefaultValues(limits)
1201+
1202+
// Prepare distributors.
1203+
// Use replication factor of 2 to always read all the chunks from both ingesters,
1204+
// this guarantees us to always read the same chunks and have a stable test.
1205+
ds, _, _, _ := prepare(t, prepConfig{
1206+
numIngesters: 2,
1207+
happyIngesters: 2,
1208+
numDistributors: 1,
1209+
shardByAllLabels: true,
1210+
limits: limits,
1211+
replicationFactor: 2,
1212+
})
1213+
1214+
allSeriesMatchers := []*labels.Matcher{
1215+
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
1216+
}
1217+
// Push a single series to allow us to calculate the label size to calculate the limit for the test.
1218+
writeReq := &cortexpb.WriteRequest{}
1219+
writeReq.Timeseries = append(writeReq.Timeseries,
1220+
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0),
1221+
)
1222+
writeRes, err := ds[0].Push(ctx, writeReq)
1223+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
1224+
assert.Nil(t, err)
1225+
labelSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1226+
require.NoError(t, err)
1227+
1228+
// Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size.
1229+
var responseLabelSize = labelSizeResponse.LabelsSize()
1230+
var maxBytesLimit = (seriesToAdd) * responseLabelSize * 2 // Multiplying by RF because the limit is applied before de-duping.
1231+
1232+
// Update the limiter with the calculated limits.
1233+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, maxBytesLimit))
1234+
1235+
// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
1236+
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
1237+
writeRes, err = ds[0].Push(ctx, writeReq)
1238+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
1239+
assert.Nil(t, err)
1240+
1241+
// Since the number of chunk bytes is equal to the limit (but doesn't
1242+
// exceed it), we expect a query running on all series to succeed.
1243+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1244+
require.NoError(t, err)
1245+
assert.Len(t, queryRes.Chunkseries, seriesToAdd)
1246+
1247+
// Push another series to exceed the chunk bytes limit once we'll query back all series.
1248+
writeReq = &cortexpb.WriteRequest{}
1249+
writeReq.Timeseries = append(writeReq.Timeseries,
1250+
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0),
1251+
)
1252+
1253+
writeRes, err = ds[0].Push(ctx, writeReq)
1254+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
1255+
assert.Nil(t, err)
1256+
1257+
// Since the aggregated chunk size is exceeding the limit, we expect
1258+
// a query running on all series to fail.
1259+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1260+
require.Error(t, err)
1261+
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, maxBytesLimit)))
1262+
}
1263+
11951264
func TestDistributor_Push_LabelRemoval(t *testing.T) {
11961265
ctx := user.InjectOrgID(context.Background(), "user")
11971266

@@ -1930,7 +1999,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19301999
},
19312000
expectedResult: []metric.Metric{},
19322001
expectedIngesters: numIngesters,
1933-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2002+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19342003
expectedErr: nil,
19352004
},
19362005
"should filter metrics by single matcher": {
@@ -1942,7 +2011,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19422011
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19432012
},
19442013
expectedIngesters: numIngesters,
1945-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2014+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19462015
expectedErr: nil,
19472016
},
19482017
"should filter metrics by multiple matchers": {
@@ -1954,7 +2023,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19542023
{Metric: util.LabelsToMetric(fixtures[0].lbls)},
19552024
},
19562025
expectedIngesters: numIngesters,
1957-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2026+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19582027
expectedErr: nil,
19592028
},
19602029
"should return all matching metrics even if their FastFingerprint collide": {
@@ -1966,7 +2035,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19662035
{Metric: util.LabelsToMetric(fixtures[4].lbls)},
19672036
},
19682037
expectedIngesters: numIngesters,
1969-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2038+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19702039
expectedErr: nil,
19712040
},
19722041
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": {
@@ -1980,7 +2049,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19802049
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19812050
},
19822051
expectedIngesters: 3,
1983-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2052+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19842053
expectedErr: nil,
19852054
},
19862055
"should query all ingesters if shuffle sharding is enabled but shard size is 0": {
@@ -1994,7 +2063,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19942063
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19952064
},
19962065
expectedIngesters: numIngesters,
1997-
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
2066+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
19982067
expectedErr: nil,
19992068
},
20002069
"should return err if series limit is exhausted": {
@@ -2005,9 +2074,20 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
20052074
},
20062075
expectedResult: nil,
20072076
expectedIngesters: numIngesters,
2008-
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
2077+
queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0),
20092078
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)),
20102079
},
2080+
"should return err if data bytes limit is exhausted": {
2081+
shuffleShardEnabled: true,
2082+
shuffleShardSize: 0,
2083+
matchers: []*labels.Matcher{
2084+
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
2085+
},
2086+
expectedResult: nil,
2087+
expectedIngesters: numIngesters,
2088+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1),
2089+
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, 1)),
2090+
},
20112091
"should not exhaust series limit when only one series is fetched": {
20122092
matchers: []*labels.Matcher{
20132093
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"),
@@ -2016,7 +2096,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
20162096
{Metric: util.LabelsToMetric(fixtures[2].lbls)},
20172097
},
20182098
expectedIngesters: numIngesters,
2019-
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
2099+
queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0),
20202100
expectedErr: nil,
20212101
},
20222102
}
@@ -2116,7 +2196,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
21162196
matchers: []*labels.Matcher{
21172197
mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"),
21182198
},
2119-
queryLimiter: limiter.NewQueryLimiter(100, 0, 0),
2199+
queryLimiter: limiter.NewQueryLimiter(100, 0, 0, 0),
21202200
expectedErr: nil,
21212201
},
21222202
}

pkg/distributor/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
335335
return nil, validation.LimitError(chunkBytesLimitErr.Error())
336336
}
337337

338+
if dataBytesLimitErr := queryLimiter.AddDataBytes(resp.ChunksSize() + resp.LabelsSize()); dataBytesLimitErr != nil {
339+
return nil, validation.LimitError(dataBytesLimitErr.Error())
340+
}
341+
338342
for _, series := range resp.Timeseries {
339343
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
340344
return nil, validation.LimitError(limitErr.Error())
@@ -392,6 +396,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
392396

393397
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
394398
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))
399+
reqStats.AddFetchedDataBytes(uint64(resp.LabelsSize() + resp.ChunksSize()))
395400

396401
return resp, nil
397402
}

pkg/frontend/transport/handler.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,11 @@ type Handler struct {
6060
roundTripper http.RoundTripper
6161

6262
// Metrics.
63-
querySeconds *prometheus.CounterVec
64-
querySeries *prometheus.CounterVec
65-
queryBytes *prometheus.CounterVec
66-
activeUsers *util.ActiveUsersCleanupService
63+
querySeconds *prometheus.CounterVec
64+
querySeries *prometheus.CounterVec
65+
queryBytes *prometheus.CounterVec
66+
queryDataBytes *prometheus.CounterVec
67+
activeUsers *util.ActiveUsersCleanupService
6768
}
6869

6970
// NewHandler creates a new frontend handler.
@@ -90,10 +91,16 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
9091
Help: "Size of all chunks fetched to execute a query in bytes.",
9192
}, []string{"user"})
9293

94+
h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
95+
Name: "cortex_query_fetched_data_bytes_total",
96+
Help: "Size of all data fetched to execute a query in bytes.",
97+
}, []string{"user"})
98+
9399
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
94100
h.querySeconds.DeleteLabelValues(user)
95101
h.querySeries.DeleteLabelValues(user)
96102
h.queryBytes.DeleteLabelValues(user)
103+
h.queryDataBytes.DeleteLabelValues(user)
97104
})
98105
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
99106
_ = h.activeUsers.StartAsync(context.Background())
@@ -186,11 +193,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
186193
wallTime := stats.LoadWallTime()
187194
numSeries := stats.LoadFetchedSeries()
188195
numBytes := stats.LoadFetchedChunkBytes()
196+
numDataBytes := stats.LoadFetchedDataBytes()
189197

190198
// Track stats.
191199
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
192200
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
193201
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
202+
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
194203
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
195204

196205
// Log stats.
@@ -203,6 +212,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
203212
"query_wall_time_seconds", wallTime.Seconds(),
204213
"fetched_series_count", numSeries,
205214
"fetched_chunks_bytes", numBytes,
215+
"fetched_data_bytes", numDataBytes,
206216
}, formatQueryString(queryString)...)
207217

208218
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

pkg/ingester/client/custom.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,54 @@ func (m *QueryStreamResponse) ChunksSize() int {
2727
}
2828
return size
2929
}
30+
31+
// LabelsSize returns the size of all labels in the response.
32+
func (m *QueryStreamResponse) LabelsSize() int {
33+
if len(m.Timeseries) == 0 && len(m.Chunkseries) == 0 {
34+
return 0
35+
}
36+
37+
size := 0
38+
for _, entry := range m.Chunkseries {
39+
for _, label := range entry.Labels {
40+
size += label.Size()
41+
}
42+
}
43+
44+
for _, entry := range m.Timeseries {
45+
for _, label := range entry.Labels {
46+
size += label.Size()
47+
}
48+
}
49+
return size
50+
}
51+
52+
// LabelsSize returns the size of all labels in the response
53+
func (r *MetricsForLabelMatchersStreamResponse) LabelsSize() int {
54+
if len(r.Metric) == 0 {
55+
return 0
56+
}
57+
58+
size := 0
59+
for _, entry := range r.Metric {
60+
for _, label := range entry.Labels {
61+
size += label.Size()
62+
}
63+
}
64+
return size
65+
}
66+
67+
// LabelsSize returns the size of all labels in the response
68+
func (r *MetricsForLabelMatchersResponse) LabelsSize() int {
69+
if len(r.Metric) == 0 {
70+
return 0
71+
}
72+
73+
size := 0
74+
for _, entry := range r.Metric {
75+
for _, label := range entry.Labels {
76+
size += label.Size()
77+
}
78+
}
79+
return size
80+
}

0 commit comments

Comments
 (0)