Skip to content

Commit 98d1fc9

Browse files
[DEV-76] Fix ClientTelemetry injection logic forcing JSON content type (#281)
* Fix ClientTelemetry injection logic forcing JSON content type * Revert temp change to speed up testing * Improve name of const * Revert whitespace changes
1 parent a715899 commit 98d1fc9

File tree

7 files changed

+102
-14
lines changed

7 files changed

+102
-14
lines changed

db-client-java/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ protobuf {
179179
// options. Note the braces cannot be omitted, otherwise the
180180
// plugin will not be added. This is because of the implicit way
181181
// NamedDomainObjectContainer binds the methods.
182-
grpc { }
182+
grpc {}
183183
}
184184
}
185185
}

db-client-java/src/main/java/com/eventstore/dbclient/ClientTelemetry.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.ArrayList;
1313
import java.util.List;
1414
import java.util.Map;
15+
import java.util.Objects;
1516
import java.util.concurrent.CompletableFuture;
1617
import java.util.concurrent.CompletionException;
1718
import java.util.function.BiFunction;
@@ -29,11 +30,14 @@ private static Tracer getTracer() {
2930

3031
private static List<EventData> tryInjectTracingContext(Span span, List<EventData> events) {
3132
List<EventData> injectedEvents = new ArrayList<>();
32-
for (EventData event : events)
33+
for (EventData event : events) {
34+
boolean isJsonEvent = Objects.equals(event.getContentType(), ContentType.JSON);
35+
3336
injectedEvents.add(EventDataBuilder
34-
.json(event.getEventId(), event.getEventType(), event.getEventData())
37+
.binary(event.getEventId(), event.getEventType(), event.getEventData(), isJsonEvent)
3538
.metadataAsBytes(tryInjectTracingContext(span, event.getUserMetadata()))
3639
.build());
40+
}
3741
return injectedEvents;
3842
}
3943

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.eventstore.dbclient;
2+
3+
class ContentType {
4+
public static final String JSON = "application/json";
5+
public static final String BYTES = "application/octet-stream";
6+
}

db-client-java/src/main/java/com/eventstore/dbclient/EventDataBuilder.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public static <A> EventDataBuilder json(UUID id, String eventType, A eventData)
5555
public static EventDataBuilder json(String eventType, byte[] eventData) {
5656
return json(null, eventType, eventData);
5757
}
58+
5859
/**
5960
* Configures an event data builder to host a JSON payload.
6061
* @param id event's id.
@@ -63,13 +64,7 @@ public static EventDataBuilder json(String eventType, byte[] eventData) {
6364
* @return an event data builder.
6465
*/
6566
public static EventDataBuilder json(UUID id, String eventType, byte[] eventData) {
66-
EventDataBuilder self = new EventDataBuilder();
67-
self.eventData = eventData;
68-
self.eventType = eventType;
69-
self.isJson = true;
70-
self.id = id;
71-
72-
return self;
67+
return binary(id, eventType, eventData, true);
7368
}
7469

7570
/**
@@ -90,11 +85,23 @@ public static EventDataBuilder binary(String eventType, byte[] eventData) {
9085
* @return an event data builder.
9186
*/
9287
public static EventDataBuilder binary(UUID id, String eventType, byte[] eventData) {
88+
return binary(id, eventType, eventData, false);
89+
}
90+
91+
/**
92+
* Configures an event data builder to host a binary payload.
93+
* @param id event's id.
94+
* @param eventType event's type.
95+
* @param eventData event's payload.
96+
* @param isJson whether the payload is JSON or not.
97+
* @return an event data builder.
98+
*/
99+
public static EventDataBuilder binary(UUID id, String eventType, byte[] eventData, boolean isJson) {
93100
EventDataBuilder self = new EventDataBuilder();
94101

95102
self.eventData = eventData;
96103
self.eventType = eventType;
97-
self.isJson = false;
104+
self.isJson = isJson;
98105
self.id = id;
99106

100107
return self;
@@ -134,11 +141,12 @@ public EventDataBuilder metadataAsBytes(byte[] value) {
134141

135142
/**
136143
* Builds an event ready to be sent to EventStoreDB.
144+
*
137145
* @see EventData
138146
*/
139147
public EventData build() {
140148
UUID eventId = this.id == null ? UUID.randomUUID() : this.id;
141-
String contentType = this.isJson ? "application/json" : "application/octet-stream";
149+
String contentType = this.isJson ? ContentType.JSON : ContentType.BYTES;
142150
return new EventData(eventId, this.eventType, contentType, this.eventData, this.metadata);
143151
}
144152
}

db-client-java/src/main/java/com/eventstore/dbclient/WorkItemArgs.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public boolean supportFeature(int feature) {
4949
public <A> HttpURLConnection getHttpConnection(OptionsBase<A> options, EventStoreDBClientSettings settings, String path) {
5050
try {
5151
HttpURLConnection conn = (HttpURLConnection) getURL(settings.isTls(), this.endpoint, path).openConnection();
52-
conn.setRequestProperty("Accept", "application/json");
52+
conn.setRequestProperty("Accept", ContentType.JSON);
5353
String creds = options.getHttpCredentialString();
5454

5555
if (creds == null && settings.getDefaultCredentials() != null) {

db-client-java/src/test/java/com/eventstore/dbclient/TelemetryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.eventstore.dbclient.telemetry.PersistentSubscriptionsTracingInstrumentationTests;
44
import com.eventstore.dbclient.telemetry.SpanProcessorSpy;
55
import com.eventstore.dbclient.telemetry.StreamsTracingInstrumentationTests;
6+
import com.eventstore.dbclient.telemetry.TracingContextInjectionTests;
67
import io.opentelemetry.api.GlobalOpenTelemetry;
78
import io.opentelemetry.api.common.AttributeKey;
89
import io.opentelemetry.sdk.OpenTelemetrySdk;
@@ -20,7 +21,7 @@
2021
import java.util.function.Consumer;
2122
import java.util.stream.Collectors;
2223

23-
public class TelemetryTests implements StreamsTracingInstrumentationTests, PersistentSubscriptionsTracingInstrumentationTests {
24+
public class TelemetryTests implements StreamsTracingInstrumentationTests, PersistentSubscriptionsTracingInstrumentationTests, TracingContextInjectionTests {
2425
static private Database database;
2526
static private Logger logger;
2627

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.eventstore.dbclient.telemetry;
2+
3+
import com.eventstore.dbclient.*;
4+
import org.junit.Assert;
5+
import org.junit.jupiter.api.Assertions;
6+
import org.junit.jupiter.api.Test;
7+
import org.junit.jupiter.api.Timeout;
8+
9+
import java.util.ArrayList;
10+
import java.util.Arrays;
11+
import java.util.UUID;
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
15+
public interface TracingContextInjectionTests extends TelemetryAware {
16+
@Test
17+
@Timeout(value = 2, unit = TimeUnit.MINUTES)
18+
default void testTracingContextInjectionDoesNotAffectEventBody() throws Throwable {
19+
EventStoreDBClient streamsClient = getDefaultClient();
20+
EventStoreDBPersistentSubscriptionsClient psClient = getDefaultPersistentSubscriptionClient();
21+
22+
String streamName = generateName();
23+
String groupName = "aGroup";
24+
25+
EventData[] events = {
26+
EventData.builderAsJson("JsonEvent", mapper.writeValueAsBytes(new Foo()))
27+
.eventId(UUID.randomUUID())
28+
.build(),
29+
EventData.builderAsBinary("ProtoEvent", mapper.writeValueAsBytes(new Foo()))
30+
.eventId(UUID.randomUUID())
31+
.build()
32+
};
33+
34+
Exceptions exceptions = new Exceptions().registerGoAwayError();
35+
flaky(10, exceptions, () -> psClient.createToStream(streamName, groupName).get());
36+
37+
streamsClient.appendToStream(streamName, events).get();
38+
39+
CountDownLatch subscribeSpansLatch = new CountDownLatch(events.length);
40+
onOperationSpanEnded(ClientTelemetryConstants.Operations.SUBSCRIBE, span -> subscribeSpansLatch.countDown());
41+
42+
ArrayList<RecordedEvent> receivedEvents = new ArrayList<>();
43+
PersistentSubscription subscription = psClient.subscribeToStream(
44+
streamName,
45+
groupName,
46+
SubscribePersistentSubscriptionOptions.get().bufferSize(32),
47+
new PersistentSubscriptionListener() {
48+
@Override
49+
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
50+
receivedEvents.add(event.getEvent());
51+
}
52+
}
53+
).get();
54+
55+
subscribeSpansLatch.await();
56+
subscription.stop();
57+
58+
for (RecordedEvent receivedEvent : receivedEvents) {
59+
EventData sentEvent = Arrays.stream(events)
60+
.filter(e -> e.getEventId().equals(receivedEvent.getEventId()))
61+
.findFirst()
62+
.orElse(null);
63+
64+
Assertions.assertNotNull(sentEvent);
65+
Assertions.assertArrayEquals(sentEvent.getEventData(), receivedEvent.getEventData());
66+
Assertions.assertEquals(sentEvent.getContentType(), receivedEvent.getContentType());
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)