From 8cbf0911faa13591c6628da86e5da5d08c9e6489 Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 5 Aug 2025 15:17:23 +0400 Subject: [PATCH 1/4] Add tracing for multi stream append --- build.gradle | 4 +- .../kurrent/dbclient/AppendStreamFailure.java | 10 +- .../io/kurrent/dbclient/ClientTelemetry.java | 104 +++++++++++++++- .../dbclient/ClientTelemetryConstants.java | 1 + .../kurrent/dbclient/MultiStreamAppend.java | 12 +- .../io/kurrent/dbclient/TelemetryTests.java | 15 +++ .../StreamsTracingInstrumentationTests.java | 115 ++++++++++++++++++ 7 files changed, 247 insertions(+), 14 deletions(-) diff --git a/build.gradle b/build.gradle index 5b51f211..a84cd8b5 100644 --- a/build.gradle +++ b/build.gradle @@ -84,8 +84,8 @@ dependencies { testImplementation 'org.slf4j:slf4j-simple:2.0.17' testImplementation "io.opentelemetry:opentelemetry-sdk" testImplementation "io.opentelemetry:opentelemetry-sdk-testing" - testImplementation 'io.opentelemetry:opentelemetry-exporter-logging:1.38.0' - testImplementation 'io.opentelemetry:opentelemetry-exporter-otlp-trace:1.14.0' + testImplementation "io.opentelemetry:opentelemetry-exporter-logging" + testImplementation "io.opentelemetry:opentelemetry-exporter-otlp" } tasks.withType(Test).configureEach { diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java index 28218bef..887ec5b4 100644 --- a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java +++ b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java @@ -1,5 +1,7 @@ package io.kurrent.dbclient; +import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; + public class AppendStreamFailure { private final io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner; @@ -12,21 +14,21 @@ public String getStreamName() { } public void visit(MultiAppendStreamErrorVisitor visitor) { - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_REVISION_CONFLICT) { + if (this.inner.getErrorCase() == ErrorCase.STREAM_REVISION_CONFLICT) { visitor.onWrongExpectedRevision(this.inner.getStreamRevisionConflict().getStreamRevision()); return; } - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) { + if (this.inner.getErrorCase() == ErrorCase.ACCESS_DENIED) { visitor.onAccessDenied(this.inner.getAccessDenied()); } - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) { + if (this.inner.getErrorCase() == ErrorCase.STREAM_DELETED) { visitor.onStreamDeleted(); return; } - if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { + if (this.inner.getErrorCase() == ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize()); return; } diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java index e435bc93..c4f1e8a0 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java @@ -4,15 +4,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.grpc.ManagedChannel; +import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.*; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.BiFunction; @@ -122,6 +120,104 @@ static CompletableFuture traceAppend( } } + static CompletableFuture traceMultiStreamAppend( + BiFunction, CompletableFuture> multiAppendOperation, + WorkItemArgs args, + Iterator requests, KurrentDBClientSettings settings) { + + List requestsWithTracing = new ArrayList<>(); + Map spanMap = new HashMap<>(); + + while (requests.hasNext()) { + AppendStreamRequest request = requests.next(); + + Span span = createSpan( + ClientTelemetryConstants.Operations.APPEND, + SpanKind.CLIENT, + null, + ClientTelemetryTags.builder() + .withRequiredTag(ClientTelemetryAttributes.KurrentDB.STREAM, request.getStreamName()) + .withServerTagsFromGrpcChannel(args.getChannel()) + .withServerTagsFromClientSettings(settings) + .withOptionalDatabaseUserTag(settings.getDefaultCredentials()) + .build()); + + spanMap.put(request.getStreamName(), span); + + List eventsWithTracing = new ArrayList<>(); + while (request.getEvents().hasNext()) + eventsWithTracing.add(request.getEvents().next()); + + List tracedEvents = tryInjectTracingContext(span, eventsWithTracing); + + requestsWithTracing.add(new AppendStreamRequest( + request.getStreamName(), + tracedEvents.iterator(), + request.getExpectedState() + )); + } + + return multiAppendOperation.apply(args, requestsWithTracing.iterator()) + .handle((result, throwable) -> { + if (throwable != null) { + for (Span span : spanMap.values()) { + span.setStatus(StatusCode.ERROR); + span.recordException(throwable); + span.end(); + } + throw new CompletionException(throwable); + } else { + if (result.getFailures().isPresent()) { + Set processedStreams = new HashSet<>(); + + for (AppendStreamFailure failure : result.getFailures().get()) { + Span span = spanMap.get(failure.getStreamName()); + if (span != null) { + StringBuilder statusDescription = new StringBuilder(); + failure.visit(new MultiAppendStreamErrorVisitor() { + @Override + public void onWrongExpectedRevision(long streamRevision) { + statusDescription.append(ErrorCase.STREAM_REVISION_CONFLICT); + } + + @Override + public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) { + statusDescription.append(ErrorCase.ACCESS_DENIED); + } + + @Override + public void onStreamDeleted() { + statusDescription.append(ErrorCase.STREAM_DELETED); + } + + @Override + public void onTransactionMaxSizeExceeded(int maxSize) { + statusDescription.append(ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED); + } + }); + + span.setStatus(StatusCode.ERROR, statusDescription.toString()); + span.end(); + processedStreams.add(failure.getStreamName()); + } + } + + spanMap.entrySet().stream().filter(entry -> !processedStreams.contains(entry.getKey())).forEach(entry -> { + entry.getValue().setStatus(StatusCode.ERROR); + entry.getValue().end(); + }); + } else if (result.getSuccesses().isPresent()) { + result.getSuccesses().get().stream().map(success -> spanMap.get(success.getStreamName())).filter(Objects::nonNull).forEach(span -> { + span.setStatus(StatusCode.OK); + span.end(); + }); + } + + return result; + } + }); + } + static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel, KurrentDBClientSettings settings, UserCredentials optionalCallCredentials, RecordedEvent event) { diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java b/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java index 5b5f359c..902262bf 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java @@ -10,6 +10,7 @@ public static class Metadata { public static class Operations { public static final String APPEND = "streams.append"; + public static final String MULTI_STREAM_APPEND = "streams.multi_stream_append"; public static final String SUBSCRIBE = "streams.subscribe"; } } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index 21195be3..0210d9b3 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -25,10 +25,14 @@ public MultiStreamAppend(GrpcClient client, Iterator reques } public CompletableFuture execute() { - return this.client.runWithArgs(this::append); + return this.client.runWithArgs(args -> ClientTelemetry.traceMultiStreamAppend( + this::append, + args, + this.requests, + this.client.getSettings())); } - private CompletableFuture append(WorkItemArgs args) { + private CompletableFuture append(WorkItemArgs args, Iterator requests) { CompletableFuture result = new CompletableFuture<>(); if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) { @@ -40,8 +44,8 @@ private CompletableFuture append(WorkItemArgs args) { StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); try { - while (this.requests.hasNext()) { - AppendStreamRequest request = this.requests.next(); + while (requests.hasNext()) { + AppendStreamRequest request = requests.next(); io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder() .setExpectedRevision(request.getExpectedState().toRawLong()) .setStream(request.getStreamName()); diff --git a/src/test/java/io/kurrent/dbclient/TelemetryTests.java b/src/test/java/io/kurrent/dbclient/TelemetryTests.java index 2ee48799..5bfc0151 100644 --- a/src/test/java/io/kurrent/dbclient/TelemetryTests.java +++ b/src/test/java/io/kurrent/dbclient/TelemetryTests.java @@ -6,9 +6,12 @@ import io.kurrent.dbclient.telemetry.TracingContextInjectionTests; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -21,6 +24,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static io.opentelemetry.semconv.ServiceAttributes.SERVICE_NAME; + public class TelemetryTests implements StreamsTracingInstrumentationTests, PersistentSubscriptionsTracingInstrumentationTests, TracingContextInjectionTests { static private Database database; static private Logger logger; @@ -39,10 +44,20 @@ public void beforeEach() { GlobalOpenTelemetry.resetForTest(); spanEndedHooks.add(recordedSpans::add); + OtlpGrpcSpanExporter otlpExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:4317") + .build(); + + Resource resource = Resource.getDefault().toBuilder() + .put(SERVICE_NAME, "kurrentdb") + .build(); + OpenTelemetrySdk.builder() .setTracerProvider(SdkTracerProvider .builder() .addSpanProcessor(new SpanProcessorSpy(spanEndedHooks)) + .addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)) + .setResource(resource) .build()) .buildAndRegisterGlobal(); } diff --git a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java index 892814b7..97b09281 100644 --- a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java +++ b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java @@ -3,12 +3,18 @@ import io.kurrent.dbclient.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.ReadableSpan; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -289,4 +295,113 @@ public void onEvent(Subscription subscription, ResolvedEvent event) { List subscribeSpans = getSpansForOperation(ClientTelemetryConstants.Operations.SUBSCRIBE); Assertions.assertTrue(subscribeSpans.isEmpty(), "No spans should be recorded for deleted events"); } + + @Test + default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws Throwable { + KurrentDBClient client = getDefaultClient(); + String streamName1 = generateName(); + String streamName2 = generateName(); + + EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + AppendStreamRequest request1 = new AppendStreamRequest( + streamName1, + Collections.singletonList(event1).iterator(), + StreamState.noStream() + ); + + AppendStreamRequest request2 = new AppendStreamRequest( + streamName2, + Collections.singletonList(event2).iterator(), + StreamState.noStream() + ); + + MultiAppendWriteResult result = client.multiStreamAppend( + Arrays.asList(request1, request2).iterator() + ).get(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getSuccesses().isPresent()); + + List spans = getSpansForOperation(ClientTelemetryConstants.Operations.APPEND); + Assertions.assertEquals(2, spans.size()); + + ReadableSpan span1 = spans.stream() + .filter(span -> streamName1.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) + .findFirst() + .orElse(null); + + ReadableSpan span2 = spans.stream() + .filter(span -> streamName2.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) + .findFirst() + .orElse(null); + + Assertions.assertNotNull(span1); + Assertions.assertNotNull(span2); + + assertAppendSpanHasExpectedAttributes(span1, streamName1); + assertAppendSpanHasExpectedAttributes(span2, streamName2); + } + + @Test + default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable { + KurrentDBClient client = getDefaultClient(); + String streamName1 = generateName(); + String streamName2 = generateName(); + + EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + + AppendStreamRequest request1 = new AppendStreamRequest( + streamName1, + Collections.singletonList(event1).iterator(), + StreamState.noStream() + ); + + AppendStreamRequest request2 = new AppendStreamRequest( + streamName2, + Collections.singletonList(event2).iterator(), + StreamState.streamExists() + ); + + MultiAppendWriteResult result = client.multiStreamAppend( + Arrays.asList(request1, request2).iterator() + ).get(); + + Assertions.assertNotNull(result); + Assertions.assertFalse(result.getSuccesses().isPresent()); + + List spans = getSpansForOperation(ClientTelemetryConstants.Operations.APPEND); + Assertions.assertEquals(2, spans.size()); + + ReadableSpan span1 = spans.stream() + .filter(span -> streamName1.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) + .findFirst() + .orElse(null); + + ReadableSpan span2 = spans.stream() + .filter(span -> streamName2.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) + .findFirst() + .orElse(null); + + Assertions.assertNotNull(span1); + Assertions.assertNotNull(span2); + + Assertions.assertEquals(StatusCode.ERROR, span1.toSpanData().getStatus().getStatusCode()); + Assertions.assertEquals("", span1.toSpanData().getStatus().getDescription()); + + Assertions.assertEquals(StatusCode.ERROR, span2.toSpanData().getStatus().getStatusCode()); + Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), span2.toSpanData().getStatus().getDescription()); + } } From 7825bb388fad9e6dacd8fb65d465032f90ef690a Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 5 Aug 2025 15:22:10 +0400 Subject: [PATCH 2/4] Only run tests on 25.0 and above --- .../StreamsTracingInstrumentationTests.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java index 97b09281..3321636a 100644 --- a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java +++ b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java @@ -10,13 +10,11 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.ReadableSpan; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -299,6 +297,14 @@ public void onEvent(Subscription subscription, ResolvedEvent event) { @Test default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws Throwable { KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + String streamName1 = generateName(); String streamName2 = generateName(); @@ -352,6 +358,14 @@ default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws T @Test default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable { KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + String streamName1 = generateName(); String streamName2 = generateName(); From 207502ed6652d0967ac0f6142654ca7a39ebbec2 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 13 Aug 2025 16:32:08 +0400 Subject: [PATCH 3/4] Fixup --- .../io/kurrent/dbclient/ClientTelemetry.java | 52 ++++++++++--------- .../StreamsTracingInstrumentationTests.java | 26 +++++++++- 2 files changed, 51 insertions(+), 27 deletions(-) diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java index c4f1e8a0..764c12e8 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java @@ -4,8 +4,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.grpc.ManagedChannel; -import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.*; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -15,6 +16,8 @@ import java.util.concurrent.CompletionException; import java.util.function.BiFunction; +import static io.kurrentdb.protocol.streams.v2.AppendStreamFailure.*; + class ClientTelemetry { private static final ClientTelemetryTags DEFAULT_ATTRIBUTES = new ClientTelemetryTags() {{ put(ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME); @@ -126,7 +129,7 @@ static CompletableFuture traceMultiStreamAppend( Iterator requests, KurrentDBClientSettings settings) { List requestsWithTracing = new ArrayList<>(); - Map spanMap = new HashMap<>(); + List spans = new ArrayList<>(); while (requests.hasNext()) { AppendStreamRequest request = requests.next(); @@ -142,7 +145,7 @@ static CompletableFuture traceMultiStreamAppend( .withOptionalDatabaseUserTag(settings.getDefaultCredentials()) .build()); - spanMap.put(request.getStreamName(), span); + spans.add(span); List eventsWithTracing = new ArrayList<>(); while (request.getEvents().hasNext()) @@ -160,7 +163,7 @@ static CompletableFuture traceMultiStreamAppend( return multiAppendOperation.apply(args, requestsWithTracing.iterator()) .handle((result, throwable) -> { if (throwable != null) { - for (Span span : spanMap.values()) { + for (Span span : spans) { span.setStatus(StatusCode.ERROR); span.recordException(throwable); span.end(); @@ -168,49 +171,48 @@ static CompletableFuture traceMultiStreamAppend( throw new CompletionException(throwable); } else { if (result.getFailures().isPresent()) { - Set processedStreams = new HashSet<>(); - - for (AppendStreamFailure failure : result.getFailures().get()) { - Span span = spanMap.get(failure.getStreamName()); - if (span != null) { - StringBuilder statusDescription = new StringBuilder(); + for (Span span : spans) { + for (AppendStreamFailure failure : result.getFailures().get()) { failure.visit(new MultiAppendStreamErrorVisitor() { @Override public void onWrongExpectedRevision(long streamRevision) { - statusDescription.append(ErrorCase.STREAM_REVISION_CONFLICT); + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(), + AttributeKey.longKey("exception.revision"), streamRevision + )); } @Override public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) { - statusDescription.append(ErrorCase.ACCESS_DENIED); + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString() + )); } @Override public void onStreamDeleted() { - statusDescription.append(ErrorCase.STREAM_DELETED); + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString() + )); } @Override public void onTransactionMaxSizeExceeded(int maxSize) { - statusDescription.append(ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED); + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(), + AttributeKey.longKey("exception.maxSize"), (long) maxSize + )); } }); - - span.setStatus(StatusCode.ERROR, statusDescription.toString()); - span.end(); - processedStreams.add(failure.getStreamName()); } + span.setStatus(StatusCode.ERROR); + span.end(); } - - spanMap.entrySet().stream().filter(entry -> !processedStreams.contains(entry.getKey())).forEach(entry -> { - entry.getValue().setStatus(StatusCode.ERROR); - entry.getValue().end(); - }); } else if (result.getSuccesses().isPresent()) { - result.getSuccesses().get().stream().map(success -> spanMap.get(success.getStreamName())).filter(Objects::nonNull).forEach(span -> { + for (Span span : spans) { span.setStatus(StatusCode.OK); span.end(); - }); + } } return result; diff --git a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java index 3321636a..fb579ace 100644 --- a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java +++ b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java @@ -395,6 +395,7 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable Assertions.assertNotNull(result); Assertions.assertFalse(result.getSuccesses().isPresent()); + Assertions.assertTrue(result.getFailures().isPresent()); List spans = getSpansForOperation(ClientTelemetryConstants.Operations.APPEND); Assertions.assertEquals(2, spans.size()); @@ -414,8 +415,29 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable Assertions.assertEquals(StatusCode.ERROR, span1.toSpanData().getStatus().getStatusCode()); Assertions.assertEquals("", span1.toSpanData().getStatus().getDescription()); - Assertions.assertEquals(StatusCode.ERROR, span2.toSpanData().getStatus().getStatusCode()); - Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), span2.toSpanData().getStatus().getDescription()); + Assertions.assertEquals("", span2.toSpanData().getStatus().getDescription()); + + List events1 = span1.toSpanData().getEvents(); + List events2 = span2.toSpanData().getEvents(); + + Assertions.assertEquals(1, events1.size()); + Assertions.assertEquals(1, events2.size()); + + io.opentelemetry.sdk.trace.data.EventData failureEvent1 = events1.get(0); + io.opentelemetry.sdk.trace.data.EventData failureEvent2 = events2.get(0); + + Assertions.assertEquals("exception", failureEvent1.getName()); + Assertions.assertEquals("exception", failureEvent2.getName()); + + Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), + failureEvent1.getAttributes().get(AttributeKey.stringKey("exception.type"))); + Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), + failureEvent2.getAttributes().get(AttributeKey.stringKey("exception.type"))); + + Assertions.assertNotNull(failureEvent1.getAttributes().get(AttributeKey.longKey("exception.revision"))); + Assertions.assertNotNull(failureEvent2.getAttributes().get(AttributeKey.longKey("exception.revision"))); + Assertions.assertEquals(-1L, failureEvent1.getAttributes().get(AttributeKey.longKey("exception.revision"))); + Assertions.assertEquals(-1L, failureEvent2.getAttributes().get(AttributeKey.longKey("exception.revision"))); } } From d49ecd544cfb579889efc631c9c27cf694f22900 Mon Sep 17 00:00:00 2001 From: William Chong Date: Mon, 25 Aug 2025 15:55:41 +0400 Subject: [PATCH 4/4] Change trace layout --- .../io/kurrent/dbclient/ClientTelemetry.java | 108 ++++++++---------- .../dbclient/ClientTelemetryConstants.java | 2 +- .../databases/DockerContainerDatabase.java | 2 +- .../StreamsTracingInstrumentationTests.java | 69 +++-------- 4 files changed, 70 insertions(+), 111 deletions(-) diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java index 764c12e8..0fe2f866 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetry.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetry.java @@ -129,24 +129,20 @@ static CompletableFuture traceMultiStreamAppend( Iterator requests, KurrentDBClientSettings settings) { List requestsWithTracing = new ArrayList<>(); - List spans = new ArrayList<>(); + + Span span = createSpan( + ClientTelemetryConstants.Operations.MULTI_APPEND, + SpanKind.CLIENT, + null, + ClientTelemetryTags.builder() + .withServerTagsFromGrpcChannel(args.getChannel()) + .withServerTagsFromClientSettings(settings) + .withOptionalDatabaseUserTag(settings.getDefaultCredentials()) + .build()); while (requests.hasNext()) { AppendStreamRequest request = requests.next(); - Span span = createSpan( - ClientTelemetryConstants.Operations.APPEND, - SpanKind.CLIENT, - null, - ClientTelemetryTags.builder() - .withRequiredTag(ClientTelemetryAttributes.KurrentDB.STREAM, request.getStreamName()) - .withServerTagsFromGrpcChannel(args.getChannel()) - .withServerTagsFromClientSettings(settings) - .withOptionalDatabaseUserTag(settings.getDefaultCredentials()) - .build()); - - spans.add(span); - List eventsWithTracing = new ArrayList<>(); while (request.getEvents().hasNext()) eventsWithTracing.add(request.getEvents().next()); @@ -163,56 +159,50 @@ static CompletableFuture traceMultiStreamAppend( return multiAppendOperation.apply(args, requestsWithTracing.iterator()) .handle((result, throwable) -> { if (throwable != null) { - for (Span span : spans) { - span.setStatus(StatusCode.ERROR); - span.recordException(throwable); - span.end(); - } + span.setStatus(StatusCode.ERROR); + span.recordException(throwable); + span.end(); throw new CompletionException(throwable); } else { if (result.getFailures().isPresent()) { - for (Span span : spans) { - for (AppendStreamFailure failure : result.getFailures().get()) { - failure.visit(new MultiAppendStreamErrorVisitor() { - @Override - public void onWrongExpectedRevision(long streamRevision) { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(), - AttributeKey.longKey("exception.revision"), streamRevision - )); - } - - @Override - public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString() - )); - } - - @Override - public void onStreamDeleted() { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString() - )); - } - - @Override - public void onTransactionMaxSizeExceeded(int maxSize) { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(), - AttributeKey.longKey("exception.maxSize"), (long) maxSize - )); - } - }); - } - span.setStatus(StatusCode.ERROR); - span.end(); + for (AppendStreamFailure failure : result.getFailures().get()) { + failure.visit(new MultiAppendStreamErrorVisitor() { + @Override + public void onWrongExpectedRevision(long streamRevision) { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(), + AttributeKey.longKey("exception.revision"), streamRevision + )); + } + + @Override + public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString() + )); + } + + @Override + public void onStreamDeleted() { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString() + )); + } + + @Override + public void onTransactionMaxSizeExceeded(int maxSize) { + span.addEvent("exception", Attributes.of( + AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(), + AttributeKey.longKey("exception.maxSize"), (long) maxSize + )); + } + }); } + span.setStatus(StatusCode.ERROR); + span.end(); } else if (result.getSuccesses().isPresent()) { - for (Span span : spans) { - span.setStatus(StatusCode.OK); - span.end(); - } + span.setStatus(StatusCode.OK); + span.end(); } return result; diff --git a/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java b/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java index 902262bf..f4d77dc5 100644 --- a/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java +++ b/src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java @@ -10,7 +10,7 @@ public static class Metadata { public static class Operations { public static final String APPEND = "streams.append"; - public static final String MULTI_STREAM_APPEND = "streams.multi_stream_append"; + public static final String MULTI_APPEND = "streams.multi-append"; public static final String SUBSCRIBE = "streams.subscribe"; } } diff --git a/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java b/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java index 8435e70e..656634fb 100644 --- a/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java +++ b/src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java @@ -12,7 +12,7 @@ import java.util.Map; public class DockerContainerDatabase extends GenericContainer implements Database { - public static final String DEFAULT_IMAGE = "docker.kurrent.io/eventstore/eventstoredb-ee:lts"; + public static final String DEFAULT_IMAGE = "docker.cloudsmith.io/eventstore/kurrent-staging/kurrentdb:ci"; public static class Builder { String image; diff --git a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java index fb579ace..542b4e1d 100644 --- a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java +++ b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java @@ -335,24 +335,14 @@ default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws T Assertions.assertNotNull(result); Assertions.assertTrue(result.getSuccesses().isPresent()); - List spans = getSpansForOperation(ClientTelemetryConstants.Operations.APPEND); - Assertions.assertEquals(2, spans.size()); - - ReadableSpan span1 = spans.stream() - .filter(span -> streamName1.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) - .findFirst() - .orElse(null); - - ReadableSpan span2 = spans.stream() - .filter(span -> streamName2.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) - .findFirst() - .orElse(null); - - Assertions.assertNotNull(span1); - Assertions.assertNotNull(span2); + List spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND); + Assertions.assertEquals(1, spans.size()); - assertAppendSpanHasExpectedAttributes(span1, streamName1); - assertAppendSpanHasExpectedAttributes(span2, streamName2); + assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME); + assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.OPERATION, ClientTelemetryConstants.Operations.MULTI_APPEND); + assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.USER, "admin"); + Assertions.assertEquals(StatusCode.OK, spans.get(0).toSpanData().getStatus().getStatusCode()); + Assertions.assertEquals(SpanKind.CLIENT, spans.get(0).getKind()); } @Test @@ -397,47 +387,26 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable Assertions.assertFalse(result.getSuccesses().isPresent()); Assertions.assertTrue(result.getFailures().isPresent()); - List spans = getSpansForOperation(ClientTelemetryConstants.Operations.APPEND); - Assertions.assertEquals(2, spans.size()); - - ReadableSpan span1 = spans.stream() - .filter(span -> streamName1.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) - .findFirst() - .orElse(null); - - ReadableSpan span2 = spans.stream() - .filter(span -> streamName2.equals(span.getAttribute(AttributeKey.stringKey(ClientTelemetryAttributes.KurrentDB.STREAM)))) - .findFirst() - .orElse(null); + List spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND); + Assertions.assertEquals(1, spans.size()); - Assertions.assertNotNull(span1); - Assertions.assertNotNull(span2); + ReadableSpan span = spans.get(0); - Assertions.assertEquals(StatusCode.ERROR, span1.toSpanData().getStatus().getStatusCode()); - Assertions.assertEquals("", span1.toSpanData().getStatus().getDescription()); - Assertions.assertEquals(StatusCode.ERROR, span2.toSpanData().getStatus().getStatusCode()); - Assertions.assertEquals("", span2.toSpanData().getStatus().getDescription()); + Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode()); + Assertions.assertEquals("", span.toSpanData().getStatus().getDescription()); - List events1 = span1.toSpanData().getEvents(); - List events2 = span2.toSpanData().getEvents(); + List events = span.toSpanData().getEvents(); - Assertions.assertEquals(1, events1.size()); - Assertions.assertEquals(1, events2.size()); + Assertions.assertEquals(1, events.size()); - io.opentelemetry.sdk.trace.data.EventData failureEvent1 = events1.get(0); - io.opentelemetry.sdk.trace.data.EventData failureEvent2 = events2.get(0); + io.opentelemetry.sdk.trace.data.EventData failureEvent = events.get(0); - Assertions.assertEquals("exception", failureEvent1.getName()); - Assertions.assertEquals("exception", failureEvent2.getName()); + Assertions.assertEquals("exception", failureEvent.getName()); Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), - failureEvent1.getAttributes().get(AttributeKey.stringKey("exception.type"))); - Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), - failureEvent2.getAttributes().get(AttributeKey.stringKey("exception.type"))); + failureEvent.getAttributes().get(AttributeKey.stringKey("exception.type"))); - Assertions.assertNotNull(failureEvent1.getAttributes().get(AttributeKey.longKey("exception.revision"))); - Assertions.assertNotNull(failureEvent2.getAttributes().get(AttributeKey.longKey("exception.revision"))); - Assertions.assertEquals(-1L, failureEvent1.getAttributes().get(AttributeKey.longKey("exception.revision"))); - Assertions.assertEquals(-1L, failureEvent2.getAttributes().get(AttributeKey.longKey("exception.revision"))); + Assertions.assertNotNull(failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision"))); + Assertions.assertEquals(-1L, failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision"))); } }