Skip to content

Commit 1072572

Browse files
authored
fix instant query bypassing limit middleware due to vertical sharding (#5832)
* fix instant query bypassing QFE instant query limit middleware Signed-off-by: Ben Ye <[email protected]> * integration test for max query length Signed-off-by: Ben Ye <[email protected]> * fix lint Signed-off-by: Ben Ye <[email protected]> * update doc Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent 498635f commit 1072572

File tree

5 files changed

+112
-39
lines changed

5 files changed

+112
-39
lines changed

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3187,9 +3187,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
31873187
# CLI flag: -querier.max-query-lookback
31883188
[max_query_lookback: <duration> | default = 0s]
31893189

3190-
# Limit the query time range (end - start time). This limit is enforced in the
3191-
# query-frontend (on the received query) and in the querier (on the query
3192-
# possibly split by the query-frontend). 0 to disable.
3190+
# Limit the query time range (end - start time of range query parameter and max
3191+
# - min of data fetched time range). This limit is enforced in the
3192+
# query-frontend and ruler (on the received query). 0 to disable.
31933193
# CLI flag: -store.max-query-length
31943194
[max_query_length: <duration> | default = 0s]
31953195

integration/query_frontend_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,3 +530,70 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) {
530530
// We shouldn't be able to see any retries.
531531
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_frontend_retries"}, e2e.WaitMissingMetrics))
532532
}
533+
534+
func TestQueryFrontendMaxQueryLengthLimits(t *testing.T) {
535+
const blockRangePeriod = 5 * time.Second
536+
537+
s, err := e2e.NewScenario(networkName)
538+
require.NoError(t, err)
539+
defer s.Close()
540+
541+
// Configure the blocks storage to frequently compact TSDB head
542+
// and ship blocks to the storage.
543+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
544+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
545+
"-blocks-storage.tsdb.ship-interval": "1s",
546+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
547+
"-blocks-storage.bucket-store.max-chunk-pool-bytes": "1",
548+
"-store.max-query-length": "30d",
549+
})
550+
551+
// Start dependencies.
552+
consul := e2edb.NewConsul()
553+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
554+
require.NoError(t, s.StartAndWaitReady(consul, minio))
555+
556+
// Start Cortex components for the write path.
557+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
558+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
559+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
560+
561+
// Wait until the distributor has updated the ring.
562+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
563+
564+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
565+
queryFrontendWithSharding := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend-sharding", "", mergeFlags(flags, map[string]string{
566+
"-frontend.query-vertical-shard-size": "2",
567+
}), "")
568+
require.NoError(t, s.Start(queryFrontend, queryFrontendWithSharding))
569+
570+
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
571+
require.NoError(t, err)
572+
cSharding, err := e2ecortex.NewClient("", queryFrontendWithSharding.HTTPEndpoint(), "", "", "user-1")
573+
require.NoError(t, err)
574+
575+
now := time.Now()
576+
// We expect request to hit max query length limit.
577+
resp, body, err := c.QueryRangeRaw(`rate(test[1m])`, now.Add(-90*time.Hour*24), now, 10*time.Hour)
578+
require.NoError(t, err)
579+
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
580+
require.Contains(t, string(body), "the query time range exceeds the limit")
581+
582+
// We expect request to hit max query length limit.
583+
resp, body, err = cSharding.QueryRangeRaw(`rate(test[1m])`, now.Add(-90*time.Hour*24), now, 10*time.Hour)
584+
require.NoError(t, err)
585+
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
586+
require.Contains(t, string(body), "the query time range exceeds the limit")
587+
588+
// We expect request to hit max query length limit.
589+
resp, body, err = c.QueryRaw(`rate(test[90d])`, now)
590+
require.NoError(t, err)
591+
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
592+
require.Contains(t, string(body), "the query time range exceeds the limit")
593+
594+
// We expect request to hit max query length limit.
595+
resp, body, err = cSharding.QueryRaw(`rate(test[90d])`, now)
596+
require.NoError(t, err)
597+
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
598+
require.Contains(t, string(body), "the query time range exceeds the limit")
599+
}

pkg/querier/tripperware/roundtrip.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"github.com/cortexproject/cortex/pkg/tenant"
3737
"github.com/cortexproject/cortex/pkg/util"
3838
util_log "github.com/cortexproject/cortex/pkg/util/log"
39-
"github.com/cortexproject/cortex/pkg/util/validation"
4039
)
4140

4241
// HandlerFunc is like http.HandlerFunc, but for Handler.
@@ -180,18 +179,6 @@ func NewQueryTripperware(
180179
if isQueryRange {
181180
return queryrange.RoundTrip(r)
182181
} else if isQuery {
183-
// If the given query is not shardable, use downstream roundtripper.
184-
query := r.FormValue("query")
185-
186-
// If vertical sharding is not enabled for the tenant, use downstream roundtripper.
187-
numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
188-
if numShards <= 1 {
189-
return next.RoundTrip(r)
190-
}
191-
analysis, err := queryAnalyzer.Analyze(query)
192-
if err != nil || !analysis.IsShardable() {
193-
return next.RoundTrip(r)
194-
}
195182
return instantQuery.RoundTrip(r)
196183
}
197184
return next.RoundTrip(r)

pkg/querier/tripperware/roundtrip_test.go

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import (
77
"net/http"
88
"net/http/httptest"
99
"net/url"
10+
"strings"
1011
"testing"
1112
"time"
1213

1314
"github.com/go-kit/log"
15+
"github.com/prometheus/common/model"
1416
"github.com/stretchr/testify/require"
1517
"github.com/thanos-io/thanos/pkg/querysharding"
1618
"github.com/weaveworks/common/httpgrpc"
@@ -22,12 +24,15 @@ import (
2224

2325
const (
2426
queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120"
25-
query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680"
26-
queryNonShardable = "/api/v1/query?time=1536716898&query=container_memory_rss&start=1536673680"
27+
query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29"
28+
queryNonShardable = "/api/v1/query?time=1536716898&query=container_memory_rss"
2729
queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'"
2830
querySubqueryStepSizeTooSmall = "/api/v1/query?query=up%5B30d%3A%5D"
31+
queryExceedsMaxQueryLength = "/api/v1/query?query=up%5B90d%5D"
32+
seriesQuery = "/api/v1/series?match[]"
2933

30-
responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}`
34+
responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}`
35+
instantResponseBody = `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}`
3136
)
3237

3338
type mockRequest struct {
@@ -45,9 +50,12 @@ type mockCodec struct {
4550
}
4651

4752
func (c mockCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (Request, error) {
48-
if r.URL.String() == query || r.URL.String() == queryRange {
53+
if strings.HasPrefix(r.URL.String(), "/api/v1/query_range") {
4954
return &mockRequest{resp: responseBody}, nil
5055
}
56+
if strings.HasPrefix(r.URL.String(), "/api/v1/query") {
57+
return &mockRequest{resp: instantResponseBody}, nil
58+
}
5159
return mockRequest{}, nil
5260
}
5361

@@ -91,13 +99,20 @@ func TestRoundTrip(t *testing.T) {
9199
next: http.DefaultTransport,
92100
}
93101

94-
middlewares := []Middleware{
102+
instantMiddlewares := []Middleware{
103+
MiddlewareFunc(func(next Handler) Handler {
104+
return mockMiddleware{}
105+
}),
106+
}
107+
rangeMiddlewares := []Middleware{
95108
MiddlewareFunc(func(next Handler) Handler {
96109
return mockMiddleware{}
97110
}),
98111
}
99112

100-
limits := validation.Limits{}
113+
limits := validation.Limits{
114+
MaxQueryLength: model.Duration(time.Hour * 24 * 60),
115+
}
101116
flagext.DefaultValues(&limits)
102117
defaultOverrides, err := validation.NewOverrides(limits, nil)
103118
require.NoError(t, err)
@@ -124,46 +139,46 @@ func TestRoundTrip(t *testing.T) {
124139
maxSubQuerySteps: 11000,
125140
},
126141
{
127-
path: queryRange,
128-
expectedBody: responseBody,
142+
path: seriesQuery,
143+
expectedBody: "bar",
129144
limits: defaultOverrides,
130145
maxSubQuerySteps: 11000,
131146
},
132147
{
133-
path: query,
134-
expectedBody: "bar",
148+
path: queryRange,
149+
expectedBody: responseBody,
135150
limits: defaultOverrides,
136151
maxSubQuerySteps: 11000,
137152
},
138153
{
139-
path: queryNonShardable,
140-
expectedBody: "bar",
154+
path: query,
155+
expectedBody: instantResponseBody,
141156
limits: defaultOverrides,
142157
maxSubQuerySteps: 11000,
143158
},
144159
{
145160
path: queryNonShardable,
146-
expectedBody: "bar",
147-
limits: shardingOverrides,
161+
expectedBody: instantResponseBody,
162+
limits: defaultOverrides,
148163
maxSubQuerySteps: 11000,
149164
},
150165
{
151166
path: query,
152-
expectedBody: responseBody,
167+
expectedBody: instantResponseBody,
153168
limits: shardingOverrides,
154169
maxSubQuerySteps: 11000,
155170
},
156171
// Shouldn't hit subquery step limit because max steps is set to 0 so this check is disabled.
157172
{
158173
path: querySubqueryStepSizeTooSmall,
159-
expectedBody: "bar",
174+
expectedBody: instantResponseBody,
160175
limits: defaultOverrides,
161176
maxSubQuerySteps: 0,
162177
},
163178
// Shouldn't hit subquery step limit because max steps is higher, which is 100K.
164179
{
165180
path: querySubqueryStepSizeTooSmall,
166-
expectedBody: "bar",
181+
expectedBody: instantResponseBody,
167182
limits: defaultOverrides,
168183
maxSubQuerySteps: 100000,
169184
},
@@ -173,11 +188,15 @@ func TestRoundTrip(t *testing.T) {
173188
limits: defaultOverrides,
174189
maxSubQuerySteps: 11000,
175190
},
191+
{
192+
// The query should go to instant query middlewares rather than forwarding to next.
193+
path: queryExceedsMaxQueryLength,
194+
expectedBody: instantResponseBody,
195+
limits: defaultOverrides,
196+
maxSubQuerySteps: 11000,
197+
},
176198
} {
177199
t.Run(tc.path, func(t *testing.T) {
178-
if tc.path != querySubqueryStepSizeTooSmall {
179-
return
180-
}
181200
//parallel testing causes data race
182201
req, err := http.NewRequest("GET", tc.path, http.NoBody)
183202
require.NoError(t, err)
@@ -193,8 +212,8 @@ func TestRoundTrip(t *testing.T) {
193212
tw := NewQueryTripperware(log.NewNopLogger(),
194213
nil,
195214
nil,
196-
middlewares,
197-
middlewares,
215+
rangeMiddlewares,
216+
instantMiddlewares,
198217
mockCodec{},
199218
mockCodec{},
200219
tc.limits,

pkg/util/validation/limits.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
209209
f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable")
210210
f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
211211
f.IntVar(&l.MaxFetchedDataBytesPerQuery, "querier.max-fetched-data-bytes-per-query", 0, "The maximum combined size of all data that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler for `query`, `query_range` and `series` APIs. 0 to disable.")
212-
f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query) and in the querier (on the query possibly split by the query-frontend). 0 to disable.")
212+
f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time of range query parameter and max - min of data fetched time range). This limit is enforced in the query-frontend and ruler (on the received query). 0 to disable.")
213213
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until <lookback> duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
214214
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.")
215215
_ = l.MaxCacheFreshness.Set("1m")

0 commit comments

Comments
 (0)