diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java index 3b0886f9..922d17cb 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java @@ -113,7 +113,7 @@ public void onNext(@NotNull StreamsOuterClass.ReadResp readResp) { channel, client.getSettings(), options.getCredentials(), - resolvedEvent.getOriginalEvent()); + resolvedEvent.getEvent()); } catch (Exception e) { onError(e); } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractSubscribePersistentSubscription.java b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractSubscribePersistentSubscription.java index 8ae44500..9419bf88 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractSubscribePersistentSubscription.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractSubscribePersistentSubscription.java @@ -98,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) { args.getChannel(), client.getSettings(), options.getCredentials(), - resolvedEvent.getOriginalEvent()); + resolvedEvent.getEvent()); } catch (Exception e) { onError(e); } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ClientTelemetry.java b/db-client-java/src/main/java/com/eventstore/dbclient/ClientTelemetry.java index a894b8a6..4d05c8c6 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/ClientTelemetry.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ClientTelemetry.java @@ -77,14 +77,18 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) { if (!TraceId.isValid(traceId) || !SpanId.isValid(spanId)) return null; - return SpanContext.createFromRemoteParent(traceId, spanId, TraceFlags.getSampled(), TraceState.getDefault()); + return SpanContext.createFromRemoteParent(traceId, spanId, TraceFlags.getSampled(), + TraceState.getDefault()); } catch (Throwable t) { return null; } } - static CompletableFuture traceAppend(BiFunction, CompletableFuture> appendOperation, ManagedChannel channel, - List events, String streamId, EventStoreDBClientSettings settings, UserCredentials optionalCallCredentials) { + static CompletableFuture traceAppend( + BiFunction, CompletableFuture> appendOperation, + ManagedChannel channel, + List events, String streamId, EventStoreDBClientSettings settings, + UserCredentials optionalCallCredentials) { Span span = createSpan( ClientTelemetryConstants.Operations.APPEND, SpanKind.CLIENT, @@ -115,9 +119,16 @@ static CompletableFuture traceAppend(BiFunction resolvedEvent = readResult.getEvents(); + Assertions.assertEquals(2, resolvedEvent.size()); // Assert unchanged - Assertions.assertArrayEquals(userMetadata, resolvedEvent.getEvent().getUserMetadata()); + Assertions.assertArrayEquals(userMetadata, resolvedEvent.get(1).getEvent().getUserMetadata()); + + CountDownLatch subscribeSpansLatch = new CountDownLatch(1); + onOperationSpanEnded(ClientTelemetryConstants.Operations.SUBSCRIBE, span -> subscribeSpansLatch.countDown()); + + Subscription subscription = client.subscribeToStream( + streamName, + new SubscriptionListener() { + } + ).get(); + + List appendSpans = this.getSpansForOperation(ClientTelemetryConstants.Operations.APPEND); + Assertions.assertEquals(1, appendSpans.size()); + + subscribeSpansLatch.await(); + subscription.stop(); + + List subscribeSpans = this.getSpansForOperation(ClientTelemetryConstants.Operations.SUBSCRIBE); + + Assertions.assertEquals(1, subscribeSpans.size()); + + assertSubscriptionActivityHasExpectedAttributes( + subscribeSpans.get(0), + streamName, + subscription.getSubscriptionId(), + eventWithValidMetadata.getEventId().toString(), + eventWithValidMetadata.getEventType()); } @Test