@@ -633,19 +633,34 @@ private <T> T getCommandResult(final Decoder<T> decoder, final ResponseBuffers r
633633 @ Override
634634 public void sendMessage (final List <ByteBuf > byteBuffers , final int lastRequestId ) {
635635 notNull ("stream is open" , stream );
636-
637636 if (isClosed ()) {
638637 throw new MongoSocketClosedException ("Cannot write to a closed stream" , getServerAddress ());
639638 }
640-
641639 try {
642640 stream .write (byteBuffers );
643641 } catch (Exception e ) {
644642 close ();
645- throw translateWriteException (e );
643+ throwTranslatedWriteException (e );
646644 }
647645 }
648646
647+ @ Override
648+ public void sendMessageAsync (final List <ByteBuf > byteBuffers , final int lastRequestId ,
649+ final SingleResultCallback <Void > callback ) {
650+ beginAsync ().thenRun ((c ) -> {
651+ notNull ("stream is open" , stream );
652+ if (isClosed ()) {
653+ throw new MongoSocketClosedException ("Cannot write to a closed stream" , getServerAddress ());
654+ }
655+ c .complete (c );
656+ }).thenRunTryCatchAsyncBlocks (c -> {
657+ stream .writeAsync (byteBuffers , c .asHandler ());
658+ }, Exception .class , (e , c ) -> {
659+ close ();
660+ throwTranslatedWriteException (e );
661+ }).finish (errorHandlingCallback (callback , LOGGER ));
662+ }
663+
649664 @ Override
650665 public ResponseBuffers receiveMessage (final int responseTo ) {
651666 assertNotNull (stream );
@@ -665,39 +680,6 @@ private ResponseBuffers receiveMessageWithAdditionalTimeout(final int additional
665680 }
666681 }
667682
668- @ Override
669- public void sendMessageAsync (final List <ByteBuf > byteBuffers , final int lastRequestId ,
670- final SingleResultCallback <Void > callback ) {
671- assertNotNull (stream );
672-
673- if (isClosed ()) {
674- callback .onResult (null , new MongoSocketClosedException ("Can not read from a closed socket" , getServerAddress ()));
675- return ;
676- }
677-
678- writeAsync (byteBuffers , errorHandlingCallback (callback , LOGGER ));
679- }
680-
681- private void writeAsync (final List <ByteBuf > byteBuffers , final SingleResultCallback <Void > callback ) {
682- try {
683- stream .writeAsync (byteBuffers , new AsyncCompletionHandler <Void >() {
684- @ Override
685- public void completed (@ Nullable final Void v ) {
686- callback .onResult (null , null );
687- }
688-
689- @ Override
690- public void failed (final Throwable t ) {
691- close ();
692- callback .onResult (null , translateWriteException (t ));
693- }
694- });
695- } catch (Throwable t ) {
696- close ();
697- callback .onResult (null , t );
698- }
699- }
700-
701683 @ Override
702684 public void receiveMessageAsync (final int responseTo , final SingleResultCallback <ResponseBuffers > callback ) {
703685 assertNotNull (stream );
@@ -762,6 +744,10 @@ private void updateSessionContext(final SessionContext sessionContext, final Res
762744 }
763745 }
764746
747+ private void throwTranslatedWriteException (final Throwable e ) {
748+ throw translateWriteException (e );
749+ }
750+
765751 private MongoException translateWriteException (final Throwable e ) {
766752 if (e instanceof MongoException ) {
767753 return (MongoException ) e ;
0 commit comments