diff --git a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java index a382728032e8..43c17cbfa2b5 100644 --- a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java @@ -1413,14 +1413,14 @@ public void handleHandshake(ByteBuffer payload) { handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString); } catch (RuntimeException ex) { RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex); - handshakeResponseSubject.onError(exception); + errorHandshake(exception); throw exception; } if (handshakeResponse.getHandshakeError() != null) { String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError(); logger.error(errorMessage); RuntimeException exception = new RuntimeException(errorMessage); - handshakeResponseSubject.onError(exception); + errorHandshake(exception); throw exception; } handshakeReceived = true; @@ -1431,12 +1431,7 @@ public void handleHandshake(ByteBuffer payload) { public void timeoutHandshakeResponse(long timeout, TimeUnit unit) { handshakeTimeout = Executors.newSingleThreadScheduledExecutor(); handshakeTimeout.schedule(() -> { - // If onError is called on a completed subject the global error handler is called - if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) - { - handshakeResponseSubject.onError( - new TimeoutException("Timed out waiting for the server to respond to the handshake message.")); - } + errorHandshake(new TimeoutException("Timed out waiting for the server to respond to the handshake message.")); }, timeout, unit); } @@ -1476,6 +1471,18 @@ public List getParameterTypes(String methodName) { return handlers.get(0).getTypes(); } + + private void errorHandshake(Exception error) { + lock.lock(); + try { + // If onError is called on a completed subject the global error handler is called + if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) { + handshakeResponseSubject.onError(error); + } + } finally { + lock.unlock(); + } + } } // We don't have reconnect yet, but this helps align the Java client with the .NET client diff --git a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java index 96b76470e258..cfd104f596e2 100644 --- a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import ch.qos.logback.classic.spi.ILoggingEvent; import io.reactivex.rxjava3.core.Completable; @@ -29,6 +30,7 @@ import io.reactivex.rxjava3.subjects.ReplaySubject; import io.reactivex.rxjava3.subjects.SingleSubject; +@ExtendWith({RxJavaUnhandledExceptionsExtensions.class}) class HubConnectionTest { private static final String RECORD_SEPARATOR = "\u001e"; private static final Type booleanType = (new TypeReference() { }).getType(); diff --git a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/RxJavaUnhandledExceptionsExtensions.java b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/RxJavaUnhandledExceptionsExtensions.java new file mode 100644 index 000000000000..d2a0acc59f9a --- /dev/null +++ b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/RxJavaUnhandledExceptionsExtensions.java @@ -0,0 +1,41 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.signalr; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +// Use by adding "@ExtendWith({RxJavaUnhandledExceptionsExtensions.class})" to a test class +class RxJavaUnhandledExceptionsExtensions implements BeforeAllCallback, AfterAllCallback { + private final BlockingQueue errors = new LinkedBlockingQueue(); + + @Override + public void beforeAll(final ExtensionContext context) { + RxJavaPlugins.setErrorHandler(error -> { + errors.put(error); + }); + } + + @Override + public void afterAll(final ExtensionContext context) { + if (errors.size() != 0) { + String RxErrors = ""; + for (final Throwable throwable : errors) { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + throwable.printStackTrace(printWriter); + RxErrors += String.format("%s\n", stringWriter.toString()); + } + throw new RuntimeException(RxErrors); + } + } +} \ No newline at end of file