Skip to content

Commit cd2f041

Browse files
committed
fix remote read error in query frontend (cortexproject#5257)
* fix remote read error in query frontend Signed-off-by: Ben Ye <[email protected]> * fix integration test Signed-off-by: Ben Ye <[email protected]> * add extra one query Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent b4c8cf0 commit cd2f041

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
## master / unreleased
44
* [CHANGE] Store gateways summary metrics have been converted to histograms `cortex_bucket_store_series_blocks_queried`, `cortex_bucket_store_series_data_fetched`, `cortex_bucket_store_series_data_size_touched_bytes`, `cortex_bucket_store_series_data_size_fetched_bytes`, `cortex_bucket_store_series_data_touched`, `cortex_bucket_store_series_result_series` #5239
5-
65
* [ENHANCEMENT] Querier: Batch Iterator optimization to prevent transversing it multiple times query ranges steps does not overlap. #5237
76
* [BUGFIX] Catch context error in the s3 bucket client. #5240
7+
* [BUGFIX] Fix query frontend remote read empty body. #5257
88

99
## 1.15.0 2023-04-19
1010

integration/e2ecortex/client.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,72 @@ func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, st
300300
return &resp, nil
301301
}
302302

303+
// RemoteRead runs a remote read query.
304+
func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, step time.Duration) (*prompb.ReadResponse, error) {
305+
startMs := start.UnixMilli()
306+
endMs := end.UnixMilli()
307+
stepMs := step.Milliseconds()
308+
309+
q, err := remote.ToQuery(startMs, endMs, matchers, &storage.SelectHints{
310+
Step: stepMs,
311+
Start: startMs,
312+
End: endMs,
313+
})
314+
if err != nil {
315+
return nil, err
316+
}
317+
318+
req := &prompb.ReadRequest{
319+
Queries: []*prompb.Query{q},
320+
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
321+
}
322+
323+
data, err := proto.Marshal(req)
324+
if err != nil {
325+
return nil, err
326+
}
327+
compressed := snappy.Encode(nil, data)
328+
329+
// Call the remote read API endpoint with a timeout.
330+
httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
331+
defer cancel()
332+
333+
httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+c.querierAddress+"/prometheus/api/v1/read", bytes.NewReader(compressed))
334+
if err != nil {
335+
return nil, err
336+
}
337+
httpReq.Header.Set("X-Scope-OrgID", "user-1")
338+
httpReq.Header.Add("Content-Encoding", "snappy")
339+
httpReq.Header.Add("Accept-Encoding", "snappy")
340+
httpReq.Header.Set("Content-Type", "application/x-protobuf")
341+
httpReq.Header.Set("User-Agent", "Prometheus/1.8.2")
342+
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
343+
344+
httpResp, err := c.httpClient.Do(httpReq)
345+
if err != nil {
346+
return nil, err
347+
}
348+
if httpResp.StatusCode != http.StatusOK {
349+
return nil, fmt.Errorf("unexpected status code %d", httpResp.StatusCode)
350+
}
351+
352+
compressed, err = io.ReadAll(httpResp.Body)
353+
if err != nil {
354+
return nil, err
355+
}
356+
357+
uncompressed, err := snappy.Decode(nil, compressed)
358+
if err != nil {
359+
return nil, err
360+
}
361+
362+
var resp prompb.ReadResponse
363+
if err = proto.Unmarshal(uncompressed, &resp); err != nil {
364+
return nil, err
365+
}
366+
return &resp, nil
367+
}
368+
303369
func (c *Client) query(addr string) (*http.Response, []byte, error) {
304370
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
305371
defer cancel()

0 commit comments

Comments
 (0)