Skip to content

Commit dfe3737

Browse files
authored
Create Span for the codec MergeResponse method (#5075)
* Create Span for the codec MergeResponse method Signed-off-by: Alan Protasio <[email protected]> * Changelog Signed-off-by: Alan Protasio <[email protected]> Signed-off-by: Alan Protasio <[email protected]>
1 parent 71206d3 commit dfe3737

File tree

10 files changed

+19
-11
lines changed

10 files changed

+19
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* [FEATURE] Ingester: Enable snapshotting of In-memory TSDB on disk during shutdown via `-blocks-storage.tsdb.memory-snapshot-on-shutdown`. #5011
1818
* [FEATURE] Query Frontend/Scheduler: Add a new counter metric `cortex_request_queue_requests_total` for total requests going to queue. #5030
1919
* [FEATURE] Build ARM docker images. #5041
20+
* [FEATURE] Query-frontend/Querier: Create spans to measure time to merge promql responses. #5041
2021
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
2122
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
2223
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,11 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
243243
return &resp, nil
244244
}
245245

246-
func (instantQueryCodec) MergeResponse(responses ...tripperware.Response) (tripperware.Response, error) {
246+
func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) {
247+
sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse")
248+
sp.SetTag("response_count", len(responses))
249+
defer sp.Finish()
250+
247251
if len(responses) == 0 {
248252
return NewEmptyPrometheusInstantQueryResponse(), nil
249253
} else if len(responses) == 1 {

pkg/querier/tripperware/instantquery/instant_query_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func TestMergeResponse(t *testing.T) {
308308
require.NoError(t, err)
309309
resps = append(resps, dr)
310310
}
311-
resp, err := InstantQueryCodec.MergeResponse(resps...)
311+
resp, err := InstantQueryCodec.MergeResponse(context.Background(), resps...)
312312
assert.Equal(t, err, tc.expectedErr)
313313
if err != nil {
314314
return

pkg/querier/tripperware/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type Codec interface {
4040
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
4141
type Merger interface {
4242
// MergeResponse merges responses from multiple requests into a single Response
43-
MergeResponse(...Response) (Response, error)
43+
MergeResponse(context.Context, ...Response) (Response, error)
4444
}
4545

4646
// Response represents a query range response.

pkg/querier/tripperware/queryrange/query_range.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ func NewEmptyPrometheusResponse() *PrometheusResponse {
127127
}
128128
}
129129

130-
func (c prometheusCodec) MergeResponse(responses ...tripperware.Response) (tripperware.Response, error) {
130+
func (c prometheusCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) {
131+
sp, _ := opentracing.StartSpanFromContext(ctx, "QueryRangeResponse.MergeResponse")
132+
sp.SetTag("response_count", len(responses))
133+
defer sp.Finish()
131134
if len(responses) == 0 {
132135
return NewEmptyPrometheusResponse(), nil
133136
}

pkg/querier/tripperware/queryrange/query_range_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ func TestMergeAPIResponses(t *testing.T) {
652652
},
653653
}} {
654654
t.Run(tc.name, func(t *testing.T) {
655-
output, err := PrometheusCodec.MergeResponse(tc.input...)
655+
output, err := PrometheusCodec.MergeResponse(context.Background(), tc.input...)
656656
require.NoError(t, err)
657657
require.Equal(t, tc.expected, output)
658658
})

pkg/querier/tripperware/queryrange/results_cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
407407
return nil, nil, err
408408
}
409409
if len(requests) == 0 {
410-
response, err := s.merger.MergeResponse(responses...)
410+
response, err := s.merger.MergeResponse(context.Background(), responses...)
411411
// No downstream requests so no need to write back to the cache.
412412
return response, nil, err
413413
}
@@ -469,7 +469,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
469469
if err != nil {
470470
return nil, nil, err
471471
}
472-
merged, err := s.merger.MergeResponse(accumulator.Response, currentRes)
472+
merged, err := s.merger.MergeResponse(ctx, accumulator.Response, currentRes)
473473
if err != nil {
474474
return nil, nil, err
475475
}
@@ -481,7 +481,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
481481
return nil, nil, err
482482
}
483483

484-
response, err := s.merger.MergeResponse(responses...)
484+
response, err := s.merger.MergeResponse(ctx, responses...)
485485
return response, mergedExtents, err
486486
}
487487

pkg/querier/tripperware/queryrange/split_by_interval.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripper
6161
resps = append(resps, reqResp.Response)
6262
}
6363

64-
response, err := s.merger.MergeResponse(resps...)
64+
response, err := s.merger.MergeResponse(ctx, resps...)
6565
if err != nil {
6666
return nil, err
6767
}

pkg/querier/tripperware/queryrange/split_by_interval_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func TestSplitQuery(t *testing.T) {
266266
}
267267

268268
func TestSplitByDay(t *testing.T) {
269-
mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse)
269+
mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), parsedResponse, parsedResponse)
270270
require.NoError(t, err)
271271

272272
mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse)

pkg/querier/tripperware/shard_by.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
7878
resps = append(resps, reqResp.Response)
7979
}
8080

81-
return s.merger.MergeResponse(resps...)
81+
return s.merger.MergeResponse(ctx, resps...)
8282
}
8383

8484
func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis querysharding.QueryAnalysis) []Request {

0 commit comments

Comments
 (0)