Skip to content

Commit 8e5f873

Browse files
authored
fix: subscription error handling during server shutdown (#311)
1 parent 59971ab commit 8e5f873

File tree

2 files changed

+33
-23
lines changed

2 files changed

+33
-23
lines changed

db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.eventstore.dbclient.proto.shared.Shared;
44
import com.eventstore.dbclient.proto.streams.StreamsGrpc;
55
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
6+
import io.grpc.ManagedChannel;
67

78
import java.util.concurrent.CompletableFuture;
89

@@ -32,38 +33,42 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure
3233
protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();
3334

3435
public CompletableFuture<Subscription> execute() {
35-
CompletableFuture<Subscription> future = new CompletableFuture<>();
36-
37-
this.client.getWorkItemArgs().whenComplete((args, error) -> {
38-
if (error != null) {
39-
future.completeExceptionally(error);
40-
return;
41-
}
42-
43-
ReadResponseObserver observer = createObserver(args, future);
44-
observer.onConnected(args);
36+
return this.client.run(channel -> {
37+
CompletableFuture<Subscription> future = new CompletableFuture<>();
4538

4639
StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
4740
.setOptions(createOptions())
4841
.build();
4942

50-
StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
43+
StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(
44+
StreamsGrpc.newStub(channel),
45+
this.client.getSettings(),
46+
this.options
47+
);
48+
49+
ReadResponseObserver observer = createObserver(channel, future);
5150
streamsClient.read(readReq, observer);
52-
});
5351

54-
return future;
52+
return future;
53+
});
5554
}
5655

57-
private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture<Subscription> future) {
58-
StreamConsumer consumer = new SubscriptionStreamConsumer(this.listener, this.checkpointer, future, (subscriptionId, event, action) -> {
59-
ClientTelemetry.traceSubscribe(
60-
action,
61-
subscriptionId,
62-
args.getChannel(),
63-
client.getSettings(),
64-
options.getCredentials(),
65-
event);
66-
});
56+
private ReadResponseObserver createObserver(ManagedChannel channel, CompletableFuture<Subscription> future) {
57+
StreamConsumer consumer = new SubscriptionStreamConsumer(
58+
this.listener,
59+
this.checkpointer,
60+
future,
61+
(subscriptionId, event, action) -> {
62+
ClientTelemetry.traceSubscribe(
63+
action,
64+
subscriptionId,
65+
channel,
66+
client.getSettings(),
67+
options.getCredentials(),
68+
event
69+
);
70+
}
71+
);
6772

6873
return new ReadResponseObserver(this.options, consumer);
6974
}

db-client-java/src/main/java/com/eventstore/dbclient/SubscriptionStreamConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public void onFellBehind() {
6767

6868
@Override
6969
public void onCancelled(Throwable exception) {
70+
// error occurred before subscription was confirmed
71+
if (this.subscription == null) {
72+
this.future.completeExceptionally(exception);
73+
}
74+
7075
this.listener.onCancelled(this.subscription, exception);
7176
}
7277

0 commit comments

Comments
 (0)