Skip to content

Commit c412aae

Browse files
authored
Adds minimal traceparent header support to Elasticsearch (#74210)
This adds just enough support for the traceparent header to be useful in es8. Since Elasticsearch already logs in ECS format extending it with support for transaction.id and trace.id is a quick win. This allows us to surface server/deprecation slow logs from an instrumented application using the Trace Logs feature. Parsing `traceparent` in http layer and populating tasks with `trace_id` which is preserved in thread context.
1 parent 40b1dc0 commit c412aae

File tree

9 files changed

+168
-27
lines changed

9 files changed

+168
-27
lines changed

qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public void testDeprecatedMessageWithoutXOpaqueId() throws IOException {
115115
public void testCompatibleLog() throws Exception {
116116
withThreadContext(threadContext -> {
117117
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
118+
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
118119
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
119120
testLogger.deprecate(DeprecationCategory.OTHER,"someKey", "deprecated message1")
120121
.compatibleApiWarning("compatibleKey","compatible API message");
@@ -143,6 +144,7 @@ public void testCompatibleLog() throws Exception {
143144
hasEntry("message", "deprecated message1"),
144145
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "someKey"),
145146
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
147+
hasEntry(Task.TRACE_ID, "someTraceId"),
146148
hasEntry("elasticsearch.event.category", "other")
147149
),
148150
allOf(
@@ -159,6 +161,7 @@ public void testCompatibleLog() throws Exception {
159161
hasEntry("message", "compatible API message"),
160162
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "compatibleKey"),
161163
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
164+
hasEntry(Task.TRACE_ID, "someTraceId"),
162165
hasEntry("elasticsearch.event.category", "compatible_api")
163166
)
164167
)
@@ -172,6 +175,7 @@ public void testCompatibleLog() throws Exception {
172175
public void testParseFieldEmittingDeprecatedLogs() throws Exception {
173176
withThreadContext(threadContext -> {
174177
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
178+
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
175179

176180
ParseField deprecatedField = new ParseField("new_name", "deprecated_name");
177181
assertTrue(deprecatedField.match("deprecated_name", LoggingDeprecationHandler.INSTANCE));
@@ -208,6 +212,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
208212
hasEntry("message", "Deprecated field [deprecated_name] used, expected [new_name] instead"),
209213
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name"),
210214
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
215+
hasEntry(Task.TRACE_ID, "someTraceId"),
211216
hasEntry("elasticsearch.event.category", "api")
212217
),
213218
// deprecation log for field deprecated_name2 (note it is not being throttled)
@@ -224,6 +229,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
224229
hasEntry("message", "Deprecated field [deprecated_name2] used, expected [new_name] instead"),
225230
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name2"),
226231
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
232+
hasEntry(Task.TRACE_ID, "someTraceId"),
227233
hasEntry("elasticsearch.event.category", "api")
228234
),
229235
// compatible log line
@@ -240,6 +246,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
240246
hasEntry("message", "Deprecated field [compatible_deprecated_name] used, expected [new_name] instead"),
241247
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_compatible_deprecated_name"),
242248
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
249+
hasEntry(Task.TRACE_ID, "someTraceId"),
243250
hasEntry("elasticsearch.event.category", "compatible_api")
244251
)
245252
)
@@ -255,6 +262,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
255262
public void testDeprecatedMessage() throws Exception {
256263
withThreadContext(threadContext -> {
257264
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
265+
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
258266
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
259267
testLogger.deprecate(DeprecationCategory.OTHER, "someKey", "deprecated message1");
260268

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,10 @@ public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpr
434434
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
435435
Set<RestHeaderDefinition> headers = Stream.concat(
436436
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
437-
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
437+
Stream.of(
438+
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
439+
new RestHeaderDefinition(Task.TRACE_PARENT, false)
440+
)
438441
).collect(Collectors.toSet());
439442
UnaryOperator<RestHandler> restWrapper = null;
440443
for (ActionPlugin plugin : actionPlugins) {

server/src/main/java/org/elasticsearch/common/logging/ECSJsonLayout.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ public EcsLayout build() {
5555
private KeyValuePair[] additionalFields() {
5656
return new KeyValuePair[] {
5757
new KeyValuePair("event.dataset", dataset),
58+
new KeyValuePair("trace.id", "%trace_id"),
5859
new KeyValuePair("elasticsearch.cluster.uuid", "%cluster_id"),
5960
new KeyValuePair("elasticsearch.node.id", "%node_id"),
6061
new KeyValuePair("elasticsearch.node.name", "%ESnode_name"),
6162
new KeyValuePair("elasticsearch.cluster.name", "${sys:es.logs.cluster_name}"), };
62-
}
63+
}
6364

6465
public String getDataset() {
6566
return dataset;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.common.logging;
10+
11+
import org.apache.logging.log4j.core.LogEvent;
12+
import org.apache.logging.log4j.core.config.plugins.Plugin;
13+
import org.apache.logging.log4j.core.pattern.ConverterKeys;
14+
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
15+
import org.apache.logging.log4j.core.pattern.PatternConverter;
16+
import org.elasticsearch.tasks.Task;
17+
18+
import java.util.Objects;
19+
20+
/**
21+
* Pattern converter to format the trace id provided in the traceparent header into JSON fields <code>trace.id</code>.
22+
*/
23+
@Plugin(category = PatternConverter.CATEGORY, name = "TraceIdConverter")
24+
@ConverterKeys({"trace_id"})
25+
public final class TraceIdConverter extends LogEventPatternConverter {
26+
/**
27+
* Called by log4j2 to initialize this converter.
28+
*/
29+
public static TraceIdConverter newInstance(@SuppressWarnings("unused") final String[] options) {
30+
return new TraceIdConverter();
31+
}
32+
33+
public TraceIdConverter() {
34+
super("trace_id", "trace_id");
35+
}
36+
37+
public static String getTraceId() {
38+
return HeaderWarning.THREAD_CONTEXT.stream()
39+
.map(t -> t.<String>getHeader(Task.TRACE_ID))
40+
.filter(Objects::nonNull)
41+
.findFirst()
42+
.orElse(null);
43+
}
44+
45+
/**
46+
* Formats the trace.id into json fields.
47+
*
48+
* @param event - a log event is ignored in this method as it uses the clusterId value
49+
* from <code>NodeAndClusterIdStateListener</code> to format
50+
*/
51+
@Override
52+
public void format(LogEvent event, StringBuilder toAppendTo) {
53+
String traceId = getTraceId();
54+
if (traceId != null) {
55+
toAppendTo.append(traceId);
56+
}
57+
}
58+
59+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,22 @@ public StoredContext stashContext() {
107107
/**
108108
* X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads.
109109
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
110+
* The same is applied to Task.TRACE_ID.
110111
* Otherwise when context is stash, it should be empty.
111112
*/
112-
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
113-
ThreadContextStruct threadContextStruct =
114-
DEFAULT_CONTEXT.putHeaders(Map.of(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID)));
113+
114+
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_ID)) {
115+
Map<String, String> map = new HashMap<>(2, 1);
116+
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
117+
map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID));
118+
}
119+
if (context.requestHeaders.containsKey(Task.TRACE_ID)) {
120+
map.put(Task.TRACE_ID, context.requestHeaders.get(Task.TRACE_ID));
121+
}
122+
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
115123
threadLocal.set(threadContextStruct);
116-
} else {
124+
}
125+
else {
117126
threadLocal.set(DEFAULT_CONTEXT);
118127
}
119128
return () -> {

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ protected Node(final Environment initialEnvironment,
574574
final Transport transport = networkModule.getTransportSupplier().get();
575575
Set<String> taskHeaders = Stream.concat(
576576
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
577-
Stream.of(Task.X_OPAQUE_ID)
577+
Stream.of(Task.X_OPAQUE_ID, Task.TRACE_ID)
578578
).collect(Collectors.toSet());
579579
final TransportService transportService = newTransportService(settings, transport, threadPool,
580580
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);

server/src/main/java/org/elasticsearch/rest/RestController.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.http.HttpServerTransport;
3030
import org.elasticsearch.indices.breaker.CircuitBreakerService;
3131
import org.elasticsearch.rest.RestHandler.Route;
32+
import org.elasticsearch.tasks.Task;
3233
import org.elasticsearch.usage.UsageService;
3334

3435
import java.io.ByteArrayOutputStream;
@@ -334,29 +335,15 @@ private void sendContentTypeErrorMessage(@Nullable List<String> contentTypeHeade
334335
}
335336

336337
private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
337-
for (final RestHeaderDefinition restHeader : headersToCopy) {
338-
final String name = restHeader.getName();
339-
final List<String> headerValues = request.getAllHeaderValues(name);
340-
if (headerValues != null && headerValues.isEmpty() == false) {
341-
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
342-
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
343-
channel.sendResponse(
344-
BytesRestResponse.
345-
createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
346-
return;
347-
} else {
348-
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
349-
}
350-
}
351-
}
352-
// error_trace cannot be used when we disable detailed errors
353-
// we consume the error_trace parameter first to ensure that it is always consumed
354-
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
355-
channel.sendResponse(
356-
BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
338+
try {
339+
copyRestHeaders(request, threadContext);
340+
validateErrorTrace(request, channel);
341+
} catch (IllegalArgumentException e) {
342+
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, e.getMessage()));
357343
return;
358344
}
359345

346+
360347
final String rawPath = request.rawPath();
361348
final String uri = request.uri();
362349
final RestRequest.Method requestMethod;
@@ -392,6 +379,34 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
392379
handleBadRequest(uri, requestMethod, channel);
393380
}
394381

382+
private void validateErrorTrace(RestRequest request, RestChannel channel) {
383+
// error_trace cannot be used when we disable detailed errors
384+
// we consume the error_trace parameter first to ensure that it is always consumed
385+
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
386+
throw new IllegalArgumentException("error traces in responses are disabled.");
387+
}
388+
}
389+
390+
private void copyRestHeaders(RestRequest request, ThreadContext threadContext) throws IOException {
391+
for (final RestHeaderDefinition restHeader : headersToCopy) {
392+
final String name = restHeader.getName();
393+
final List<String> headerValues = request.getAllHeaderValues(name);
394+
if (headerValues != null && headerValues.isEmpty() == false) {
395+
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
396+
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
397+
throw new IllegalArgumentException("multiple values for single-valued header [" + name + "].");
398+
} else if (name.equals(Task.TRACE_PARENT)) {
399+
String traceparent = distinctHeaderValues.get(0);
400+
if (traceparent.length() >= 55) {
401+
threadContext.putHeader(Task.TRACE_ID, traceparent.substring(3, 35));
402+
}
403+
} else {
404+
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
405+
}
406+
}
407+
}
408+
}
409+
395410
Iterator<MethodHandlers> getAllHandlers(@Nullable Map<String, String> requestParamsRef, String rawPath) {
396411
final Supplier<Map<String, String>> paramsSupplier;
397412
if (requestParamsRef == null) {

server/src/main/java/org/elasticsearch/tasks/Task.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,19 @@ public class Task {
2828
*/
2929
public static final String X_OPAQUE_ID = "X-Opaque-Id";
3030

31+
/**
32+
* The request header which is contained in HTTP request. We parse trace.id from it and store it in thread context.
33+
* TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved
34+
* has to be declared as a header copied over from http request.
35+
*/
36+
public static final String TRACE_PARENT = "traceparent";
37+
38+
/**
39+
* Parsed part of traceparent. It is stored in thread context and emitted in logs.
40+
* Has to be declared as a header copied over for tasks.
41+
*/
42+
public static final String TRACE_ID = "trace.id";
43+
3144
private final long id;
3245

3346
private final String type;

server/src/test/java/org/elasticsearch/rest/RestControllerTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.http.HttpStats;
3333
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
3434
import org.elasticsearch.rest.RestHandler.Route;
35+
import org.elasticsearch.tasks.Task;
3536
import org.elasticsearch.test.ESTestCase;
3637
import org.elasticsearch.test.client.NoOpNodeClient;
3738
import org.elasticsearch.test.rest.FakeRestRequest;
@@ -163,6 +164,38 @@ public void testRequestWithDisallowedMultiValuedHeader() {
163164
assertTrue(channel.getSendResponseCalled());
164165
}
165166

167+
public void testTraceParentAndTraceId() throws Exception {
168+
final ThreadContext threadContext = client.threadPool().getThreadContext();
169+
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition(Task.TRACE_PARENT, false)));
170+
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
171+
Map<String, List<String>> restHeaders = new HashMap<>();
172+
restHeaders.put(Task.TRACE_PARENT, Collections.singletonList("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"));
173+
RestRequest fakeRequest = new FakeRestRequest.Builder(xContentRegistry()).withHeaders(restHeaders).build();
174+
final RestController spyRestController = spy(restController);
175+
when(spyRestController.getAllHandlers(null, fakeRequest.rawPath()))
176+
.thenReturn(new Iterator<>() {
177+
@Override
178+
public boolean hasNext() {
179+
return false;
180+
}
181+
182+
@Override
183+
public MethodHandlers next() {
184+
return new MethodHandlers("/")
185+
.addMethod(GET, RestApiVersion.current(), (request, channel, client) -> {
186+
assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID));
187+
assertNull(threadContext.getHeader(Task.TRACE_PARENT));
188+
});
189+
}
190+
});
191+
AssertingChannel channel = new AssertingChannel(fakeRequest, false, RestStatus.BAD_REQUEST);
192+
restController.dispatchRequest(fakeRequest, channel, threadContext);
193+
// the rest controller relies on the caller to stash the context, so we should expect these values here as we didn't stash the
194+
// context in this test
195+
assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID));
196+
assertNull(threadContext.getHeader(Task.TRACE_PARENT));
197+
}
198+
166199
public void testRequestWithDisallowedMultiValuedHeaderButSameValues() {
167200
final ThreadContext threadContext = client.threadPool().getThreadContext();
168201
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),

0 commit comments

Comments
 (0)