Skip to content

Commit dbdbd13

Browse files
committed
Ensure that AsyncReadWriteBinding is released prior to subscriber notification
Previously the driver assumed that Mono#doOnTerminate is executed prior to the subscriber being notified of completion. But it turns out that behavior is not guaranteed in the Californium release of Project Reactor (though it is in later releases). So instead, now the SingleResultCallback that is used to notify the Mongo of completion is wrapped by one that first releases the binding, and Mono#doOnTerminate is no longer used. JAVA-4027
1 parent 927e50d commit dbdbd13

File tree

1 file changed

+14
-13
lines changed

1 file changed

+14
-13
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,13 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
6666
binding.release();
6767
return Mono.error(new MongoClientException("Read preference in a transaction must be primary"));
6868
} else {
69-
return Mono.<T>create(sink -> operation.executeAsync(binding, sinkToCallback(sink)))
70-
.doOnTerminate(binding::release)
71-
.doOnError((t) -> {
72-
labelException(session, t);
73-
unpinServerAddressOnTransientTransactionError(session, t);
74-
});
69+
return Mono.<T>create(sink -> operation.executeAsync(binding, (result, t) -> {
70+
binding.release();
71+
sinkToCallback(sink).onResult(result, t);
72+
})).doOnError((t) -> {
73+
labelException(session, t);
74+
unpinServerAddressOnTransientTransactionError(session, t);
75+
});
7576
}
7677
});
7778
}
@@ -86,13 +87,13 @@ public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadCon
8687
session == null && clientSession != null))
8788
.switchIfEmpty(Mono.fromCallable(() -> getReadWriteBinding(ReadPreference.primary(), readConcern, session, false)))
8889
.flatMap(binding ->
89-
Mono.<T>create(sink -> operation.executeAsync(binding, sinkToCallback(sink)))
90-
.doOnTerminate(binding::release)
91-
.doOnError((t) -> {
92-
labelException(session, t);
93-
unpinServerAddressOnTransientTransactionError(session, t);
94-
})
95-
90+
Mono.<T>create(sink -> operation.executeAsync(binding, (result, t) -> {
91+
binding.release();
92+
sinkToCallback(sink).onResult(result, t);
93+
})).doOnError((t) -> {
94+
labelException(session, t);
95+
unpinServerAddressOnTransientTransactionError(session, t);
96+
})
9697
);
9798
}
9899

0 commit comments

Comments
 (0)