diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java index 66198244..1a66a814 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java @@ -3,6 +3,7 @@ import com.eventstore.dbclient.proto.shared.Shared; import com.eventstore.dbclient.proto.streams.StreamsGrpc; import com.eventstore.dbclient.proto.streams.StreamsOuterClass; +import io.grpc.ManagedChannel; import java.util.concurrent.CompletableFuture; @@ -32,38 +33,42 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions(); public CompletableFuture execute() { - CompletableFuture future = new CompletableFuture<>(); - - this.client.getWorkItemArgs().whenComplete((args, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - - ReadResponseObserver observer = createObserver(args, future); - observer.onConnected(args); + return this.client.run(channel -> { + CompletableFuture future = new CompletableFuture<>(); StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder() .setOptions(createOptions()) .build(); - StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options); + StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub( + StreamsGrpc.newStub(channel), + this.client.getSettings(), + this.options + ); + + ReadResponseObserver observer = createObserver(channel, future); streamsClient.read(readReq, observer); - }); - return future; + return future; + }); } - private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture future) { - StreamConsumer consumer = new SubscriptionStreamConsumer(this.listener, this.checkpointer, future, (subscriptionId, event, action) -> { - ClientTelemetry.traceSubscribe( - action, - subscriptionId, - args.getChannel(), - client.getSettings(), - options.getCredentials(), - event); - }); + private ReadResponseObserver createObserver(ManagedChannel channel, CompletableFuture future) { + StreamConsumer consumer = new SubscriptionStreamConsumer( + this.listener, + this.checkpointer, + future, + (subscriptionId, event, action) -> { + ClientTelemetry.traceSubscribe( + action, + subscriptionId, + channel, + client.getSettings(), + options.getCredentials(), + event + ); + } + ); return new ReadResponseObserver(this.options, consumer); } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/SubscriptionStreamConsumer.java b/db-client-java/src/main/java/com/eventstore/dbclient/SubscriptionStreamConsumer.java index 00574e76..e02049cd 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/SubscriptionStreamConsumer.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/SubscriptionStreamConsumer.java @@ -67,6 +67,11 @@ public void onFellBehind() { @Override public void onCancelled(Throwable exception) { + // error occurred before subscription was confirmed + if (this.subscription == null) { + this.future.completeExceptionally(exception); + } + this.listener.onCancelled(this.subscription, exception); }