Skip to content

Commit 7e37e92

Browse files
authored
log query response size in QFE and querier (#5288)
* log final query response size in QFE Signed-off-by: Ben Ye <[email protected]> enable response size message in processor Signed-off-by: Ben Ye <[email protected]> * fix import Signed-off-by: Ben Ye <[email protected]> * remove the response size counter metric Signed-off-by: Ben Ye <[email protected]> * update content encoding log Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent 2587e15 commit 7e37e92

File tree

5 files changed

+66
-4
lines changed

5 files changed

+66
-4
lines changed

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
346346
}
347347

348348
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
349+
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
349350
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
350351
}
351352

pkg/frontend/transport/handler.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
177177
}
178178
}
179179

180-
f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode)
180+
f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode, resp)
181181
}
182182

183183
if err != nil {
@@ -232,7 +232,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
232232
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
233233
}
234234

235-
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int) {
235+
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
236236
tenantIDs, err := tenant.TenantIDs(r.Context())
237237
if err != nil {
238238
return
@@ -252,6 +252,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
252252
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
253253
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
254254

255+
var (
256+
contentLength int64
257+
encoding string
258+
)
259+
if resp != nil {
260+
contentLength = resp.ContentLength
261+
encoding = resp.Header.Get("Content-Encoding")
262+
}
263+
255264
// Log stats.
256265
logMessage := append([]interface{}{
257266
"msg", "query stats",
@@ -266,6 +275,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
266275
"fetched_chunks_bytes", numChunkBytes,
267276
"fetched_data_bytes", numDataBytes,
268277
"status_code", statusCode,
278+
"response_size", contentLength,
269279
}, stats.LoadExtraFields()...)
270280

271281
logMessage = append(logMessage, formatQueryString(queryString)...)
@@ -274,6 +284,10 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
274284
logMessage = append(logMessage, grafanaFields...)
275285
}
276286

287+
if len(encoding) > 0 {
288+
logMessage = append(logMessage, "content_encoding", encoding)
289+
}
290+
277291
if error != nil {
278292
s, ok := status.FromError(error)
279293
if !ok {

pkg/querier/worker/frontend_processor.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"net/textproto"
78
"time"
89

910
"github.com/go-kit/log"
1011
"github.com/go-kit/log/level"
1112
"github.com/weaveworks/common/httpgrpc"
13+
"github.com/weaveworks/common/user"
1214
"google.golang.org/grpc"
1315

1416
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
1517
"github.com/cortexproject/cortex/pkg/querier/stats"
1618
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
1719
"github.com/cortexproject/cortex/pkg/util/backoff"
20+
util_log "github.com/cortexproject/cortex/pkg/util/log"
1821
)
1922

2023
var (
@@ -30,6 +33,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) pr
3033
handler: handler,
3134
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
3235
querierID: cfg.QuerierID,
36+
targetHeaders: cfg.TargetHeaders,
3337
}
3438
}
3539

@@ -40,6 +44,8 @@ type frontendProcessor struct {
4044
querierID string
4145

4246
log log.Logger
47+
48+
targetHeaders []string
4349
}
4450

4551
// notifyShutdown implements processor.
@@ -120,6 +126,24 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
120126
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
121127
}
122128

129+
headers := make(map[string]string, 0)
130+
for _, h := range request.Headers {
131+
headers[h.Key] = h.Values[0]
132+
}
133+
headerMap := make(map[string]string, 0)
134+
// Remove non-existent header.
135+
for _, header := range fp.targetHeaders {
136+
if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok {
137+
headerMap[header] = v
138+
}
139+
}
140+
orgID, ok := headers[textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName)]
141+
if ok {
142+
ctx = user.InjectOrgID(ctx, orgID)
143+
}
144+
ctx = util_log.ContextWithHeaderMap(ctx, headerMap)
145+
logger := util_log.WithContext(ctx, fp.log)
146+
123147
response, err := fp.handler.Handle(ctx, request)
124148
if err != nil {
125149
var ok bool
@@ -131,6 +155,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
131155
}
132156
}
133157
}
158+
if statsEnabled {
159+
level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody()))
160+
}
134161

135162
// Ensure responses that are too big are not retried.
136163
if len(response.Body) >= fp.maxMessageSize {

pkg/querier/worker/scheduler_processor.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"net/textproto"
78
"time"
89

910
"github.com/go-kit/log"
@@ -37,7 +38,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
3738
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
3839
querierID: cfg.QuerierID,
3940
grpcConfig: cfg.GRPCClientConfig,
40-
41+
targetHeaders: cfg.TargetHeaders,
4142
frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
4243
Name: "cortex_querier_query_frontend_request_duration_seconds",
4344
Help: "Time spend doing requests to frontend.",
@@ -70,6 +71,8 @@ type schedulerProcessor struct {
7071

7172
frontendPool *client.Pool
7273
frontendClientRequestDuration *prometheus.HistogramVec
74+
75+
targetHeaders []string
7376
}
7477

7578
// notifyShutdown implements processor.
@@ -130,6 +133,19 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
130133
// We need to inject user into context for sending response back.
131134
ctx := user.InjectOrgID(ctx, request.UserID)
132135

136+
headers := make(map[string]string, 0)
137+
for _, h := range request.HttpRequest.Headers {
138+
headers[h.Key] = h.Values[0]
139+
}
140+
headerMap := make(map[string]string, 0)
141+
// Remove non-existent header.
142+
for _, header := range sp.targetHeaders {
143+
if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok {
144+
headerMap[header] = v
145+
}
146+
}
147+
ctx = util_log.ContextWithHeaderMap(ctx, headerMap)
148+
133149
tracer := opentracing.GlobalTracer()
134150
// Ignore errors here. If we cannot get parent span, we just don't create new one.
135151
parentSpanContext, _ := httpgrpcutil.GetParentSpanForRequest(tracer, request.HttpRequest)
@@ -140,7 +156,6 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
140156
ctx = spanCtx
141157
}
142158
logger := util_log.WithContext(ctx, sp.log)
143-
144159
sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)
145160

146161
// Report back to scheduler that processing of the query has finished.
@@ -168,6 +183,9 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger,
168183
}
169184
}
170185
}
186+
if statsEnabled {
187+
level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody()))
188+
}
171189

172190
// Ensure responses that are too big are not retried.
173191
if len(response.Body) >= sp.maxMessageSize {

pkg/querier/worker/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type Config struct {
3131
QuerierID string `yaml:"id"`
3232

3333
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
34+
35+
TargetHeaders []string `yaml:"-"` // Propagated by config.
3436
}
3537

3638
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

0 commit comments

Comments
 (0)