Skip to content

Commit 2987c87

Browse files
committed
Refactor multi stream append to use v2 protocol (#347)
* Refactor multi stream append to use v2 protocol
1 parent 5366a91 commit 2987c87

29 files changed

+942
-1244
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ dependencies {
6060
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
6161
implementation "io.grpc:grpc-stub:${grpcVersion}"
6262
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
63+
implementation "com.google.api.grpc:grpc-google-common-protos:2.61.3"
6364
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
6465
implementation "org.slf4j:slf4j-api:2.0.17"
6566
implementation "org.bouncycastle:bcprov-jdk18on:1.80"

docs/api/appending-events.md

Lines changed: 3 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -255,23 +255,12 @@ This feature is only available in KurrentDB 25.1 and later.
255255

256256
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.
257257

258-
The `multiStreamAppend` method accepts a collection of `AppendStreamRequest` objects and returns a `MultiAppendWriteResult`. Each `AppendStreamRequest` contains:
259-
260-
- **streamName** - The name of the stream
261-
- **expectedState** - The expected state of the stream for optimistic concurrency control
262-
- **events** - A collection of `EventData` objects to append
263-
264-
The operation returns a `MultiAppendWriteResult` that contains either:
265-
- A list of `AppendStreamSuccess` objects if all streams were successfully updated
266-
- A list of `AppendStreamFailure` objects if any streams failed to update
267-
268258
::: warning
269-
Event metadata in `EventData` must be valid JSON objects. This requirement will
270-
be removed in a future major release.
259+
Currently, metadata must be valid JSON. Binary metadata will not be supported in
260+
this version. This limitation ensures compatibility with KurrentDB's metadata
261+
handling and will be removed in the next major release.
271262
:::
272263

273-
Here's a basic example of appending events to multiple streams:
274-
275264
```java
276265
JsonMapper mapper = new JsonMapper();
277266

@@ -306,87 +295,5 @@ List<AppendStreamRequest> requests = Arrays.asList(
306295
);
307296

308297
MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get();
309-
310-
if (result.getSuccesses().isPresent())
311-
result.getSuccesses().get().forEach(success -> {
312-
System.out.println(success.getStreamName() + " updated at " + success.getPosition());
313-
});
314298
```
315299

316-
If the operation doesn't succeed, you can handle the failures as follows:
317-
318-
```java
319-
if (result.getFailures().isPresent()) {
320-
MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor();
321-
result.getFailures().get().forEach(failure -> {
322-
failure.visit(visitor);
323-
324-
if (visitor.wasWrongExpectedRevisionVisited()) {
325-
System.out.println("Wrong revision for stream: " + failure.getStreamName());
326-
} else if (visitor.wasStreamDeletedVisited()) {
327-
System.out.println("Stream deleted: " + failure.getStreamName());
328-
} else if (visitor.wasAccessDenied()) {
329-
System.out.println("Access denied: " + failure.getStreamName());
330-
} else if (visitor.wasTransactionMaxSizeExceeded()) {
331-
System.out.println("Transaction too large: " + failure.getStreamName());
332-
} else {
333-
System.out.println("Unknown error: " + failure.getStreamName());
334-
}
335-
});
336-
}
337-
```
338-
339-
::: details Click here to see the implementaton of `MultiAppendErrorVisitor`
340-
341-
```java
342-
class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor {
343-
private boolean wrongExpectedRevisionVisited = false;
344-
private boolean streamDeletedVisited = false;
345-
private boolean transactionMaxSizeExceeded = false;
346-
private boolean accessDenied = false;
347-
private long actualRevision = -1;
348-
349-
@Override
350-
public void onAccessDenied(ErrorDetails.AccessDenied detail) {
351-
this.accessDenied = true;
352-
}
353-
354-
@Override
355-
public void onWrongExpectedRevision(long streamRevision) {
356-
this.wrongExpectedRevisionVisited = true;
357-
this.actualRevision = streamRevision;
358-
}
359-
360-
@Override
361-
public void onStreamDeleted() {
362-
this.streamDeletedVisited = true;
363-
}
364-
365-
@Override
366-
public void onTransactionMaxSizeExceeded(int maxSize) {
367-
this.transactionMaxSizeExceeded = true;
368-
}
369-
370-
public boolean wasWrongExpectedRevisionVisited() {
371-
return wrongExpectedRevisionVisited;
372-
}
373-
374-
public boolean wasStreamDeletedVisited() {
375-
return streamDeletedVisited;
376-
}
377-
378-
public boolean wasAccessDenied() {
379-
return accessDenied;
380-
}
381-
382-
public boolean wasTransactionMaxSizeExceeded() {
383-
return transactionMaxSizeExceeded;
384-
}
385-
386-
public long getActualRevision() {
387-
return actualRevision;
388-
}
389-
}
390-
```
391-
392-
:::
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.kurrent.dbclient;
2+
3+
public class AppendResponse {
4+
private final String stream;
5+
private final long streamRevision;
6+
7+
public AppendResponse(String stream, long streamRevision) {
8+
this.stream = stream;
9+
this.streamRevision = streamRevision;
10+
}
11+
12+
public String getStream() {
13+
return stream;
14+
}
15+
16+
public long getStreamRevision() {
17+
return streamRevision;
18+
}
19+
}

src/main/java/io/kurrent/dbclient/AppendStreamFailure.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/main/java/io/kurrent/dbclient/AppendToStream.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
66
import com.google.protobuf.ByteString;
77
import io.grpc.ManagedChannel;
8-
import io.grpc.Metadata;
9-
import io.grpc.StatusRuntimeException;
108
import io.grpc.stub.StreamObserver;
119

1210
import java.util.ArrayList;
@@ -112,16 +110,6 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
112110
.build());
113111
}
114112
requestStream.onCompleted();
115-
} catch (StatusRuntimeException e) {
116-
String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
117-
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));
118-
119-
if (leaderHost != null && leaderPort != null) {
120-
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
121-
result.completeExceptionally(reason);
122-
} else {
123-
result.completeExceptionally(e);
124-
}
125113
} catch (RuntimeException e) {
126114
result.completeExceptionally(e);
127115
}

src/main/java/io/kurrent/dbclient/ClientTelemetry.java

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import com.fasterxml.jackson.databind.node.ObjectNode;
66
import io.grpc.ManagedChannel;
77
import io.opentelemetry.api.GlobalOpenTelemetry;
8-
import io.opentelemetry.api.common.AttributeKey;
9-
import io.opentelemetry.api.common.Attributes;
108
import io.opentelemetry.api.trace.*;
119
import io.opentelemetry.context.Context;
1210
import io.opentelemetry.context.Scope;
@@ -16,8 +14,6 @@
1614
import java.util.concurrent.CompletionException;
1715
import java.util.function.BiFunction;
1816

19-
import static io.kurrentdb.protocol.streams.v2.AppendStreamFailure.*;
20-
2117
class ClientTelemetry {
2218
private static final ClientTelemetryTags DEFAULT_ATTRIBUTES = new ClientTelemetryTags() {{
2319
put(ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME);
@@ -123,8 +119,8 @@ static CompletableFuture<WriteResult> traceAppend(
123119
}
124120
}
125121

126-
static CompletableFuture<MultiAppendWriteResult> traceMultiStreamAppend(
127-
BiFunction<WorkItemArgs, Iterator<AppendStreamRequest>, CompletableFuture<MultiAppendWriteResult>> multiAppendOperation,
122+
static CompletableFuture<MultiStreamAppendResponse> traceMultiStreamAppend(
123+
BiFunction<WorkItemArgs, Iterator<AppendStreamRequest>, CompletableFuture<MultiStreamAppendResponse>> multiAppendOperation,
128124
WorkItemArgs args,
129125
Iterator<AppendStreamRequest> requests, KurrentDBClientSettings settings) {
130126

@@ -164,47 +160,8 @@ static CompletableFuture<MultiAppendWriteResult> traceMultiStreamAppend(
164160
span.end();
165161
throw new CompletionException(throwable);
166162
} else {
167-
if (result.getFailures().isPresent()) {
168-
for (AppendStreamFailure failure : result.getFailures().get()) {
169-
failure.visit(new MultiAppendStreamErrorVisitor() {
170-
@Override
171-
public void onWrongExpectedRevision(long streamRevision) {
172-
span.addEvent("exception", Attributes.of(
173-
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(),
174-
AttributeKey.longKey("exception.revision"), streamRevision
175-
));
176-
}
177-
178-
@Override
179-
public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) {
180-
span.addEvent("exception", Attributes.of(
181-
AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString()
182-
));
183-
}
184-
185-
@Override
186-
public void onStreamDeleted() {
187-
span.addEvent("exception", Attributes.of(
188-
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString()
189-
));
190-
}
191-
192-
@Override
193-
public void onTransactionMaxSizeExceeded(int maxSize) {
194-
span.addEvent("exception", Attributes.of(
195-
AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(),
196-
AttributeKey.longKey("exception.maxSize"), (long) maxSize
197-
));
198-
}
199-
});
200-
}
201-
span.setStatus(StatusCode.ERROR);
202-
span.end();
203-
} else if (result.getSuccesses().isPresent()) {
204-
span.setStatus(StatusCode.OK);
205-
span.end();
206-
}
207-
163+
span.setStatus(StatusCode.OK);
164+
span.end();
208165
return result;
209166
}
210167
});
Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package io.kurrent.dbclient;
22

3+
import io.kurrentdb.protocol.v2.streams.SchemaFormat;
4+
35
import java.util.Collections;
46
import java.util.HashMap;
57
import java.util.Map;
68

79
public class ContentTypeMapper {
8-
private static final Map<String, String> CONTENT_TYPE_MAP;
10+
private static final Map<String, SchemaFormat> CONTENT_TYPE_MAP;
911

1012
static {
11-
Map<String, String> map = new HashMap<>();
12-
map.put("application/json", "Json");
13-
map.put("application/octet-stream", "Binary");
13+
Map<String, SchemaFormat> map = new HashMap<>();
14+
map.put("application/json", SchemaFormat.SCHEMA_FORMAT_JSON);
15+
map.put("application/octet-stream", SchemaFormat.SCHEMA_FORMAT_BYTES);
1416
CONTENT_TYPE_MAP = Collections.unmodifiableMap(map);
1517
}
1618

17-
public static String toSchemaDataFormat(String contentType) {
18-
return CONTENT_TYPE_MAP.getOrDefault(contentType, contentType);
19+
public static SchemaFormat toSchemaDataFormat(String contentType) {
20+
return CONTENT_TYPE_MAP.getOrDefault(contentType, SchemaFormat.SCHEMA_FORMAT_UNSPECIFIED);
1921
}
2022
}

0 commit comments

Comments
 (0)