Skip to content

Commit 2587e15

Browse files
authored
Handle grpc code resource exhausted for store gateway (#5286)
* handle grpc code resource exhausted for store gateway Signed-off-by: Ben Ye <[email protected]> * fix lint Signed-off-by: Ben Ye <[email protected]> * update changelog Signed-off-by: Ben Ye <[email protected]> * try fixing test Signed-off-by: Ben Ye <[email protected]> * try to fix E2E test Signed-off-by: Ben Ye <[email protected]> * lint Signed-off-by: Ben Ye <[email protected]> * try again Signed-off-by: Ben Ye <[email protected]> * fix message Signed-off-by: Ben Ye <[email protected]> * remove labels API Signed-off-by: Ben Ye <[email protected]> * remove logic to check string contains Signed-off-by: Ben Ye <[email protected]> * make limiter vars private Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent 58b59f4 commit 2587e15

File tree

8 files changed

+189
-15
lines changed

8 files changed

+189
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
5+
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
56

67
## 1.15.0 2023-04-19
78

integration/e2ecortex/client.go

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,88 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur
150150
}
151151

152152
// QueryRaw runs a query directly against the querier API.
153-
func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) {
154-
addr := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=%s", c.querierAddress, url.QueryEscape(query))
153+
func (c *Client) QueryRaw(query string, ts time.Time) (*http.Response, []byte, error) {
154+
u := &url.URL{
155+
Scheme: "http",
156+
Path: fmt.Sprintf("%s/api/prom/api/v1/query", c.querierAddress),
157+
}
158+
q := u.Query()
159+
q.Set("query", query)
155160

156-
return c.query(addr)
161+
if !ts.IsZero() {
162+
q.Set("time", FormatTime(ts))
163+
}
164+
u.RawQuery = q.Encode()
165+
return c.query(u.String())
166+
}
167+
168+
// SeriesRaw runs a series request directly against the querier API.
169+
func (c *Client) SeriesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) {
170+
u := &url.URL{
171+
Scheme: "http",
172+
Path: fmt.Sprintf("%s/api/prom/api/v1/series", c.querierAddress),
173+
}
174+
q := u.Query()
175+
176+
for _, m := range matches {
177+
q.Add("match[]", m)
178+
}
179+
180+
if !startTime.IsZero() {
181+
q.Set("start", FormatTime(startTime))
182+
}
183+
if !endTime.IsZero() {
184+
q.Set("end", FormatTime(endTime))
185+
}
186+
187+
u.RawQuery = q.Encode()
188+
return c.query(u.String())
189+
}
190+
191+
// LabelNamesRaw runs a label names request directly against the querier API.
192+
func (c *Client) LabelNamesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) {
193+
u := &url.URL{
194+
Scheme: "http",
195+
Path: fmt.Sprintf("%s/api/prom/api/v1/labels", c.querierAddress),
196+
}
197+
q := u.Query()
198+
199+
for _, m := range matches {
200+
q.Add("match[]", m)
201+
}
202+
203+
if !startTime.IsZero() {
204+
q.Set("start", FormatTime(startTime))
205+
}
206+
if !endTime.IsZero() {
207+
q.Set("end", FormatTime(endTime))
208+
}
209+
210+
u.RawQuery = q.Encode()
211+
return c.query(u.String())
212+
}
213+
214+
// LabelValuesRaw runs a label values request directly against the querier API.
215+
func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) {
216+
u := &url.URL{
217+
Scheme: "http",
218+
Path: fmt.Sprintf("%s/api/prom/api/v1/label/%s/values", c.querierAddress, label),
219+
}
220+
q := u.Query()
221+
222+
for _, m := range matches {
223+
q.Add("match[]", m)
224+
}
225+
226+
if !startTime.IsZero() {
227+
q.Set("start", FormatTime(startTime))
228+
}
229+
if !endTime.IsZero() {
230+
q.Set("end", FormatTime(endTime))
231+
}
232+
233+
u.RawQuery = q.Encode()
234+
return c.query(u.String())
157235
}
158236

159237
// RemoteRead runs a remote read query.
@@ -259,8 +337,8 @@ func (c *Client) LabelValues(label string, start, end time.Time, matches []strin
259337
}
260338

261339
// LabelNames gets label names
262-
func (c *Client) LabelNames(start, end time.Time) ([]string, error) {
263-
result, _, err := c.querierClient.LabelNames(context.Background(), nil, start, end)
340+
func (c *Client) LabelNames(start, end time.Time, matchers ...string) ([]string, error) {
341+
result, _, err := c.querierClient.LabelNames(context.Background(), matchers, start, end)
264342
return result, err
265343
}
266344

integration/querier_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55

66
import (
77
"fmt"
8+
"net/http"
89
"strconv"
910
"strings"
1011
"testing"
@@ -818,6 +819,90 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) {
818819
assert.Contains(t, err.Error(), "500")
819820
}
820821

822+
func TestQuerierWithBlocksStorageLimits(t *testing.T) {
823+
const blockRangePeriod = 5 * time.Second
824+
825+
s, err := e2e.NewScenario(networkName)
826+
require.NoError(t, err)
827+
defer s.Close()
828+
829+
// Configure the blocks storage to frequently compact TSDB head
830+
// and ship blocks to the storage.
831+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
832+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
833+
"-blocks-storage.tsdb.ship-interval": "1s",
834+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
835+
})
836+
837+
// Start dependencies.
838+
consul := e2edb.NewConsul()
839+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
840+
require.NoError(t, s.StartAndWaitReady(consul, minio))
841+
842+
// Start Cortex components for the write path.
843+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
844+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
845+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
846+
847+
// Wait until the distributor has updated the ring.
848+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
849+
850+
// Push some series to Cortex.
851+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
852+
require.NoError(t, err)
853+
854+
seriesTimestamp := time.Now()
855+
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
856+
series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"})
857+
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"})
858+
859+
res, err := c.Push(series1)
860+
require.NoError(t, err)
861+
require.Equal(t, 200, res.StatusCode)
862+
863+
res, err = c.Push(series2)
864+
require.NoError(t, err)
865+
require.Equal(t, 200, res.StatusCode)
866+
867+
// Wait until the TSDB head is compacted and shipped to the storage.
868+
// The shipped block contains the 1st series, while the 2ns series in in the head.
869+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
870+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total"))
871+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))
872+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))
873+
874+
// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
875+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
876+
"-blocks-storage.bucket-store.sync-interval": "5s",
877+
"-querier.max-fetched-series-per-query": "1",
878+
}), "")
879+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
880+
"-blocks-storage.bucket-store.sync-interval": "5s",
881+
"-querier.max-fetched-series-per-query": "1",
882+
}), "")
883+
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
884+
885+
// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
886+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
887+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
888+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))
889+
890+
// Query back the series.
891+
c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
892+
require.NoError(t, err)
893+
894+
// We expect all queries hitting 422 exceeded series limit
895+
resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp)
896+
require.NoError(t, err)
897+
require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
898+
require.Contains(t, string(body), "max number of series limit")
899+
900+
resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp)
901+
require.NoError(t, err)
902+
require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
903+
require.Contains(t, string(body), "max number of series limit")
904+
}
905+
821906
func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) {
822907
const blockRangePeriod = 5 * time.Second
823908

integration/query_frontend_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
293293

294294
// No need to repeat the test on missing metric name for each user.
295295
if userID == 0 && cfg.testMissingMetricName {
296-
res, body, err := c.QueryRaw("{instance=~\"hello.*\"}")
296+
res, body, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now())
297297
require.NoError(t, err)
298298
require.Equal(t, 422, res.StatusCode)
299299
require.Contains(t, string(body), "query must contain metric name")
@@ -317,7 +317,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
317317

318318
// No need to repeat the test on Server-Timing header for each user.
319319
if userID == 0 && cfg.queryStatsEnabled {
320-
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}")
320+
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now())
321321
require.NoError(t, err)
322322
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
323323
}

integration/query_fuzz_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ func TestVerticalShardingFuzz(t *testing.T) {
133133
opts := []promqlsmith.Option{
134134
promqlsmith.WithEnableOffset(true),
135135
promqlsmith.WithEnableAtModifier(true),
136-
promqlsmith.WithEnableVectorMatching(true),
137136
}
138137
ps := promqlsmith.New(rnd, lbls, opts...)
139138

integration/zone_aware_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestZoneAwareReplication(t *testing.T) {
135135
require.NoError(t, ingester3.Kill())
136136

137137
// Query back any series => fail (either because of a timeout or 500)
138-
result, _, err := client.QueryRaw("series_1")
138+
result, _, err := client.QueryRaw("series_1", time.Now())
139139
if !errors.Is(err, context.DeadlineExceeded) {
140140
require.NoError(t, err)
141141
require.Equal(t, 500, result.StatusCode)

pkg/querier/blocks_store_queryable.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
639639
}
640640

641641
if err != nil {
642+
s, ok := status.FromError(err)
643+
if !ok {
644+
s, ok = status.FromError(errors.Cause(err))
645+
}
646+
647+
if ok {
648+
if s.Code() == codes.ResourceExhausted {
649+
return validation.LimitError(s.Message())
650+
}
651+
}
642652
return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress())
643653
}
644654

@@ -763,10 +773,11 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
763773
namesResp, err := c.LabelNames(gCtx, req)
764774
if err != nil {
765775
if isRetryableError(err) {
766-
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress()))
776+
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label names from %s due to retryable error", c.RemoteAddress()))
767777
return nil
768778
}
769-
return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress())
779+
780+
return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress())
770781
}
771782

772783
myQueriedBlocks := []ulid.ULID(nil)
@@ -844,10 +855,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
844855
valuesResp, err := c.LabelValues(gCtx, req)
845856
if err != nil {
846857
if isRetryableError(err) {
847-
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress()))
858+
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label values from %s due to retryable error", c.RemoteAddress()))
848859
return nil
849860
}
850-
return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress())
861+
return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress())
851862
}
852863

853864
myQueriedBlocks := []ulid.ULID(nil)

pkg/querier/error_translate_queryable.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package querier
33
import (
44
"context"
55

6-
"github.com/cortexproject/cortex/pkg/util/validation"
7-
86
"github.com/gogo/status"
97
"github.com/pkg/errors"
108
"github.com/prometheus/prometheus/model/labels"
119
"github.com/prometheus/prometheus/promql"
1210
"github.com/prometheus/prometheus/storage"
11+
12+
"github.com/cortexproject/cortex/pkg/util/validation"
1313
)
1414

1515
// TranslateToPromqlAPIError converts error to one of promql.Errors for consumption in PromQL API.

0 commit comments

Comments
 (0)