44package com .microsoft .signalr ;
55
66import java .util .Map ;
7+ import java .util .concurrent .locks .ReentrantLock ;
78
89import org .slf4j .Logger ;
910import org .slf4j .LoggerFactory ;
@@ -26,6 +27,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
2627 private WebSocketOnClosedCallback onClose ;
2728 private CompletableSubject startSubject = CompletableSubject .create ();
2829 private CompletableSubject closeSubject = CompletableSubject .create ();
30+ private final ReentrantLock closeLock = new ReentrantLock ();
2931
3032 private final Logger logger = LoggerFactory .getLogger (OkHttpWebSocketWrapper .class );
3133
@@ -87,14 +89,29 @@ public void onMessage(WebSocket webSocket, String message) {
8789 @ Override
8890 public void onClosing (WebSocket webSocket , int code , String reason ) {
8991 onClose .invoke (code , reason );
90- closeSubject .onComplete ();
92+ try {
93+ closeLock .lock ();
94+ closeSubject .onComplete ();
95+ }
96+ finally {
97+ closeLock .unlock ();
98+ }
9199 checkStartFailure ();
92100 }
93101
94102 @ Override
95103 public void onFailure (WebSocket webSocket , Throwable t , Response response ) {
96104 logger .error ("WebSocket closed from an error: {}." , t .getMessage ());
97- closeSubject .onError (new RuntimeException (t ));
105+
106+ try {
107+ closeLock .lock ();
108+ if (!closeSubject .hasComplete ()) {
109+ closeSubject .onError (new RuntimeException (t ));
110+ }
111+ }
112+ finally {
113+ closeLock .unlock ();
114+ }
98115 onClose .invoke (null , t .getMessage ());
99116 checkStartFailure ();
100117 }
0 commit comments