Skip to content

Commit 2c0f775

Browse files
committed
Take control of the metadata
1 parent b2052b1 commit 2c0f775

File tree

6 files changed

+198
-27
lines changed

6 files changed

+198
-27
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.kurrent.dbclient;
2+
3+
import com.fasterxml.jackson.core.type.TypeReference;
4+
import com.fasterxml.jackson.databind.json.JsonMapper;
5+
import com.google.protobuf.ByteString;
6+
import com.google.protobuf.Timestamp;
7+
import com.google.protobuf.Duration;
8+
import io.kurrentdb.protocol.DynamicValue;
9+
10+
import java.time.Instant;
11+
import java.time.LocalDateTime;
12+
import java.time.ZonedDateTime;
13+
import java.util.Collections;
14+
import java.util.Map;
15+
import java.util.stream.Collectors;
16+
17+
/**
18+
* Utility class for converting Java objects to DynamicValue protobuf messages.
19+
*/
20+
public class DynamicValueMapper {
21+
private static final JsonMapper objectMapper = new JsonMapper();
22+
23+
/**
24+
* Converts JSON byte array metadata to a Map of DynamicValue objects.
25+
*
26+
* @param jsonMetadata the source metadata as JSON bytes
27+
* @return a map with DynamicValue objects
28+
*/
29+
public static Map<String, DynamicValue> mapJsonToDynamicValueMap(byte[] jsonMetadata) {
30+
if (jsonMetadata == null || jsonMetadata.length == 0)
31+
return Collections.emptyMap();
32+
33+
try {
34+
Map<String, Object> metadata = objectMapper.readValue(jsonMetadata, new TypeReference<Map<String, Object>>() {
35+
});
36+
return mapToDynamicValueMap(metadata);
37+
} catch (Exception e) {
38+
return Collections.emptyMap();
39+
}
40+
}
41+
42+
/**
43+
* Converts a Map of metadata to a Map of DynamicValue objects.
44+
*
45+
* @param metadata the source metadata map
46+
* @return a map with DynamicValue objects
47+
*/
48+
public static Map<String, DynamicValue> mapToDynamicValueMap(Map<String, Object> metadata) {
49+
if (metadata == null) {
50+
return Collections.emptyMap();
51+
}
52+
53+
return metadata.entrySet().stream()
54+
.collect(Collectors.toMap(
55+
Map.Entry::getKey,
56+
entry -> mapToDynamicValue(entry.getValue())
57+
));
58+
}
59+
60+
/**
61+
* Converts a Java object to a DynamicValue protobuf message.
62+
*
63+
* @param source the source object
64+
* @return the corresponding DynamicValue
65+
*/
66+
public static DynamicValue mapToDynamicValue(Object source) {
67+
if (source == null) {
68+
return DynamicValue.newBuilder()
69+
.setNullValue(com.google.protobuf.NullValue.NULL_VALUE)
70+
.build();
71+
}
72+
73+
DynamicValue.Builder builder = DynamicValue.newBuilder();
74+
75+
if (source instanceof String) {
76+
return builder.setStringValue((String) source).build();
77+
} else if (source instanceof Boolean) {
78+
return builder.setBooleanValue((Boolean) source).build();
79+
} else if (source instanceof Integer) {
80+
return builder.setInt32Value((Integer) source).build();
81+
} else if (source instanceof Long) {
82+
return builder.setInt64Value((Long) source).build();
83+
} else if (source instanceof Float) {
84+
return builder.setFloatValue((Float) source).build();
85+
} else if (source instanceof Double) {
86+
return builder.setDoubleValue((Double) source).build();
87+
} else if (source instanceof Instant) {
88+
Instant instant = (Instant) source;
89+
return builder.setTimestampValue(Timestamp.newBuilder()
90+
.setSeconds(instant.getEpochSecond())
91+
.setNanos(instant.getNano())
92+
.build()).build();
93+
} else if (source instanceof LocalDateTime) {
94+
LocalDateTime localDateTime = (LocalDateTime) source;
95+
Instant instant = localDateTime.atZone(java.time.ZoneOffset.UTC).toInstant();
96+
return builder.setTimestampValue(Timestamp.newBuilder()
97+
.setSeconds(instant.getEpochSecond())
98+
.setNanos(instant.getNano())
99+
.build()).build();
100+
} else if (source instanceof ZonedDateTime) {
101+
ZonedDateTime zonedDateTime = (ZonedDateTime) source;
102+
Instant instant = zonedDateTime.toInstant();
103+
return builder.setTimestampValue(Timestamp.newBuilder()
104+
.setSeconds(instant.getEpochSecond())
105+
.setNanos(instant.getNano())
106+
.build()).build();
107+
} else if (source instanceof java.time.Duration) {
108+
java.time.Duration duration = (java.time.Duration) source;
109+
return builder.setDurationValue(Duration.newBuilder()
110+
.setSeconds(duration.getSeconds())
111+
.setNanos(duration.getNano())
112+
.build()).build();
113+
} else if (source instanceof byte[]) {
114+
return builder.setBytesValue(ByteString.copyFrom((byte[]) source)).build();
115+
} else {
116+
// For any other type, convert to string
117+
return builder.setStringValue(source.toString()).build();
118+
}
119+
}
120+
}

src/main/java/io/kurrent/dbclient/EventData.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,4 @@ public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, b
9898
}
9999
}
100100

101+

src/main/java/io/kurrent/dbclient/EventDataBuilder.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.kurrent.dbclient;
22

3-
import com.fasterxml.jackson.core.JsonProcessingException;
4-
import com.fasterxml.jackson.databind.json.JsonMapper;
5-
63
import java.util.UUID;
74

85
/**

src/main/java/io/kurrent/dbclient/KurrentDBClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, AppendTo
7575
return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute();
7676
}
7777

78-
public CompletableFuture<MultiAppendWriteResult> multiAppend(AppendToStreamOptions options, Iterator<AppendStreamRequest> requests) {
78+
public CompletableFuture<MultiAppendWriteResult> multiStreamAppend(AppendToStreamOptions options, Iterator<AppendStreamRequest> requests) {
7979
return new MultiStreamAppend(this.getGrpcClient(), requests).execute();
8080
}
8181

src/main/java/io/kurrent/dbclient/MultiStreamAppend.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.ArrayList;
1313
import java.util.Iterator;
1414
import java.util.List;
15+
import java.util.Map;
1516
import java.util.concurrent.CompletableFuture;
1617

1718
class MultiStreamAppend {
@@ -46,18 +47,24 @@ private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args) {
4647

4748
while (request.getEvents().hasNext()) {
4849
EventData event = request.getEvents().next();
49-
builder.addRecords(AppendRecord.newBuilder()
50+
AppendRecord.Builder recordBuilder = AppendRecord.newBuilder()
5051
.setData(ByteString.copyFrom(event.getEventData()))
51-
.setRecordId(event.getEventId().toString())
52-
.putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue
53-
.newBuilder()
54-
.setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType()))
55-
.build())
56-
.putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue
57-
.newBuilder()
58-
.setStringValue(event.getEventType())
59-
.build())
52+
.setRecordId(event.getEventId().toString())
53+
.putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue
54+
.newBuilder()
55+
.setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType()))
56+
.build())
57+
.putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue
58+
.newBuilder()
59+
.setStringValue(event.getEventType())
6060
.build());
61+
62+
if (event.getUserMetadata() != null) {
63+
Map<String, DynamicValue> userMetadataProperties = DynamicValueMapper.mapJsonToDynamicValueMap(event.getUserMetadata());
64+
recordBuilder.putAllProperties(userMetadataProperties);
65+
}
66+
67+
builder.addRecords(recordBuilder.build());
6168
}
6269

6370
requestStream.onNext(builder.build());

src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66

7-
import java.util.ArrayList;
8-
import java.util.List;
9-
import java.util.Optional;
7+
import java.io.IOException;
8+
import java.time.Instant;
9+
import java.util.*;
1010
import java.util.concurrent.ExecutionException;
1111

12+
@SuppressWarnings("rawtypes")
1213
public class MultiStreamAppendTests implements ConnectionAware {
1314
static private Database database;
1415
static private Logger logger;
@@ -35,7 +36,7 @@ public static void cleanup() {
3536
}
3637

3738
@Test
38-
public void testMultiStreamAppend() throws ExecutionException, InterruptedException {
39+
public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException {
3940
KurrentDBClient client = getDefaultClient();
4041

4142
Optional<ServerVersion> version = client.getServerVersion().get();
@@ -45,18 +46,64 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept
4546
"Multi-stream append is not supported server versions below 25.0.0"
4647
);
4748

48-
List<AppendStreamRequest> requests = new ArrayList<>();
49+
// Arrange
50+
String streamName1 = generateName();
51+
String streamName2 = generateName();
4952

50-
List<EventData> events = new ArrayList<>();
51-
for (int i = 0; i < 10; i++)
52-
events.add(EventData.builderAsBinary("created", new byte[0]).build());
53+
Map<String, Object> metadata = new HashMap<>();
54+
metadata.put("stringProperty", "hello world");
55+
metadata.put("intProperty", 42);
56+
metadata.put("longProperty", 9876543210L);
57+
metadata.put("booleanProperty", true);
58+
metadata.put("doubleProperty", 3.14159);
59+
metadata.put("nullProperty", null);
60+
metadata.put("timestampProperty", Instant.now().toString());
5361

54-
requests.add(new AppendStreamRequest("foobar", events.iterator(), StreamState.any()));
55-
requests.add(new AppendStreamRequest("baz", events.iterator(), StreamState.any()));
62+
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
5663

57-
MultiAppendWriteResult result = client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get();
64+
EventData event1 = EventData.builderAsJson("event-a", "{\"data\":\"test1\"}".getBytes())
65+
.metadataAsBytes(metadataBytes)
66+
.build();
5867

68+
EventData event2 = EventData.builderAsBinary("event-b", new byte[0]).build();
69+
70+
List<EventData> events1 = Collections.singletonList(event1);
71+
List<EventData> events2 = Collections.singletonList(event2);
72+
73+
List<AppendStreamRequest> requests = Arrays.asList(
74+
new AppendStreamRequest(streamName1, events1.iterator(), StreamState.noStream()),
75+
new AppendStreamRequest(streamName2, events2.iterator(), StreamState.noStream())
76+
);
77+
78+
// Act
79+
MultiAppendWriteResult result = client.multiStreamAppend(AppendToStreamOptions.get(), requests.iterator()).get();
80+
81+
// Assert
5982
Assertions.assertTrue(result.getSuccesses().isPresent());
83+
Assertions.assertFalse(result.getSuccesses().get().isEmpty());
84+
85+
List<ResolvedEvent> readEvents1 = client.readStream(streamName1, ReadStreamOptions.get()).get().getEvents();
86+
Assertions.assertEquals(1, readEvents1.size());
87+
88+
ResolvedEvent readEvent1 = readEvents1.get(0);
89+
Assertions.assertEquals(event1.getEventType(), readEvent1.getEvent().getEventType());
90+
91+
byte[] readMetadata = readEvent1.getEvent().getUserMetadata();
92+
Assertions.assertNotNull(readMetadata);
93+
Assertions.assertTrue(readMetadata.length > 0);
94+
95+
Map deserializedMetadata = mapper.readValue(readMetadata, Map.class);
96+
Assertions.assertEquals(metadata.get("stringProperty"), deserializedMetadata.get("stringProperty"));
97+
Assertions.assertEquals(metadata.get("intProperty"), deserializedMetadata.get("intProperty"));
98+
Assertions.assertEquals(metadata.get("longProperty"), ((Number) deserializedMetadata.get("longProperty")).longValue());
99+
Assertions.assertEquals(metadata.get("booleanProperty"), deserializedMetadata.get("booleanProperty"));
100+
Assertions.assertEquals((Double) metadata.get("doubleProperty"), ((Number) deserializedMetadata.get("doubleProperty")).doubleValue(), 0.00001);
101+
Assertions.assertEquals(metadata.get("timestampProperty"), deserializedMetadata.get("timestampProperty"));
102+
Assertions.assertNull(deserializedMetadata.get("nullProperty"));
103+
104+
List<ResolvedEvent> readEvents2 = client.readStream(streamName2, ReadStreamOptions.get()).get().getEvents();
105+
Assertions.assertEquals(1, readEvents2.size());
106+
Assertions.assertEquals(event2.getEventType(), readEvents2.get(0).getEvent().getEventType());
60107
}
61108

62109
@Test
@@ -80,9 +127,8 @@ public void testMultiStreamAppendWhenUnsupported() throws ExecutionException, In
80127

81128
ExecutionException e = Assertions.assertThrows(
82129
ExecutionException.class,
83-
() -> client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get());
130+
() -> client.multiStreamAppend(AppendToStreamOptions.get(), requests.iterator()).get());
84131

85132
Assertions.assertInstanceOf(UnsupportedOperationException.class, e.getCause());
86133
}
87134
}
88-

0 commit comments

Comments
 (0)