@@ -101,7 +101,7 @@ public long getKeepAliveInterval() {
101101 * @return A string representing the the client's connectionId.
102102 */
103103 public String getConnectionId () {
104- ConnectionState state = this .state .getConnectionStateUnsynchronized ();
104+ ConnectionState state = this .state .getConnectionStateUnsynchronized (true );
105105 if (state != null ) {
106106 return state .connectionId ;
107107 }
@@ -233,11 +233,10 @@ public Completable start() {
233233 try {
234234 if (this .state .getHubConnectionState () != HubConnectionState .DISCONNECTED ) {
235235 logger .debug ("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately." , this .state .getHubConnectionState ());
236- return this .state .startTask ;
236+ return this .state .getConnectionStateUnsynchronized ( false ). startTask ;
237237 }
238238
239239 this .state .changeState (HubConnectionState .DISCONNECTED , HubConnectionState .CONNECTING );
240- this .state .startTask = localStart ;
241240
242241 CompletableSubject tokenCompletable = CompletableSubject .create ();
243242 Map <String , String > localHeaders = new HashMap <>();
@@ -247,6 +246,7 @@ public Completable start() {
247246 }
248247 ConnectionState connectionState = new ConnectionState (this );
249248 this .state .setConnectionState (connectionState );
249+ connectionState .startTask = localStart ;
250250
251251 accessTokenProvider .subscribe (token -> {
252252 if (token != null && !token .isEmpty ()) {
@@ -288,10 +288,22 @@ public Completable start() {
288288 new HandshakeRequestMessage (protocol .getName (), protocol .getVersion ()));
289289
290290 return connectionState .transport .send (handshake ).andThen (Completable .defer (() -> {
291- connectionState .timeoutHandshakeResponse (handshakeResponseTimeout , TimeUnit .MILLISECONDS );
291+ this .state .lock ();
292+ try {
293+ if (this .state .getConnectionStateUnsynchronized (true ) != null ) {
294+ connectionState .timeoutHandshakeResponse (handshakeResponseTimeout , TimeUnit .MILLISECONDS );
295+ } else {
296+ return Completable .error (new RuntimeException ("Connection closed while sending handshake." ));
297+ }
298+ } finally {
299+ this .state .unlock ();
300+ }
292301 return connectionState .handshakeResponseSubject .andThen (Completable .defer (() -> {
293- connectionState . lock .lock ();
302+ this . state .lock ();
294303 try {
304+ if (this .state .getConnectionStateUnsynchronized (true ) == null ) {
305+ return Completable .error (new RuntimeException ("Connection closed while waiting for handshake." ));
306+ }
295307 this .state .changeState (HubConnectionState .CONNECTING , HubConnectionState .CONNECTED );
296308 logger .info ("HubConnection started." );
297309 connectionState .resetServerTimeout ();
@@ -300,7 +312,7 @@ public Completable start() {
300312 connectionState .activatePingTimer ();
301313 }
302314 } finally {
303- connectionState . lock .unlock ();
315+ this . state .unlock ();
304316 }
305317
306318 return Completable .complete ();
@@ -311,9 +323,10 @@ public Completable start() {
311323 }).subscribe (() -> {
312324 localStart .onComplete ();
313325 }, error -> {
314- this .state .lock .lock ();
315- this .state .changeState (HubConnectionState .CONNECTING , HubConnectionState .DISCONNECTED );
316- this .state .lock .unlock ();
326+ try {
327+ this .state .changeState (HubConnectionState .CONNECTING , HubConnectionState .DISCONNECTED );
328+ // this error is already logged and we want the user to see the original error
329+ } catch (Exception ex ) { }
317330 localStart .onError (error );
318331 });
319332 } finally {
@@ -384,13 +397,13 @@ private Completable stop(String errorMessage) {
384397 }
385398
386399 if (errorMessage != null ) {
387- this .state .getConnectionStateUnsynchronized ().stopError = errorMessage ;
400+ this .state .getConnectionStateUnsynchronized (false ).stopError = errorMessage ;
388401 logger .error ("HubConnection disconnected with an error: {}." , errorMessage );
389402 } else {
390403 logger .debug ("Stopping HubConnection." );
391404 }
392405
393- transport = this .state .getConnectionStateUnsynchronized ().transport ;
406+ transport = this .state .getConnectionStateUnsynchronized (false ).transport ;
394407 } finally {
395408 this .state .unlock ();
396409 }
@@ -402,15 +415,22 @@ private Completable stop(String errorMessage) {
402415
403416 private void ReceiveLoop (ByteBuffer payload )
404417 {
405- ConnectionState connectionState = this .state .getConnectionState ();
406- connectionState .resetServerTimeout ();
407- connectionState .handleHandshake (payload );
408- // The payload only contained the handshake response so we can return.
409- if (!payload .hasRemaining ()) {
410- return ;
411- }
418+ List <HubMessage > messages ;
419+ ConnectionState connectionState ;
420+ this .state .lock ();
421+ try {
422+ connectionState = this .state .getConnectionState ();
423+ connectionState .resetServerTimeout ();
424+ connectionState .handleHandshake (payload );
425+ // The payload only contained the handshake response so we can return.
426+ if (!payload .hasRemaining ()) {
427+ return ;
428+ }
412429
413- List <HubMessage > messages = protocol .parseMessages (payload , connectionState );
430+ messages = protocol .parseMessages (payload , connectionState );
431+ } finally {
432+ this .state .unlock ();
433+ }
414434
415435 for (HubMessage message : messages ) {
416436 logger .debug ("Received message of type {}." , message .getMessageType ());
@@ -486,15 +506,15 @@ private void stopConnection(String errorMessage) {
486506 try {
487507 // errorMessage gets passed in from the transport. An already existing stopError value
488508 // should take precedence.
489- if (this .state .getConnectionStateUnsynchronized ().stopError != null ) {
490- errorMessage = this .state .getConnectionStateUnsynchronized ().stopError ;
509+ if (this .state .getConnectionStateUnsynchronized (false ).stopError != null ) {
510+ errorMessage = this .state .getConnectionStateUnsynchronized (false ).stopError ;
491511 }
492512 if (errorMessage != null ) {
493513 exception = new RuntimeException (errorMessage );
494514 logger .error ("HubConnection disconnected with an error {}." , errorMessage );
495515 }
496516
497- ConnectionState connectionState = this .state .getConnectionStateUnsynchronized ();
517+ ConnectionState connectionState = this .state .getConnectionStateUnsynchronized (true );
498518 if (connectionState != null ) {
499519 connectionState .cancelOutstandingInvocations (exception );
500520 connectionState .close ();
@@ -606,7 +626,7 @@ public Completable invoke(String method, Object... args) {
606626 throw new RuntimeException ("The 'invoke' method cannot be called if the connection is not active." );
607627 }
608628
609- ConnectionState connectionState = this .state .getConnectionStateUnsynchronized ();
629+ ConnectionState connectionState = this .state .getConnectionStateUnsynchronized (false );
610630 String id = connectionState .getNextInvocationId ();
611631
612632 CompletableSubject subject = CompletableSubject .create ();
@@ -664,7 +684,7 @@ private <T> Single<T> invoke(Type returnType, Class<?> returnClass, String metho
664684 throw new RuntimeException ("The 'invoke' method cannot be called if the connection is not active." );
665685 }
666686
667- ConnectionState connectionState = this .state .getConnectionStateUnsynchronized ();
687+ ConnectionState connectionState = this .state .getConnectionStateUnsynchronized (false );
668688 String id = connectionState .getNextInvocationId ();
669689 InvocationRequest irq = new InvocationRequest (returnType , id );
670690 connectionState .addInvocation (irq );
@@ -724,7 +744,7 @@ private <T> Observable<T> stream(Type returnType, Class<?> returnClass, String m
724744 throw new RuntimeException ("The 'stream' method cannot be called if the connection is not active." );
725745 }
726746
727- ConnectionState connectionState = this .state .getConnectionStateUnsynchronized ();
747+ ConnectionState connectionState = this .state .getConnectionStateUnsynchronized (false );
728748 invocationId = connectionState .getNextInvocationId ();
729749 irq = new InvocationRequest (returnType , invocationId );
730750 connectionState .addInvocation (irq );
@@ -743,10 +763,7 @@ private <T> Observable<T> stream(Type returnType, Class<?> returnClass, String m
743763 if (subscriptionCount .decrementAndGet () == 0 ) {
744764 CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage (null , invocationId );
745765 sendHubMessageWithLock (cancelInvocationMessage );
746- ConnectionState connectionStateInner = this .state .getConnectionState ();
747- if (connectionStateInner != null ) {
748- connectionStateInner .tryRemoveInvocation (invocationId );
749- }
766+ connectionState .tryRemoveInvocation (invocationId );
750767 subject .onComplete ();
751768 }
752769 });
@@ -767,7 +784,7 @@ private void sendHubMessageWithLock(HubMessage message) {
767784 logger .debug ("Sending {} message." , message .getMessageType ().name ());
768785 }
769786
770- ConnectionState connectionState = this .state .getConnectionStateUnsynchronized ();
787+ ConnectionState connectionState = this .state .getConnectionStateUnsynchronized (false );
771788 connectionState .transport .send (serializedMessage ).subscribeWith (CompletableSubject .create ());
772789 connectionState .resetKeepAlive ();
773790 } finally {
@@ -1247,6 +1264,7 @@ private final class ConnectionState implements InvocationBinder {
12471264 public Transport transport ;
12481265 public String connectionId ;
12491266 public String stopError ;
1267+ public Completable startTask ;
12501268
12511269 public ConnectionState (HubConnection connection ) {
12521270 this .connection = connection ;
@@ -1431,7 +1449,6 @@ private final class ReconnectingConnectionState {
14311449 private final Lock lock = new ReentrantLock ();
14321450 private ConnectionState state ;
14331451 private HubConnectionState hubConnectionState = HubConnectionState .DISCONNECTED ;
1434- public Completable startTask ;
14351452 public TransportEnum currentTransport ;
14361453
14371454 public ReconnectingConnectionState (Logger logger ) {
@@ -1447,7 +1464,10 @@ public void setConnectionState(ConnectionState state) {
14471464 }
14481465 }
14491466
1450- public ConnectionState getConnectionStateUnsynchronized () {
1467+ public ConnectionState getConnectionStateUnsynchronized (Boolean allowNull ) {
1468+ if (allowNull != true && this .state == null ) {
1469+ throw new RuntimeException ("Connection is not active." );
1470+ }
14511471 return this .state ;
14521472 }
14531473
0 commit comments