Skip to content

Commit f35eb17

Browse files
[Java] Safely call onError on Subjects (#31779)
1 parent 5d8525c commit f35eb17

File tree

3 files changed

+58
-8
lines changed

3 files changed

+58
-8
lines changed

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,14 +1413,14 @@ public void handleHandshake(ByteBuffer payload) {
14131413
handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString);
14141414
} catch (RuntimeException ex) {
14151415
RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex);
1416-
handshakeResponseSubject.onError(exception);
1416+
errorHandshake(exception);
14171417
throw exception;
14181418
}
14191419
if (handshakeResponse.getHandshakeError() != null) {
14201420
String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError();
14211421
logger.error(errorMessage);
14221422
RuntimeException exception = new RuntimeException(errorMessage);
1423-
handshakeResponseSubject.onError(exception);
1423+
errorHandshake(exception);
14241424
throw exception;
14251425
}
14261426
handshakeReceived = true;
@@ -1431,12 +1431,7 @@ public void handleHandshake(ByteBuffer payload) {
14311431
public void timeoutHandshakeResponse(long timeout, TimeUnit unit) {
14321432
handshakeTimeout = Executors.newSingleThreadScheduledExecutor();
14331433
handshakeTimeout.schedule(() -> {
1434-
// If onError is called on a completed subject the global error handler is called
1435-
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable()))
1436-
{
1437-
handshakeResponseSubject.onError(
1438-
new TimeoutException("Timed out waiting for the server to respond to the handshake message."));
1439-
}
1434+
errorHandshake(new TimeoutException("Timed out waiting for the server to respond to the handshake message."));
14401435
}, timeout, unit);
14411436
}
14421437

@@ -1476,6 +1471,18 @@ public List<Type> getParameterTypes(String methodName) {
14761471

14771472
return handlers.get(0).getTypes();
14781473
}
1474+
1475+
private void errorHandshake(Exception error) {
1476+
lock.lock();
1477+
try {
1478+
// If onError is called on a completed subject the global error handler is called
1479+
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) {
1480+
handshakeResponseSubject.onError(error);
1481+
}
1482+
} finally {
1483+
lock.unlock();
1484+
}
1485+
}
14791486
}
14801487

14811488
// We don't have reconnect yet, but this helps align the Java client with the .NET client

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818

1919
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
2021

2122
import ch.qos.logback.classic.spi.ILoggingEvent;
2223
import io.reactivex.rxjava3.core.Completable;
@@ -29,6 +30,7 @@
2930
import io.reactivex.rxjava3.subjects.ReplaySubject;
3031
import io.reactivex.rxjava3.subjects.SingleSubject;
3132

33+
@ExtendWith({RxJavaUnhandledExceptionsExtensions.class})
3234
class HubConnectionTest {
3335
private static final String RECORD_SEPARATOR = "\u001e";
3436
private static final Type booleanType = (new TypeReference<Boolean>() { }).getType();
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
package com.microsoft.signalr;
5+
6+
import java.io.PrintWriter;
7+
import java.io.StringWriter;
8+
import java.util.concurrent.BlockingQueue;
9+
import java.util.concurrent.LinkedBlockingQueue;
10+
11+
import org.junit.jupiter.api.extension.AfterAllCallback;
12+
import org.junit.jupiter.api.extension.BeforeAllCallback;
13+
import org.junit.jupiter.api.extension.ExtensionContext;
14+
15+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
16+
17+
// Use by adding "@ExtendWith({RxJavaUnhandledExceptionsExtensions.class})" to a test class
18+
class RxJavaUnhandledExceptionsExtensions implements BeforeAllCallback, AfterAllCallback {
19+
private final BlockingQueue<Throwable> errors = new LinkedBlockingQueue<Throwable>();
20+
21+
@Override
22+
public void beforeAll(final ExtensionContext context) {
23+
RxJavaPlugins.setErrorHandler(error -> {
24+
errors.put(error);
25+
});
26+
}
27+
28+
@Override
29+
public void afterAll(final ExtensionContext context) {
30+
if (errors.size() != 0) {
31+
String RxErrors = "";
32+
for (final Throwable throwable : errors) {
33+
StringWriter stringWriter = new StringWriter();
34+
PrintWriter printWriter = new PrintWriter(stringWriter);
35+
throwable.printStackTrace(printWriter);
36+
RxErrors += String.format("%s\n", stringWriter.toString());
37+
}
38+
throw new RuntimeException(RxErrors);
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)