Skip to content

Commit 12a1d4c

Browse files
authored
Merge pull request #284 from EventStore/DEVEX-77-Java-Client-Don-t-create-span-for-events-with-non-json-metadata
Extract tracing metadata from Event not OriginalEvent
2 parents c0a5a42 + 8af0a89 commit 12a1d4c

File tree

4 files changed

+62
-15
lines changed

4 files changed

+62
-15
lines changed

db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void onNext(@NotNull StreamsOuterClass.ReadResp readResp) {
113113
channel,
114114
client.getSettings(),
115115
options.getCredentials(),
116-
resolvedEvent.getOriginalEvent());
116+
resolvedEvent.getEvent());
117117
} catch (Exception e) {
118118
onError(e);
119119
}

db-client-java/src/main/java/com/eventstore/dbclient/AbstractSubscribePersistentSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) {
9898
args.getChannel(),
9999
client.getSettings(),
100100
options.getCredentials(),
101-
resolvedEvent.getOriginalEvent());
101+
resolvedEvent.getEvent());
102102
} catch (Exception e) {
103103
onError(e);
104104
}

db-client-java/src/main/java/com/eventstore/dbclient/ClientTelemetry.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,18 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) {
7777
if (!TraceId.isValid(traceId) || !SpanId.isValid(spanId))
7878
return null;
7979

80-
return SpanContext.createFromRemoteParent(traceId, spanId, TraceFlags.getSampled(), TraceState.getDefault());
80+
return SpanContext.createFromRemoteParent(traceId, spanId, TraceFlags.getSampled(),
81+
TraceState.getDefault());
8182
} catch (Throwable t) {
8283
return null;
8384
}
8485
}
8586

86-
static CompletableFuture<WriteResult> traceAppend(BiFunction<ManagedChannel, List<EventData>, CompletableFuture<WriteResult>> appendOperation, ManagedChannel channel,
87-
List<EventData> events, String streamId, EventStoreDBClientSettings settings, UserCredentials optionalCallCredentials) {
87+
static CompletableFuture<WriteResult> traceAppend(
88+
BiFunction<ManagedChannel, List<EventData>, CompletableFuture<WriteResult>> appendOperation,
89+
ManagedChannel channel,
90+
List<EventData> events, String streamId, EventStoreDBClientSettings settings,
91+
UserCredentials optionalCallCredentials) {
8892
Span span = createSpan(
8993
ClientTelemetryConstants.Operations.APPEND,
9094
SpanKind.CLIENT,
@@ -115,9 +119,16 @@ static CompletableFuture<WriteResult> traceAppend(BiFunction<ManagedChannel, Lis
115119
}
116120
}
117121

118-
static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel, EventStoreDBClientSettings settings,
122+
static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel,
123+
EventStoreDBClientSettings settings,
119124
UserCredentials optionalCallCredentials, RecordedEvent event) {
120125
SpanContext remoteParentContext = tryExtractTracingContext(event.getUserMetadata());
126+
127+
if (remoteParentContext == null) {
128+
tracedOperation.run();
129+
return;
130+
}
131+
121132
Span span = createSpan(
122133
ClientTelemetryConstants.Operations.SUBSCRIBE,
123134
SpanKind.CONSUMER,
@@ -145,7 +156,8 @@ static void traceSubscribe(Runnable tracedOperation, String subscriptionId, Mana
145156
}
146157
}
147158

148-
static Span createSpan(String operationName, SpanKind spanKind, SpanContext parentContext, ClientTelemetryTags customAttributes) {
159+
static Span createSpan(String operationName, SpanKind spanKind, SpanContext parentContext,
160+
ClientTelemetryTags customAttributes) {
149161
SpanBuilder spanBuilder = getTracer().spanBuilder(operationName).setSpanKind(spanKind);
150162

151163
if (parentContext != null)

db-client-java/src/test/java/com/eventstore/dbclient/telemetry/StreamsTracingInstrumentationTests.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,27 +64,62 @@ default void testTracingContextIsInjectedAsExpectedWhenUserMetadataIsJsonObject(
6464
}
6565

6666
@Test
67-
default void testTracingContextInjectionIsIgnoredAsExpectedWhenUserMetadataIsNonNullAndNotAJsonObject() throws Throwable {
67+
@Timeout(value = 2, unit = TimeUnit.MINUTES)
68+
default void testTracingContextInjectionIsIgnoredAsExpectedWhenUserMetadataIsNonNullAndNotAJsonObject()
69+
throws Throwable {
6870
EventStoreDBClient client = getDefaultClient();
6971
String streamName = generateName();
7072
byte[] userMetadata = mapper.writeValueAsBytes("clearlynotvalidjson");
7173

74+
EventData eventWithValidMetadata = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
75+
.eventId(UUID.randomUUID())
76+
.build();
77+
78+
EventData eventWithInvalidMetadata = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
79+
.metadataAsBytes(userMetadata)
80+
.eventId(UUID.randomUUID())
81+
.build();
82+
7283
client.appendToStream(
7384
streamName,
7485
AppendToStreamOptions.get().expectedRevision(ExpectedRevision.noStream()),
75-
EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
76-
.metadataAsBytes(userMetadata)
77-
.eventId(UUID.randomUUID())
78-
.build())
86+
eventWithValidMetadata,
87+
eventWithInvalidMetadata)
7988
.get();
8089

8190
ReadResult readResult = client.readStream(streamName, ReadStreamOptions.get()).get();
8291

83-
ResolvedEvent resolvedEvent = readResult.getEvents().get(0);
84-
Assertions.assertNotNull(resolvedEvent);
92+
List<ResolvedEvent> resolvedEvent = readResult.getEvents();
93+
Assertions.assertEquals(2, resolvedEvent.size());
8594

8695
// Assert unchanged
87-
Assertions.assertArrayEquals(userMetadata, resolvedEvent.getEvent().getUserMetadata());
96+
Assertions.assertArrayEquals(userMetadata, resolvedEvent.get(1).getEvent().getUserMetadata());
97+
98+
CountDownLatch subscribeSpansLatch = new CountDownLatch(1);
99+
onOperationSpanEnded(ClientTelemetryConstants.Operations.SUBSCRIBE, span -> subscribeSpansLatch.countDown());
100+
101+
Subscription subscription = client.subscribeToStream(
102+
streamName,
103+
new SubscriptionListener() {
104+
}
105+
).get();
106+
107+
List<ReadableSpan> appendSpans = this.getSpansForOperation(ClientTelemetryConstants.Operations.APPEND);
108+
Assertions.assertEquals(1, appendSpans.size());
109+
110+
subscribeSpansLatch.await();
111+
subscription.stop();
112+
113+
List<ReadableSpan> subscribeSpans = this.getSpansForOperation(ClientTelemetryConstants.Operations.SUBSCRIBE);
114+
115+
Assertions.assertEquals(1, subscribeSpans.size());
116+
117+
assertSubscriptionActivityHasExpectedAttributes(
118+
subscribeSpans.get(0),
119+
streamName,
120+
subscription.getSubscriptionId(),
121+
eventWithValidMetadata.getEventId().toString(),
122+
eventWithValidMetadata.getEventType());
88123
}
89124

90125
@Test

0 commit comments

Comments
 (0)