diff --git a/build.gradle b/build.gradle index 5b51f211..a84cd8b5 100644 --- a/build.gradle +++ b/build.gradle @@ -84,8 +84,8 @@ dependencies { testImplementation 'org.slf4j:slf4j-simple:2.0.17' testImplementation "io.opentelemetry:opentelemetry-sdk" testImplementation "io.opentelemetry:opentelemetry-sdk-testing" - testImplementation 'io.opentelemetry:opentelemetry-exporter-logging:1.38.0' - testImplementation 'io.opentelemetry:opentelemetry-exporter-otlp-trace:1.14.0' + testImplementation "io.opentelemetry:opentelemetry-exporter-logging" + testImplementation "io.opentelemetry:opentelemetry-exporter-otlp" } tasks.withType(Test).configureEach { diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java index 28218bef..887ec5b4 100644 --- a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java +++ b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java @@ -1,5 +1,7 @@ package io.kurrent.dbclient; +import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; + public class AppendStreamFailure { private final io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner; @@ -12,21 +14,21 @@ public String getStreamName() { } public void visit(MultiAppendStreamErrorVisitor visitor) { - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_REVISION_CONFLICT) { + if (this.inner.getErrorCase() == ErrorCase.STREAM_REVISION_CONFLICT) { visitor.onWrongExpectedRevision(this.inner.getStreamRevisionConflict().getStreamRevision()); return; } - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) { + if (this.inner.getErrorCase() == ErrorCase.ACCESS_DENIED) { visitor.onAccessDenied(this.inner.getAccessDenied()); } - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) { + if (this.inner.getErrorCase() == ErrorCase.STREAM_DELETED) { visitor.onStreamDeleted(); return; } - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { + if (this.inner.getErrorCase() == ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize()); return; } diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java index e435bc93..0fe2f866 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java @@ -5,18 +5,19 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.grpc.ManagedChannel; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.*; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.BiFunction; +import static io.kurrentdb.protocol.streams.v2.AppendStreamFailure.*; + class ClientTelemetry { private static final ClientTelemetryTags DEFAULT_ATTRIBUTES = new ClientTelemetryTags() {{ put(ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME); @@ -122,6 +123,93 @@ static CompletableFuture traceAppend( } } + static CompletableFuture traceMultiStreamAppend( + BiFunction, CompletableFuture> multiAppendOperation, + WorkItemArgs args, + Iterator requests, KurrentDBClientSettings settings) { + + List requestsWithTracing = new ArrayList<>(); + + Span span = createSpan( + ClientTelemetryConstants.Operations.MULTI_APPEND, + SpanKind.CLIENT, + null, + ClientTelemetryTags.builder() + .withServerTagsFromGrpcChannel(args.getChannel()) + .withServerTagsFromClientSettings(settings) + .withOptionalDatabaseUserTag(settings.getDefaultCredentials()) + .build()); + + while (requests.hasNext()) { + AppendStreamRequest request = requests.next(); + + List eventsWithTracing = new ArrayList<>(); + while (request.getEvents().hasNext()) + eventsWithTracing.add(request.getEvents().next()); + + List tracedEvents = tryInjectTracingContext(span, eventsWithTracing); + + requestsWithTracing.add(new AppendStreamRequest( + request.getStreamName(), + tracedEvents.iterator(), + request.getExpectedState() + )); + } + + return multiAppendOperation.apply(args, requestsWithTracing.iterator()) + .handle((result, throwable) -> { + if (throwable != null) { + span.setStatus(StatusCode.ERROR); + span.recordException(throwable); + span.end(); + throw new CompletionException(throwable); + } else { + if (result.getFailures().isPresent()) { + for (AppendStreamFailure failure : result.getFailures().get()) { + failure.visit(new MultiAppendStreamErrorVisitor() { + @Override + public void onWrongExpectedRevision(long streamRevision) { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(), + AttributeKey.longKey("exception.revision"), streamRevision + )); + } + + @Override + public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString() + )); + } + + @Override + public void onStreamDeleted() { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString() + )); + } + + @Override + public void onTransactionMaxSizeExceeded(int maxSize) { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(), + AttributeKey.longKey("exception.maxSize"), (long) maxSize + )); + } + }); + } + span.setStatus(StatusCode.ERROR); + span.end(); + } else if (result.getSuccesses().isPresent()) { + span.setStatus(StatusCode.OK); + span.end(); + } + + return result; + } + }); + } + static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel, KurrentDBClientSettings settings, UserCredentials optionalCallCredentials, RecordedEvent event) { diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java b/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java index 5b5f359c..f4d77dc5 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java @@ -10,6 +10,7 @@ public static class Metadata { public static class Operations { public static final String APPEND = "streams.append"; + public static final String MULTI_APPEND = "streams.multi-append"; public static final String SUBSCRIBE = "streams.subscribe"; } } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index 21195be3..0210d9b3 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -25,10 +25,14 @@ public MultiStreamAppend(GrpcClient client, Iterator reques } public CompletableFuture execute() { - return this.client.runWithArgs(this::append); + return this.client.runWithArgs(args -> ClientTelemetry.traceMultiStreamAppend( + this::append, + args, + this.requests, + this.client.getSettings())); } - private CompletableFuture append(WorkItemArgs args) { + private CompletableFuture append(WorkItemArgs args, Iterator requests) { CompletableFuture result = new CompletableFuture<>(); if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) { @@ -40,8 +44,8 @@ private CompletableFuture append(WorkItemArgs args) { StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); try { - while (this.requests.hasNext()) { - AppendStreamRequest request = this.requests.next(); + while (requests.hasNext()) { + AppendStreamRequest request = requests.next(); io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder() .setExpectedRevision(request.getExpectedState().toRawLong()) .setStream(request.getStreamName()); diff --git a/src/test/java/io/kurrent/dbclient/TelemetryTests.java b/src/test/java/io/kurrent/dbclient/TelemetryTests.java index 2ee48799..5bfc0151 100644 --- a/src/test/java/io/kurrent/dbclient/TelemetryTests.java +++ b/src/test/java/io/kurrent/dbclient/TelemetryTests.java @@ -6,9 +6,12 @@ import io.kurrent.dbclient.telemetry.TracingContextInjectionTests; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -21,6 +24,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static io.opentelemetry.semconv.ServiceAttributes.SERVICE_NAME; + public class TelemetryTests implements StreamsTracingInstrumentationTests, PersistentSubscriptionsTracingInstrumentationTests, TracingContextInjectionTests { static private Database database; static private Logger logger; @@ -39,10 +44,20 @@ public void beforeEach() { GlobalOpenTelemetry.resetForTest(); spanEndedHooks.add(recordedSpans::add); + OtlpGrpcSpanExporter otlpExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:4317") + .build(); + + Resource resource = Resource.getDefault().toBuilder() + .put(SERVICE_NAME, "kurrentdb") + .build(); + OpenTelemetrySdk.builder() .setTracerProvider(SdkTracerProvider .builder() .addSpanProcessor(new SpanProcessorSpy(spanEndedHooks)) + .addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)) + .setResource(resource) .build()) .buildAndRegisterGlobal(); } diff --git a/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java b/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java index 8435e70e..656634fb 100644 --- a/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java +++ b/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java @@ -12,7 +12,7 @@ import java.util.Map; public class DockerContainerDatabase extends GenericContainer implements Database { - public static final String DEFAULT_IMAGE = "docker.kurrent.io/eventstore/eventstoredb-ee:lts"; + public static final String DEFAULT_IMAGE = "docker.cloudsmith.io/eventstore/kurrent-staging/kurrentdb:ci"; public static class Builder { String image; diff --git a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java index 892814b7..542b4e1d 100644 --- a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java +++ b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java @@ -3,14 +3,18 @@ import io.kurrent.dbclient.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.ReadableSpan; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -289,4 +293,120 @@ public void onEvent(Subscription subscription, ResolvedEvent event) { List subscribeSpans = getSpansForOperation(ClientTelemetryConstants.Operations.SUBSCRIBE); Assertions.assertTrue(subscribeSpans.isEmpty(), "No spans should be recorded for deleted events"); } + + @Test + default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws Throwable { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + + String streamName1 = generateName(); + String streamName2 = generateName(); + + EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + AppendStreamRequest request1 = new AppendStreamRequest( + streamName1, + Collections.singletonList(event1).iterator(), + StreamState.noStream() + ); + + AppendStreamRequest request2 = new AppendStreamRequest( + streamName2, + Collections.singletonList(event2).iterator(), + StreamState.noStream() + ); + + MultiAppendWriteResult result = client.multiStreamAppend( + Arrays.asList(request1, request2).iterator() + ).get(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getSuccesses().isPresent()); + + List spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND); + Assertions.assertEquals(1, spans.size()); + + assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME); + assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.OPERATION, ClientTelemetryConstants.Operations.MULTI_APPEND); + assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.USER, "admin"); + Assertions.assertEquals(StatusCode.OK, spans.get(0).toSpanData().getStatus().getStatusCode()); + Assertions.assertEquals(SpanKind.CLIENT, spans.get(0).getKind()); + } + + @Test + default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + + String streamName1 = generateName(); + String streamName2 = generateName(); + + EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + AppendStreamRequest request1 = new AppendStreamRequest( + streamName1, + Collections.singletonList(event1).iterator(), + StreamState.noStream() + ); + + AppendStreamRequest request2 = new AppendStreamRequest( + streamName2, + Collections.singletonList(event2).iterator(), + StreamState.streamExists() + ); + + MultiAppendWriteResult result = client.multiStreamAppend( + Arrays.asList(request1, request2).iterator() + ).get(); + + Assertions.assertNotNull(result); + Assertions.assertFalse(result.getSuccesses().isPresent()); + Assertions.assertTrue(result.getFailures().isPresent()); + + List spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND); + Assertions.assertEquals(1, spans.size()); + + ReadableSpan span = spans.get(0); + + Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode()); + Assertions.assertEquals("", span.toSpanData().getStatus().getDescription()); + + List events = span.toSpanData().getEvents(); + + Assertions.assertEquals(1, events.size()); + + io.opentelemetry.sdk.trace.data.EventData failureEvent = events.get(0); + + Assertions.assertEquals("exception", failureEvent.getName()); + + Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), + failureEvent.getAttributes().get(AttributeKey.stringKey("exception.type"))); + + Assertions.assertNotNull(failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision"))); + Assertions.assertEquals(-1L, failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision"))); + } }