Skip to content

Commit ff089fc

Browse files
committed
Take control of the metadata (#337)
1 parent bee823e commit ff089fc

File tree

6 files changed

+327
-27
lines changed

6 files changed

+327
-27
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.kurrent.dbclient;
2+
3+
import com.fasterxml.jackson.core.type.TypeReference;
4+
import com.fasterxml.jackson.databind.json.JsonMapper;
5+
import com.google.protobuf.ByteString;
6+
import com.google.protobuf.Timestamp;
7+
import com.google.protobuf.Duration;
8+
import io.kurrentdb.protocol.DynamicValue;
9+
10+
import java.time.Instant;
11+
import java.time.LocalDateTime;
12+
import java.time.ZonedDateTime;
13+
import java.util.Collections;
14+
import java.util.Map;
15+
import java.util.stream.Collectors;
16+
17+
/**
18+
* Utility class for converting Java objects to DynamicValue protobuf messages.
19+
*/
20+
public class DynamicValueMapper {
21+
private static final JsonMapper objectMapper = new JsonMapper();
22+
23+
/**
24+
* Converts JSON byte array metadata to a Map of DynamicValue objects.
25+
*
26+
* @param jsonMetadata the source metadata as JSON bytes
27+
* @return a map with DynamicValue objects
28+
*/
29+
public static Map<String, DynamicValue> mapJsonToDynamicValueMap(byte[] jsonMetadata) {
30+
if (jsonMetadata == null || jsonMetadata.length == 0)
31+
return Collections.emptyMap();
32+
33+
try {
34+
Map<String, Object> metadata = objectMapper.readValue(jsonMetadata, new TypeReference<Map<String, Object>>() {
35+
});
36+
return mapToDynamicValueMap(metadata);
37+
} catch (Exception e) {
38+
return Collections.emptyMap();
39+
}
40+
}
41+
42+
/**
43+
* Converts a Map of metadata to a Map of DynamicValue objects.
44+
*
45+
* @param metadata the source metadata map
46+
* @return a map with DynamicValue objects
47+
*/
48+
public static Map<String, DynamicValue> mapToDynamicValueMap(Map<String, Object> metadata) {
49+
if (metadata == null) {
50+
return Collections.emptyMap();
51+
}
52+
53+
return metadata.entrySet().stream()
54+
.collect(Collectors.toMap(
55+
Map.Entry::getKey,
56+
entry -> mapToDynamicValue(entry.getValue())
57+
));
58+
}
59+
60+
/**
61+
* Converts a Java object to a DynamicValue protobuf message.
62+
*
63+
* @param source the source object
64+
* @return the corresponding DynamicValue
65+
*/
66+
public static DynamicValue mapToDynamicValue(Object source) {
67+
if (source == null) {
68+
return DynamicValue.newBuilder()
69+
.setNullValue(com.google.protobuf.NullValue.NULL_VALUE)
70+
.build();
71+
}
72+
73+
DynamicValue.Builder builder = DynamicValue.newBuilder();
74+
75+
if (source instanceof String) {
76+
return builder.setStringValue((String) source).build();
77+
} else if (source instanceof Boolean) {
78+
return builder.setBooleanValue((Boolean) source).build();
79+
} else if (source instanceof Integer) {
80+
return builder.setInt32Value((Integer) source).build();
81+
} else if (source instanceof Long) {
82+
return builder.setInt64Value((Long) source).build();
83+
} else if (source instanceof Float) {
84+
return builder.setFloatValue((Float) source).build();
85+
} else if (source instanceof Double) {
86+
return builder.setDoubleValue((Double) source).build();
87+
} else if (source instanceof Instant) {
88+
Instant instant = (Instant) source;
89+
return builder.setTimestampValue(Timestamp.newBuilder()
90+
.setSeconds(instant.getEpochSecond())
91+
.setNanos(instant.getNano())
92+
.build()).build();
93+
} else if (source instanceof LocalDateTime) {
94+
LocalDateTime localDateTime = (LocalDateTime) source;
95+
Instant instant = localDateTime.atZone(java.time.ZoneOffset.UTC).toInstant();
96+
return builder.setTimestampValue(Timestamp.newBuilder()
97+
.setSeconds(instant.getEpochSecond())
98+
.setNanos(instant.getNano())
99+
.build()).build();
100+
} else if (source instanceof ZonedDateTime) {
101+
ZonedDateTime zonedDateTime = (ZonedDateTime) source;
102+
Instant instant = zonedDateTime.toInstant();
103+
return builder.setTimestampValue(Timestamp.newBuilder()
104+
.setSeconds(instant.getEpochSecond())
105+
.setNanos(instant.getNano())
106+
.build()).build();
107+
} else if (source instanceof java.time.Duration) {
108+
java.time.Duration duration = (java.time.Duration) source;
109+
return builder.setDurationValue(Duration.newBuilder()
110+
.setSeconds(duration.getSeconds())
111+
.setNanos(duration.getNano())
112+
.build()).build();
113+
} else if (source instanceof byte[]) {
114+
return builder.setBytesValue(ByteString.copyFrom((byte[]) source)).build();
115+
} else {
116+
// For any other type, convert to string
117+
return builder.setStringValue(source.toString()).build();
118+
}
119+
}
120+
}

src/main/java/io/kurrent/dbclient/EventData.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,4 @@ public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, b
9898
}
9999
}
100100

101+

src/main/java/io/kurrent/dbclient/EventDataBuilder.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.kurrent.dbclient;
22

3-
import com.fasterxml.jackson.core.JsonProcessingException;
4-
import com.fasterxml.jackson.databind.json.JsonMapper;
5-
63
import java.util.UUID;
74

85
/**

src/main/java/io/kurrent/dbclient/KurrentDBClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, AppendTo
7575
return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute();
7676
}
7777

78-
public CompletableFuture<MultiAppendWriteResult> multiAppend(AppendToStreamOptions options, Iterator<AppendStreamRequest> requests) {
78+
public CompletableFuture<MultiAppendWriteResult> multiStreamAppend(Iterator<AppendStreamRequest> requests) {
7979
return new MultiStreamAppend(this.getGrpcClient(), requests).execute();
8080
}
8181

src/main/java/io/kurrent/dbclient/MultiStreamAppend.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.ArrayList;
1313
import java.util.Iterator;
1414
import java.util.List;
15+
import java.util.Map;
1516
import java.util.concurrent.CompletableFuture;
1617

1718
class MultiStreamAppend {
@@ -42,22 +43,29 @@ private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args) {
4243
while (this.requests.hasNext()) {
4344
AppendStreamRequest request = this.requests.next();
4445
io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder()
46+
.setExpectedRevision(request.getExpectedState().toRawLong())
4547
.setStream(request.getStreamName());
4648

4749
while (request.getEvents().hasNext()) {
4850
EventData event = request.getEvents().next();
49-
builder.addRecords(AppendRecord.newBuilder()
51+
AppendRecord.Builder recordBuilder = AppendRecord.newBuilder()
5052
.setData(ByteString.copyFrom(event.getEventData()))
51-
.setRecordId(event.getEventId().toString())
52-
.putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue
53-
.newBuilder()
54-
.setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType()))
55-
.build())
56-
.putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue
57-
.newBuilder()
58-
.setStringValue(event.getEventType())
59-
.build())
53+
.setRecordId(event.getEventId().toString())
54+
.putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue
55+
.newBuilder()
56+
.setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType()))
57+
.build())
58+
.putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue
59+
.newBuilder()
60+
.setStringValue(event.getEventType())
6061
.build());
62+
63+
if (event.getUserMetadata() != null) {
64+
Map<String, DynamicValue> userMetadataProperties = DynamicValueMapper.mapJsonToDynamicValueMap(event.getUserMetadata());
65+
recordBuilder.putAllProperties(userMetadataProperties);
66+
}
67+
68+
builder.addRecords(recordBuilder.build());
6169
}
6270

6371
requestStream.onNext(builder.build());

0 commit comments

Comments
 (0)