55
66import java .io .StringReader ;
77import java .lang .reflect .Type ;
8- import java .lang .reflect .TypeVariable ;
98import java .nio .ByteBuffer ;
109import java .nio .charset .StandardCharsets ;
1110import java .util .*;
@@ -230,18 +229,16 @@ public void setBaseUrl(String url) {
230229 public Completable start () {
231230 CompletableSubject localStart = CompletableSubject .create ();
232231
233- hubConnectionStateLock .lock ();
232+ this . state . lock .lock ();
234233 try {
235- if (hubConnectionState != HubConnectionState .DISCONNECTED ) {
236- logger .debug ("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately." , hubConnectionState );
234+ if (this . state . getHubConnectionState () != HubConnectionState .DISCONNECTED ) {
235+ logger .debug ("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately." , this . state . getHubConnectionState () );
237236 return start ;
238237 }
239238
240- hubConnectionState = HubConnectionState .CONNECTING ;
239+ this . state . changeState ( HubConnectionState . DISCONNECTED , HubConnectionState .CONNECTING ) ;
241240 start = localStart ;
242241
243- handshakeResponseSubject = CompletableSubject .create ();
244- handshakeReceived = false ;
245242 CompletableSubject tokenCompletable = CompletableSubject .create ();
246243 Map <String , String > localHeaders = new HashMap <>();
247244 localHeaders .put (UserAgentHelper .getUserAgentName (), UserAgentHelper .createUserAgentString ());
@@ -260,7 +257,6 @@ public Completable start() {
260257 tokenCompletable .onError (error );
261258 });
262259
263- stopError = null ;
264260 Single <NegotiateResponse > negotiate = null ;
265261 if (!skipNegotiate ) {
266262 negotiate = tokenCompletable .andThen (Single .defer (() -> startNegotiate (baseUrl , 0 , localHeaders )));
@@ -287,25 +283,25 @@ public Completable start() {
287283 transport .setOnReceive (this .callback );
288284 transport .setOnClose ((message ) -> stopConnection (message ));
289285
290- return transport .start (negotiateResponse .getFinalUrl ()).andThen (Completable .defer (() -> {
291- ByteBuffer handshake = HandshakeProtocol .createHandshakeRequestMessage (
292- new HandshakeRequestMessage (protocol .getName (), protocol .getVersion ()));
293-
294- return connectionState .transport .send (handshake ).andThen (Completable .defer (() -> {
295- connectionState .timeoutHandshakeResponse (handshakeResponseTimeout , TimeUnit .MILLISECONDS );
296- return connectionState .handshakeResponseSubject .andThen (Completable .defer (() -> {
297- connectionState .lock .lock ();
298- try {
299- this .state .changeState (HubConnectionState .DISCONNECTED , HubConnectionState .CONNECTED );
300- logger .info ("HubConnection started." );
301- connectionState .resetServerTimeout ();
302- // Don't send pings if we're using long polling.
303- if (transportEnum != TransportEnum .LONG_POLLING ) {
304- connectionState .activatePingTimer ();
286+ return transport .start (negotiateResponse .getFinalUrl ()).andThen (Completable .defer (() -> {
287+ ByteBuffer handshake = HandshakeProtocol .createHandshakeRequestMessage (
288+ new HandshakeRequestMessage (protocol .getName (), protocol .getVersion ()));
289+
290+ return connectionState .transport .send (handshake ).andThen (Completable .defer (() -> {
291+ connectionState .timeoutHandshakeResponse (handshakeResponseTimeout , TimeUnit .MILLISECONDS );
292+ return connectionState .handshakeResponseSubject .andThen (Completable .defer (() -> {
293+ connectionState .lock .lock ();
294+ try {
295+ this .state .changeState (HubConnectionState .CONNECTING , HubConnectionState .CONNECTED );
296+ logger .info ("HubConnection started." );
297+ connectionState .resetServerTimeout ();
298+ // Don't send pings if we're using long polling.
299+ if (transportEnum != TransportEnum .LONG_POLLING ) {
300+ connectionState .activatePingTimer ();
301+ }
302+ } finally {
303+ connectionState .lock .unlock ();
305304 }
306- } finally {
307- connectionState .lock .unlock ();
308- }
309305
310306 return Completable .complete ();
311307 }));
@@ -315,13 +311,13 @@ public Completable start() {
315311 }).subscribe (() -> {
316312 localStart .onComplete ();
317313 }, error -> {
318- hubConnectionStateLock .lock ();
319- hubConnectionState = HubConnectionState .DISCONNECTED ;
320- hubConnectionStateLock .unlock ();
314+ this . state . lock .lock ();
315+ this . state . changeState ( HubConnectionState . CONNECTING , HubConnectionState .DISCONNECTED ) ;
316+ this . state . lock .unlock ();
321317 localStart .onError (error );
322318 });
323319 } finally {
324- hubConnectionStateLock .unlock ();
320+ this . state . lock .unlock ();
325321 }
326322
327323 return localStart ;
@@ -402,11 +398,11 @@ private Completable stop(String errorMessage) {
402398 return stop ;
403399 }
404400
405- private void ReceiveLoop (String payload )
401+ private void ReceiveLoop (ByteBuffer payload )
406402 {
407403 ConnectionState connectionState = this .state .getConnectionState ();
408404 connectionState .resetServerTimeout ();
409- payload = connectionState .handleHandshake (payload );
405+ connectionState .handleHandshake (payload );
410406 // The payload only contained the handshake response so we can return.
411407 if (!payload .hasRemaining ()) {
412408 return ;
@@ -1337,7 +1333,7 @@ public void run() {
13371333 }, new Date (0 ), tickRate );
13381334 }
13391335
1340- public String handleHandshake (String payload ) {
1336+ public void handleHandshake (ByteBuffer payload ) {
13411337 if (!handshakeReceived ) {
13421338 List <Byte > handshakeByteList = new ArrayList <Byte >();
13431339 byte curr = payload .get ();
@@ -1370,11 +1366,7 @@ public String handleHandshake(String payload) {
13701366 }
13711367 handshakeReceived = true ;
13721368 handshakeResponseSubject .onComplete ();
1373-
1374- payload = payload .substring (handshakeLength );
13751369 }
1376-
1377- return payload ;
13781370 }
13791371
13801372 public void timeoutHandshakeResponse (long timeout , TimeUnit unit ) {
0 commit comments