@@ -1032,7 +1032,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
10321032 limits : limits ,
10331033 })
10341034
1035- ctx = limiter .AddQueryLimiterToContext (ctx , limiter .NewQueryLimiter (0 , 0 , maxChunksLimit ))
1035+ ctx = limiter .AddQueryLimiterToContext (ctx , limiter .NewQueryLimiter (0 , 0 , maxChunksLimit , 0 ))
10361036
10371037 // Push a number of series below the max chunks limit. Each series has 1 sample,
10381038 // so expect 1 chunk per series when querying back.
@@ -1077,7 +1077,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
10771077 ctx := user .InjectOrgID (context .Background (), "user" )
10781078 limits := & validation.Limits {}
10791079 flagext .DefaultValues (limits )
1080- ctx = limiter .AddQueryLimiterToContext (ctx , limiter .NewQueryLimiter (maxSeriesLimit , 0 , 0 ))
1080+ ctx = limiter .AddQueryLimiterToContext (ctx , limiter .NewQueryLimiter (maxSeriesLimit , 0 , 0 , 0 ))
10811081
10821082 // Prepare distributors.
10831083 ds , _ , _ , _ := prepare (t , prepConfig {
@@ -1161,7 +1161,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
11611161 var maxBytesLimit = (seriesToAdd ) * responseChunkSize
11621162
11631163 // Update the limiter with the calculated limits.
1164- ctx = limiter .AddQueryLimiterToContext (ctx , limiter .NewQueryLimiter (0 , maxBytesLimit , 0 ))
1164+ ctx = limiter .AddQueryLimiterToContext (ctx , limiter .NewQueryLimiter (0 , maxBytesLimit , 0 , 0 ))
11651165
11661166 // Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
11671167 writeReq = makeWriteRequest (0 , seriesToAdd - 1 , 0 )
@@ -1192,6 +1192,75 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
11921192 assert .Equal (t , err , validation .LimitError (fmt .Sprintf (limiter .ErrMaxChunkBytesHit , maxBytesLimit )))
11931193}
11941194
1195+ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsReached (t * testing.T ) {
1196+ const seriesToAdd = 10
1197+
1198+ ctx := user .InjectOrgID (context .Background (), "user" )
1199+ limits := & validation.Limits {}
1200+ flagext .DefaultValues (limits )
1201+
1202+ // Prepare distributors.
1203+ // Use replication factor of 2 to always read all the chunks from both ingesters,
1204+ // this guarantees us to always read the same chunks and have a stable test.
1205+ ds , _ , _ , _ := prepare (t , prepConfig {
1206+ numIngesters : 2 ,
1207+ happyIngesters : 2 ,
1208+ numDistributors : 1 ,
1209+ shardByAllLabels : true ,
1210+ limits : limits ,
1211+ replicationFactor : 2 ,
1212+ })
1213+
1214+ allSeriesMatchers := []* labels.Matcher {
1215+ labels .MustNewMatcher (labels .MatchRegexp , model .MetricNameLabel , ".+" ),
1216+ }
1217+ // Push a single series to allow us to calculate the label size to calculate the limit for the test.
1218+ writeReq := & cortexpb.WriteRequest {}
1219+ writeReq .Timeseries = append (writeReq .Timeseries ,
1220+ makeWriteRequestTimeseries ([]cortexpb.LabelAdapter {{Name : model .MetricNameLabel , Value : "another_series" }}, 0 , 0 ),
1221+ )
1222+ writeRes , err := ds [0 ].Push (ctx , writeReq )
1223+ assert .Equal (t , & cortexpb.WriteResponse {}, writeRes )
1224+ assert .Nil (t , err )
1225+ dataSizeResponse , err := ds [0 ].QueryStream (ctx , math .MinInt32 , math .MaxInt32 , allSeriesMatchers ... )
1226+ require .NoError (t , err )
1227+
1228+ // Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size.
1229+ var dataSize = dataSizeResponse .Size ()
1230+ var maxBytesLimit = (seriesToAdd ) * dataSize * 2 // Multiplying by RF because the limit is applied before de-duping.
1231+
1232+ // Update the limiter with the calculated limits.
1233+ ctx = limiter .AddQueryLimiterToContext (ctx , limiter .NewQueryLimiter (0 , 0 , 0 , maxBytesLimit ))
1234+
1235+ // Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
1236+ writeReq = makeWriteRequest (0 , seriesToAdd - 1 , 0 )
1237+ writeRes , err = ds [0 ].Push (ctx , writeReq )
1238+ assert .Equal (t , & cortexpb.WriteResponse {}, writeRes )
1239+ assert .Nil (t , err )
1240+
1241+ // Since the number of chunk bytes is equal to the limit (but doesn't
1242+ // exceed it), we expect a query running on all series to succeed.
1243+ queryRes , err := ds [0 ].QueryStream (ctx , math .MinInt32 , math .MaxInt32 , allSeriesMatchers ... )
1244+ require .NoError (t , err )
1245+ assert .Len (t , queryRes .Chunkseries , seriesToAdd )
1246+
1247+ // Push another series to exceed the chunk bytes limit once we'll query back all series.
1248+ writeReq = & cortexpb.WriteRequest {}
1249+ writeReq .Timeseries = append (writeReq .Timeseries ,
1250+ makeWriteRequestTimeseries ([]cortexpb.LabelAdapter {{Name : model .MetricNameLabel , Value : "another_series_1" }}, 0 , 0 ),
1251+ )
1252+
1253+ writeRes , err = ds [0 ].Push (ctx , writeReq )
1254+ assert .Equal (t , & cortexpb.WriteResponse {}, writeRes )
1255+ assert .Nil (t , err )
1256+
1257+ // Since the aggregated chunk size is exceeding the limit, we expect
1258+ // a query running on all series to fail.
1259+ _ , err = ds [0 ].QueryStream (ctx , math .MinInt32 , math .MaxInt32 , allSeriesMatchers ... )
1260+ require .Error (t , err )
1261+ assert .Equal (t , err , validation .LimitError (fmt .Sprintf (limiter .ErrMaxDataBytesHit , maxBytesLimit )))
1262+ }
1263+
11951264func TestDistributor_Push_LabelRemoval (t * testing.T ) {
11961265 ctx := user .InjectOrgID (context .Background (), "user" )
11971266
@@ -1930,7 +1999,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19301999 },
19312000 expectedResult : []metric.Metric {},
19322001 expectedIngesters : numIngesters ,
1933- queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 ),
2002+ queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 , 0 ),
19342003 expectedErr : nil ,
19352004 },
19362005 "should filter metrics by single matcher" : {
@@ -1942,7 +2011,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19422011 {Metric : util .LabelsToMetric (fixtures [1 ].lbls )},
19432012 },
19442013 expectedIngesters : numIngesters ,
1945- queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 ),
2014+ queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 , 0 ),
19462015 expectedErr : nil ,
19472016 },
19482017 "should filter metrics by multiple matchers" : {
@@ -1954,7 +2023,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19542023 {Metric : util .LabelsToMetric (fixtures [0 ].lbls )},
19552024 },
19562025 expectedIngesters : numIngesters ,
1957- queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 ),
2026+ queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 , 0 ),
19582027 expectedErr : nil ,
19592028 },
19602029 "should return all matching metrics even if their FastFingerprint collide" : {
@@ -1966,7 +2035,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19662035 {Metric : util .LabelsToMetric (fixtures [4 ].lbls )},
19672036 },
19682037 expectedIngesters : numIngesters ,
1969- queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 ),
2038+ queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 , 0 ),
19702039 expectedErr : nil ,
19712040 },
19722041 "should query only ingesters belonging to tenant's subring if shuffle sharding is enabled" : {
@@ -1980,7 +2049,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19802049 {Metric : util .LabelsToMetric (fixtures [1 ].lbls )},
19812050 },
19822051 expectedIngesters : 3 ,
1983- queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 ),
2052+ queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 , 0 ),
19842053 expectedErr : nil ,
19852054 },
19862055 "should query all ingesters if shuffle sharding is enabled but shard size is 0" : {
@@ -1994,7 +2063,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19942063 {Metric : util .LabelsToMetric (fixtures [1 ].lbls )},
19952064 },
19962065 expectedIngesters : numIngesters ,
1997- queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 ),
2066+ queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 , 0 ),
19982067 expectedErr : nil ,
19992068 },
20002069 "should return err if series limit is exhausted" : {
@@ -2005,9 +2074,20 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
20052074 },
20062075 expectedResult : nil ,
20072076 expectedIngesters : numIngesters ,
2008- queryLimiter : limiter .NewQueryLimiter (1 , 0 , 0 ),
2077+ queryLimiter : limiter .NewQueryLimiter (1 , 0 , 0 , 0 ),
20092078 expectedErr : validation .LimitError (fmt .Sprintf (limiter .ErrMaxSeriesHit , 1 )),
20102079 },
2080+ "should return err if data bytes limit is exhausted" : {
2081+ shuffleShardEnabled : true ,
2082+ shuffleShardSize : 0 ,
2083+ matchers : []* labels.Matcher {
2084+ mustNewMatcher (labels .MatchEqual , model .MetricNameLabel , "test_1" ),
2085+ },
2086+ expectedResult : nil ,
2087+ expectedIngesters : numIngesters ,
2088+ queryLimiter : limiter .NewQueryLimiter (0 , 0 , 0 , 1 ),
2089+ expectedErr : validation .LimitError (fmt .Sprintf (limiter .ErrMaxDataBytesHit , 1 )),
2090+ },
20112091 "should not exhaust series limit when only one series is fetched" : {
20122092 matchers : []* labels.Matcher {
20132093 mustNewMatcher (labels .MatchEqual , model .MetricNameLabel , "test_2" ),
@@ -2016,7 +2096,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
20162096 {Metric : util .LabelsToMetric (fixtures [2 ].lbls )},
20172097 },
20182098 expectedIngesters : numIngesters ,
2019- queryLimiter : limiter .NewQueryLimiter (1 , 0 , 0 ),
2099+ queryLimiter : limiter .NewQueryLimiter (1 , 0 , 0 , 0 ),
20202100 expectedErr : nil ,
20212101 },
20222102 }
@@ -2116,7 +2196,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
21162196 matchers : []* labels.Matcher {
21172197 mustNewMatcher (labels .MatchRegexp , model .MetricNameLabel , "foo.+" ),
21182198 },
2119- queryLimiter : limiter .NewQueryLimiter (100 , 0 , 0 ),
2199+ queryLimiter : limiter .NewQueryLimiter (100 , 0 , 0 , 0 ),
21202200 expectedErr : nil ,
21212201 },
21222202 }
0 commit comments