Skip to content

Commit 491c4c6

Browse files
committed
fix remote read error in query frontend
Signed-off-by: Ben Ye <[email protected]>
1 parent 668fd67 commit 491c4c6

File tree

4 files changed

+107
-4
lines changed

4 files changed

+107
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
## master / unreleased
44
* [CHANGE] Store gateways summary metrics have been converted to histograms `cortex_bucket_store_series_blocks_queried`, `cortex_bucket_store_series_data_fetched`, `cortex_bucket_store_series_data_size_touched_bytes`, `cortex_bucket_store_series_data_size_fetched_bytes`, `cortex_bucket_store_series_data_touched`, `cortex_bucket_store_series_result_series` #5239
5-
65
* [ENHANCEMENT] Querier: Batch Iterator optimization to prevent transversing it multiple times query ranges steps does not overlap. #5237
76
* [BUGFIX] Catch context error in the s3 bucket client. #5240
7+
* [BUGFIX] Fix query frontend remote read empty body. #5257
88

99
## 1.15.0 in progress
1010

integration/e2ecortex/client.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ import (
1919
promapi "github.com/prometheus/client_golang/api"
2020
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
2121
"github.com/prometheus/common/model"
22+
"github.com/prometheus/prometheus/model/labels"
2223
"github.com/prometheus/prometheus/model/rulefmt"
2324
"github.com/prometheus/prometheus/prompb"
25+
"github.com/prometheus/prometheus/storage"
26+
"github.com/prometheus/prometheus/storage/remote"
2427
yaml "gopkg.in/yaml.v3"
2528

2629
"github.com/cortexproject/cortex/pkg/ruler"
@@ -153,6 +156,66 @@ func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) {
153156
return c.query(addr)
154157
}
155158

159+
// RemoteRead runs a remote read query.
160+
func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, step time.Duration) (*prompb.ReadResponse, error) {
161+
startMs := start.UnixMilli()
162+
endMs := end.UnixMilli()
163+
stepMs := step.Milliseconds()
164+
165+
q, err := remote.ToQuery(startMs, endMs, matchers, &storage.SelectHints{
166+
Step: stepMs,
167+
Start: startMs,
168+
End: endMs,
169+
})
170+
171+
req := &prompb.ReadRequest{
172+
Queries: []*prompb.Query{q},
173+
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
174+
}
175+
176+
data, err := proto.Marshal(req)
177+
if err != nil {
178+
return nil, err
179+
}
180+
compressed := snappy.Encode(nil, data)
181+
182+
// Call the remote read API endpoint with a timeout.
183+
httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
184+
defer cancel()
185+
186+
httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+c.querierAddress+"/prometheus/api/v1/read", bytes.NewReader(compressed))
187+
httpReq.Header.Set("X-Scope-OrgID", "user-1")
188+
httpReq.Header.Add("Content-Encoding", "snappy")
189+
httpReq.Header.Add("Accept-Encoding", "snappy")
190+
httpReq.Header.Set("Content-Type", "application/x-protobuf")
191+
httpReq.Header.Set("User-Agent", "Prometheus/1.8.2")
192+
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
193+
194+
httpResp, err := c.httpClient.Do(httpReq)
195+
if err != nil {
196+
return nil, err
197+
}
198+
if httpResp.StatusCode != http.StatusOK {
199+
return nil, fmt.Errorf("unexpected status code %d", httpResp.StatusCode)
200+
}
201+
202+
compressed, err = io.ReadAll(httpResp.Body)
203+
if err != nil {
204+
return nil, err
205+
}
206+
207+
uncompressed, err := snappy.Decode(nil, compressed)
208+
if err != nil {
209+
return nil, err
210+
}
211+
212+
var resp prompb.ReadResponse
213+
if err = proto.Unmarshal(uncompressed, &resp); err != nil {
214+
return nil, err
215+
}
216+
return &resp, nil
217+
}
218+
156219
func (c *Client) query(addr string) (*http.Response, []byte, error) {
157220
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
158221
defer cancel()

integration/query_frontend_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type queryFrontendTestConfig struct {
3131
testMissingMetricName bool
3232
querySchedulerEnabled bool
3333
queryStatsEnabled bool
34+
remoteReadEnabled bool
3435
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
3536
}
3637

@@ -194,6 +195,19 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
194195
})
195196
}
196197

198+
func TestQueryFrontendRemoteRead(t *testing.T) {
199+
runQueryFrontendTest(t, queryFrontendTestConfig{
200+
remoteReadEnabled: true,
201+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
202+
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
203+
204+
minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
205+
require.NoError(t, s.StartAndWaitReady(minio))
206+
return cortexConfigFile, flags
207+
},
208+
})
209+
}
210+
197211
func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
198212
const numUsers = 10
199213
const numQueriesPerUser = 10
@@ -307,6 +321,18 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
307321
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
308322
}
309323

324+
// No need to repeat the test on remote read for each user.
325+
if userID == 0 && cfg.remoteReadEnabled {
326+
start := time.Unix(1595846748, 806*1e6)
327+
end := time.Unix(1595846750, 806*1e6)
328+
res, err := c.RemoteRead([]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "series_1")}, start, end, time.Second)
329+
require.NoError(t, err)
330+
require.True(t, len(res.Results) > 0)
331+
require.True(t, len(res.Results[0].Timeseries) > 0)
332+
require.True(t, len(res.Results[0].Timeseries[0].Samples) > 0)
333+
require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0)
334+
}
335+
310336
// In this test we do ensure that the /series start/end time is ignored and Cortex
311337
// always returns series in ingesters memory. No need to repeat it for each user.
312338
if userID == 0 {

pkg/frontend/transport/handler.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/weaveworks/common/httpgrpc/server"
2323
"google.golang.org/grpc/status"
2424

25+
"github.com/cortexproject/cortex/pkg/ingester/client"
2526
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
2627
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2728
"github.com/cortexproject/cortex/pkg/tenant"
@@ -33,6 +34,9 @@ const (
3334
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
3435
StatusClientClosedRequest = 499
3536
ServiceTimingHeaderName = "Server-Timing"
37+
38+
// Queries are a set of matchers with time ranges - should not get into megabytes
39+
maxRemoteReadQuerySize = 1024 * 1024
3640
)
3741

3842
var (
@@ -139,10 +143,20 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
139143
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
140144
// We parse form here so that we can use buf as body, in order to
141145
// prevent https://github.com/cortexproject/cortex/issues/5201.
142-
if err := r.ParseForm(); err != nil {
143-
writeError(w, err)
144-
return
146+
if strings.Contains(r.URL.Path, "api/v1/read") {
147+
var req client.ReadRequest
148+
if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
149+
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "failed to parse proto", "err", err.Error())
150+
http.Error(w, err.Error(), http.StatusBadRequest)
151+
return
152+
}
153+
} else {
154+
if err := r.ParseForm(); err != nil {
155+
writeError(w, err)
156+
return
157+
}
145158
}
159+
146160
r.Body = io.NopCloser(&buf)
147161

148162
startTime := time.Now()

0 commit comments

Comments
 (0)