From 491c4c6b979ef178b56348362f9a2f9878c53523 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 10 Apr 2023 19:16:01 -0700 Subject: [PATCH 1/3] fix remote read error in query frontend Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- integration/e2ecortex/client.go | 63 ++++++++++++++++++++++++++++++ integration/query_frontend_test.go | 26 ++++++++++++ pkg/frontend/transport/handler.go | 20 ++++++++-- 4 files changed, 107 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c3f77ae42a..0b64b0da013 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,9 @@ ## master / unreleased * [CHANGE] Store gateways summary metrics have been converted to histograms `cortex_bucket_store_series_blocks_queried`, `cortex_bucket_store_series_data_fetched`, `cortex_bucket_store_series_data_size_touched_bytes`, `cortex_bucket_store_series_data_size_fetched_bytes`, `cortex_bucket_store_series_data_touched`, `cortex_bucket_store_series_result_series` #5239 - * [ENHANCEMENT] Querier: Batch Iterator optimization to prevent transversing it multiple times query ranges steps does not overlap. #5237 * [BUGFIX] Catch context error in the s3 bucket client. #5240 +* [BUGFIX] Fix query frontend remote read empty body. #5257 ## 1.15.0 in progress diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index feb643c37bb..5ed269c433c 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -19,8 +19,11 @@ import ( promapi "github.com/prometheus/client_golang/api" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" yaml "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/ruler" @@ -153,6 +156,66 @@ func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) { return c.query(addr) } +// RemoteRead runs a remote read query. +func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, step time.Duration) (*prompb.ReadResponse, error) { + startMs := start.UnixMilli() + endMs := end.UnixMilli() + stepMs := step.Milliseconds() + + q, err := remote.ToQuery(startMs, endMs, matchers, &storage.SelectHints{ + Step: stepMs, + Start: startMs, + End: endMs, + }) + + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{q}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, err + } + compressed := snappy.Encode(nil, data) + + // Call the remote read API endpoint with a timeout. + httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+c.querierAddress+"/prometheus/api/v1/read", bytes.NewReader(compressed)) + httpReq.Header.Set("X-Scope-OrgID", "user-1") + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Add("Accept-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("User-Agent", "Prometheus/1.8.2") + httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + + httpResp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpResp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d", httpResp.StatusCode) + } + + compressed, err = io.ReadAll(httpResp.Body) + if err != nil { + return nil, err + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var resp prompb.ReadResponse + if err = proto.Unmarshal(uncompressed, &resp); err != nil { + return nil, err + } + return &resp, nil +} + func (c *Client) query(addr string) (*http.Response, []byte, error) { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index a0a0a702529..c8c8d42ee1f 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -31,6 +31,7 @@ type queryFrontendTestConfig struct { testMissingMetricName bool querySchedulerEnabled bool queryStatsEnabled bool + remoteReadEnabled bool setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) } @@ -194,6 +195,19 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) { }) } +func TestQueryFrontendRemoteRead(t *testing.T) { + runQueryFrontendTest(t, queryFrontendTestConfig{ + remoteReadEnabled: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) + + minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + return cortexConfigFile, flags + }, + }) +} + func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { const numUsers = 10 const numQueriesPerUser = 10 @@ -307,6 +321,18 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) } + // No need to repeat the test on remote read for each user. + if userID == 0 && cfg.remoteReadEnabled { + start := time.Unix(1595846748, 806*1e6) + end := time.Unix(1595846750, 806*1e6) + res, err := c.RemoteRead([]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "series_1")}, start, end, time.Second) + require.NoError(t, err) + require.True(t, len(res.Results) > 0) + require.True(t, len(res.Results[0].Timeseries) > 0) + require.True(t, len(res.Results[0].Timeseries[0].Samples) > 0) + require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0) + } + // In this test we do ensure that the /series start/end time is ignored and Cortex // always returns series in ingesters memory. No need to repeat it for each user. if userID == 0 { diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index a5174477b00..5882b23ee6e 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -22,6 +22,7 @@ import ( "github.com/weaveworks/common/httpgrpc/server" "google.golang.org/grpc/status" + "github.com/cortexproject/cortex/pkg/ingester/client" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" @@ -33,6 +34,9 @@ const ( // StatusClientClosedRequest is the status code for when a client request cancellation of an http request StatusClientClosedRequest = 499 ServiceTimingHeaderName = "Server-Timing" + + // Queries are a set of matchers with time ranges - should not get into megabytes + maxRemoteReadQuerySize = 1024 * 1024 ) var ( @@ -139,10 +143,20 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(io.TeeReader(r.Body, &buf)) // We parse form here so that we can use buf as body, in order to // prevent https://github.com/cortexproject/cortex/issues/5201. - if err := r.ParseForm(); err != nil { - writeError(w, err) - return + if strings.Contains(r.URL.Path, "api/v1/read") { + var req client.ReadRequest + if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil { + level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "failed to parse proto", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } else { + if err := r.ParseForm(); err != nil { + writeError(w, err) + return + } } + r.Body = io.NopCloser(&buf) startTime := time.Now() From d0aa4d28b1a85bfd75f461cee964703b17408ed1 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 10 Apr 2023 21:16:24 -0700 Subject: [PATCH 2/3] fix integration test Signed-off-by: Ben Ye --- integration/e2ecortex/client.go | 6 ++++++ integration/query_frontend_test.go | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 5ed269c433c..adfc99faf4c 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -167,6 +167,9 @@ func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, st Start: startMs, End: endMs, }) + if err != nil { + return nil, err + } req := &prompb.ReadRequest{ Queries: []*prompb.Query{q}, @@ -184,6 +187,9 @@ func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, st defer cancel() httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+c.querierAddress+"/prometheus/api/v1/read", bytes.NewReader(compressed)) + if err != nil { + return nil, err + } httpReq.Header.Set("X-Scope-OrgID", "user-1") httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Add("Accept-Encoding", "snappy") diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index c8c8d42ee1f..5c2a7ebf304 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -323,9 +323,9 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on remote read for each user. if userID == 0 && cfg.remoteReadEnabled { - start := time.Unix(1595846748, 806*1e6) - end := time.Unix(1595846750, 806*1e6) - res, err := c.RemoteRead([]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "series_1")}, start, end, time.Second) + start := now.Add(-1 * time.Hour) + end := now.Add(1 * time.Hour) + res, err := c.RemoteRead([]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "series_1")}, start, end, time.Second) require.NoError(t, err) require.True(t, len(res.Results) > 0) require.True(t, len(res.Results[0].Timeseries) > 0) From bc032b857204e4ba3cf10d0c35c358a356ee8cb2 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 11 Apr 2023 09:29:43 -0700 Subject: [PATCH 3/3] add extra one query Signed-off-by: Ben Ye --- integration/query_frontend_test.go | 4 ++++ pkg/frontend/transport/handler.go | 17 +++-------------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 5c2a7ebf304..6df5d101015 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -368,6 +368,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { extra++ } + if cfg.remoteReadEnabled { + extra++ + } + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total")) // The number of received request is greater than the query requests because include diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 5882b23ee6e..fdf8ae27c03 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -22,7 +22,6 @@ import ( "github.com/weaveworks/common/httpgrpc/server" "google.golang.org/grpc/status" - "github.com/cortexproject/cortex/pkg/ingester/client" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" @@ -34,9 +33,6 @@ const ( // StatusClientClosedRequest is the status code for when a client request cancellation of an http request StatusClientClosedRequest = 499 ServiceTimingHeaderName = "Server-Timing" - - // Queries are a set of matchers with time ranges - should not get into megabytes - maxRemoteReadQuerySize = 1024 * 1024 ) var ( @@ -143,22 +139,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(io.TeeReader(r.Body, &buf)) // We parse form here so that we can use buf as body, in order to // prevent https://github.com/cortexproject/cortex/issues/5201. - if strings.Contains(r.URL.Path, "api/v1/read") { - var req client.ReadRequest - if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil { - level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "failed to parse proto", "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - } else { + // Exclude remote read here as we don't have to buffer its body. + if !strings.Contains(r.URL.Path, "api/v1/read") { if err := r.ParseForm(); err != nil { writeError(w, err) return } + r.Body = io.NopCloser(&buf) } - r.Body = io.NopCloser(&buf) - startTime := time.Now() resp, err := f.roundTripper.RoundTrip(r) queryResponseTime := time.Since(startTime)