Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies {
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "com.google.api.grpc:grpc-google-common-protos:2.61.3"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "org.slf4j:slf4j-api:2.0.17"
implementation "org.bouncycastle:bcprov-jdk18on:1.80"
Expand Down
99 changes: 3 additions & 96 deletions docs/api/appending-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,12 @@ This feature is only available in KurrentDB 25.1 and later.

You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.

The `multiStreamAppend` method accepts a collection of `AppendStreamRequest` objects and returns a `MultiAppendWriteResult`. Each `AppendStreamRequest` contains:

- **streamName** - The name of the stream
- **expectedState** - The expected state of the stream for optimistic concurrency control
- **events** - A collection of `EventData` objects to append

The operation returns a `MultiAppendWriteResult` that contains either:
- A list of `AppendStreamSuccess` objects if all streams were successfully updated
- A list of `AppendStreamFailure` objects if any streams failed to update

::: warning
Event metadata in `EventData` must be valid JSON objects. This requirement will
be removed in a future major release.
Currently, metadata must be valid JSON. Binary metadata will not be supported in
this version. This limitation ensures compatibility with KurrentDB's metadata
handling and will be removed in the next major release.
:::

Here's a basic example of appending events to multiple streams:

```java
JsonMapper mapper = new JsonMapper();

Expand Down Expand Up @@ -306,87 +295,5 @@ List<AppendStreamRequest> requests = Arrays.asList(
);

MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get();

if (result.getSuccesses().isPresent())
result.getSuccesses().get().forEach(success -> {
System.out.println(success.getStreamName() + " updated at " + success.getPosition());
});
```

If the operation doesn't succeed, you can handle the failures as follows:

```java
if (result.getFailures().isPresent()) {
MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor();
result.getFailures().get().forEach(failure -> {
failure.visit(visitor);

if (visitor.wasWrongExpectedRevisionVisited()) {
System.out.println("Wrong revision for stream: " + failure.getStreamName());
} else if (visitor.wasStreamDeletedVisited()) {
System.out.println("Stream deleted: " + failure.getStreamName());
} else if (visitor.wasAccessDenied()) {
System.out.println("Access denied: " + failure.getStreamName());
} else if (visitor.wasTransactionMaxSizeExceeded()) {
System.out.println("Transaction too large: " + failure.getStreamName());
} else {
System.out.println("Unknown error: " + failure.getStreamName());
}
});
}
```

::: details Click here to see the implementaton of `MultiAppendErrorVisitor`

```java
class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor {
private boolean wrongExpectedRevisionVisited = false;
private boolean streamDeletedVisited = false;
private boolean transactionMaxSizeExceeded = false;
private boolean accessDenied = false;
private long actualRevision = -1;

@Override
public void onAccessDenied(ErrorDetails.AccessDenied detail) {
this.accessDenied = true;
}

@Override
public void onWrongExpectedRevision(long streamRevision) {
this.wrongExpectedRevisionVisited = true;
this.actualRevision = streamRevision;
}

@Override
public void onStreamDeleted() {
this.streamDeletedVisited = true;
}

@Override
public void onTransactionMaxSizeExceeded(int maxSize) {
this.transactionMaxSizeExceeded = true;
}

public boolean wasWrongExpectedRevisionVisited() {
return wrongExpectedRevisionVisited;
}

public boolean wasStreamDeletedVisited() {
return streamDeletedVisited;
}

public boolean wasAccessDenied() {
return accessDenied;
}

public boolean wasTransactionMaxSizeExceeded() {
return transactionMaxSizeExceeded;
}

public long getActualRevision() {
return actualRevision;
}
}
```

:::
19 changes: 19 additions & 0 deletions src/main/java/io/kurrent/dbclient/AppendResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.kurrent.dbclient;

public class AppendResponse {
private final String stream;
private final long streamRevision;

public AppendResponse(String stream, long streamRevision) {
this.stream = stream;
this.streamRevision = streamRevision;
}

public String getStream() {
return stream;
}

public long getStreamRevision() {
return streamRevision;
}
}
38 changes: 0 additions & 38 deletions src/main/java/io/kurrent/dbclient/AppendStreamFailure.java

This file was deleted.

21 changes: 0 additions & 21 deletions src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java

This file was deleted.

12 changes: 0 additions & 12 deletions src/main/java/io/kurrent/dbclient/AppendToStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
Expand Down Expand Up @@ -112,16 +110,6 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
.build());
}
requestStream.onCompleted();
} catch (StatusRuntimeException e) {
String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
result.completeExceptionally(reason);
} else {
result.completeExceptionally(e);
}
} catch (RuntimeException e) {
result.completeExceptionally(e);
}
Expand Down
51 changes: 4 additions & 47 deletions src/main/java/io/kurrent/dbclient/ClientTelemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.grpc.ManagedChannel;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.*;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
Expand All @@ -16,8 +14,6 @@
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;

import static io.kurrentdb.protocol.streams.v2.AppendStreamFailure.*;

class ClientTelemetry {
private static final ClientTelemetryTags DEFAULT_ATTRIBUTES = new ClientTelemetryTags() {{
put(ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME);
Expand Down Expand Up @@ -123,8 +119,8 @@ static CompletableFuture<WriteResult> traceAppend(
}
}

static CompletableFuture<MultiAppendWriteResult> traceMultiStreamAppend(
BiFunction<WorkItemArgs, Iterator<AppendStreamRequest>, CompletableFuture<MultiAppendWriteResult>> multiAppendOperation,
static CompletableFuture<MultiStreamAppendResponse> traceMultiStreamAppend(
BiFunction<WorkItemArgs, Iterator<AppendStreamRequest>, CompletableFuture<MultiStreamAppendResponse>> multiAppendOperation,
WorkItemArgs args,
Iterator<AppendStreamRequest> requests, KurrentDBClientSettings settings) {

Expand Down Expand Up @@ -164,47 +160,8 @@ static CompletableFuture<MultiAppendWriteResult> traceMultiStreamAppend(
span.end();
throw new CompletionException(throwable);
} else {
if (result.getFailures().isPresent()) {
for (AppendStreamFailure failure : result.getFailures().get()) {
failure.visit(new MultiAppendStreamErrorVisitor() {
@Override
public void onWrongExpectedRevision(long streamRevision) {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(),
AttributeKey.longKey("exception.revision"), streamRevision
));
}

@Override
public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString()
));
}

@Override
public void onStreamDeleted() {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString()
));
}

@Override
public void onTransactionMaxSizeExceeded(int maxSize) {
span.addEvent("exception", Attributes.of(
AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(),
AttributeKey.longKey("exception.maxSize"), (long) maxSize
));
}
});
}
span.setStatus(StatusCode.ERROR);
span.end();
} else if (result.getSuccesses().isPresent()) {
span.setStatus(StatusCode.OK);
span.end();
}

span.setStatus(StatusCode.OK);
span.end();
return result;
}
});
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/io/kurrent/dbclient/ContentTypeMapper.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package io.kurrent.dbclient;

import io.kurrentdb.protocol.v2.streams.SchemaFormat;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ContentTypeMapper {
private static final Map<String, String> CONTENT_TYPE_MAP;
private static final Map<String, SchemaFormat> CONTENT_TYPE_MAP;

static {
Map<String, String> map = new HashMap<>();
map.put("application/json", "Json");
map.put("application/octet-stream", "Binary");
Map<String, SchemaFormat> map = new HashMap<>();
map.put("application/json", SchemaFormat.SCHEMA_FORMAT_JSON);
map.put("application/octet-stream", SchemaFormat.SCHEMA_FORMAT_BYTES);
CONTENT_TYPE_MAP = Collections.unmodifiableMap(map);
}

public static String toSchemaDataFormat(String contentType) {
return CONTENT_TYPE_MAP.getOrDefault(contentType, contentType);
public static SchemaFormat toSchemaDataFormat(String contentType) {
return CONTENT_TYPE_MAP.getOrDefault(contentType, SchemaFormat.SCHEMA_FORMAT_UNSPECIFIED);
}
}
Loading
Loading