55
66import java .io .StringReader ;
77import java .lang .reflect .Type ;
8- import java .lang .reflect .TypeVariable ;
98import java .nio .ByteBuffer ;
109import java .nio .charset .StandardCharsets ;
1110import java .util .*;
@@ -230,18 +229,16 @@ public void setBaseUrl(String url) {
230229 public Completable start () {
231230 CompletableSubject localStart = CompletableSubject .create ();
232231
233- hubConnectionStateLock .lock ();
232+ this . state . lock .lock ();
234233 try {
235- if (hubConnectionState != HubConnectionState .DISCONNECTED ) {
236- logger .debug ("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately." , hubConnectionState );
234+ if (this . state . getHubConnectionState () != HubConnectionState .DISCONNECTED ) {
235+ logger .debug ("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately." , this . state . getHubConnectionState () );
237236 return start ;
238237 }
239238
240- hubConnectionState = HubConnectionState .CONNECTING ;
239+ this . state . changeState ( HubConnectionState . DISCONNECTED , HubConnectionState .CONNECTING ) ;
241240 start = localStart ;
242241
243- handshakeResponseSubject = CompletableSubject .create ();
244- handshakeReceived = false ;
245242 CompletableSubject tokenCompletable = CompletableSubject .create ();
246243 Map <String , String > localHeaders = new HashMap <>();
247244 localHeaders .put (UserAgentHelper .getUserAgentName (), UserAgentHelper .createUserAgentString ());
@@ -260,7 +257,6 @@ public Completable start() {
260257 tokenCompletable .onError (error );
261258 });
262259
263- stopError = null ;
264260 Single <NegotiateResponse > negotiate = null ;
265261 if (!skipNegotiate ) {
266262 negotiate = tokenCompletable .andThen (Single .defer (() -> startNegotiate (baseUrl , 0 , localHeaders )));
@@ -287,25 +283,25 @@ public Completable start() {
287283 transport .setOnReceive (this .callback );
288284 transport .setOnClose ((message ) -> stopConnection (message ));
289285
290- return transport .start (negotiateResponse .getFinalUrl ()).andThen (Completable .defer (() -> {
291- ByteBuffer handshake = HandshakeProtocol .createHandshakeRequestMessage (
292- new HandshakeRequestMessage (protocol .getName (), protocol .getVersion ()));
293-
294- return connectionState .transport .send (handshake ).andThen (Completable .defer (() -> {
295- connectionState .timeoutHandshakeResponse (handshakeResponseTimeout , TimeUnit .MILLISECONDS );
296- return connectionState .handshakeResponseSubject .andThen (Completable .defer (() -> {
297- connectionState .lock .lock ();
298- try {
299- this .state .changeState (HubConnectionState .DISCONNECTED , HubConnectionState .CONNECTED );
300- logger .info ("HubConnection started." );
301- connectionState .resetServerTimeout ();
302- // Don't send pings if we're using long polling.
303- if (transportEnum != TransportEnum .LONG_POLLING ) {
304- connectionState .activatePingTimer ();
286+ return transport .start (negotiateResponse .getFinalUrl ()).andThen (Completable .defer (() -> {
287+ ByteBuffer handshake = HandshakeProtocol .createHandshakeRequestMessage (
288+ new HandshakeRequestMessage (protocol .getName (), protocol .getVersion ()));
289+
290+ return connectionState .transport .send (handshake ).andThen (Completable .defer (() -> {
291+ connectionState .timeoutHandshakeResponse (handshakeResponseTimeout , TimeUnit .MILLISECONDS );
292+ return connectionState .handshakeResponseSubject .andThen (Completable .defer (() -> {
293+ connectionState .lock .lock ();
294+ try {
295+ this .state .changeState (HubConnectionState .CONNECTING , HubConnectionState .CONNECTED );
296+ logger .info ("HubConnection started." );
297+ connectionState .resetServerTimeout ();
298+ // Don't send pings if we're using long polling.
299+ if (transportEnum != TransportEnum .LONG_POLLING ) {
300+ connectionState .activatePingTimer ();
301+ }
302+ } finally {
303+ connectionState .lock .unlock ();
305304 }
306- } finally {
307- connectionState .lock .unlock ();
308- }
309305
310306 return Completable .complete ();
311307 }));
@@ -315,13 +311,13 @@ public Completable start() {
315311 }).subscribe (() -> {
316312 localStart .onComplete ();
317313 }, error -> {
318- hubConnectionStateLock .lock ();
319- hubConnectionState = HubConnectionState .DISCONNECTED ;
320- hubConnectionStateLock .unlock ();
314+ this . state . lock .lock ();
315+ this . state . changeState ( HubConnectionState . CONNECTING , HubConnectionState .DISCONNECTED ) ;
316+ this . state . lock .unlock ();
321317 localStart .onError (error );
322318 });
323319 } finally {
324- hubConnectionStateLock .unlock ();
320+ this . state . lock .unlock ();
325321 }
326322
327323 return localStart ;
@@ -400,11 +396,11 @@ private Completable stop(String errorMessage) {
400396 return transport .stop ();
401397 }
402398
403- private void ReceiveLoop (String payload )
399+ private void ReceiveLoop (ByteBuffer payload )
404400 {
405401 ConnectionState connectionState = this .state .getConnectionState ();
406402 connectionState .resetServerTimeout ();
407- payload = connectionState .handleHandshake (payload );
403+ connectionState .handleHandshake (payload );
408404 // The payload only contained the handshake response so we can return.
409405 if (!payload .hasRemaining ()) {
410406 return ;
@@ -1355,7 +1351,7 @@ public void run() {
13551351 }, new Date (0 ), tickRate );
13561352 }
13571353
1358- public String handleHandshake (String payload ) {
1354+ public void handleHandshake (ByteBuffer payload ) {
13591355 if (!handshakeReceived ) {
13601356 List <Byte > handshakeByteList = new ArrayList <Byte >();
13611357 byte curr = payload .get ();
@@ -1388,11 +1384,7 @@ public String handleHandshake(String payload) {
13881384 }
13891385 handshakeReceived = true ;
13901386 handshakeResponseSubject .onComplete ();
1391-
1392- payload = payload .substring (handshakeLength );
13931387 }
1394-
1395- return payload ;
13961388 }
13971389
13981390 public void timeoutHandshakeResponse (long timeout , TimeUnit unit ) {
0 commit comments