Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ dependencies {
testImplementation 'org.slf4j:slf4j-simple:2.0.17'
testImplementation "io.opentelemetry:opentelemetry-sdk"
testImplementation "io.opentelemetry:opentelemetry-sdk-testing"
testImplementation 'io.opentelemetry:opentelemetry-exporter-logging:1.38.0'
testImplementation 'io.opentelemetry:opentelemetry-exporter-otlp-trace:1.14.0'
testImplementation "io.opentelemetry:opentelemetry-exporter-logging"
testImplementation "io.opentelemetry:opentelemetry-exporter-otlp"
}

tasks.withType(Test).configureEach {
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/io/kurrent/dbclient/AppendStreamFailure.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kurrent.dbclient;

import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase;

public class AppendStreamFailure {
private final io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner;

Expand All @@ -12,21 +14,21 @@ public String getStreamName() {
}

public void visit(MultiAppendStreamErrorVisitor visitor) {
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_REVISION_CONFLICT) {
if (this.inner.getErrorCase() == ErrorCase.STREAM_REVISION_CONFLICT) {
visitor.onWrongExpectedRevision(this.inner.getStreamRevisionConflict().getStreamRevision());
return;
}

if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) {
if (this.inner.getErrorCase() == ErrorCase.ACCESS_DENIED) {
visitor.onAccessDenied(this.inner.getAccessDenied());
}

if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) {
if (this.inner.getErrorCase() == ErrorCase.STREAM_DELETED) {
visitor.onStreamDeleted();
return;
}

if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) {
if (this.inner.getErrorCase() == ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) {
visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize());
return;
}
Expand Down
96 changes: 92 additions & 4 deletions src/main/java/io/kurrent/dbclient/ClientTelemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.grpc.ManagedChannel;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.*;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;

import static io.kurrentdb.protocol.streams.v2.AppendStreamFailure.*;

class ClientTelemetry {
private static final ClientTelemetryTags DEFAULT_ATTRIBUTES = new ClientTelemetryTags() {{
put(ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME);
Expand Down Expand Up @@ -122,6 +123,93 @@ static CompletableFuture<WriteResult> traceAppend(
}
}

static CompletableFuture<MultiAppendWriteResult> traceMultiStreamAppend(
BiFunction<WorkItemArgs, Iterator<AppendStreamRequest>, CompletableFuture<MultiAppendWriteResult>> multiAppendOperation,
WorkItemArgs args,
Iterator<AppendStreamRequest> requests, KurrentDBClientSettings settings) {

List<AppendStreamRequest> requestsWithTracing = new ArrayList<>();

Span span = createSpan(
ClientTelemetryConstants.Operations.MULTI_APPEND,
SpanKind.CLIENT,
null,
ClientTelemetryTags.builder()
.withServerTagsFromGrpcChannel(args.getChannel())
.withServerTagsFromClientSettings(settings)
.withOptionalDatabaseUserTag(settings.getDefaultCredentials())
.build());

while (requests.hasNext()) {
AppendStreamRequest request = requests.next();

List<EventData> eventsWithTracing = new ArrayList<>();
while (request.getEvents().hasNext())
eventsWithTracing.add(request.getEvents().next());

List<EventData> tracedEvents = tryInjectTracingContext(span, eventsWithTracing);

requestsWithTracing.add(new AppendStreamRequest(
request.getStreamName(),
tracedEvents.iterator(),
request.getExpectedState()
));
}

return multiAppendOperation.apply(args, requestsWithTracing.iterator())
.handle((result, throwable) -> {
if (throwable != null) {
span.setStatus(StatusCode.ERROR);
span.recordException(throwable);
span.end();
throw new CompletionException(throwable);
} else {
if (result.getFailures().isPresent()) {
for (AppendStreamFailure failure : result.getFailures().get()) {
failure.visit(new MultiAppendStreamErrorVisitor() {
@Override
public void onWrongExpectedRevision(long streamRevision) {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(),
AttributeKey.longKey("exception.revision"), streamRevision
));
}

@Override
public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString()
));
}

@Override
public void onStreamDeleted() {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString()
));
}

@Override
public void onTransactionMaxSizeExceeded(int maxSize) {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(),
AttributeKey.longKey("exception.maxSize"), (long) maxSize
));
}
});
}
span.setStatus(StatusCode.ERROR);
span.end();
} else if (result.getSuccesses().isPresent()) {
span.setStatus(StatusCode.OK);
span.end();
}

return result;
}
});
}

static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel,
KurrentDBClientSettings settings,
UserCredentials optionalCallCredentials, RecordedEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public static class Metadata {

public static class Operations {
public static final String APPEND = "streams.append";
public static final String MULTI_APPEND = "streams.multi-append";
public static final String SUBSCRIBE = "streams.subscribe";
}
}
12 changes: 8 additions & 4 deletions src/main/java/io/kurrent/dbclient/MultiStreamAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ public MultiStreamAppend(GrpcClient client, Iterator<AppendStreamRequest> reques
}

public CompletableFuture<MultiAppendWriteResult> execute() {
return this.client.runWithArgs(this::append);
return this.client.runWithArgs(args -> ClientTelemetry.traceMultiStreamAppend(
this::append,
args,
this.requests,
this.client.getSettings()));
}

private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args) {
private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args, Iterator<AppendStreamRequest> requests) {
CompletableFuture<MultiAppendWriteResult> result = new CompletableFuture<>();

if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) {
Expand All @@ -40,8 +44,8 @@ private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args) {
StreamObserver<io.kurrentdb.protocol.streams.v2.AppendStreamRequest> requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse));

try {
while (this.requests.hasNext()) {
AppendStreamRequest request = this.requests.next();
while (requests.hasNext()) {
AppendStreamRequest request = requests.next();
io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder()
.setExpectedRevision(request.getExpectedState().toRawLong())
.setStream(request.getStreamName());
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/io/kurrent/dbclient/TelemetryTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import io.kurrent.dbclient.telemetry.TracingContextInjectionTests;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -21,6 +24,8 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static io.opentelemetry.semconv.ServiceAttributes.SERVICE_NAME;

public class TelemetryTests implements StreamsTracingInstrumentationTests, PersistentSubscriptionsTracingInstrumentationTests, TracingContextInjectionTests {
static private Database database;
static private Logger logger;
Expand All @@ -39,10 +44,20 @@ public void beforeEach() {
GlobalOpenTelemetry.resetForTest();
spanEndedHooks.add(recordedSpans::add);

OtlpGrpcSpanExporter otlpExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:4317")
.build();

Resource resource = Resource.getDefault().toBuilder()
.put(SERVICE_NAME, "kurrentdb")
.build();

OpenTelemetrySdk.builder()
.setTracerProvider(SdkTracerProvider
.builder()
.addSpanProcessor(new SpanProcessorSpy(spanEndedHooks))
.addSpanProcessor(SimpleSpanProcessor.create(otlpExporter))
.setResource(resource)
.build())
.buildAndRegisterGlobal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.Map;

public class DockerContainerDatabase extends GenericContainer<DockerContainerDatabase> implements Database {
public static final String DEFAULT_IMAGE = "docker.kurrent.io/eventstore/eventstoredb-ee:lts";
public static final String DEFAULT_IMAGE = "docker.cloudsmith.io/eventstore/kurrent-staging/kurrentdb:ci";

public static class Builder {
String image;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import io.kurrent.dbclient.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -289,4 +293,120 @@ public void onEvent(Subscription subscription, ResolvedEvent event) {
List<ReadableSpan> subscribeSpans = getSpansForOperation(ClientTelemetryConstants.Operations.SUBSCRIBE);
Assertions.assertTrue(subscribeSpans.isEmpty(), "No spans should be recorded for deleted events");
}

@Test
default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws Throwable {
KurrentDBClient client = getDefaultClient();

Optional<ServerVersion> version = client.getServerVersion().get();

Assumptions.assumeTrue(
version.isPresent() && version.get().isGreaterOrEqualThan(25, 0),
"Multi-stream append is not supported server versions below 25.0.0"
);

String streamName1 = generateName();
String streamName2 = generateName();

EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
.eventId(UUID.randomUUID())
.build();

EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
.eventId(UUID.randomUUID())
.build();

AppendStreamRequest request1 = new AppendStreamRequest(
streamName1,
Collections.singletonList(event1).iterator(),
StreamState.noStream()
);

AppendStreamRequest request2 = new AppendStreamRequest(
streamName2,
Collections.singletonList(event2).iterator(),
StreamState.noStream()
);

MultiAppendWriteResult result = client.multiStreamAppend(
Arrays.asList(request1, request2).iterator()
).get();

Assertions.assertNotNull(result);
Assertions.assertTrue(result.getSuccesses().isPresent());

List<ReadableSpan> spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND);
Assertions.assertEquals(1, spans.size());

assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME);
assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.OPERATION, ClientTelemetryConstants.Operations.MULTI_APPEND);
assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.USER, "admin");
Assertions.assertEquals(StatusCode.OK, spans.get(0).toSpanData().getStatus().getStatusCode());
Assertions.assertEquals(SpanKind.CLIENT, spans.get(0).getKind());
}

@Test
default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable {
KurrentDBClient client = getDefaultClient();

Optional<ServerVersion> version = client.getServerVersion().get();

Assumptions.assumeTrue(
version.isPresent() && version.get().isGreaterOrEqualThan(25, 0),
"Multi-stream append is not supported server versions below 25.0.0"
);

String streamName1 = generateName();
String streamName2 = generateName();

EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
.eventId(UUID.randomUUID())
.build();

EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
.eventId(UUID.randomUUID())
.build();

AppendStreamRequest request1 = new AppendStreamRequest(
streamName1,
Collections.singletonList(event1).iterator(),
StreamState.noStream()
);

AppendStreamRequest request2 = new AppendStreamRequest(
streamName2,
Collections.singletonList(event2).iterator(),
StreamState.streamExists()
);

MultiAppendWriteResult result = client.multiStreamAppend(
Arrays.asList(request1, request2).iterator()
).get();

Assertions.assertNotNull(result);
Assertions.assertFalse(result.getSuccesses().isPresent());
Assertions.assertTrue(result.getFailures().isPresent());

List<ReadableSpan> spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND);
Assertions.assertEquals(1, spans.size());

ReadableSpan span = spans.get(0);

Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode());
Assertions.assertEquals("", span.toSpanData().getStatus().getDescription());

List<io.opentelemetry.sdk.trace.data.EventData> events = span.toSpanData().getEvents();

Assertions.assertEquals(1, events.size());

io.opentelemetry.sdk.trace.data.EventData failureEvent = events.get(0);

Assertions.assertEquals("exception", failureEvent.getName());

Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(),
failureEvent.getAttributes().get(AttributeKey.stringKey("exception.type")));

Assertions.assertNotNull(failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision")));
Assertions.assertEquals(-1L, failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision")));
}
}
Loading