Skip to content

Commit 0555759

Browse files
committed
fix: subscription error handling during server shutdown
1 parent 59971ab commit 0555759

File tree

2 files changed

+32
-23
lines changed

2 files changed

+32
-23
lines changed

db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,42 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure
3232
protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();
3333

3434
public CompletableFuture<Subscription> execute() {
35-
CompletableFuture<Subscription> future = new CompletableFuture<>();
36-
37-
this.client.getWorkItemArgs().whenComplete((args, error) -> {
38-
if (error != null) {
39-
future.completeExceptionally(error);
40-
return;
41-
}
42-
43-
ReadResponseObserver observer = createObserver(args, future);
44-
observer.onConnected(args);
35+
return this.client.run(channel -> {
36+
CompletableFuture<Subscription> future = new CompletableFuture<>();
4537

4638
StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
4739
.setOptions(createOptions())
4840
.build();
4941

50-
StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
42+
StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(
43+
StreamsGrpc.newStub(channel),
44+
this.client.getSettings(),
45+
this.options
46+
);
47+
48+
ReadResponseObserver observer = createObserver(future);
5149
streamsClient.read(readReq, observer);
52-
});
5350

54-
return future;
51+
return future;
52+
});
5553
}
5654

57-
private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture<Subscription> future) {
58-
StreamConsumer consumer = new SubscriptionStreamConsumer(this.listener, this.checkpointer, future, (subscriptionId, event, action) -> {
59-
ClientTelemetry.traceSubscribe(
60-
action,
61-
subscriptionId,
62-
args.getChannel(),
63-
client.getSettings(),
64-
options.getCredentials(),
65-
event);
66-
});
55+
private ReadResponseObserver createObserver(CompletableFuture<Subscription> future) {
56+
StreamConsumer consumer = new SubscriptionStreamConsumer(
57+
this.listener,
58+
this.checkpointer,
59+
future,
60+
(subscriptionId, event, action) -> {
61+
ClientTelemetry.traceSubscribe(
62+
action,
63+
subscriptionId,
64+
null, // Channel will be provided elsewhere
65+
client.getSettings(),
66+
options.getCredentials(),
67+
event
68+
);
69+
}
70+
);
6771

6872
return new ReadResponseObserver(this.options, consumer);
6973
}

db-client-java/src/main/java/com/eventstore/dbclient/SubscriptionStreamConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public void onFellBehind() {
6767

6868
@Override
6969
public void onCancelled(Throwable exception) {
70+
// error occurred before subscription was confirmed
71+
if (this.subscription == null) {
72+
this.future.completeExceptionally(exception);
73+
}
74+
7075
this.listener.onCancelled(this.subscription, exception);
7176
}
7277

0 commit comments

Comments
 (0)