Skip to content

Commit bee823e

Browse files
YoEightw1am
authored andcommitted
feat: add multi stream append support (#330)
1 parent ba0f028 commit bee823e

30 files changed

+1195
-15
lines changed

.github/workflows/tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
strategy:
2121
fail-fast: false
2222
matrix:
23-
test: [Streams, PersistentSubscriptions, Telemetry]
23+
test: [Streams, PersistentSubscriptions, Telemetry, MultiStreamAppend]
2424

2525
runs-on: ubuntu-latest
2626
steps:
@@ -102,7 +102,7 @@ jobs:
102102
strategy:
103103
fail-fast: false
104104
matrix:
105-
test: [Streams, PersistentSubscriptions]
105+
test: [Streams, PersistentSubscriptions, MultiStreamAppend]
106106

107107
runs-on: ubuntu-latest
108108
steps:
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.kurrent.dbclient;
2+
3+
public class AppendStreamFailure {
4+
private final io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner;
5+
6+
AppendStreamFailure(io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner) {
7+
this.inner = inner;
8+
}
9+
10+
public String getStreamName() {
11+
return this.inner.getStream();
12+
}
13+
14+
public void visit(MultiAppendStreamErrorVisitor visitor) {
15+
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_REVISION_CONFLICT) {
16+
visitor.onWrongExpectedRevision(this.inner.getStreamRevisionConflict().getStreamRevision());
17+
return;
18+
}
19+
20+
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) {
21+
visitor.onAccessDenied(this.inner.getAccessDenied());
22+
}
23+
24+
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) {
25+
visitor.onStreamDeleted();
26+
return;
27+
}
28+
29+
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) {
30+
visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize());
31+
return;
32+
}
33+
34+
throw new IllegalArgumentException("Append failure does not match any known error type");
35+
}
36+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.kurrent.dbclient;
2+
3+
import java.util.Iterator;
4+
5+
public class AppendStreamRequest {
6+
private final String streamName;
7+
private final Iterator<EventData> events;
8+
private final StreamState expectedState;
9+
10+
public AppendStreamRequest(String streamName, Iterator<EventData> events, StreamState expectedState) {
11+
this.streamName = streamName;
12+
this.events = events;
13+
this.expectedState = expectedState;
14+
}
15+
16+
public String getStreamName() {
17+
return streamName;
18+
}
19+
20+
public Iterator<EventData> getEvents() {
21+
return events;
22+
}
23+
24+
public StreamState getExpectedState() {
25+
return expectedState;
26+
}
27+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.kurrent.dbclient;
2+
3+
public class AppendStreamSuccess {
4+
private final io.kurrentdb.protocol.streams.v2.AppendStreamSuccess inner;
5+
6+
AppendStreamSuccess(io.kurrentdb.protocol.streams.v2.AppendStreamSuccess inner) {
7+
this.inner = inner;
8+
}
9+
10+
public String getStreamName() {
11+
return this.inner.getStream();
12+
}
13+
14+
public long getStreamRevision() {
15+
return this.inner.getStreamRevision();
16+
}
17+
18+
public long getPosition() {
19+
return this.inner.getPosition();
20+
}
21+
}

src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
public class ConnectionSettingsBuilder {
1818
private static final Logger logger = LoggerFactory.getLogger(ConnectionSettingsBuilder.class);
1919
private static final Set<String> SUPPORTED_PROTOCOLS = new HashSet<>(Arrays.asList(
20-
"esdb", "esdb+discover", "kurrent", "kurrent+discover", "kdb", "kdb+discover", "kurrentdb", "kurrentdb+discover"
20+
"esdb", "esdb+discover", "kurrentdb", "kurrent+discover", "kdb", "kdb+discover", "kurrentdb", "kurrentdb+discover"
2121
));
2222

2323
private boolean _dnsDiscover = false;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.kurrent.dbclient;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
public class ContentTypeMapper {
8+
private static final Map<String, String> CONTENT_TYPE_MAP;
9+
10+
static {
11+
Map<String, String> map = new HashMap<>();
12+
map.put("application/json", "Json");
13+
map.put("application/octet-stream", "Binary");
14+
CONTENT_TYPE_MAP = Collections.unmodifiableMap(map);
15+
}
16+
17+
public static String toSchemaDataFormat(String contentType) {
18+
return CONTENT_TYPE_MAP.getOrDefault(contentType, contentType);
19+
}
20+
}

src/main/java/io/kurrent/dbclient/FeatureFlags.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ class FeatureFlags {
77
public final static int PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM = 8;
88
public final static int PERSISTENT_SUBSCRIPTION_GET_INFO = 16;
99
public final static int PERSISTENT_SUBSCRIPTION_TO_ALL = 32;
10+
public final static int MULTI_STREAM_APPEND = 64;
1011
public final static int PERSISTENT_SUBSCRIPTION_MANAGEMENT = PERSISTENT_SUBSCRIPTION_LIST | PERSISTENT_SUBSCRIPTION_REPLAY | PERSISTENT_SUBSCRIPTION_GET_INFO | PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM;
1112
}

src/main/java/io/kurrent/dbclient/GossipClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class GossipClient {
2121

2222
public GossipClient(KurrentDBClientSettings settings, ManagedChannel channel) {
2323
_channel = channel;
24-
_stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), (long)settings.getGossipTimeout());
24+
_stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), settings.getGossipTimeout());
2525
}
2626

2727
public void shutdown() {

src/main/java/io/kurrent/dbclient/GrpcUtils.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,14 @@ static public StreamsOuterClass.ReadReq.Options.StreamOptions toStreamOptions(St
113113
}
114114

115115
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options) {
116-
return configureStub(stub, settings, options, null);
116+
return configureStub(stub, settings, options, null, true);
117117
}
118118

119-
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options, Long forceDeadlineInMs) {
119+
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options, long forceDeadlineInMs) {
120+
return configureStub(stub, settings, options, forceDeadlineInMs, true);
121+
}
122+
123+
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options, Long forceDeadlineInMs, boolean forwardRequiresLeader) {
120124
S finalStub = stub;
121125
ConnectionMetadata metadata = new ConnectionMetadata();
122126

@@ -146,7 +150,7 @@ static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, Kurren
146150
metadata.authenticated(credentials);
147151
}
148152

149-
if (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER) {
153+
if (forwardRequiresLeader && (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER)) {
150154
metadata.requiresLeader();
151155
}
152156

src/main/java/io/kurrent/dbclient/KurrentDBClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, AppendTo
7575
return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute();
7676
}
7777

78+
public CompletableFuture<MultiAppendWriteResult> multiAppend(AppendToStreamOptions options, Iterator<AppendStreamRequest> requests) {
79+
return new MultiStreamAppend(this.getGrpcClient(), requests).execute();
80+
}
81+
7882
/**
7983
* Sets a stream's metadata.
8084
* @param streamName stream's name.

0 commit comments

Comments
 (0)