From ccd0104916257307c7bea88addf7e888d88fe4b9 Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 7 Oct 2025 14:08:28 +0400 Subject: [PATCH 1/2] Refactor multi stream append to use v2 protocol --- build.gradle | 1 + docs/api/appending-events.md | 99 +--- .../io/kurrent/dbclient/AppendResponse.java | 19 + .../kurrent/dbclient/AppendStreamFailure.java | 38 -- .../kurrent/dbclient/AppendStreamSuccess.java | 21 - .../io/kurrent/dbclient/AppendToStream.java | 12 - .../io/kurrent/dbclient/ClientTelemetry.java | 51 +- .../kurrent/dbclient/ContentTypeMapper.java | 14 +- .../kurrent/dbclient/DynamicValueMapper.java | 68 ++- .../java/io/kurrent/dbclient/GrpcUtils.java | 27 +- .../io/kurrent/dbclient/KurrentDBClient.java | 2 +- .../MultiAppendStreamErrorVisitor.java | 10 - .../dbclient/MultiAppendWriteResult.java | 22 - .../kurrent/dbclient/MultiStreamAppend.java | 187 ++++--- .../dbclient/MultiStreamAppendResponse.java | 21 + .../dbclient/RecordSizeExceededException.java | 50 ++ .../io/kurrent/dbclient/ServerFeatures.java | 2 +- .../dbclient/StreamTombstonedException.java | 18 + .../TransactionMaxSizeExceededException.java | 30 + .../proto/kurrentdb/protocol/v1/status.proto | 15 +- .../proto/kurrentdb/protocol/v2/core.proto | 82 --- .../proto/kurrentdb/protocol/v2/errors.proto | 150 +++++ .../protocol/v2/features/service.proto | 144 ----- .../proto/kurrentdb/protocol/v2/rpc.proto | 94 ++++ .../protocol/v2/streams/errors.proto | 130 +++++ .../protocol/v2/streams/shared.proto | 118 ---- .../protocol/v2/streams/streams.proto | 512 +++--------------- .../dbclient/MultiStreamAppendTests.java | 89 +-- .../StreamsTracingInstrumentationTests.java | 30 +- 29 files changed, 819 insertions(+), 1237 deletions(-) create mode 100644 src/main/java/io/kurrent/dbclient/AppendResponse.java delete mode 100644 src/main/java/io/kurrent/dbclient/AppendStreamFailure.java delete mode 100644 src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java delete mode 100644 src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java delete mode 100644 src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java create mode 100644 src/main/java/io/kurrent/dbclient/MultiStreamAppendResponse.java create mode 100644 src/main/java/io/kurrent/dbclient/RecordSizeExceededException.java create mode 100644 src/main/java/io/kurrent/dbclient/StreamTombstonedException.java create mode 100644 src/main/java/io/kurrent/dbclient/TransactionMaxSizeExceededException.java delete mode 100644 src/main/proto/kurrentdb/protocol/v2/core.proto create mode 100644 src/main/proto/kurrentdb/protocol/v2/errors.proto delete mode 100644 src/main/proto/kurrentdb/protocol/v2/features/service.proto create mode 100644 src/main/proto/kurrentdb/protocol/v2/rpc.proto create mode 100644 src/main/proto/kurrentdb/protocol/v2/streams/errors.proto delete mode 100644 src/main/proto/kurrentdb/protocol/v2/streams/shared.proto diff --git a/build.gradle b/build.gradle index a84cd8b5..4220a7dc 100644 --- a/build.gradle +++ b/build.gradle @@ -60,6 +60,7 @@ dependencies { implementation "io.grpc:grpc-netty-shaded:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" implementation "io.grpc:grpc-protobuf:${grpcVersion}" + implementation "com.google.api.grpc:grpc-google-common-protos:2.61.3" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" implementation "org.slf4j:slf4j-api:2.0.17" implementation "org.bouncycastle:bcprov-jdk18on:1.80" diff --git a/docs/api/appending-events.md b/docs/api/appending-events.md index 6b00046a..652209f8 100644 --- a/docs/api/appending-events.md +++ b/docs/api/appending-events.md @@ -255,23 +255,12 @@ This feature is only available in KurrentDB 25.1 and later. You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails. -The `multiStreamAppend` method accepts a collection of `AppendStreamRequest` objects and returns a `MultiAppendWriteResult`. Each `AppendStreamRequest` contains: - -- **streamName** - The name of the stream -- **expectedState** - The expected state of the stream for optimistic concurrency control -- **events** - A collection of `EventData` objects to append - -The operation returns a `MultiAppendWriteResult` that contains either: -- A list of `AppendStreamSuccess` objects if all streams were successfully updated -- A list of `AppendStreamFailure` objects if any streams failed to update - ::: warning -Event metadata in `EventData` must be valid JSON objects. This requirement will -be removed in a future major release. +Currently, metadata must be valid JSON. Binary metadata will not be supported in +this version. This limitation ensures compatibility with KurrentDB's metadata +handling and will be removed in the next major release. ::: -Here's a basic example of appending events to multiple streams: - ```java JsonMapper mapper = new JsonMapper(); @@ -306,87 +295,5 @@ List requests = Arrays.asList( ); MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); - -if (result.getSuccesses().isPresent()) - result.getSuccesses().get().forEach(success -> { - System.out.println(success.getStreamName() + " updated at " + success.getPosition()); - }); ``` -If the operation doesn't succeed, you can handle the failures as follows: - -```java -if (result.getFailures().isPresent()) { - MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor(); - result.getFailures().get().forEach(failure -> { - failure.visit(visitor); - - if (visitor.wasWrongExpectedRevisionVisited()) { - System.out.println("Wrong revision for stream: " + failure.getStreamName()); - } else if (visitor.wasStreamDeletedVisited()) { - System.out.println("Stream deleted: " + failure.getStreamName()); - } else if (visitor.wasAccessDenied()) { - System.out.println("Access denied: " + failure.getStreamName()); - } else if (visitor.wasTransactionMaxSizeExceeded()) { - System.out.println("Transaction too large: " + failure.getStreamName()); - } else { - System.out.println("Unknown error: " + failure.getStreamName()); - } - }); -} -``` - -::: details Click here to see the implementaton of `MultiAppendErrorVisitor` - -```java -class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor { - private boolean wrongExpectedRevisionVisited = false; - private boolean streamDeletedVisited = false; - private boolean transactionMaxSizeExceeded = false; - private boolean accessDenied = false; - private long actualRevision = -1; - - @Override - public void onAccessDenied(ErrorDetails.AccessDenied detail) { - this.accessDenied = true; - } - - @Override - public void onWrongExpectedRevision(long streamRevision) { - this.wrongExpectedRevisionVisited = true; - this.actualRevision = streamRevision; - } - - @Override - public void onStreamDeleted() { - this.streamDeletedVisited = true; - } - - @Override - public void onTransactionMaxSizeExceeded(int maxSize) { - this.transactionMaxSizeExceeded = true; - } - - public boolean wasWrongExpectedRevisionVisited() { - return wrongExpectedRevisionVisited; - } - - public boolean wasStreamDeletedVisited() { - return streamDeletedVisited; - } - - public boolean wasAccessDenied() { - return accessDenied; - } - - public boolean wasTransactionMaxSizeExceeded() { - return transactionMaxSizeExceeded; - } - - public long getActualRevision() { - return actualRevision; - } -} -``` - -::: diff --git a/src/main/java/io/kurrent/dbclient/AppendResponse.java b/src/main/java/io/kurrent/dbclient/AppendResponse.java new file mode 100644 index 00000000..0f702be2 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/AppendResponse.java @@ -0,0 +1,19 @@ +package io.kurrent.dbclient; + +public class AppendResponse { + private final String stream; + private final long streamRevision; + + public AppendResponse(String stream, long streamRevision) { + this.stream = stream; + this.streamRevision = streamRevision; + } + + public String getStream() { + return stream; + } + + public long getStreamRevision() { + return streamRevision; + } +} diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java deleted file mode 100644 index 887ec5b4..00000000 --- a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.kurrent.dbclient; - -import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; - -public class AppendStreamFailure { - private final io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner; - - AppendStreamFailure(io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner) { - this.inner = inner; - } - - public String getStreamName() { - return this.inner.getStream(); - } - - public void visit(MultiAppendStreamErrorVisitor visitor) { - if (this.inner.getErrorCase() == ErrorCase.STREAM_REVISION_CONFLICT) { - visitor.onWrongExpectedRevision(this.inner.getStreamRevisionConflict().getStreamRevision()); - return; - } - - if (this.inner.getErrorCase() == ErrorCase.ACCESS_DENIED) { - visitor.onAccessDenied(this.inner.getAccessDenied()); - } - - if (this.inner.getErrorCase() == ErrorCase.STREAM_DELETED) { - visitor.onStreamDeleted(); - return; - } - - if (this.inner.getErrorCase() == ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { - visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize()); - return; - } - - throw new IllegalArgumentException("Append failure does not match any known error type"); - } -} diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java b/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java deleted file mode 100644 index be55e3ba..00000000 --- a/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.kurrent.dbclient; - -public class AppendStreamSuccess { - private final io.kurrentdb.protocol.streams.v2.AppendStreamSuccess inner; - - AppendStreamSuccess(io.kurrentdb.protocol.streams.v2.AppendStreamSuccess inner) { - this.inner = inner; - } - - public String getStreamName() { - return this.inner.getStream(); - } - - public long getStreamRevision() { - return this.inner.getStreamRevision(); - } - - public long getPosition() { - return this.inner.getPosition(); - } -} diff --git a/src/main/java/io/kurrent/dbclient/AppendToStream.java b/src/main/java/io/kurrent/dbclient/AppendToStream.java index 11ef0060..4302420e 100644 --- a/src/main/java/io/kurrent/dbclient/AppendToStream.java +++ b/src/main/java/io/kurrent/dbclient/AppendToStream.java @@ -5,8 +5,6 @@ import io.kurrent.dbclient.proto.streams.StreamsOuterClass; import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.ArrayList; @@ -112,16 +110,6 @@ private CompletableFuture append(ManagedChannel channel, List traceAppend( } } - static CompletableFuture traceMultiStreamAppend( - BiFunction, CompletableFuture> multiAppendOperation, + static CompletableFuture traceMultiStreamAppend( + BiFunction, CompletableFuture> multiAppendOperation, WorkItemArgs args, Iterator requests, KurrentDBClientSettings settings) { @@ -164,47 +160,8 @@ static CompletableFuture traceMultiStreamAppend( span.end(); throw new CompletionException(throwable); } else { - if (result.getFailures().isPresent()) { - for (AppendStreamFailure failure : result.getFailures().get()) { - failure.visit(new MultiAppendStreamErrorVisitor() { - @Override - public void onWrongExpectedRevision(long streamRevision) { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(), - AttributeKey.longKey("exception.revision"), streamRevision - )); - } - - @Override - public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString() - )); - } - - @Override - public void onStreamDeleted() { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString() - )); - } - - @Override - public void onTransactionMaxSizeExceeded(int maxSize) { - span.addEvent("exception", Attributes.of( - AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(), - AttributeKey.longKey("exception.maxSize"), (long) maxSize - )); - } - }); - } - span.setStatus(StatusCode.ERROR); - span.end(); - } else if (result.getSuccesses().isPresent()) { - span.setStatus(StatusCode.OK); - span.end(); - } - + span.setStatus(StatusCode.OK); + span.end(); return result; } }); diff --git a/src/main/java/io/kurrent/dbclient/ContentTypeMapper.java b/src/main/java/io/kurrent/dbclient/ContentTypeMapper.java index a74bb316..cd4b2db3 100644 --- a/src/main/java/io/kurrent/dbclient/ContentTypeMapper.java +++ b/src/main/java/io/kurrent/dbclient/ContentTypeMapper.java @@ -1,20 +1,22 @@ package io.kurrent.dbclient; +import io.kurrentdb.protocol.v2.streams.SchemaFormat; + import java.util.Collections; import java.util.HashMap; import java.util.Map; public class ContentTypeMapper { - private static final Map CONTENT_TYPE_MAP; + private static final Map CONTENT_TYPE_MAP; static { - Map map = new HashMap<>(); - map.put("application/json", "Json"); - map.put("application/octet-stream", "Binary"); + Map map = new HashMap<>(); + map.put("application/json", SchemaFormat.SCHEMA_FORMAT_JSON); + map.put("application/octet-stream", SchemaFormat.SCHEMA_FORMAT_BYTES); CONTENT_TYPE_MAP = Collections.unmodifiableMap(map); } - public static String toSchemaDataFormat(String contentType) { - return CONTENT_TYPE_MAP.getOrDefault(contentType, contentType); + public static SchemaFormat toSchemaDataFormat(String contentType) { + return CONTENT_TYPE_MAP.getOrDefault(contentType, SchemaFormat.SCHEMA_FORMAT_UNSPECIFIED); } } diff --git a/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java b/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java index af4415bf..ab84d552 100644 --- a/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java +++ b/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java @@ -5,7 +5,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.protobuf.Duration; -import io.kurrentdb.protocol.DynamicValue; +import com.google.protobuf.Value; import java.time.Instant; import java.time.LocalDateTime; @@ -26,14 +26,14 @@ public class DynamicValueMapper { * @param jsonMetadata the source metadata as JSON bytes * @return a map with DynamicValue objects */ - public static Map mapJsonToDynamicValueMap(byte[] jsonMetadata) { + public static Map mapJsonToValueMap(byte[] jsonMetadata) { if (jsonMetadata == null || jsonMetadata.length == 0) return Collections.emptyMap(); try { Map metadata = objectMapper.readValue(jsonMetadata, new TypeReference>() { }); - return mapToDynamicValueMap(metadata); + return mapToValueMap(metadata); } catch (Exception e) { return Collections.emptyMap(); } @@ -45,7 +45,7 @@ public static Map mapJsonToDynamicValueMap(byte[] jsonMeta * @param metadata the source metadata map * @return a map with DynamicValue objects */ - public static Map mapToDynamicValueMap(Map metadata) { + public static Map mapToValueMap(Map metadata) { if (metadata == null) { return Collections.emptyMap(); } @@ -53,7 +53,7 @@ public static Map mapToDynamicValueMap(Map return metadata.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, - entry -> mapToDynamicValue(entry.getValue()) + entry -> mapToValue(entry.getValue()) )); } @@ -63,55 +63,63 @@ public static Map mapToDynamicValueMap(Map * @param source the source object * @return the corresponding DynamicValue */ - public static DynamicValue mapToDynamicValue(Object source) { + public static Value mapToValue(Object source) { if (source == null) { - return DynamicValue.newBuilder() + return Value.newBuilder() .setNullValue(com.google.protobuf.NullValue.NULL_VALUE) .build(); } - DynamicValue.Builder builder = DynamicValue.newBuilder(); + Value.Builder builder = Value.newBuilder(); if (source instanceof String) { return builder.setStringValue((String) source).build(); } else if (source instanceof Boolean) { - return builder.setBooleanValue((Boolean) source).build(); + return builder.setBoolValue((Boolean) source).build(); } else if (source instanceof Integer) { - return builder.setInt32Value((Integer) source).build(); + return builder.setNumberValue((Integer) source).build(); } else if (source instanceof Long) { - return builder.setInt64Value((Long) source).build(); + return builder.setNumberValue((Long) source).build(); } else if (source instanceof Float) { - return builder.setFloatValue((Float) source).build(); + return builder.setNumberValue((Float) source).build(); } else if (source instanceof Double) { - return builder.setDoubleValue((Double) source).build(); + return builder.setNumberValue((Double) source).build(); } else if (source instanceof Instant) { Instant instant = (Instant) source; - return builder.setTimestampValue(Timestamp.newBuilder() - .setSeconds(instant.getEpochSecond()) - .setNanos(instant.getNano()) - .build()).build(); + return builder.setStringValue( + Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build().toString() + ).build(); } else if (source instanceof LocalDateTime) { LocalDateTime localDateTime = (LocalDateTime) source; Instant instant = localDateTime.atZone(java.time.ZoneOffset.UTC).toInstant(); - return builder.setTimestampValue(Timestamp.newBuilder() - .setSeconds(instant.getEpochSecond()) - .setNanos(instant.getNano()) - .build()).build(); + return builder.setStringValue( + Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build().toString() + ).build(); } else if (source instanceof ZonedDateTime) { ZonedDateTime zonedDateTime = (ZonedDateTime) source; Instant instant = zonedDateTime.toInstant(); - return builder.setTimestampValue(Timestamp.newBuilder() - .setSeconds(instant.getEpochSecond()) - .setNanos(instant.getNano()) - .build()).build(); + return builder.setStringValue( + Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build().toString() + ).build(); } else if (source instanceof java.time.Duration) { java.time.Duration duration = (java.time.Duration) source; - return builder.setDurationValue(Duration.newBuilder() - .setSeconds(duration.getSeconds()) - .setNanos(duration.getNano()) - .build()).build(); + return builder.setStringValue( + Duration.newBuilder() + .setSeconds(duration.getSeconds()) + .setNanos(duration.getNano()) + .build().toString() + ).build(); } else if (source instanceof byte[]) { - return builder.setBytesValue(ByteString.copyFrom((byte[]) source)).build(); + return builder.setStringValue(ByteString.copyFrom((byte[]) source).toStringUtf8()).build(); } else { // For any other type, convert to string return builder.setStringValue(source.toString()).build(); diff --git a/src/main/java/io/kurrent/dbclient/GrpcUtils.java b/src/main/java/io/kurrent/dbclient/GrpcUtils.java index 2779f9b2..60f5b8c8 100644 --- a/src/main/java/io/kurrent/dbclient/GrpcUtils.java +++ b/src/main/java/io/kurrent/dbclient/GrpcUtils.java @@ -12,6 +12,26 @@ import java.util.concurrent.TimeUnit; final class GrpcUtils { + static boolean handleNotLeaderError(Throwable t, CompletableFuture dest) { + if (t instanceof StatusRuntimeException) { + StatusRuntimeException e = (StatusRuntimeException) t; + Metadata trailers = e.getTrailers(); + + if (trailers != null) { + String leaderHost = trailers.get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + NotLeaderException reason = new NotLeaderException(leaderHost, Integer.parseInt(leaderPort)); + dest.completeExceptionally(reason); + return true; + } + } + } + + return false; + } + static public ClientResponseObserver convertSingleResponse( CompletableFuture dest) { @@ -73,12 +93,7 @@ public void onError(Throwable t) { } } - String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); - String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); - - if (leaderHost != null && leaderPort != null) { - NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); - dest.completeExceptionally(reason); + if (handleNotLeaderError(t, dest)) { return; } } diff --git a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index 2dc0c679..d3d1b557 100644 --- a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -75,7 +75,7 @@ public CompletableFuture appendToStream(String streamName, AppendTo return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); } - public CompletableFuture multiStreamAppend(Iterator requests) { + public CompletableFuture multiStreamAppend(Iterator requests) { return new MultiStreamAppend(this.getGrpcClient(), requests).execute(); } diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java b/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java deleted file mode 100644 index 97b360ff..00000000 --- a/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java +++ /dev/null @@ -1,10 +0,0 @@ -package io.kurrent.dbclient; - -import io.kurrentdb.protocol.streams.v2.ErrorDetails; - -public interface MultiAppendStreamErrorVisitor { - default void onWrongExpectedRevision(long streamRevision) {} - default void onAccessDenied(ErrorDetails.AccessDenied detail) {} - default void onStreamDeleted() {} - default void onTransactionMaxSizeExceeded(int maxSize) {} -} diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java b/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java deleted file mode 100644 index ad9f1dad..00000000 --- a/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.kurrent.dbclient; - -import java.util.List; -import java.util.Optional; - -public class MultiAppendWriteResult { - private final List successes; - private final List failures; - - public MultiAppendWriteResult(List successes, List failures) { - this.successes = successes; - this.failures = failures; - } - - public Optional> getSuccesses() { - return Optional.ofNullable(successes); - } - - public Optional> getFailures() { - return Optional.ofNullable(failures); - } -} diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index 0210d9b3..e544eaba 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -1,20 +1,27 @@ package io.kurrent.dbclient; import com.google.protobuf.ByteString; -import io.grpc.Metadata; +import com.google.protobuf.Value; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import io.kurrentdb.protocol.DynamicValue; -import io.kurrentdb.protocol.streams.v2.AppendRecord; -import io.kurrentdb.protocol.streams.v2.MultiStreamAppendResponse; -import io.kurrentdb.protocol.streams.v2.StreamsServiceGrpc; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.ClientCallStreamObserver; +import io.kurrentdb.protocol.v2.streams.AppendRecord; +import io.kurrentdb.protocol.v2.streams.AppendRequest; +import io.kurrentdb.protocol.v2.streams.AppendSessionResponse; +import io.kurrentdb.protocol.v2.streams.SchemaInfo; +import io.kurrentdb.protocol.v2.streams.StreamsServiceGrpc; + +import io.kurrentdb.protocol.v2.streams.errors.*; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import com.google.protobuf.Any; +import io.grpc.protobuf.StatusProto; + class MultiStreamAppend { private final GrpcClient client; private final Iterator requests; @@ -24,7 +31,7 @@ public MultiStreamAppend(GrpcClient client, Iterator reques this.requests = requests; } - public CompletableFuture execute() { + public CompletableFuture execute() { return this.client.runWithArgs(args -> ClientTelemetry.traceMultiStreamAppend( this::append, args, @@ -32,88 +39,132 @@ public CompletableFuture execute() { this.client.getSettings())); } - private CompletableFuture append(WorkItemArgs args, Iterator requests) { - CompletableFuture result = new CompletableFuture<>(); + private CompletableFuture append(WorkItemArgs args, Iterator requests) { + CompletableFuture result = new CompletableFuture<>(); if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) { result.completeExceptionally(new UnsupportedOperationException("Multi-stream append is not supported by the server")); return result; } - StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(args.getChannel()), this.client.getSettings(), new OptionsBase<>(), null, false); - StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); + StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(args.getChannel()), this.client.getSettings(), new OptionsBase<>(), null, false); + StreamObserver requestStream = client.appendSession(new MultiStreamAppendObserver(result)); - try { - while (requests.hasNext()) { + try { + while (requests.hasNext()) { AppendStreamRequest request = requests.next(); - io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder() + AppendRequest.Builder builder = AppendRequest.newBuilder() .setExpectedRevision(request.getExpectedState().toRawLong()) .setStream(request.getStreamName()); - while (request.getEvents().hasNext()) { - EventData event = request.getEvents().next(); - AppendRecord.Builder recordBuilder = AppendRecord.newBuilder() - .setData(ByteString.copyFrom(event.getEventData())) - .setRecordId(event.getEventId().toString()) - .putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue - .newBuilder() - .setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType())) - .build()) - .putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue - .newBuilder() - .setStringValue(event.getEventType()) - .build()); - - if (event.getUserMetadata() != null) { - Map userMetadataProperties = DynamicValueMapper.mapJsonToDynamicValueMap(event.getUserMetadata()); - recordBuilder.putAllProperties(userMetadataProperties); - } - - builder.addRecords(recordBuilder.build()); - } - - requestStream.onNext(builder.build()); - } - - requestStream.onCompleted(); - } catch (StatusRuntimeException e) { - String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); - String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); - - if (leaderHost != null && leaderPort != null) { - NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); - requestStream.onError(reason); - result.completeExceptionally(reason); - } else { - requestStream.onError(e); - result.completeExceptionally(e); - } - } catch (RuntimeException e) { - requestStream.onError(e); - result.completeExceptionally(e); - } + while (request.getEvents().hasNext()) { + EventData event = request.getEvents().next(); + AppendRecord.Builder recordBuilder = AppendRecord.newBuilder() + .setData(ByteString.copyFrom(event.getEventData())) + .setRecordId(event.getEventId().toString()) + .setSchema( + SchemaInfo.newBuilder() + .setFormat(ContentTypeMapper.toSchemaDataFormat(event.getContentType())) + .setName(event.getEventType()) + ); + + if (event.getUserMetadata() != null) { + Map userMetadataProperties = DynamicValueMapper.mapJsonToValueMap(event.getUserMetadata()); + recordBuilder.putAllProperties(userMetadataProperties); + } + builder.addRecords(recordBuilder.build()); + } + + requestStream.onNext(builder.build()); + } + + requestStream.onCompleted(); + } catch (RuntimeException e) { + result.completeExceptionally(e); + } return result; } - public MultiAppendWriteResult onResponse(MultiStreamAppendResponse response) { - List failures = null; - List successes = null; + private MultiStreamAppendResponse onResponse(AppendSessionResponse response) { + List results = new java.util.ArrayList<>(response.getOutputCount()); + + for (io.kurrentdb.protocol.v2.streams.AppendResponse output : response.getOutputList()) { + results.add(new AppendResponse(output.getStream(), output.getStreamRevision())); + } + + return new MultiStreamAppendResponse(response.getPosition(), results); + } + + private class MultiStreamAppendObserver implements ClientResponseObserver { + private final CompletableFuture result; + + public MultiStreamAppendObserver(CompletableFuture result) { + this.result = result; + } - if (response.hasFailure()) { - failures = new ArrayList<>(response.getFailure().getOutputCount()); + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + } - for (io.kurrentdb.protocol.streams.v2.AppendStreamFailure failure : response.getFailure().getOutputList()) { - failures.add(new AppendStreamFailure(failure)); + @Override + public void onNext(AppendSessionResponse response) { + try { + MultiStreamAppendResponse converted = onResponse(response); + result.complete(converted); + } catch (Throwable e) { + result.completeExceptionally(e); } - } else { - successes = new ArrayList<>(response.getSuccess().getOutputCount()); + } - for (io.kurrentdb.protocol.streams.v2.AppendStreamSuccess success : response.getSuccess().getOutputList()) { - successes.add(new AppendStreamSuccess(success)); + @Override + public void onError(Throwable t) { + if (GrpcUtils.handleNotLeaderError(t, result)) return; + + if (t instanceof StatusRuntimeException) { + StatusRuntimeException e = (StatusRuntimeException) t; + com.google.rpc.Status status = StatusProto.fromThrowable(e); + + if (status != null && status.getDetailsCount() > 0) { + for (Any d : status.getDetailsList()) { + try { + if (d.is(StreamRevisionConflictErrorDetails.class)) { + StreamRevisionConflictErrorDetails details = d.unpack(StreamRevisionConflictErrorDetails.class); + StreamState expected = StreamState.fromRawLong(details.getExpectedRevision()); + StreamState actual = StreamState.fromRawLong(details.getActualRevision()); + String stream = details.getStream(); + result.completeExceptionally(new WrongExpectedVersionException(stream, expected, actual)); + return; + } else if (d.is(StreamDeletedErrorDetails.class)) { + StreamDeletedErrorDetails details = d.unpack(StreamDeletedErrorDetails.class); + result.completeExceptionally(new StreamDeletedException(details.getStream())); + return; + } else if (d.is(StreamTombstonedErrorDetails.class)) { + StreamTombstonedErrorDetails details = d.unpack(StreamTombstonedErrorDetails.class); + result.completeExceptionally(new StreamTombstonedException(details.getStream())); + return; + } else if (d.is(AppendRecordSizeExceededErrorDetails.class)) { + AppendRecordSizeExceededErrorDetails details = d.unpack(AppendRecordSizeExceededErrorDetails.class); + result.completeExceptionally(new RecordSizeExceededException(details.getStream(), details.getRecordId(), details.getSize(), details.getMaxSize())); + return; + } else if (d.is(AppendTransactionSizeExceededErrorDetails.class)) { + AppendTransactionSizeExceededErrorDetails details = d.unpack(AppendTransactionSizeExceededErrorDetails.class); + result.completeExceptionally(new TransactionMaxSizeExceededException(details.getSize(), details.getMaxSize())); + return; + } + } catch (com.google.protobuf.InvalidProtocolBufferException ex) { + result.completeExceptionally(ex); + return; + } + } + } } + + result.completeExceptionally(t); } - return new MultiAppendWriteResult(successes, failures); + @Override + public void onCompleted() { + } } } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppendResponse.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppendResponse.java new file mode 100644 index 00000000..b4254eef --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppendResponse.java @@ -0,0 +1,21 @@ +package io.kurrent.dbclient; + +import java.util.List; + +public class MultiStreamAppendResponse { + private final long position; + private final List results; + + public MultiStreamAppendResponse(long position, List results) { + this.position = position; + this.results = results; + } + + public long getPosition() { + return position; + } + + public List getResults() { + return results; + } +} diff --git a/src/main/java/io/kurrent/dbclient/RecordSizeExceededException.java b/src/main/java/io/kurrent/dbclient/RecordSizeExceededException.java new file mode 100644 index 00000000..d4ea887c --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/RecordSizeExceededException.java @@ -0,0 +1,50 @@ +package io.kurrent.dbclient; + +/** + * Thrown when an append record exceeds the maximum allowed size. + */ +public class RecordSizeExceededException extends RuntimeException { + /** + * The name of the stream where the append was attempted. + */ + private final String stream; + + /** + * The identifier of the offending and oversized record. + */ + private final String recordId; + + /** + * The size of the huge record in bytes. + */ + private final int size; + + /** + * The maximum allowed size of a single record that can be appended in bytes. + */ + private final int maxSize; + + public RecordSizeExceededException(String stream, String recordId, int size, int maxSize) { + super(String.format("The size of record %s (%d bytes) exceeds the maximum allowed size of %d bytes by %d bytes", recordId, size, maxSize, size - maxSize)); + this.stream = stream; + this.recordId = recordId; + this.size = size; + this.maxSize = maxSize; + } + + public String getStream() { + return stream; + } + + public String getRecordId() { + return recordId; + } + + public int getSize() { + return size; + } + + public int getMaxSize() { + return maxSize; + } +} diff --git a/src/main/java/io/kurrent/dbclient/ServerFeatures.java b/src/main/java/io/kurrent/dbclient/ServerFeatures.java index 4943c187..20f6ef11 100644 --- a/src/main/java/io/kurrent/dbclient/ServerFeatures.java +++ b/src/main/java/io/kurrent/dbclient/ServerFeatures.java @@ -95,7 +95,7 @@ private static CompletableFuture getSupportedFeaturesInternal(Server default: break; } - } else if (method.getMethodName().equals("multistreamappendsession")) { + } else if (method.getMethodName().equals("appendsession")) { features |= FeatureFlags.MULTI_STREAM_APPEND; } } diff --git a/src/main/java/io/kurrent/dbclient/StreamTombstonedException.java b/src/main/java/io/kurrent/dbclient/StreamTombstonedException.java new file mode 100644 index 00000000..ecc93e9f --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/StreamTombstonedException.java @@ -0,0 +1,18 @@ +package io.kurrent.dbclient; + +/** + * When a read or write operation was performed on a deleted stream. + */ +final public class StreamTombstonedException extends RuntimeException { + private final String streamName; + + StreamTombstonedException(String streamName) { + super(String.format("Stream '%s' is deleted", streamName)); + + this.streamName = streamName; + } + + public String getStreamName() { + return streamName; + } +} diff --git a/src/main/java/io/kurrent/dbclient/TransactionMaxSizeExceededException.java b/src/main/java/io/kurrent/dbclient/TransactionMaxSizeExceededException.java new file mode 100644 index 00000000..53bf1bea --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/TransactionMaxSizeExceededException.java @@ -0,0 +1,30 @@ +package io.kurrent.dbclient; + +/** + * Thrown when an append transaction exceeds the maximum allowed size. + */ +public class TransactionMaxSizeExceededException extends RuntimeException { + /** + * The size of the transaction in bytes. + */ + private final int size; + + /** + * The maximum allowed size of the append transaction in bytes. + */ + private final int maxSize; + + public TransactionMaxSizeExceededException(int size, int maxSize) { + super(String.format("The total size of the append transaction (%d bytes) exceeds the maximum allowed size of %d bytes by %d bytes", size, maxSize, size - maxSize)); + this.size = size; + this.maxSize = maxSize; + } + + public int getSize() { + return size; + } + + public int getMaxSize() { + return maxSize; + } +} diff --git a/src/main/proto/kurrentdb/protocol/v1/status.proto b/src/main/proto/kurrentdb/protocol/v1/status.proto index 2875e016..90b70ddf 100644 --- a/src/main/proto/kurrentdb/protocol/v1/status.proto +++ b/src/main/proto/kurrentdb/protocol/v1/status.proto @@ -1,4 +1,4 @@ -// Copyright 2020 Google LLC +// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ syntax = "proto3"; package google.rpc; import "google/protobuf/any.proto"; -import "kurrentdb/protocol/v1/code.proto"; option cc_enable_arenas = true; option go_package = "google.golang.org/genproto/googleapis/rpc/status;status"; @@ -34,15 +33,17 @@ option objc_class_prefix = "RPC"; // You can find out more about this error model and how to work with it in the // [API Design Guide](https://cloud.google.com/apis/design/errors). message Status { - // The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code]. - google.rpc.Code code = 1; + // The status code, which should be an enum value of + // [google.rpc.Code][google.rpc.Code]. + int32 code = 1; // A developer-facing error message, which should be in English. Any // user-facing error message should be localized and sent in the - // [google.rpc.Status.details][google.rpc.Status.details] field, or localized by the client. + // [google.rpc.Status.details][google.rpc.Status.details] field, or localized + // by the client. string message = 2; // A list of messages that carry the error details. There is a common set of // message types for APIs to use. - google.protobuf.Any details = 3; -} \ No newline at end of file + repeated google.protobuf.Any details = 3; +} diff --git a/src/main/proto/kurrentdb/protocol/v2/core.proto b/src/main/proto/kurrentdb/protocol/v2/core.proto deleted file mode 100644 index b4626652..00000000 --- a/src/main/proto/kurrentdb/protocol/v2/core.proto +++ /dev/null @@ -1,82 +0,0 @@ -syntax = "proto3"; - -package kurrentdb.protocol; - -option csharp_namespace = "KurrentDB.Protocol"; -option java_package = "io.kurrentdb.protocol"; -option java_multiple_files = true; - -import "google/protobuf/timestamp.proto"; -import "google/protobuf/duration.proto"; -import "google/protobuf/struct.proto"; -import "google/protobuf/descriptor.proto"; - -//=================================================================== -// Error Annotations -//=================================================================== - -message ErrorAnnotations { - // Identifies the error condition. - string code = 1; - - // Severity of the error. - Severity severity = 2; - - // Human-readable message that describes the error condition. - optional string message = 3; - - enum Severity { - // The error is recoverable, the operation failed but the session can continue. - RECOVERABLE = 0; - // The error is fatal and the session should be terminated. - FATAL = 1; - } -} - -// Extend the MessageOptions to include error information. -extend google.protobuf.MessageOptions { - // Provides additional information about the error condition. - optional ErrorAnnotations error_info = 50000; -} - -//=================================================================== -// Dynamic values map -//=================================================================== - -// Represents a list of dynamically typed values. -message DynamicValueList { - // Repeated property of dynamically typed values. - repeated DynamicValue values = 1; -} - -// Represents a map of dynamically typed values. -message DynamicValueMap { - // A map of string keys to dynamically typed values. - map values = 1; -} - -// Represents a dynamic value -message DynamicValue { - oneof kind { - // Represents a null value. - google.protobuf.NullValue null_value = 1; - // Represents a 32-bit signed integer value. - sint32 int32_value = 2; - // Represents a 64-bit signed integer value. - sint64 int64_value = 3; - // Represents a byte array value. - bytes bytes_value = 4; - // Represents a 64-bit double-precision floating-point value. - double double_value = 5; - // Represents a 32-bit single-precision floating-point value - float float_value = 6; - // Represents a string value. - string string_value = 7; - // Represents a boolean value. - bool boolean_value = 8; - // Represents a timestamp value. - google.protobuf.Timestamp timestamp_value = 9; - // Represents a duration value. - google.protobuf.Duration duration_value = 10; - } -} diff --git a/src/main/proto/kurrentdb/protocol/v2/errors.proto b/src/main/proto/kurrentdb/protocol/v2/errors.proto new file mode 100644 index 00000000..f7d1e734 --- /dev/null +++ b/src/main/proto/kurrentdb/protocol/v2/errors.proto @@ -0,0 +1,150 @@ +// ****************************************************************************************** +// This protocol is UNSTABLE in the sense of being subject to change. +// ****************************************************************************************** + +syntax = "proto3"; + +package kurrentdb.protocol.v2.common.errors; + +option go_package = "github.com/kurrent-io/KurrentDB-Client-Go/protos/kurrentdb/protocols/v2/common/errors"; +option csharp_namespace = "KurrentDB.Protocol.V2.Common.Errors"; +option java_package = "io.kurrentdb.protocol.v2.common.errors"; +option java_multiple_files = true; + +import "kurrentdb/protocol/v2/rpc.proto"; + +enum CommonError { + // Default value. This value is not used. + // An error code MUST always be set to a non-zero value. + // If an error code is not explicitly set, it MUST be treated as + // an internal server error (INTERNAL). + UNSPECIFIED = 0; + + COMMON_ERROR_ACCESS_DENIED = 1 [(kurrent.rpc.error) = { + status_code: PERMISSION_DENIED, + has_details: true + }]; + + COMMON_ERROR_INVALID_REQUEST = 2 [(kurrent.rpc.error) = { + status_code: INVALID_ARGUMENT, + has_details: true + }]; + + COMMON_ERROR_NOT_LEADER_NODE = 5 [(kurrent.rpc.error) = { + status_code: FAILED_PRECONDITION, + has_details: true + }]; + + COMMON_ERROR_OPERATION_TIMEOUT = 6 [(kurrent.rpc.error) = { + status_code: DEADLINE_EXCEEDED + }]; + + COMMON_ERROR_SERVER_NOT_READY = 7 [(kurrent.rpc.error) = { + status_code: UNAVAILABLE + }]; + + COMMON_ERROR_SERVER_OVERLOADED = 8 [(kurrent.rpc.error) = { + status_code: UNAVAILABLE + }]; + + COMMON_ERROR_SERVER_MALFUNCTION = 9 [(kurrent.rpc.error) = { + status_code: INTERNAL + }]; + +// // The operation was aborted, typically due to a concurrency issue such as a +// // sequencer conflict or transaction abort. +// // This error will only be used when there is no intention to create a dedicated +// // error code for the specific issue, perhaps because the issue is too generic +// // or too transient or temporary in terms of handling. +// OPERATION_ABORTED = 10 [(kurrent.rpc.error) = { +// status_code: ABORTED +// }]; +} + +message AccessDeniedErrorDetails { + // The scope in which access was denied. + // It could represent a resource, a domain, a permission type + // or a "path" that is a combination of these. + // (e.g., "stream:orders", "db:customers:read", etc.) + optional string scope = 1; + + // The username of the user who was denied access. + optional string username = 2; +} + +message InvalidRequestErrorDetails { + // Detailed information about each invalid argument. + repeated FieldViolation violations = 1; + + // Describes a single field violation. + message FieldViolation { + // A path that leads to a field in the request body. The value will be a + // sequence of dot-separated identifiers that identify a protocol buffer + // field. + // + // Consider the following: + // + // message CreateContactRequest { + // message EmailAddress { + // enum Type { + // TYPE_UNSPECIFIED = 0; + // HOME = 1; + // WORK = 2; + // } + // + // optional string email = 1; + // repeated EmailType type = 2; + // } + // + // string full_name = 1; + // repeated EmailAddress email_addresses = 2; + // } + // + // In this example, in proto `field` could take one of the following values: + // + // * `full_name` for a violation in the `full_name` value + // * `email_addresses[1].email` for a violation in the `email` field of the + // first `email_addresses` message + // * `email_addresses[3].type[2]` for a violation in the second `type` + // value in the third `email_addresses` message. + // + // In JSON, the same values are represented as: + // + // * `fullName` for a violation in the `fullName` value + // * `emailAddresses[1].email` for a violation in the `email` field of the + // first `emailAddresses` message + // * `emailAddresses[3].type[2]` for a violation in the second `type` + // value in the third `emailAddresses` message. + string field = 1; + + // A description of why the request element is bad. + string description = 2; + } +} + +message NotLeaderNodeErrorDetails { + // The host of the current leader node + string host = 1; + + // The port of the current leader node + int32 port = 2; + + // The instance ID of the current leader node + optional string node_id = 3; +} + +message RetryInfoErrorDetails { + // The duration in milliseconds after which the client can retry the operation. + int32 retry_delay_ms = 1; +} + +message NodeInfoErrorDetails { + // The host of the node + string host = 1; + + // The port of the node + int32 port = 2; + + // The instance ID of the node + optional string node_id = 3; +} diff --git a/src/main/proto/kurrentdb/protocol/v2/features/service.proto b/src/main/proto/kurrentdb/protocol/v2/features/service.proto deleted file mode 100644 index 4db99a80..00000000 --- a/src/main/proto/kurrentdb/protocol/v2/features/service.proto +++ /dev/null @@ -1,144 +0,0 @@ -syntax = "proto3"; - -/** - * KurrentDB Server Features Protocol - * - * This protocol defines services and messages for discovering server features - * in a KurrentDB environment. It enables clients to adapt their behavior based - * on server features, their enablement status, and requirements. - */ -package kurrentdb.protocol.v2; - -option csharp_namespace = "KurrentDB.Protocol.Features.V2"; -option java_package = "io.kurrentdb.protocol.features.v2"; -option java_multiple_files = true; - -import "google/protobuf/timestamp.proto"; -import "kurrentdb/protocol/v2/core.proto"; - -/** - * Service for retrieving information about the server, including features - * and metadata. - */ -service ServerInfoService { - // Retrieves server information and available features - rpc GetServerInfo(ServerInfoRequest) returns (ServerInfoResponse) {} -} - -/** - * Contains server version information, build details, and compatibility requirements. - */ -message ServerMetadata { - // Semantic version of the server software - string version = 1; - - // Build identifier or hash - string build = 2; - - // Minimum client version required for compatibility - string min_compatible_client_version = 3; - - // Unique identifier for this server node - string node_id = 4; -} - -/** - * Request message for retrieving server information. - */ -message ServerInfoRequest { - // Client version making the request - string client_version = 1; - - // Unique client identifier - string client_id = 2; -} - -/** - * Response containing server information. - */ -message ServerInfoResponse { - // Server information and features - ServerInfo info = 1; -} - -/** - * Top-level server information container including metadata - * and available features. - */ -message ServerInfo { - // Server metadata (version, build info, etc.) - ServerMetadata metadata = 1; - - // Features organized by namespace - map features = 2; -} - -/** - * Container for features within a specific namespace. - */ -message FeaturesList { - // Features in this namespace - repeated Feature features = 1; -} - -/** - * Defines a specific server feature with its enablement status, - * requirements, and metadata. - */ -message Feature { - // Unique identifier for this feature - string name = 1; - - // Human-readable description of the feature - optional string description = 2; - - // Whether this feature is currently enabled - bool enabled = 3; - - // Whether this feature is deprecated and may be removed in future versions - bool deprecated = 4; - - // Requirements associated with this feature that clients must satisfy - repeated FeatureRequirement requirements = 5; - - // Whether clients can request changes to this feature's enabled status - bool client_configurable = 6; - - // For temporary features, indicates when the feature will no longer be available - optional google.protobuf.Timestamp available_until = 7; -} - -/** - * Defines a requirement that must be satisfied to use a feature. - * Requirements can be optional, required, or prohibited. - */ -message FeatureRequirement { - // Unique identifier for this requirement - string name = 1; - - // The value of this requirement, which can contain various data types - DynamicValue value = 2; - - // Enforcement level for this requirement - PolicyStatus policy_status = 3; - - // Human-readable description of the requirement - optional string description = 4; - - // Message shown when the requirement is violated - optional string violation_message = 5; -} - -/** - * Defines how requirements are enforced. - */ -enum PolicyStatus { - // Feature is optional with no warnings - OPTIONAL = 0; - - // Feature must be enabled; operations rejected if disabled - REQUIRED = 3; - - // Feature must be disabled; operations rejected if enabled - PROHIBITED = 4; -} diff --git a/src/main/proto/kurrentdb/protocol/v2/rpc.proto b/src/main/proto/kurrentdb/protocol/v2/rpc.proto new file mode 100644 index 00000000..696e9d90 --- /dev/null +++ b/src/main/proto/kurrentdb/protocol/v2/rpc.proto @@ -0,0 +1,94 @@ +// ****************************************************************************************** +// This protocol is UNSTABLE in the sense of being subject to change. +// ****************************************************************************************** + +syntax = "proto3"; + +package kurrent.rpc; + +option go_package = "github.com/kurrent-io/KurrentDB-Client-Go/protos/rpc"; +option csharp_namespace = "Kurrent.Rpc"; +option java_package = "io.kurrent.rpc"; +option java_multiple_files = true; + +import "google/protobuf/descriptor.proto"; +import "kurrentdb/protocol/v1/code.proto"; + +// ErrorMetadata provides actionable information for error enum values to enable automated +// code generation, documentation, and consistent error handling across the Kurrent platform. +// +// It was modeled to support a single details type per error code to simplify code generation and +// validation. If multiple detail types are needed for a single error code, consider defining +// separate error codes for each detail type. Or, use a union type (oneof) in the detail message +// to encapsulate multiple detail variants within a single detail message. +// +// More however DebugInfo and RetryInfo can and should be added to any error regardless of +// this setting, when applicable. +// +// This annotation is applied to enum values using the google.protobuf.EnumValueOptions +// extension mechanism. It enables: +// - Automatic gRPC status code mapping +// - Code generation for error handling utilities +// - Documentation generation +// - Type-safe error detail validation +// +// Usage Example: +// enum StreamErrorCode { +// REVISION_CONFLICT = 5 [(kurrent.rpc.error) = { +// status_code: FAILED_PRECONDITION, +// has_details: true +// }]; +// } +// +// See individual field documentation for conventions and defaults. +message ErrorMetadata { + // Maps the error to a standard gRPC status code for transport-level compatibility. + // This field is REQUIRED for every error annotation. + // + // Use standard gRPC status codes from `google.rpc.code`. + // + // Code generators use this to: + // - Map errors to gRPC status codes automatically + // - Generate HTTP status code mappings + // - Create transport-agnostic error handling + google.rpc.Code status_code = 1; + + // Indicates whether this error supports rich, typed detail messages. + // Defaults to false (simple message string only). + // The message type name must be derived from the enum name by convention. + // Mask: {EnumValue}ErrorDetails + // + // Examples: + // ACCESS_DENIED -> "AccessDeniedErrorDetails" + // SERVER_NOT_READY -> "ServerNotReadyErrorDetails" + // + // Code generators use the message type name to: + // - Validate that the detail message matches the expected type + // - Generate type-safe error handling code + // - Create accurate documentation + bool has_details = 2; +} + +// Extend EnumValueOptions to include error information for enum values +extend google.protobuf.EnumValueOptions { + // Provides additional information about error conditions for automated + // code generation and documentation. + optional ErrorMetadata error = 50000; +} + +// The top-level error message that must be returned by any service or operation +// in the Kurrent platform. +message RequestErrorInfo { + // The code must match one of the defined enum error codes from the module + // where the error originated from. + // A machine-readable error code that indicates the specific error condition. + // This should be at most 63 characters and match a regular expression of + // `[A-Z][A-Z0-9_]+[A-Z0-9]`, which represents UPPER_SNAKE_CASE. + // By convention, it will be generated from the enum value name if not + // explicitly specified. + // Conventions: + // - Prefix with the service name or domain to avoid collisions + // - Use UPPER_SNAKE_CASE with only letters, numbers, and underscores + // - Avoid redundant information (e.g., do not include "ERROR" suffix) + string code = 1; +} diff --git a/src/main/proto/kurrentdb/protocol/v2/streams/errors.proto b/src/main/proto/kurrentdb/protocol/v2/streams/errors.proto new file mode 100644 index 00000000..fde515f6 --- /dev/null +++ b/src/main/proto/kurrentdb/protocol/v2/streams/errors.proto @@ -0,0 +1,130 @@ +// ****************************************************************************************** +// This protocol is UNSTABLE in the sense of being subject to change. +// ****************************************************************************************** + +syntax = "proto3"; + +package kurrentdb.protocol.v2.streams.errors; + +option go_package = "github.com/kurrent-io/KurrentDB-Client-Go/protos/kurrentdb/protocols/v2/streams"; +option csharp_namespace = "KurrentDB.Protocol.V2.Streams.Errors"; +option java_package = "io.kurrentdb.protocol.v2.streams.errors"; +option java_multiple_files = true; + +import "kurrentdb/protocol/v2/rpc.proto"; + +enum StreamsError { + // Default value. This value is not used. + // An error code MUST always be set to a non-zero value. + // If an error code is not explicitly set, it MUST be treated as + // an internal server error (INTERNAL). + STREAMS_ERROR_UNSPECIFIED = 0; + + // The stream was not found. + // This is recoverable by the client by creating the stream first. + STREAMS_ERROR_STREAM_NOT_FOUND = 1 [(kurrent.rpc.error) = { + status_code: NOT_FOUND, + has_details: true, + }]; + + // The stream already exists. + // This is recoverable by the client by using the existing stream. + STREAMS_ERROR_STREAM_ALREADY_EXISTS = 2 [(kurrent.rpc.error) = { + status_code: ALREADY_EXISTS, + has_details: true + }]; + + // The stream has been soft deleted. + // It will not be visible in the stream list, until it is restored by appending to it again. + STREAMS_ERROR_STREAM_DELETED = 3 [(kurrent.rpc.error) = { + status_code: FAILED_PRECONDITION, + has_details: true + }]; + + // The stream has been tombstoned. + // It has been permanently removed from the system and cannot be restored. + STREAMS_ERROR_STREAM_TOMBSTONED = 4 [(kurrent.rpc.error) = { + status_code: FAILED_PRECONDITION, + has_details: true + }]; + + // The expected revision of the stream does not match the actual revision. + // This is recoverable by the client by fetching the current revision and retrying. + STREAMS_ERROR_STREAM_REVISION_CONFLICT = 5 [(kurrent.rpc.error) = { + status_code: FAILED_PRECONDITION, + has_details: true + }]; + + // The size of a record being appended exceeds the maximum allowed size. + // It is recoverable by the client by sending a smaller record. + STREAMS_ERROR_APPEND_RECORD_SIZE_EXCEEDED = 6 [(kurrent.rpc.error) = { + status_code: INVALID_ARGUMENT, + has_details: true + }]; + + // When the transaction exceeds the maximum size allowed (max chunk size). + // It is recoverable by the client by sending a smaller transaction. + STREAMS_ERROR_APPEND_TRANSACTION_SIZE_EXCEEDED = 7 [(kurrent.rpc.error) = { + status_code: ABORTED, + has_details: true + }]; + + // The stream is already in an append session. + // Appending to the same stream multiple times is currently not supported. + STREAMS_ERROR_STREAM_ALREADY_IN_APPEND_SESSION = 8 [(kurrent.rpc.error) = { + status_code: ABORTED, + has_details: true + }]; +} + +message StreamNotFoundErrorDetails { + // The name of the stream that was not found. + string stream = 1; +} + +message StreamAlreadyExistsErrorDetails { + // The name of the stream that already exists. + string stream = 1; +} + +message StreamDeletedErrorDetails { + // The name of the stream that was deleted. + string stream = 1; +} + +message StreamTombstonedErrorDetails { + // The name of the stream that was tombstoned. + string stream = 1; +} + +message StreamRevisionConflictErrorDetails { + // The name of the stream that had a revision conflict. + string stream = 1; + // The actual revision of the stream. + int64 expected_revision = 2 [jstype = JS_STRING]; + // The actual revision of the stream. + int64 actual_revision = 3 [jstype = JS_STRING]; +} + +message AppendRecordSizeExceededErrorDetails { + // The name of the stream where the append was attempted. + string stream = 1; + // The identifier of the offending and oversized record. + string record_id = 2; + // The size of the huge record in bytes. + int32 size = 3; + // The maximum allowed size of a single record that can be appended in bytes. + int32 max_size = 4; +} + +message AppendTransactionSizeExceededErrorDetails { + // The size of the huge transaction in bytes. + int32 size = 1; + // The maximum allowed size of the append transaction in bytes. + int32 max_size = 2; +} + +message StreamAlreadyInAppendSessionErrorDetails { + // The name of the stream that is already in an append session. + string stream = 1; +} diff --git a/src/main/proto/kurrentdb/protocol/v2/streams/shared.proto b/src/main/proto/kurrentdb/protocol/v2/streams/shared.proto deleted file mode 100644 index e70ba804..00000000 --- a/src/main/proto/kurrentdb/protocol/v2/streams/shared.proto +++ /dev/null @@ -1,118 +0,0 @@ -syntax = "proto3"; - -package kurrentdb.protocol.v2; - -option csharp_namespace = "KurrentDB.Protocol.Streams.V2"; -option java_package = "io.kurrentdb.protocol.streams.v2"; -option java_multiple_files = true; - -import "google/protobuf/timestamp.proto"; -import "google/protobuf/descriptor.proto"; -import "kurrentdb/protocol/v2/core.proto"; - -// ErrorDetails provides detailed information about specific error conditions. -message ErrorDetails { - // When the user does not have sufficient permissions to perform the - // operation. - message AccessDenied { - option (error_info) = { - code : "ACCESS_DENIED", - severity : RECOVERABLE, - message : "The user does not have sufficient permissions to perform the operation." - }; - } - - // When the stream has been deleted. - message StreamDeleted { - option (error_info) = { - code : "STREAM_DELETED", - severity : RECOVERABLE, - message : "The stream has been soft deleted. It will not be visible in the stream list, until it is restored by appending to it again." - }; - - // The name of the stream that was deleted. - optional string stream = 1; - - // The time when the stream was deleted. - google.protobuf.Timestamp deleted_at = 2; - } - - // When the stream has been tombstoned. - message StreamTombstoned { - option (error_info) = { - code : "STREAM_TOMBSTONED", - severity : RECOVERABLE, - message : "The stream has been tombstoned and cannot be used anymore." - }; - - // The name of the stream that was tombstoned. - optional string stream = 1; - - // The time when the stream was tombstoned. - google.protobuf.Timestamp tombstoned_at = 2; - } - - // When the stream is not found. - message StreamNotFound { - option (error_info) = { - code : "STREAM_NOT_FOUND", - severity : RECOVERABLE, - message : "The specified stream was not found." - }; - - // The name of the stream that was not found. - optional string stream = 1; - } - - // When the expected revision of the stream does not match the actual - // revision. - message StreamRevisionConflict { - option (error_info) = { - code : "REVISION_CONFLICT", - severity : RECOVERABLE, - message : "The actual stream revision does not match the expected revision." - }; - - // The actual revision of the stream. - int64 stream_revision = 1 [jstype = JS_STRING]; - } - - // When the transaction exceeds the maximum size allowed - // (its bigger than the configured chunk size). - message TransactionMaxSizeExceeded { - option (error_info) = { - code : "TRANSACTION_MAX_SIZE_EXCEEDED", - severity : FATAL, - message : "The transaction exceeds the maximum size allowed." - }; - - // The maximum allowed size of the transaction. - uint32 max_size = 1; - } - - // When the user is not found. - message UserNotFound { - option (error_info) = { - code : "USER_NOT_FOUND", - severity : RECOVERABLE, - message : "The specified user was not found." - }; - } - - // When the user is not authenticated. - message NotAuthenticated { - option (error_info) = { - code : "NOT_AUTHENTICATED", - severity : RECOVERABLE, - message : "The user is not authenticated." - }; - } - - message LogPositionNotFound { - option (error_info) = { - code : "LOG_POSITION_NOT_FOUND", - severity : RECOVERABLE, - message : "The specified log position was not found." - }; - } -} diff --git a/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto b/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto index 7fe57056..499fd969 100644 --- a/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto +++ b/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto @@ -1,91 +1,36 @@ +// ****************************************************************************************** +// This protocol is UNSTABLE in the sense of being subject to change. +// ****************************************************************************************** + syntax = "proto3"; -package kurrentdb.protocol.v2; +package kurrentdb.protocol.v2.streams; +option go_package = "github.com/kurrent-io/KurrentDB-Client-Go/protos/kurrentdb/protocols/v2/streams"; option csharp_namespace = "KurrentDB.Protocol.Streams.V2"; -option java_package = "io.kurrentdb.protocol.streams.v2"; +option java_package = "io.kurrentdb.protocol.v2.streams"; option java_multiple_files = true; -import "google/protobuf/timestamp.proto"; -import "google/protobuf/duration.proto"; -import "google/protobuf/descriptor.proto"; - -import "kurrentdb/protocol/v2/streams/shared.proto"; -import "kurrentdb/protocol/v2/core.proto"; +import "google/protobuf/struct.proto"; service StreamsService { // Executes an atomic operation to append records to multiple streams. // This transactional method ensures that all appends either succeed - // completely, or are entirely rolled back, thereby maintaining strict data - // consistency across all involved streams. - rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); + // completely, or are entirely rolled back, thereby maintaining strict + // data consistency across all involved streams. + rpc Append(AppendRequest) returns (AppendResponse); - // Streaming version of MultiStreamAppend that allows clients to send multiple - // append requests over a single connection. When the stream completes, all - // records are appended transactionally (all succeed or fail together). + // Streaming version of Append that allows clients to send multiple + // append requests continuously. Once completed, all records are + // appended transactionally (all succeed or fail together). // Provides improved efficiency for high-throughput scenarios while // maintaining the same transactional guarantees. - rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); - -// // Appends records to a specific stream. -// rpc AppendStream(AppendStreamRequest) returns (AppendStreamResponse); - -// // Append batches of records to a stream continuously, while guaranteeing pipelined -// // requests are processed in order. If any request fails, the session is terminated. -// rpc AppendStreamSession(stream AppendStreamRequest) returns (stream AppendStreamResponse); - -// // Retrieve a batch of records -// rpc ReadStream(ReadRequest) returns (ReadResponse); - - // Retrieve batches of records continuously. - rpc ReadSession(ReadRequest) returns (stream ReadResponse); + rpc AppendSession(stream AppendRequest) returns (AppendSessionResponse); } -//=================================================================== -// Append Operations -//=================================================================== - -// Record to be appended to a stream. -message AppendRecord { - // Universally Unique identifier for the record. - // If not provided, the server will generate a new one. - optional string record_id = 1; - -// // The name of the stream to append the record to. -// optional string stream = 6; -// -// // The name of the schema in the registry that defines the structure of the record. -// string schema_name = 4; -// -// // The format of the data in the record. -// SchemaDataFormat data_format = 5; - - // A collection of properties providing additional information about the - // record. This can include user-defined metadata or system properties. - // System properties are prefixed with "$." to avoid conflicts with user-defined properties. - // For example, "$schema.name" or "$schema.data-format". - map properties = 2; - - // The actual data payload of the record, stored as bytes. - bytes data = 3; -} - -// Constants that match the expected state of a stream during an -// append operation. It can be used to specify whether the stream should exist, -// not exist, or can be in any state. -enum ExpectedRevisionConstants { - // The stream should exist and the expected revision should match the current - EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; - // It is not important whether the stream exists or not. - EXPECTED_REVISION_CONSTANTS_ANY = -2; - // The stream should not exist. If it does, the append will fail. - EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; - // The stream should exist - EXPECTED_REVISION_CONSTANTS_EXISTS = -4; -} // Represents the input for appending records to a specific stream. -message AppendStreamRequest { +message AppendRequest { // The name of the stream to append records to. string stream = 1; // The records to append to the stream. @@ -98,384 +43,75 @@ message AppendStreamRequest { optional sint64 expected_revision = 3 [jstype = JS_STRING]; } -// Success represents the successful outcome of an append operation. -message AppendStreamSuccess { +// Represents the outcome of an append operation. +message AppendResponse { // The name of the stream to which records were appended. string stream = 1; - // The position of the last appended record in the stream. - int64 position = 2 [jstype = JS_STRING]; // The expected revision of the stream after the append operation. - int64 stream_revision = 3 [jstype = JS_STRING]; -} - -// Failure represents the detailed error information when an append operation fails. -message AppendStreamFailure { - // The name of the stream to which records were appended. - string stream = 1; - - // The error details - oneof error { - // Failed because the actual stream revision didn't match the expected revision. - ErrorDetails.StreamRevisionConflict stream_revision_conflict = 2; - // Failed because the client lacks sufficient permissions. - ErrorDetails.AccessDenied access_denied = 3; - // Failed because the target stream has been deleted. - ErrorDetails.StreamDeleted stream_deleted = 4; - // Failed because the stream was not found. - ErrorDetails.StreamNotFound stream_not_found = 5; - // Failed because the transaction exceeded the maximum size allowed - ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 6; - } -} - -// AppendStreamResponse represents the output of appending records to a specific -// stream. -message AppendStreamResponse { - // The result of the append operation. - oneof result { - // Success represents the successful outcome of an append operation. - AppendStreamSuccess success = 1; - // Failure represents the details of a failed append operation. - AppendStreamFailure failure = 2; - } -} - -// MultiStreamAppendRequest represents a request to append records to multiple streams. -message MultiStreamAppendRequest { - // A list of AppendStreamInput messages, each representing a stream to which records should be appended. - repeated AppendStreamRequest input = 1; -} - -// Response from the MultiStreamAppend operation. -message MultiStreamAppendResponse { - oneof result { - // Success represents the successful outcome of a multi-stream append operation. - Success success = 1; - // Failure represents the details of a failed multi-stream append operation. - Failure failure = 2; - } - - message Success { - repeated AppendStreamSuccess output = 1; - } - - message Failure { - repeated AppendStreamFailure output = 1; - } -} - -//=================================================================== -// Read Operations -//=================================================================== - -// The scope of the read filter determines where the filter will be applied. -enum ReadFilterScope { - READ_FILTER_SCOPE_UNSPECIFIED = 0; - // The filter will be applied to the record stream name - READ_FILTER_SCOPE_STREAM = 1; - // The filter will be applied to the record schema name - READ_FILTER_SCOPE_SCHEMA_NAME = 2; - // The filter will be applied to the properties of the record - READ_FILTER_SCOPE_PROPERTIES = 3; - // The filter will be applied to all the record properties - // including the stream and schema name - READ_FILTER_SCOPE_RECORD = 4; -} - -// The filter to apply when reading records from the database -// The combination of stream scope and literal expression indicates a direct stream name match, -// while a regex expression indicates a pattern match across multiple streams. -message ReadFilter { - // The scope of the filter. - ReadFilterScope scope = 1; - // The expression can be a regular expression or a literal value. - // If it starts with "~" it will be considered a regex. - string expression = 2; - - // // The optional name of the record property to filter on. - // optional string property_name = 3; - - // The optional property names to filter on. - repeated string property_names = 4; -} - -// Record retrieved from the database. -message Record { - // The unique identifier of the record in the database. - string record_id = 1; - // The position of the record in the database. - int64 position = 5 [jstype = JS_STRING]; - // The actual data payload of the record, stored as bytes. - bytes data = 2; - // Additional information about the record. - map properties = 3; - // When the record was created. - google.protobuf.Timestamp timestamp = 4; - // The stream to which the record belongs. - optional string stream = 6; - // The revision of the stream created when the record was appended. - optional int64 stream_revision = 7 [jstype = JS_STRING]; -} - -// The direction in which to read records from the database (forwards or backwards). -enum ReadDirection { - READ_DIRECTION_FORWARDS = 0; - READ_DIRECTION_BACKWARDS = 1; -} - -// The position from which to start reading records. -// This can be either the earliest or latest position in the stream. -enum ReadPositionConstants { - READ_POSITION_CONSTANTS_UNSPECIFIED = 0; - READ_POSITION_CONSTANTS_EARLIEST = 1; - READ_POSITION_CONSTANTS_LATEST = 2; -} - -// Represents the successful outcome of a read operation. -message ReadSuccess { - repeated Record records = 1; -} - -// Represents the detailed error information when a read operation fails. -message ReadFailure { - // The error details - oneof error { - // Failed because the client lacks sufficient permissions. - ErrorDetails.AccessDenied access_denied = 1; - // Failed because the target stream has been deleted. - ErrorDetails.StreamDeleted stream_deleted = 2; - // Failed because the expected stream revision did not match the actual revision. - ErrorDetails.StreamNotFound stream_not_found = 3; - } + int64 stream_revision = 2 [jstype = JS_STRING]; + // The position of the last appended record in the stream. + optional int64 position = 4 [jstype = JS_STRING]; } -message ReadRequest { - // The filter to apply when reading records. - optional ReadFilter filter = 1; - // The starting position of the log from which to read records. - optional int64 start_position = 2 [jstype = JS_STRING]; - // Limit how many records can be returned. - // This will get capped at the default limit, - // which is up to 1000 records. - optional int64 limit = 3 [jstype = JS_STRING]; - // The direction in which to read the stream (forwards or backwards). - ReadDirection direction = 4; - // Heartbeats can be enabled to monitor end-to-end session health. - HeartbeatOptions heartbeats = 5; - // The number of records to read in a single batch. - int32 batch_size = 6; +message AppendSessionResponse { + // The results of each append request in the session. + repeated AppendResponse output = 1; + // The position of the last appended record in the session. + int64 position = 2 [jstype = JS_STRING]; } -//message SubscriptionConfirmed { -// // The subscription ID that was confirmed. -// string subscription_id = 1; -// // The position of the last record read by the server. -// optional int64 position = 2 [jstype = JS_STRING]; -// // When the subscription was confirmed. -// google.protobuf.Timestamp timestamp = 3; -//} - -// Read session response. -message ReadResponse { - oneof result { - // Success represents the successful outcome of an read operation. - ReadSuccess success = 1; - // Failure represents the details of a failed read operation. - ReadFailure failure = 2; - // Heartbeat represents the health check of the read operation when - // the server has not found any records matching the filter for the specified - // period of time or records threshold. - // A heartbeat will be sent when the initial switch to real-time tailing happens. - Heartbeat heartbeat = 3; - } +// Represents the data format of the schema. +enum SchemaFormat { + // Default value, should not be used. + SCHEMA_FORMAT_UNSPECIFIED = 0; + SCHEMA_FORMAT_JSON = 1; + SCHEMA_FORMAT_PROTOBUF = 2; + SCHEMA_FORMAT_AVRO = 3; + SCHEMA_FORMAT_BYTES = 4; } -// A health check will be sent when the server has not found any records -// matching the filter for the specified period of time or records threshold. A -// heartbeat will be sent when the initial switch to real-time tailing happens. -message HeartbeatOptions { - bool enable = 1; - optional google.protobuf.Duration period = 2; // 30 seconds - optional int32 records_threshold = 3; // 500 +message SchemaInfo { + // The format of the schema that the record conforms to. + SchemaFormat format = 1; + // The name of the schema that the record conforms to. + string name = 2; + // The identifier of the specific version of the schema that the record payload + // conforms to. This should match a registered schema version in the system. + // Not necessary when not enforcing schema validation. + optional string id = 3; } -enum HeartbeatType { - HEARTBEAT_TYPE_UNSPECIFIED = 0; - HEARTBEAT_TYPE_CHECKPOINT = 1; - HEARTBEAT_TYPE_CAUGHT_UP = 2; - HEARTBEAT_TYPE_FELL_BEHIND = 3; +// Record to be appended to a stream. +message AppendRecord { + // Universally Unique identifier for the record. Must be a guid. + // If not provided, the server will generate a new one. + optional string record_id = 1; + // The timestamp of when the record was created, represented as + // milliseconds since the Unix epoch. This is primarily for + // informational purposes and does not affect the ordering of records + // within the stream, which is determined by the server. + // If not provided, the server will assign it upon receipt. + optional int64 timestamp = 2 [jstype = JS_STRING]; + // A collection of properties providing additional information about the + // record. This can include user-defined metadata or system properties. + // System properties are uniquely identified by the "$." prefix. + map properties = 3; + // Information about the schema that the record payload conforms to. + SchemaInfo schema = 4; + // The actual data payload of the record. + bytes data = 5; } -message Heartbeat { - // This indicates whether the subscription is caught up, fell behind, or - // the filter has not been satisfied after a period of time or records threshold. - HeartbeatType type = 1; - // Checkpoint for resuming reads. - // It will always be populated unless the database is empty. - int64 position = 2 [jstype = JS_STRING]; - // When the heartbeat was sent. - google.protobuf.Timestamp timestamp = 3; +// Constants that match the expected state of a stream during an +// append operation. It can be used to specify whether the stream should exist, +// not exist, or can be in any state. +enum ExpectedRevisionConstants { + // The stream should exist and have a single event. + EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; + // It is not important whether the stream exists or not. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + // The stream should not exist. If it does, the append will fail. + EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; + // The stream should exist + EXPECTED_REVISION_CONSTANTS_EXISTS = -4; } - -////=================================================================== -//// Read Operations -////=================================================================== -// -//enum ConsumeFilterScope { -// CONSUME_FILTER_SCOPE_UNSPECIFIED = 0; -// // The filter will be applied to the stream name -// CONSUME_FILTER_SCOPE_STREAM = 1; -// // The filter will be applied to the record schema name -// CONSUME_FILTER_SCOPE_RECORD = 2; -// // The filter will be applied to the properties of record -// CONSUME_FILTER_SCOPE_PROPERTIES = 3; -// // The filter will be applied to the record data -// CONSUME_FILTER_SCOPE_DATA = 4; -//} -// -//// The filter to apply when reading records from the database -//// It applies to a stream or a record -//message ConsumeFilter { -// // The scope of the filter. -// ConsumeFilterScope scope = 1; -// // The expression can be a regular expression, a jsonpath expression, or a literal value. -// // if it starts with "~" it will be considered a regex and if it starts with "$" it will be considered a jsonpath filter, else its a literal. -// string expression = 2; -// // The name of the record property to filter on. -// optional string property_name = 3; -//} -// -//// Record retrieved from the database. -//message Record { -// // The unique identifier of the record in the database. -// string record_id = 1; -// // The position of the record in the database. -// int64 position = 5 [jstype = JS_STRING]; -// // The actual data payload of the record, stored as bytes. -// bytes data = 2; -// // Additional information about the record. -// map properties = 3; -// // When the record was created. -// google.protobuf.Timestamp timestamp = 4; -// // The stream to which the record belongs. -// optional string stream = 6; -// // The revision of the stream created when the record was appended. -// optional int64 stream_revision = 7 [jstype = JS_STRING]; -//} -// -////// A batch of records. -////message RecordBatch { -//// repeated Record records = 1; -////} -// -//// The direction in which to read records from the database (forwards or backwards). -//enum ReadDirection { -// READ_DIRECTION_FORWARDS = 0; -// READ_DIRECTION_BACKWARDS = 1; -//} -// -//// The position from which to start reading records. -//// This can be either the earliest or latest position in the stream. -//enum ReadPositionConstants { -// READ_POSITION_CONSTANTS_UNSPECIFIED = 0; -// READ_POSITION_CONSTANTS_EARLIEST = 1; -// READ_POSITION_CONSTANTS_LATEST = 2; -//} -// -//message ReadStreamRequest { -// // The filter to apply when reading records. -// optional ConsumeFilter filter = 1; -// // The starting position of the log from which to read records. -// optional int64 start_position = 2 [jstype = JS_STRING]; -// // Limit how many records can be returned. -// // This will get capped at the default limit, -// // which is up to 1000 records. -// optional int64 limit = 3 [jstype = JS_STRING]; -// // The direction in which to read the stream (forwards or backwards). -// ReadDirection direction = 4; -//} -// -//message ReadStreamSuccess { -// repeated Record records = 1; -//} -// -//// Represents the detailed error information when a read operation fails. -//message ReadStreamFailure { -// // The error details -// oneof error { -// // Failed because the client lacks sufficient permissions. -// ErrorDetails.AccessDenied access_denied = 3; -// // Failed because the target stream has been deleted. -// ErrorDetails.StreamDeleted stream_deleted = 4; -// } -//} -//message ReadStreamResponse { -// // The result of the read operation. -// oneof result { -// // Success represents the successful outcome of an read operation. -// ReadStreamSuccess success = 1; -// // Failure represents the details of a failed read operation. -// ReadStreamFailure failure = 2; -// // Heartbeat represents the health check of the read operation when -// // the server has not found any records matching the filter for the specified -// // period of time or records threshold. -// // A heartbeat will be sent when the initial switch to real-time tailing happens. -// Heartbeat heartbeat = 3; -// } -//} -// -//message ReadSessionRequest { -// // The filter to apply when reading records. -// optional ConsumeFilter filter = 1; -// // The starting position of the log from which to read records. -// optional int64 start_position = 2 [jstype = JS_STRING]; -// // Limit how many records can be returned. -// // This will get capped at the default limit, -// // which is up to 1000 records. -// optional int64 limit = 3 [jstype = JS_STRING]; -// // The direction in which to read the stream (forwards or backwards). -// ReadDirection direction = 4; -// // Heartbeats can be enabled to monitor end-to-end session health. -// HeartbeatOptions heartbeats = 5; -//} -// -//// Read session response. -//message ReadSessionResponse { -// oneof result { -// // Success represents the successful outcome of an read operation. -// ReadStreamSuccess success = 1; -// // Failure represents the details of a failed read operation. -// ReadStreamFailure failure = 2; -// // Heartbeat represents the health check of the read operation when -// // the server has not found any records matching the filter for the specified -// // period of time or records threshold. -// // A heartbeat will be sent when the initial switch to real-time tailing happens. -// Heartbeat heartbeat = 3; -// } -//} -// -//// A health check will be sent when the server has not found any records -//// matching the filter for the specified period of time or records threshold. A -//// heartbeat will be sent when the initial switch to real-time tailing happens. -//message HeartbeatOptions { -// bool enable = 1; -// //optional google.protobuf.Duration period = 2; -// optional int32 records_threshold = 3; // 1000 -//} -// -//enum HeartbeatType { -// HEARTBEAT_TYPE_UNSPECIFIED = 0; -// HEARTBEAT_TYPE_CHECKPOINT = 1; -// HEARTBEAT_TYPE_CAUGHT_UP = 2; -//} -// -//message Heartbeat { -// // This indicates whether the subscription is caught up, fell behind, or -// // the filter has not been satisfied after a period of time or records threshold. -// HeartbeatType type = 1; -// // Checkpoint for resuming reads. -// // It will always be populated unless the database is empty. -// int64 position = 2 [jstype = JS_STRING]; -// // When the heartbeat was sent. -// google.protobuf.Timestamp timestamp = 3; -//} diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java index 8244d34b..6cc2c315 100644 --- a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -76,11 +76,11 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept ); // Act - MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); + MultiStreamAppendResponse result = client.multiStreamAppend(requests.iterator()).get(); // Assert - Assertions.assertTrue(result.getSuccesses().isPresent()); - Assertions.assertFalse(result.getSuccesses().get().isEmpty()); + Assertions.assertFalse(result.getResults().isEmpty()); + Assertions.assertTrue(result.getPosition() > 0); List readEvents1 = client.readStream(streamName1, ReadStreamOptions.get()).get().getEvents(); Assertions.assertEquals(1, readEvents1.size()); @@ -132,6 +132,7 @@ public void testMultiStreamAppendWhenUnsupported() throws ExecutionException, In Assertions.assertInstanceOf(UnsupportedOperationException.class, e.getCause()); } + // @Test public void testMultiStreamAppendStreamRevisionConflict() throws ExecutionException, InterruptedException { KurrentDBClient client = getDefaultClient(); @@ -146,41 +147,30 @@ public void testMultiStreamAppendStreamRevisionConflict() throws ExecutionExcept // Arrange String streamName = generateName(); - EventData event1 = EventData.builderAsJson("event-1", "{}".getBytes()).build(); - EventData event2 = EventData.builderAsJson("event-2", "{}".getBytes()).build(); - EventData event3 = EventData.builderAsJson("event-3", "{}".getBytes()).build(); + EventData event = EventData.builderAsJson("event-1", "{}".getBytes()).build(); client.appendToStream( streamName, AppendToStreamOptions.get().streamState(StreamState.noStream()), - event1, event2, event3 + event ).get(); - ResolvedEvent lastEvent = client.readStream(streamName, ReadStreamOptions.get().maxCount(1).fromEnd().backwards()).get().getEvents().get(0); - List requests = Collections.singletonList( new AppendStreamRequest( streamName, - Collections.singletonList(EventData.builderAsBinary("event-4", "{}".getBytes()).build()).iterator(), + Collections.singletonList(EventData.builderAsBinary("event-2", "{}".getBytes()).build()).iterator(), StreamState.noStream() ) ); - // Act - MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); - - // Assert - Assertions.assertTrue(result.getFailures().isPresent()); - Assertions.assertFalse(result.getFailures().get().isEmpty()); - - AppendStreamFailure failure = result.getFailures().get().get(0); - Assertions.assertEquals(streamName, failure.getStreamName()); - - MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor(); - failure.visit(visitor); - - Assertions.assertTrue(visitor.wasWrongExpectedRevisionVisited()); - Assertions.assertEquals(lastEvent.getOriginalEvent().getRevision(), visitor.getActualRevision()); + // Act & Assert + Assertions.assertThrows(WrongExpectedVersionException.class, () -> { + try { + client.multiStreamAppend(requests.iterator()).get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + }); } @Test @@ -216,47 +206,12 @@ public void testMultiStreamAppendStreamDeleted() throws ExecutionException, Inte ); // Act - MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); - - // Assert - Assertions.assertTrue(result.getFailures().isPresent()); - Assertions.assertFalse(result.getFailures().get().isEmpty()); - - AppendStreamFailure failure = result.getFailures().get().get(0); - Assertions.assertEquals(streamName, failure.getStreamName()); - - MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor(); - failure.visit(visitor); - - Assertions.assertTrue(visitor.wasStreamDeletedVisited()); - } - - private static class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor { - private boolean wrongExpectedRevisionVisited = false; - private boolean streamDeletedVisited = false; - private long actualRevision = -1; - - @Override - public void onWrongExpectedRevision(long streamRevision) { - this.wrongExpectedRevisionVisited = true; - this.actualRevision = streamRevision; - } - - @Override - public void onStreamDeleted() { - this.streamDeletedVisited = true; - } - - public boolean wasWrongExpectedRevisionVisited() { - return wrongExpectedRevisionVisited; - } - - public boolean wasStreamDeletedVisited() { - return streamDeletedVisited; - } - - public long getActualRevision() { - return actualRevision; - } + Assertions.assertThrows(StreamTombstonedException.class, () -> { + try { + client.multiStreamAppend(requests.iterator()).get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + }); } } diff --git a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java index 542b4e1d..2fad7b33 100644 --- a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java +++ b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java @@ -3,8 +3,6 @@ import io.kurrent.dbclient.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; @@ -328,12 +326,12 @@ default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws T StreamState.noStream() ); - MultiAppendWriteResult result = client.multiStreamAppend( + MultiStreamAppendResponse result = client.multiStreamAppend( Arrays.asList(request1, request2).iterator() ).get(); - Assertions.assertNotNull(result); - Assertions.assertTrue(result.getSuccesses().isPresent()); + Assertions.assertFalse(result.getResults().isEmpty()); + Assertions.assertTrue(result.getPosition() > 0); List spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND); Assertions.assertEquals(1, spans.size()); @@ -379,13 +377,13 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable StreamState.streamExists() ); - MultiAppendWriteResult result = client.multiStreamAppend( + MultiStreamAppendResponse result = client.multiStreamAppend( Arrays.asList(request1, request2).iterator() ).get(); Assertions.assertNotNull(result); - Assertions.assertFalse(result.getSuccesses().isPresent()); - Assertions.assertTrue(result.getFailures().isPresent()); + Assertions.assertFalse(result.getResults().isEmpty()); + Assertions.assertTrue(result.getPosition() > 0); List spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND); Assertions.assertEquals(1, spans.size()); @@ -393,20 +391,6 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable ReadableSpan span = spans.get(0); Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode()); - Assertions.assertEquals("", span.toSpanData().getStatus().getDescription()); - - List events = span.toSpanData().getEvents(); - - Assertions.assertEquals(1, events.size()); - - io.opentelemetry.sdk.trace.data.EventData failureEvent = events.get(0); - - Assertions.assertEquals("exception", failureEvent.getName()); - - Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(), - failureEvent.getAttributes().get(AttributeKey.stringKey("exception.type"))); - - Assertions.assertNotNull(failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision"))); - Assertions.assertEquals(-1L, failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision"))); + Assertions.assertEquals(SpanKind.CLIENT, span.getKind()); } } From 87a3d5a27abd1be76358682915b66b2c5a3b9ab1 Mon Sep 17 00:00:00 2001 From: William Chong Date: Mon, 20 Oct 2025 12:57:10 +0400 Subject: [PATCH 2/2] Fixup --- .../kurrent/dbclient/MultiStreamAppend.java | 1 + .../proto/kurrentdb/protocol/v2/errors.proto | 191 +++++++++--------- .../proto/kurrentdb/protocol/v2/rpc.proto | 23 +-- .../protocol/v2/streams/errors.proto | 136 ++++++++++--- .../protocol/v2/streams/streams.proto | 141 ++++++++----- 5 files changed, 304 insertions(+), 188 deletions(-) diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index e544eaba..842b29df 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -2,6 +2,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Value; +import com.google.rpc.ErrorInfo; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.grpc.stub.ClientResponseObserver; diff --git a/src/main/proto/kurrentdb/protocol/v2/errors.proto b/src/main/proto/kurrentdb/protocol/v2/errors.proto index f7d1e734..5c9421c4 100644 --- a/src/main/proto/kurrentdb/protocol/v2/errors.proto +++ b/src/main/proto/kurrentdb/protocol/v2/errors.proto @@ -4,7 +4,7 @@ syntax = "proto3"; -package kurrentdb.protocol.v2.common.errors; +package kurrent.rpc; option go_package = "github.com/kurrent-io/KurrentDB-Client-Go/protos/kurrentdb/protocols/v2/common/errors"; option csharp_namespace = "KurrentDB.Protocol.V2.Common.Errors"; @@ -13,138 +13,143 @@ option java_multiple_files = true; import "kurrentdb/protocol/v2/rpc.proto"; -enum CommonError { +// The canonical server error codes for the Kurrent Platform gRPC APIs. +// These errors represent common failure modes across all Kurrent services. +enum ServerError { // Default value. This value is not used. // An error code MUST always be set to a non-zero value. // If an error code is not explicitly set, it MUST be treated as // an internal server error (INTERNAL). UNSPECIFIED = 0; - COMMON_ERROR_ACCESS_DENIED = 1 [(kurrent.rpc.error) = { + // Authentication or authorization failure. + // The client lacks valid credentials or sufficient permissions to perform the requested operation. + // + // Common causes: + // - Missing or invalid authentication tokens + // - Insufficient permissions for the operation + // - Expired credentials + // + // Client action: Check credentials, verify permissions, and re-authenticate if necessary. + // Not retriable without fixing the underlying authorization issue. + SERVER_ERROR_ACCESS_DENIED = 1 [(kurrent.rpc.error) = { status_code: PERMISSION_DENIED, has_details: true }]; - COMMON_ERROR_INVALID_REQUEST = 2 [(kurrent.rpc.error) = { + // The request is malformed or contains invalid data. + // The server cannot process the request due to client error. + // + // Common causes: + // - Invalid field values (e.g., empty required fields, out-of-range numbers) + // - Malformed data formats + // - Validation failures + // + // Client action: Fix the request data and retry. + // Not retriable without modifying the request. + SERVER_ERROR_BAD_REQUEST = 2 [(kurrent.rpc.error) = { status_code: INVALID_ARGUMENT, has_details: true }]; - COMMON_ERROR_NOT_LEADER_NODE = 5 [(kurrent.rpc.error) = { + // The server is not the cluster leader and cannot process write operations. + // In a clustered deployment, only the leader node can accept write operations. + // + // Common causes: + // - Client connected to a follower node + // - Leader election in progress + // - Network partition + // + // Client action: Redirect the request to the leader node indicated in the error details. + // Retriable after redirecting to the correct leader node. + SERVER_ERROR_NOT_LEADER_NODE = 5 [(kurrent.rpc.error) = { status_code: FAILED_PRECONDITION, has_details: true }]; - COMMON_ERROR_OPERATION_TIMEOUT = 6 [(kurrent.rpc.error) = { + // The operation did not complete within the configured timeout period. + // + // Common causes: + // - Slow disk I/O during writes + // - Cluster consensus delays + // - Network latency + // - Heavy server load + // + // Client action: Retry with exponential backoff. Consider increasing timeout values. + // Retriable - the operation may succeed on retry. + SERVER_ERROR_OPERATION_TIMEOUT = 6 [(kurrent.rpc.error) = { status_code: DEADLINE_EXCEEDED }]; - COMMON_ERROR_SERVER_NOT_READY = 7 [(kurrent.rpc.error) = { + // The server is starting up or shutting down and cannot process requests. + // + // Common causes: + // - Server is initializing (loading indexes, recovering state) + // - Server is performing graceful shutdown + // - Server is performing maintenance operations + // + // Client action: Retry with exponential backoff. Wait for server to become ready. + // Retriable - the server will become available after initialization completes. + SERVER_ERROR_SERVER_NOT_READY = 7 [(kurrent.rpc.error) = { status_code: UNAVAILABLE }]; - COMMON_ERROR_SERVER_OVERLOADED = 8 [(kurrent.rpc.error) = { + // The server is temporarily overloaded and cannot accept more requests. + // This is a backpressure mechanism to prevent server overload. + // + // Common causes: + // - Too many concurrent requests + // - Resource exhaustion (CPU, memory, disk I/O) + // - Rate limiting triggered + // + // Client action: Retry with exponential backoff. Reduce request rate. + // Retriable - the server may accept requests after load decreases. + SERVER_ERROR_SERVER_OVERLOADED = 8 [(kurrent.rpc.error) = { status_code: UNAVAILABLE }]; - COMMON_ERROR_SERVER_MALFUNCTION = 9 [(kurrent.rpc.error) = { + // An internal server error occurred. + // This indicates a bug or unexpected condition in the server. + // + // Common causes: + // - Unhandled exceptions + // - Assertion failures + // - Corrupted internal state + // - Programming errors + // + // Client action: Report to server administrators with request details. + // May be retriable, but likely indicates a server-side issue requiring investigation. + SERVER_ERROR_SERVER_MALFUNCTION = 9 [(kurrent.rpc.error) = { status_code: INTERNAL }]; - -// // The operation was aborted, typically due to a concurrency issue such as a -// // sequencer conflict or transaction abort. -// // This error will only be used when there is no intention to create a dedicated -// // error code for the specific issue, perhaps because the issue is too generic -// // or too transient or temporary in terms of handling. -// OPERATION_ABORTED = 10 [(kurrent.rpc.error) = { -// status_code: ABORTED -// }]; } +// Details for ACCESS_DENIED errors. message AccessDeniedErrorDetails { - // The scope in which access was denied. - // It could represent a resource, a domain, a permission type - // or a "path" that is a combination of these. - // (e.g., "stream:orders", "db:customers:read", etc.) - optional string scope = 1; + // The friendly name of the operation that was denied. + string operation = 1; // The username of the user who was denied access. optional string username = 2; -} -message InvalidRequestErrorDetails { - // Detailed information about each invalid argument. - repeated FieldViolation violations = 1; - - // Describes a single field violation. - message FieldViolation { - // A path that leads to a field in the request body. The value will be a - // sequence of dot-separated identifiers that identify a protocol buffer - // field. - // - // Consider the following: - // - // message CreateContactRequest { - // message EmailAddress { - // enum Type { - // TYPE_UNSPECIFIED = 0; - // HOME = 1; - // WORK = 2; - // } - // - // optional string email = 1; - // repeated EmailType type = 2; - // } - // - // string full_name = 1; - // repeated EmailAddress email_addresses = 2; - // } - // - // In this example, in proto `field` could take one of the following values: - // - // * `full_name` for a violation in the `full_name` value - // * `email_addresses[1].email` for a violation in the `email` field of the - // first `email_addresses` message - // * `email_addresses[3].type[2]` for a violation in the second `type` - // value in the third `email_addresses` message. - // - // In JSON, the same values are represented as: - // - // * `fullName` for a violation in the `fullName` value - // * `emailAddresses[1].email` for a violation in the `email` field of the - // first `emailAddresses` message - // * `emailAddresses[3].type[2]` for a violation in the second `type` - // value in the third `emailAddresses` message. - string field = 1; - - // A description of why the request element is bad. - string description = 2; - } + // The permission that was required for this operation. + optional string permission = 3; } +// Details for NOT_LEADER_NODE errors. message NotLeaderNodeErrorDetails { - // The host of the current leader node - string host = 1; + // Information about the current cluster leader node. + NodeInfo current_leader = 1; - // The port of the current leader node - int32 port = 2; + // Information about a cluster node. + message NodeInfo { + // The hostname or IP address of the node. + string host = 1; - // The instance ID of the current leader node - optional string node_id = 3; -} + // The gRPC port of the node. + int32 port = 2; -message RetryInfoErrorDetails { - // The duration in milliseconds after which the client can retry the operation. - int32 retry_delay_ms = 1; -} - -message NodeInfoErrorDetails { - // The host of the node - string host = 1; - - // The port of the node - int32 port = 2; - - // The instance ID of the node - optional string node_id = 3; + // The unique instance ID of the node. + optional string node_id = 3; + } } diff --git a/src/main/proto/kurrentdb/protocol/v2/rpc.proto b/src/main/proto/kurrentdb/protocol/v2/rpc.proto index 696e9d90..a5e915cb 100644 --- a/src/main/proto/kurrentdb/protocol/v2/rpc.proto +++ b/src/main/proto/kurrentdb/protocol/v2/rpc.proto @@ -56,11 +56,11 @@ message ErrorMetadata { // Indicates whether this error supports rich, typed detail messages. // Defaults to false (simple message string only). // The message type name must be derived from the enum name by convention. - // Mask: {EnumValue}ErrorDetails + // Mask: {EnumValue}ErrorDetails, {EnumValue}Error, {EnumValue} // // Examples: - // ACCESS_DENIED -> "AccessDeniedErrorDetails" - // SERVER_NOT_READY -> "ServerNotReadyErrorDetails" + // ACCESS_DENIED -> "AccessDeniedErrorDetails", "AccessDeniedError" or "AccessDenied" + // SERVER_NOT_READY -> "ServerNotReadyErrorDetails", "ServerNotReadyError" or "ServerNotReady" // // Code generators use the message type name to: // - Validate that the detail message matches the expected type @@ -75,20 +75,3 @@ extend google.protobuf.EnumValueOptions { // code generation and documentation. optional ErrorMetadata error = 50000; } - -// The top-level error message that must be returned by any service or operation -// in the Kurrent platform. -message RequestErrorInfo { - // The code must match one of the defined enum error codes from the module - // where the error originated from. - // A machine-readable error code that indicates the specific error condition. - // This should be at most 63 characters and match a regular expression of - // `[A-Z][A-Z0-9_]+[A-Z0-9]`, which represents UPPER_SNAKE_CASE. - // By convention, it will be generated from the enum value name if not - // explicitly specified. - // Conventions: - // - Prefix with the service name or domain to avoid collisions - // - Use UPPER_SNAKE_CASE with only letters, numbers, and underscores - // - Avoid redundant information (e.g., do not include "ERROR" suffix) - string code = 1; -} diff --git a/src/main/proto/kurrentdb/protocol/v2/streams/errors.proto b/src/main/proto/kurrentdb/protocol/v2/streams/errors.proto index fde515f6..daa3e44a 100644 --- a/src/main/proto/kurrentdb/protocol/v2/streams/errors.proto +++ b/src/main/proto/kurrentdb/protocol/v2/streams/errors.proto @@ -13,6 +13,8 @@ option java_multiple_files = true; import "kurrentdb/protocol/v2/rpc.proto"; +// Error codes specific to the Streams API. +// These errors represent failure modes when working with streams of records. enum StreamsError { // Default value. This value is not used. // An error code MUST always be set to a non-zero value. @@ -20,111 +22,195 @@ enum StreamsError { // an internal server error (INTERNAL). STREAMS_ERROR_UNSPECIFIED = 0; - // The stream was not found. - // This is recoverable by the client by creating the stream first. + // The requested stream does not exist in the database. + // + // Common causes: + // - Stream name typo or incorrect stream identifier + // - Stream was never created (no events appended yet) + // - Stream was deleted and not yet recreated + // + // Client action: Verify the stream name is correct. Create the stream by appending to it. + // Recoverable by creating the stream first (append with NO_STREAM expected revision). STREAMS_ERROR_STREAM_NOT_FOUND = 1 [(kurrent.rpc.error) = { status_code: NOT_FOUND, has_details: true, }]; - // The stream already exists. - // This is recoverable by the client by using the existing stream. + // The stream already exists when an operation expected it not to exist. + // + // Common causes: + // - Attempting to create a stream that already has events + // - Using NO_STREAM expected revision on an existing stream + // - Race condition with concurrent stream creation + // + // Client action: Use the existing stream or use a different expected revision. + // Recoverable by adjusting the expected revision or using the existing stream. STREAMS_ERROR_STREAM_ALREADY_EXISTS = 2 [(kurrent.rpc.error) = { status_code: ALREADY_EXISTS, has_details: true }]; // The stream has been soft deleted. - // It will not be visible in the stream list, until it is restored by appending to it again. + // Soft-deleted streams are hidden from stream lists but can be restored by appending to them. + // + // Common causes: + // - Stream was explicitly soft-deleted via delete operation + // - Attempting to read from a soft-deleted stream + // + // Client action: Restore the stream by appending new events, or accept that the stream is deleted. + // Recoverable by appending to the stream to restore it. STREAMS_ERROR_STREAM_DELETED = 3 [(kurrent.rpc.error) = { status_code: FAILED_PRECONDITION, has_details: true }]; - // The stream has been tombstoned. - // It has been permanently removed from the system and cannot be restored. + // The stream has been tombstoned (permanently deleted). + // Tombstoned streams cannot be restored and will never accept new events. + // + // Common causes: + // - Stream was explicitly tombstoned via tombstone operation + // - Administrative deletion of sensitive data + // - Attempting to write to or read from a tombstoned stream + // + // Client action: Stream is permanently removed. Create a new stream with a different name if needed. + // Not recoverable - the stream cannot be restored. STREAMS_ERROR_STREAM_TOMBSTONED = 4 [(kurrent.rpc.error) = { status_code: FAILED_PRECONDITION, has_details: true }]; - // The expected revision of the stream does not match the actual revision. - // This is recoverable by the client by fetching the current revision and retrying. + // The expected revision does not match the actual stream revision. + // This is an optimistic concurrency control failure. + // + // Common causes: + // - Another client modified the stream concurrently + // - Client has stale state about the stream revision + // - Race condition in distributed system + // + // Client action: Fetch the current stream revision and retry with the correct expected revision. + // Recoverable by reading the current state and retrying with proper optimistic concurrency control. STREAMS_ERROR_STREAM_REVISION_CONFLICT = 5 [(kurrent.rpc.error) = { status_code: FAILED_PRECONDITION, has_details: true }]; - // The size of a record being appended exceeds the maximum allowed size. - // It is recoverable by the client by sending a smaller record. + // A single record being appended exceeds the maximum allowed size. + // + // Common causes: + // - Record payload is too large (exceeds server's max record size configuration) + // - Excessive metadata in properties + // - Large binary data without chunking + // + // Client action: Reduce record size, split large payloads across multiple records, or increase server limits. + // Recoverable by reducing record size or adjusting server configuration. STREAMS_ERROR_APPEND_RECORD_SIZE_EXCEEDED = 6 [(kurrent.rpc.error) = { status_code: INVALID_ARGUMENT, has_details: true }]; - // When the transaction exceeds the maximum size allowed (max chunk size). - // It is recoverable by the client by sending a smaller transaction. + // The total size of all records in a single append session exceeds the maximum allowed transaction size. + // + // Common causes: + // - Too many records in a single append session + // - Combined payload size exceeds server's max transaction size + // - Attempting to write very large batches + // + // Client action: Split the append into multiple smaller transactions. + // Recoverable by reducing the number of records per append session. STREAMS_ERROR_APPEND_TRANSACTION_SIZE_EXCEEDED = 7 [(kurrent.rpc.error) = { status_code: ABORTED, has_details: true }]; - // The stream is already in an append session. - // Appending to the same stream multiple times is currently not supported. + // The same stream appears multiple times in a single append session. + // This is currently not supported to prevent complexity with expected revisions and ordering. + // + // Common causes: + // - Accidentally appending to the same stream twice in one session + // - Application logic error in batch operations + // + // Client action: Remove duplicate streams from the append session or split into multiple sessions. + // Recoverable by restructuring the append session to reference each stream only once. STREAMS_ERROR_STREAM_ALREADY_IN_APPEND_SESSION = 8 [(kurrent.rpc.error) = { status_code: ABORTED, has_details: true }]; + + // An append session was started but no append requests were sent before completing the stream. + // + // Common causes: + // - Client completed the stream without sending any AppendRequest messages + // - Application logic error + // + // Client action: Ensure at least one AppendRequest is sent before completing the stream. + // Recoverable by properly implementing the append session protocol. + STREAMS_ERROR_APPEND_SESSION_NO_REQUESTS = 9 [(kurrent.rpc.error) = { + status_code: FAILED_PRECONDITION + }]; } +// Details for STREAM_NOT_FOUND errors. message StreamNotFoundErrorDetails { // The name of the stream that was not found. string stream = 1; } +// Details for STREAM_ALREADY_EXISTS errors. message StreamAlreadyExistsErrorDetails { // The name of the stream that already exists. string stream = 1; } +// Details for STREAM_DELETED errors. message StreamDeletedErrorDetails { // The name of the stream that was deleted. string stream = 1; } +// Details for STREAM_TOMBSTONED errors. message StreamTombstonedErrorDetails { // The name of the stream that was tombstoned. string stream = 1; } +// Details for STREAM_REVISION_CONFLICT errors. message StreamRevisionConflictErrorDetails { // The name of the stream that had a revision conflict. string stream = 1; - // The actual revision of the stream. - int64 expected_revision = 2 [jstype = JS_STRING]; - // The actual revision of the stream. - int64 actual_revision = 3 [jstype = JS_STRING]; + + // The expected revision that was provided in the append request. + sint64 expected_revision = 2 [jstype = JS_STRING]; + + // The actual current revision of the stream. + sint64 actual_revision = 3 [jstype = JS_STRING]; } +// Details for APPEND_RECORD_SIZE_EXCEEDED errors. message AppendRecordSizeExceededErrorDetails { // The name of the stream where the append was attempted. string stream = 1; - // The identifier of the offending and oversized record. + + // The identifier of the record that exceeded the size limit. string record_id = 2; - // The size of the huge record in bytes. + + // The actual size of the record in bytes. int32 size = 3; - // The maximum allowed size of a single record that can be appended in bytes. + + // The maximum allowed size of a single record in bytes. int32 max_size = 4; } +// Details for APPEND_TRANSACTION_SIZE_EXCEEDED errors. message AppendTransactionSizeExceededErrorDetails { - // The size of the huge transaction in bytes. + // The actual size of the transaction in bytes. int32 size = 1; - // The maximum allowed size of the append transaction in bytes. + + // The maximum allowed size of an append transaction in bytes. int32 max_size = 2; } +// Details for STREAM_ALREADY_IN_APPEND_SESSION errors. message StreamAlreadyInAppendSessionErrorDetails { - // The name of the stream that is already in an append session. + // The name of the stream that appears multiple times. string stream = 1; } diff --git a/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto b/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto index 499fd969..afe71667 100644 --- a/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto +++ b/src/main/proto/kurrentdb/protocol/v2/streams/streams.proto @@ -14,50 +14,64 @@ option java_multiple_files = true; import "google/protobuf/struct.proto"; service StreamsService { - // Executes an atomic operation to append records to multiple streams. - // This transactional method ensures that all appends either succeed - // completely, or are entirely rolled back, thereby maintaining strict - // data consistency across all involved streams. - rpc Append(AppendRequest) returns (AppendResponse); - - // Streaming version of Append that allows clients to send multiple - // append requests continuously. Once completed, all records are - // appended transactionally (all succeed or fail together). - // Provides improved efficiency for high-throughput scenarios while - // maintaining the same transactional guarantees. + // Appends records to multiple streams atomically within a single transaction. + // + // This is a client-streaming RPC where the client sends multiple AppendRequest messages + // (one per stream) and receives a single AppendSessionResponse upon commit. + // + // Guarantees: + // - Atomicity: All writes succeed or all fail together + // - Optimistic Concurrency: Expected revisions are validated for all streams before commit + // - Ordering: Records within each stream maintain send order + // + // Current Limitations: + // - Each stream can only appear once per session (no multiple appends to same stream) + // + // Example flow: + // 1. Client opens stream + // 2. Client sends AppendRequest for stream "orders" with 3 records + // 3. Client sends AppendRequest for stream "inventory" with 2 records + // 4. Client completes the stream + // 5. Server validates, commits, returns AppendSessionResponse with positions rpc AppendSession(stream AppendRequest) returns (AppendSessionResponse); } - // Represents the input for appending records to a specific stream. message AppendRequest { - // The name of the stream to append records to. + // The stream to append records to. string stream = 1; + // The records to append to the stream. repeated AppendRecord records = 2; - // The expected revision of the stream. If the stream's current revision does - // not match, the append will fail. - // The expected revision can also be one of the special values - // from ExpectedRevisionConstants. - // Missing value means no expectation, the same as EXPECTED_REVISION_CONSTANTS_ANY + + // The expected revision for optimistic concurrency control. + // Can be either: + // - A specific revision number (0, 1, 2, ...) - the stream must be at exactly this revision + // - An ExpectedRevisionConstants value (-4, -2, -1) for special semantics + // + // If omitted, defaults to EXPECTED_REVISION_CONSTANTS_ANY (-2). optional sint64 expected_revision = 3 [jstype = JS_STRING]; } // Represents the outcome of an append operation. message AppendResponse { - // The name of the stream to which records were appended. + // The stream to which records were appended. string stream = 1; - // The expected revision of the stream after the append operation. - int64 stream_revision = 2 [jstype = JS_STRING]; - // The position of the last appended record in the stream. - optional int64 position = 4 [jstype = JS_STRING]; + + // The actual/current revision of the stream after the append. + // This is the revision number of the last record written to this stream. + sint64 stream_revision = 2 [jstype = JS_STRING]; + + // The position of the last appended record in the global log. + optional sint64 position = 3 [jstype = JS_STRING]; } message AppendSessionResponse { // The results of each append request in the session. repeated AppendResponse output = 1; - // The position of the last appended record in the session. - int64 position = 2 [jstype = JS_STRING]; + + // The global commit position of the last appended record in the session. + sint64 position = 2 [jstype = JS_STRING]; } // Represents the data format of the schema. @@ -70,11 +84,22 @@ enum SchemaFormat { SCHEMA_FORMAT_BYTES = 4; } +// Schema information for record validation and interpretation. message SchemaInfo { - // The format of the schema that the record conforms to. - SchemaFormat format = 1; - // The name of the schema that the record conforms to. + // The format of the data payload. + // Determines how the bytes in AppendRecord.data should be interpreted. + SchemaFormat format = 1; + + // The schema name (replaces the legacy "event type" concept). + // Identifies what kind of data this record contains. + // + // Common naming formats: + // - Kebab-case: "order-placed", "customer-registered" + // - URN format: "urn:kurrentdb:events:order-placed:v1" + // - Dotted namespace: "Teams.Player.V1", "Orders.OrderPlaced.V2" + // - Reverse domain: "com.acme.orders.placed" string name = 2; + // The identifier of the specific version of the schema that the record payload // conforms to. This should match a registered schema version in the system. // Not necessary when not enforcing schema validation. @@ -83,35 +108,51 @@ message SchemaInfo { // Record to be appended to a stream. message AppendRecord { - // Universally Unique identifier for the record. Must be a guid. + // Unique identifier for this record (must be a valid UUID/GUID). // If not provided, the server will generate a new one. optional string record_id = 1; - // The timestamp of when the record was created, represented as - // milliseconds since the Unix epoch. This is primarily for - // informational purposes and does not affect the ordering of records - // within the stream, which is determined by the server. - // If not provided, the server will assign it upon receipt. - optional int64 timestamp = 2 [jstype = JS_STRING]; + // A collection of properties providing additional information about the - // record. This can include user-defined metadata or system properties. - // System properties are uniquely identified by the "$." prefix. - map properties = 3; - // Information about the schema that the record payload conforms to. - SchemaInfo schema = 4; - // The actual data payload of the record. - bytes data = 5; + // record. Can contain user-defined or system propreties. + // System keys will be prefixed with "$" (e.g., "$timestamp"). + // User-defined keys MUST NOT start with "$". + // + // Common examples: + // User metadata: + // - "user-id": "12345" + // - "tenant": "acme-corp" + // - "source": "mobile-app" + // + // System metadata (with $ prefix): + // - "$trace-id": "4bf92f3577b34da6a3ce929d0e0e4736" // OpenTelemetry trace ID + // - "$span-id": "00f067aa0ba902b7" // OpenTelemetry span ID + // - "$timestamp": "2025-01-15T10:30:00.000Z" // ISO 8601 timestamp + map properties = 2; + + // Schema information for this record. + SchemaInfo schema = 3; + + // The record payload as raw bytes. + // The format specified in SchemaInfo determines how to interpret these bytes. + bytes data = 4; } -// Constants that match the expected state of a stream during an -// append operation. It can be used to specify whether the stream should exist, -// not exist, or can be in any state. +// Constants for expected revision validation in optimistic concurrency control. +// These can be used in the expected_revision field, or you can specify an actual revision number. enum ExpectedRevisionConstants { - // The stream should exist and have a single event. + // The stream must have exactly one event at revision 0. + // Used for scenarios requiring strict single-event semantics. EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; - // It is not important whether the stream exists or not. - EXPECTED_REVISION_CONSTANTS_ANY = -2; - // The stream should not exist. If it does, the append will fail. + + // The stream must not exist yet (first write to the stream). + // Fails if the stream already has events. EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; - // The stream should exist + + // Accept any current state of the stream (no optimistic concurrency check). + // The write will succeed regardless of the stream's current revision. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + + // The stream must exist (have at least one record). + // Fails if the stream doesn't exist yet. EXPECTED_REVISION_CONSTANTS_EXISTS = -4; }