From 58a32a75b70d905975b81ab0cb935f535e546e92 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Wed, 10 Jul 2019 18:01:19 -0700 Subject: [PATCH 1/7] Locking and connection state check --- .../com/microsoft/signalr/HubConnection.java | 28 ++++++++++++++++--- .../microsoft/signalr/HubConnectionTest.java | 11 +++++++- .../com/microsoft/signalr/sample/Chat.java | 2 +- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 67669576e74d..a40dbf4e16ca 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -537,8 +537,13 @@ private void stopConnection(String errorMessage) { * @param args The arguments to be passed to the method. */ public void send(String method, Object... args) { - if (hubConnectionState != HubConnectionState.CONNECTED) { - throw new RuntimeException("The 'send' method cannot be called if the connection is not active."); + hubConnectionStateLock.lock(); + try { + if (hubConnectionState != HubConnectionState.CONNECTED) { + throw new RuntimeException("The 'send' method cannot be called if the connection is not active."); + } + } finally { + hubConnectionStateLock.unlock(); } sendInvocationMessage(method, args); @@ -638,10 +643,16 @@ public Completable invoke(String method, Object... args) { */ @SuppressWarnings("unchecked") public Single invoke(Class returnType, String method, Object... args) { - if (hubConnectionState != HubConnectionState.CONNECTED) { - throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active."); + hubConnectionStateLock.lock(); + try { + if (hubConnectionState != HubConnectionState.CONNECTED) { + throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active."); + } + } finally { + hubConnectionStateLock.unlock(); } + String id = connectionState.getNextInvocationId(); SingleSubject subject = SingleSubject.create(); @@ -677,6 +688,15 @@ public Single invoke(Class returnType, String method, Object... args) */ @SuppressWarnings("unchecked") public Observable stream(Class returnType, String method, Object ... args) { + hubConnectionStateLock.lock(); + try { + if (hubConnectionState != HubConnectionState.CONNECTED) { + throw new RuntimeException("The 'stream' method cannot be called if the connection is not active."); + } + } finally { + hubConnectionStateLock.unlock(); + } + String invocationId = connectionState.getNextInvocationId(); AtomicInteger subscriptionCount = new AtomicInteger(); InvocationRequest irq = new InvocationRequest(returnType, invocationId); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index b5104919ab44..0ff3be7fd0d9 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -1608,6 +1608,15 @@ public void cannotInvokeBeforeStart() { assertEquals("The 'invoke' method cannot be called if the connection is not active.", exception.getMessage()); } + @Test + public void cannotStreamBeforeStart() { + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); + assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); + + Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.stream(String.class, "inc", "arg1")); + assertEquals("The 'stream' method cannot be called if the connection is not active.", exception.getMessage()); + } + @Test public void doesNotErrorWhenReceivingInvokeWithIncorrectArgumentLength() { MockTransport mockTransport = new MockTransport(); @@ -2036,7 +2045,7 @@ public void authorizationHeaderFromNegotiateGetsSetToNewValue() { TestHttpClient client = new TestHttpClient() .on("POST", "http://example.com/negotiate", (req) -> { - if(redirectCount.get() == 0){ + if (redirectCount.get() == 0) { redirectCount.incrementAndGet(); redirectToken.set(req.getHeaders().get("Authorization")); return Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\",\"accessToken\":\"firstRedirectToken\"}")); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java index bd617fe88e45..5201a0915887 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java @@ -37,6 +37,6 @@ public static void main(String[] args) { hubConnection.send("Send", message); } - hubConnection.stop(); + hubConnection.stop().blockingAwait(); } } From 31f50ae5c4b7b49324233b34a3760d254fa9ef98 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Wed, 10 Jul 2019 18:08:09 -0700 Subject: [PATCH 2/7] clean up --- .../src/main/java/com/microsoft/signalr/HubConnection.java | 3 +-- .../com/microsoft/signalr/WebSocketTransportUrlFormatTest.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index a40dbf4e16ca..2a47ff562f6b 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -651,8 +651,7 @@ public Single invoke(Class returnType, String method, Object... args) } finally { hubConnectionStateLock.unlock(); } - - + String id = connectionState.getNextInvocationId(); SingleSubject subject = SingleSubject.create(); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java index 2631084a3424..613ffab959b1 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java @@ -8,11 +8,11 @@ import java.util.HashMap; import java.util.stream.Stream; -import io.reactivex.Single; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; + class WebSocketTransportUrlFormatTest { private static Stream protocols() { return Stream.of( From a3235afa19713e165d4db485fde9c5f3479885f9 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Wed, 10 Jul 2019 18:12:35 -0700 Subject: [PATCH 3/7] clean up --- .../com/microsoft/signalr/WebSocketTransportUrlFormatTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java index 613ffab959b1..1dbe653dcc1a 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java @@ -12,7 +12,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; - class WebSocketTransportUrlFormatTest { private static Stream protocols() { return Stream.of( From f05723fbbccb0e15aae3d1b226d30767b2fc0fbd Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 11 Jul 2019 14:46:31 -0700 Subject: [PATCH 4/7] moar locking --- .../com/microsoft/signalr/HubConnection.java | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 2a47ff562f6b..02d1e1aa8fa7 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -57,6 +57,7 @@ public class HubConnection { private Map streamMap = new ConcurrentHashMap<>(); private TransportEnum transportEnum = TransportEnum.ALL; private String connectionId; + private final Lock connectionStateLock = new ReentrantLock(); private final Logger logger = LoggerFactory.getLogger(HubConnection.class); /** @@ -505,9 +506,15 @@ private void stopConnection(String errorMessage) { exception = new RuntimeException(errorMessage); logger.error("HubConnection disconnected with an error {}.", errorMessage); } - if (connectionState != null) { - connectionState.cancelOutstandingInvocations(exception); - connectionState = null; + + connectionStateLock.lock(); + try { + if (connectionState != null) { + connectionState.cancelOutstandingInvocations(exception); + connectionState = null; + } + } finally { + connectionStateLock.unlock(); } logger.info("HubConnection stopped."); @@ -643,20 +650,23 @@ public Completable invoke(String method, Object... args) { */ @SuppressWarnings("unchecked") public Single invoke(Class returnType, String method, Object... args) { + ConnectionState localConnectionState; + InvocationRequest irq; + String id; hubConnectionStateLock.lock(); try { if (hubConnectionState != HubConnectionState.CONNECTED) { throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active."); } + + id = connectionState.getNextInvocationId(); + irq = new InvocationRequest(returnType, id); + connectionState.addInvocation(irq); } finally { hubConnectionStateLock.unlock(); } - - String id = connectionState.getNextInvocationId(); SingleSubject subject = SingleSubject.create(); - InvocationRequest irq = new InvocationRequest(returnType, id); - connectionState.addInvocation(irq); // forward the invocation result or error to the user // run continuations on a separate thread @@ -687,21 +697,23 @@ public Single invoke(Class returnType, String method, Object... args) */ @SuppressWarnings("unchecked") public Observable stream(Class returnType, String method, Object ... args) { + String invocationId; + InvocationRequest irq; hubConnectionStateLock.lock(); try { if (hubConnectionState != HubConnectionState.CONNECTED) { throw new RuntimeException("The 'stream' method cannot be called if the connection is not active."); } + + invocationId = connectionState.getNextInvocationId(); + irq = new InvocationRequest(returnType, invocationId); + connectionState.addInvocation(irq); } finally { hubConnectionStateLock.unlock(); } - String invocationId = connectionState.getNextInvocationId(); AtomicInteger subscriptionCount = new AtomicInteger(); - InvocationRequest irq = new InvocationRequest(returnType, invocationId); - connectionState.addInvocation(irq); ReplaySubject subject = ReplaySubject.create(); - Subject pendingCall = irq.getPendingCall(); pendingCall.subscribe(result -> { // Primitive types can't be cast with the Class cast function @@ -719,7 +731,14 @@ public Observable stream(Class returnType, String method, Object ... a if (subscriptionCount.decrementAndGet() == 0) { CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId); sendHubMessage(cancelInvocationMessage); - connectionState.tryRemoveInvocation(invocationId); + connectionStateLock.lock(); + try { + if (connectionState != null) { + connectionState.tryRemoveInvocation(invocationId); + } + } finally { + connectionStateLock.unlock(); + } subject.onComplete(); } }); From 8267ede6038561646a0e496fbec0ac4b81934e40 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 11 Jul 2019 16:03:39 -0700 Subject: [PATCH 5/7] clean up --- .../com/microsoft/signalr/HubConnection.java | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 02d1e1aa8fa7..e226148ea650 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -57,7 +57,6 @@ public class HubConnection { private Map streamMap = new ConcurrentHashMap<>(); private TransportEnum transportEnum = TransportEnum.ALL; private String connectionId; - private final Lock connectionStateLock = new ReentrantLock(); private final Logger logger = LoggerFactory.getLogger(HubConnection.class); /** @@ -506,15 +505,9 @@ private void stopConnection(String errorMessage) { exception = new RuntimeException(errorMessage); logger.error("HubConnection disconnected with an error {}.", errorMessage); } - - connectionStateLock.lock(); - try { - if (connectionState != null) { - connectionState.cancelOutstandingInvocations(exception); - connectionState = null; - } - } finally { - connectionStateLock.unlock(); + if (connectionState != null) { + connectionState.cancelOutstandingInvocations(exception); + connectionState = null; } logger.info("HubConnection stopped."); @@ -731,13 +724,13 @@ public Observable stream(Class returnType, String method, Object ... a if (subscriptionCount.decrementAndGet() == 0) { CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId); sendHubMessage(cancelInvocationMessage); - connectionStateLock.lock(); + hubConnectionStateLock.lock(); try { if (connectionState != null) { connectionState.tryRemoveInvocation(invocationId); } } finally { - connectionStateLock.unlock(); + hubConnectionStateLock.unlock(); } subject.onComplete(); } From 629e1228c22b3d4096d4c039cd46ca82ce3d12d2 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 11 Jul 2019 16:30:00 -0700 Subject: [PATCH 6/7] clean --- .../src/main/java/com/microsoft/signalr/HubConnection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index e226148ea650..9d619253fdcc 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -643,7 +643,6 @@ public Completable invoke(String method, Object... args) { */ @SuppressWarnings("unchecked") public Single invoke(Class returnType, String method, Object... args) { - ConnectionState localConnectionState; InvocationRequest irq; String id; hubConnectionStateLock.lock(); From a8879a164566f8e8de06e91ce3d0711113573dc8 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Fri, 12 Jul 2019 14:20:05 -0700 Subject: [PATCH 7/7] lock everything --- .../com/microsoft/signalr/HubConnection.java | 135 +++++++++--------- 1 file changed, 66 insertions(+), 69 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 9d619253fdcc..a4d1e58c7dab 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -542,11 +542,10 @@ public void send(String method, Object... args) { if (hubConnectionState != HubConnectionState.CONNECTED) { throw new RuntimeException("The 'send' method cannot be called if the connection is not active."); } + sendInvocationMessage(method, args); } finally { hubConnectionStateLock.unlock(); } - - sendInvocationMessage(method, args); } private void sendInvocationMessage(String method, Object[] args) { @@ -610,26 +609,31 @@ Object[] checkUploadStream(Object[] args, List streamIds) { */ @SuppressWarnings("unchecked") public Completable invoke(String method, Object... args) { - if (hubConnectionState != HubConnectionState.CONNECTED) { - throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active."); - } + hubConnectionStateLock.lock(); + try { + if (hubConnectionState != HubConnectionState.CONNECTED) { + throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active."); + } - String id = connectionState.getNextInvocationId(); + String id = connectionState.getNextInvocationId(); - CompletableSubject subject = CompletableSubject.create(); - InvocationRequest irq = new InvocationRequest(null, id); - connectionState.addInvocation(irq); + CompletableSubject subject = CompletableSubject.create(); + InvocationRequest irq = new InvocationRequest(null, id); + connectionState.addInvocation(irq); - Subject pendingCall = irq.getPendingCall(); + Subject pendingCall = irq.getPendingCall(); - pendingCall.subscribe(result -> subject.onComplete(), - error -> subject.onError(error), - () -> subject.onComplete()); + pendingCall.subscribe(result -> subject.onComplete(), + error -> subject.onError(error), + () -> subject.onComplete()); - // Make sure the actual send is after setting up the callbacks otherwise there is a race - // where the map doesn't have the callbacks yet when the response is returned - sendInvocationMessage(method, args, id, false); - return subject; + // Make sure the actual send is after setting up the callbacks otherwise there is a race + // where the map doesn't have the callbacks yet when the response is returned + sendInvocationMessage(method, args, id, false); + return subject; + } finally { + hubConnectionStateLock.unlock(); + } } /** @@ -643,39 +647,37 @@ public Completable invoke(String method, Object... args) { */ @SuppressWarnings("unchecked") public Single invoke(Class returnType, String method, Object... args) { - InvocationRequest irq; - String id; hubConnectionStateLock.lock(); try { if (hubConnectionState != HubConnectionState.CONNECTED) { throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active."); } - id = connectionState.getNextInvocationId(); - irq = new InvocationRequest(returnType, id); + String id = connectionState.getNextInvocationId(); + InvocationRequest irq = new InvocationRequest(returnType, id); connectionState.addInvocation(irq); - } finally { - hubConnectionStateLock.unlock(); - } - SingleSubject subject = SingleSubject.create(); + SingleSubject subject = SingleSubject.create(); - // forward the invocation result or error to the user - // run continuations on a separate thread - Subject pendingCall = irq.getPendingCall(); - pendingCall.subscribe(result -> { - // Primitive types can't be cast with the Class cast function - if (returnType.isPrimitive()) { - subject.onSuccess((T)result); - } else { - subject.onSuccess(returnType.cast(result)); - } - }, error -> subject.onError(error)); + // forward the invocation result or error to the user + // run continuations on a separate thread + Subject pendingCall = irq.getPendingCall(); + pendingCall.subscribe(result -> { + // Primitive types can't be cast with the Class cast function + if (returnType.isPrimitive()) { + subject.onSuccess((T)result); + } else { + subject.onSuccess(returnType.cast(result)); + } + }, error -> subject.onError(error)); - // Make sure the actual send is after setting up the callbacks otherwise there is a race - // where the map doesn't have the callbacks yet when the response is returned - sendInvocationMessage(method, args, id, false); - return subject; + // Make sure the actual send is after setting up the callbacks otherwise there is a race + // where the map doesn't have the callbacks yet when the response is returned + sendInvocationMessage(method, args, id, false); + return subject; + } finally { + hubConnectionStateLock.unlock(); + } } /** @@ -700,40 +702,35 @@ public Observable stream(Class returnType, String method, Object ... a invocationId = connectionState.getNextInvocationId(); irq = new InvocationRequest(returnType, invocationId); connectionState.addInvocation(irq); - } finally { - hubConnectionStateLock.unlock(); - } - AtomicInteger subscriptionCount = new AtomicInteger(); - ReplaySubject subject = ReplaySubject.create(); - Subject pendingCall = irq.getPendingCall(); - pendingCall.subscribe(result -> { - // Primitive types can't be cast with the Class cast function - if (returnType.isPrimitive()) { - subject.onNext((T)result); - } else { - subject.onNext(returnType.cast(result)); - } - }, error -> subject.onError(error), - () -> subject.onComplete()); - - Observable observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet()); - sendInvocationMessage(method, args, invocationId, true); - return observable.doOnDispose(() -> { - if (subscriptionCount.decrementAndGet() == 0) { - CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId); - sendHubMessage(cancelInvocationMessage); - hubConnectionStateLock.lock(); - try { + AtomicInteger subscriptionCount = new AtomicInteger(); + ReplaySubject subject = ReplaySubject.create(); + Subject pendingCall = irq.getPendingCall(); + pendingCall.subscribe(result -> { + // Primitive types can't be cast with the Class cast function + if (returnType.isPrimitive()) { + subject.onNext((T)result); + } else { + subject.onNext(returnType.cast(result)); + } + }, error -> subject.onError(error), + () -> subject.onComplete()); + + Observable observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet()); + sendInvocationMessage(method, args, invocationId, true); + return observable.doOnDispose(() -> { + if (subscriptionCount.decrementAndGet() == 0) { + CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId); + sendHubMessage(cancelInvocationMessage); if (connectionState != null) { connectionState.tryRemoveInvocation(invocationId); } - } finally { - hubConnectionStateLock.unlock(); + subject.onComplete(); } - subject.onComplete(); - } - }); + }); + } finally { + hubConnectionStateLock.unlock(); + } } private void sendHubMessage(HubMessage message) {