Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tracing.SpanId;

import java.security.AccessController;
import java.security.PrivilegedAction;
Expand All @@ -59,7 +60,7 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic
private static final Logger logger = LogManager.getLogger(APMTracer.class);

/** Holds in-flight span information. */
private final Map<String, Context> spans = ConcurrentCollections.newConcurrentMap();
private final Map<SpanId, Context> spans = ConcurrentCollections.newConcurrentMap();

private volatile boolean enabled;
private volatile APMServices services;
Expand Down Expand Up @@ -158,7 +159,7 @@ private void destroyApmServices() {
}

@Override
public void startTrace(ThreadContext threadContext, String spanId, String spanName, @Nullable Map<String, Object> attributes) {
public void startTrace(ThreadContext threadContext, SpanId spanId, String spanName, @Nullable Map<String, Object> attributes) {
assert threadContext != null;
assert spanId != null;
assert spanName != null;
Expand Down Expand Up @@ -273,7 +274,7 @@ private Context getParentContext(ThreadContext threadContext) {
* @return a method to close the scope when you are finished with it.
*/
@Override
public Releasable withScope(String spanId) {
public Releasable withScope(SpanId spanId) {
final Context context = spans.get(spanId);
if (context != null) {
var scope = context.makeCurrent();
Expand Down Expand Up @@ -330,47 +331,47 @@ private void setSpanAttributes(ThreadContext threadContext, @Nullable Map<String
}

@Override
public void addError(String spanId, Throwable throwable) {
public void addError(SpanId spanId, Throwable throwable) {
final var span = Span.fromContextOrNull(spans.get(spanId));
if (span != null) {
span.recordException(throwable);
}
}

@Override
public void setAttribute(String spanId, String key, boolean value) {
public void setAttribute(SpanId spanId, String key, boolean value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void setAttribute(String spanId, String key, double value) {
public void setAttribute(SpanId spanId, String key, double value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void setAttribute(String spanId, String key, long value) {
public void setAttribute(SpanId spanId, String key, long value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void setAttribute(String spanId, String key, String value) {
public void setAttribute(SpanId spanId, String key, String value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void stopTrace(String spanId) {
public void stopTrace(SpanId spanId) {
final var span = Span.fromContextOrNull(spans.remove(spanId));
if (span != null) {
logger.trace("Finishing trace [{}]", spanId);
Expand All @@ -387,7 +388,7 @@ public void stopTrace() {
}

@Override
public void addEvent(String spanId, String eventName) {
public void addEvent(SpanId spanId, String eventName) {
final var span = Span.fromContextOrNull(spans.get(spanId));
if (span != null) {
span.addEvent(eventName);
Expand All @@ -412,7 +413,7 @@ private static boolean isSupportedContextKey(String key) {
}

// VisibleForTesting
Map<String, Context> getSpans() {
Map<SpanId, Context> getSpans() {
return spans;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.tracing.SpanId;

import java.util.List;
import java.util.stream.Stream;
Expand All @@ -28,14 +29,18 @@

public class APMTracerTests extends ESTestCase {

private static final SpanId SPAN_ID1 = SpanId.forBareString("id1");
private static final SpanId SPAN_ID2 = SpanId.forBareString("id2");
private static final SpanId SPAN_ID3 = SpanId.forBareString("id3");

/**
* Check that the tracer doesn't create spans when tracing is disabled.
*/
public void test_onTraceStarted_withTracingDisabled_doesNotStartTrace() {
Settings settings = Settings.builder().put(APM_ENABLED_SETTING.getKey(), false).build();
APMTracer apmTracer = buildTracer(settings);

apmTracer.startTrace(new ThreadContext(settings), "id1", "name1", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID1, "name1", null);

assertThat(apmTracer.getSpans(), anEmptyMap());
}
Expand All @@ -50,7 +55,7 @@ public void test_onTraceStarted_withSpanNameOmitted_doesNotStartTrace() {
.build();
APMTracer apmTracer = buildTracer(settings);

apmTracer.startTrace(new ThreadContext(settings), "id1", "name1", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID1, "name1", null);

assertThat(apmTracer.getSpans(), anEmptyMap());
}
Expand All @@ -62,10 +67,10 @@ public void test_onTraceStarted_startsTrace() {
Settings settings = Settings.builder().put(APM_ENABLED_SETTING.getKey(), true).build();
APMTracer apmTracer = buildTracer(settings);

apmTracer.startTrace(new ThreadContext(settings), "id1", "name1", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID1, "name1", null);

assertThat(apmTracer.getSpans(), aMapWithSize(1));
assertThat(apmTracer.getSpans(), hasKey("id1"));
assertThat(apmTracer.getSpans(), hasKey(SPAN_ID1));
}

/**
Expand All @@ -75,8 +80,8 @@ public void test_onTraceStopped_stopsTrace() {
Settings settings = Settings.builder().put(APM_ENABLED_SETTING.getKey(), true).build();
APMTracer apmTracer = buildTracer(settings);

apmTracer.startTrace(new ThreadContext(settings), "id1", "name1", null);
apmTracer.stopTrace("id1");
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID1, "name1", null);
apmTracer.stopTrace(SPAN_ID1);

assertThat(apmTracer.getSpans(), anEmptyMap());
}
Expand All @@ -93,7 +98,7 @@ public void test_whenTraceStarted_threadContextIsPopulated() {
APMTracer apmTracer = buildTracer(settings);

ThreadContext threadContext = new ThreadContext(settings);
apmTracer.startTrace(threadContext, "id1", "name1", null);
apmTracer.startTrace(threadContext, SPAN_ID1, "name1", null);
assertThat(threadContext.getTransient(Task.APM_TRACE_CONTEXT), notNullValue());
}

Expand All @@ -114,13 +119,13 @@ public void test_whenTraceStarted_andSpanNameIncluded_thenSpanIsStarted() {
.build();
APMTracer apmTracer = buildTracer(settings);

apmTracer.startTrace(new ThreadContext(settings), "id1", "name-aaa", null);
apmTracer.startTrace(new ThreadContext(settings), "id2", "name-bbb", null);
apmTracer.startTrace(new ThreadContext(settings), "id3", "name-ccc", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID1, "name-aaa", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID2, "name-bbb", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID3, "name-ccc", null);

assertThat(apmTracer.getSpans(), hasKey("id1"));
assertThat(apmTracer.getSpans(), hasKey("id2"));
assertThat(apmTracer.getSpans(), not(hasKey("id3")));
assertThat(apmTracer.getSpans(), hasKey(SPAN_ID1));
assertThat(apmTracer.getSpans(), hasKey(SPAN_ID2));
assertThat(apmTracer.getSpans(), not(hasKey(SPAN_ID3)));
}

/**
Expand All @@ -137,7 +142,7 @@ public void test_whenTraceStarted_andSpanNameIncludedAndExcluded_thenSpanIsNotSt
.build();
APMTracer apmTracer = buildTracer(settings);

apmTracer.startTrace(new ThreadContext(settings), "id1", "name-aaa", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID1, "name-aaa", null);

assertThat(apmTracer.getSpans(), not(hasKey("id1")));
}
Expand All @@ -159,13 +164,13 @@ public void test_whenTraceStarted_andSpanNameExcluded_thenSpanIsNotStarted() {
.build();
APMTracer apmTracer = buildTracer(settings);

apmTracer.startTrace(new ThreadContext(settings), "id1", "name-aaa", null);
apmTracer.startTrace(new ThreadContext(settings), "id2", "name-bbb", null);
apmTracer.startTrace(new ThreadContext(settings), "id3", "name-ccc", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID1, "name-aaa", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID2, "name-bbb", null);
apmTracer.startTrace(new ThreadContext(settings), SPAN_ID3, "name-ccc", null);

assertThat(apmTracer.getSpans(), not(hasKey("id1")));
assertThat(apmTracer.getSpans(), not(hasKey("id2")));
assertThat(apmTracer.getSpans(), hasKey("id3"));
assertThat(apmTracer.getSpans(), not(hasKey(SPAN_ID1)));
assertThat(apmTracer.getSpans(), not(hasKey(SPAN_ID2)));
assertThat(apmTracer.getSpans(), hasKey(SPAN_ID3));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tracing.SpanId;
import org.elasticsearch.tracing.Tracer;

import java.util.ArrayList;
Expand Down Expand Up @@ -91,13 +92,13 @@ public void sendResponse(RestResponse restResponse) {
// We're sending a response so we know we won't be needing the request content again and release it
httpRequest.release();

final String traceId = "rest-" + this.request.getRequestId();
final SpanId spanId = SpanId.forRestRequest(request);

final ArrayList<Releasable> toClose = new ArrayList<>(4);
if (HttpUtils.shouldCloseConnection(httpRequest)) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}
toClose.add(() -> tracer.stopTrace(traceId));
toClose.add(() -> tracer.stopTrace(request));

boolean success = false;
String opaque = null;
Expand Down Expand Up @@ -171,9 +172,9 @@ public void sendResponse(RestResponse restResponse) {

addCookies(httpResponse);

tracer.setAttribute(traceId, "http.status_code", restResponse.status().getStatus());
tracer.setAttribute(spanId, "http.status_code", restResponse.status().getStatus());
restResponse.getHeaders()
.forEach((key, values) -> tracer.setAttribute(traceId, "http.response.headers." + key, String.join("; ", values)));
.forEach((key, values) -> tracer.setAttribute(spanId, "http.response.headers." + key, String.join("; ", values)));

ActionListener<Void> listener = ActionListener.releasing(Releasables.wrap(toClose));
if (httpLogger != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,11 @@ private void startTrace(ThreadContext threadContext, RestChannel channel, String
case HTTP_1_1 -> attributes.put("http.flavour", "1.1");
}

tracer.startTrace(threadContext, "rest-" + channel.request().getRequestId(), name, attributes);
tracer.startTrace(threadContext, channel.request(), name, attributes);
}

private void traceException(RestChannel channel, Throwable e) {
this.tracer.addError("rest-" + channel.request().getRequestId(), e);
this.tracer.addError(channel.request(), e);
}

private static void sendContentTypeErrorMessage(@Nullable List<String> contentTypeHeader, RestChannel channel) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, Ac
private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException {
ReaderContext readerContext = createOrGetReaderContext(request);
try (
Releasable scope = tracer.withScope("task-" + task.getId());
Releasable scope = tracer.withScope(task);
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, true)
) {
Expand Down Expand Up @@ -626,7 +626,7 @@ private static <T> void runAsync(Executor executor, CheckedSupplier<T, Exception
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {
final ReaderContext readerContext = createOrGetReaderContext(request);
try (
Releasable scope = tracer.withScope("task-" + task.getId());
Releasable scope = tracer.withScope(task);
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, true)
) {
Expand Down Expand Up @@ -668,7 +668,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (
Releasable scope = tracer.withScope("task-" + context.getTask().getId());
Releasable scope = tracer.withScope(context.getTask());
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
) {
shortcutDocIdsToLoad(context);
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void startTrace(ThreadContext threadContext, Task task) {
Tracer.AttributeKeys.PARENT_TASK_ID,
parentTask.toString()
);
tracer.startTrace(threadContext, "task-" + task.getId(), task.getAction(), attributes);
tracer.startTrace(threadContext, task, task.getAction(), attributes);
}

public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(
Expand Down Expand Up @@ -288,7 +288,7 @@ public Task unregister(Task task) {
return removedTask;
}
} finally {
tracer.stopTrace("task-" + task.getId());
tracer.stopTrace(task);
for (RemovedTaskListener listener : removedTaskListeners) {
listener.onRemoved(task);
}
Expand Down
56 changes: 56 additions & 0 deletions server/src/main/java/org/elasticsearch/tracing/SpanId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.tracing;

import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;

import java.util.Objects;

public class SpanId {
private final String rawId;

private SpanId(String rawId) {
this.rawId = Objects.requireNonNull(rawId);
}

public String getRawId() {
return rawId;
}

@Override
public String toString() {
return "SpanId[" + rawId + "]";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SpanId spanId = (SpanId) o;
return rawId.equals(spanId.rawId);
}

@Override
public int hashCode() {
return Objects.hash(rawId);
}

public static SpanId forTask(Task task) {
return new SpanId("task-" + task.getId());
}

public static SpanId forRestRequest(RestRequest restRequest) {
return new SpanId("rest-" + restRequest.getRequestId());
}

public static SpanId forBareString(String rawId) {
return new SpanId(rawId);
}
}
Loading