44package com .microsoft .signalr ;
55
66import java .nio .ByteBuffer ;
7- import java .nio .charset .StandardCharsets ;
87import java .util .Map ;
98import java .util .concurrent .ExecutorService ;
109import java .util .concurrent .Executors ;
1514
1615import io .reactivex .rxjava3 .core .Completable ;
1716import io .reactivex .rxjava3 .core .Single ;
17+ import io .reactivex .rxjava3 .schedulers .Schedulers ;
18+ import io .reactivex .rxjava3 .subjects .BehaviorSubject ;
1819import io .reactivex .rxjava3 .subjects .CompletableSubject ;
1920
2021class LongPollingTransport implements Transport {
@@ -29,7 +30,7 @@ class LongPollingTransport implements Transport {
2930 private volatile Boolean active = false ;
3031 private String pollUrl ;
3132 private String closeError ;
32- private CompletableSubject receiveLoop = CompletableSubject .create ();
33+ private BehaviorSubject < String > receiveLoopSubject = BehaviorSubject .create ();
3334 private CompletableSubject closeSubject = CompletableSubject .create ();
3435 private ExecutorService threadPool ;
3536 private ExecutorService onReceiveThread ;
@@ -78,27 +79,30 @@ public Completable start(String url) {
7879 this .threadPool = Executors .newCachedThreadPool ();
7980 threadPool .execute (() -> {
8081 this .onReceiveThread = Executors .newSingleThreadExecutor ();
81- receiveLoop . subscribe (() -> {
82- this . stop (). onErrorComplete (). subscribe ( );
82+ receiveLoopSubject . observeOn ( Schedulers . io ()). subscribe ( u -> {
83+ poll ( u );
8384 }, e -> {
8485 this .stop ().onErrorComplete ().subscribe ();
86+ }, () -> {
87+ this .stop ().onErrorComplete ().subscribe ();
8588 });
86- poll (url ).subscribeWith (receiveLoop );
89+ // start polling
90+ receiveLoopSubject .onNext (url );
8791 });
8892
8993 return Completable .complete ();
9094 });
9195 }));
9296 }
9397
94- private Completable poll (String url ) {
98+ private void poll (String url ) {
9599 if (this .active ) {
96100 pollUrl = url + "&_=" + System .currentTimeMillis ();
97101 logger .debug ("Polling {}." , pollUrl );
98- return this .updateHeaderToken ().andThen (Completable .defer (() -> {
102+ this .updateHeaderToken ().andThen (Completable .defer (() -> {
99103 HttpRequest request = new HttpRequest ();
100104 request .addHeaders (headers );
101- Completable pollingCompletable = this .pollingClient .get (pollUrl , request ).flatMapCompletable (response -> {
105+ this .pollingClient .get (pollUrl , request ).subscribe (response -> {
102106 if (response .getStatusCode () == 204 ) {
103107 logger .info ("LongPolling transport terminated by server." );
104108 this .active = false ;
@@ -107,22 +111,32 @@ private Completable poll(String url) {
107111 this .active = false ;
108112 this .closeError = "Unexpected response code " + response .getStatusCode () + "." ;
109113 } else {
110- if (response .getContent () != null ) {
114+ if (response .getContent () != null && response . getContent (). hasRemaining () ) {
111115 logger .debug ("Message received." );
112- onReceiveThread .submit (() -> this .onReceive (response .getContent ()));
116+ try {
117+ onReceiveThread .submit (() -> this .onReceive (response .getContent ()));
118+ // it's possible for stop to be called while a request is in progress, if stop throws it wont wait for
119+ // an in-progress poll to complete and will shutdown the thread. We should ignore the exception so we don't
120+ // get an unhandled RX error
121+ } catch (Exception e ) {}
113122 } else {
114123 logger .debug ("Poll timed out, reissuing." );
115124 }
116125 }
117- return poll (url );
126+ receiveLoopSubject .onNext (url );
127+ }, e -> {
128+ receiveLoopSubject .onError (e );
118129 });
119130
120- return pollingCompletable ;
121- }));
131+ return Completable .complete ();
132+ }))
133+ .subscribe (() -> {},
134+ e -> {
135+ receiveLoopSubject .onError (e );
136+ });
122137 } else {
123138 logger .debug ("Long Polling transport polling complete." );
124- receiveLoop .onComplete ();
125- return Completable .complete ();
139+ receiveLoopSubject .onComplete ();
126140 }
127141 }
128142
@@ -162,7 +176,7 @@ public Completable stop() {
162176 HttpRequest request = new HttpRequest ();
163177 request .addHeaders (headers );
164178 return this .pollingClient .delete (this .url , request ).ignoreElement ()
165- .andThen (receiveLoop )
179+ .andThen (receiveLoopSubject . ignoreElements () )
166180 .doOnComplete (() -> {
167181 cleanup (this .closeError );
168182 });
0 commit comments