Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.microsoft.signalr;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,6 +27,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
private WebSocketOnClosedCallback onClose;
private CompletableSubject startSubject = CompletableSubject.create();
private CompletableSubject closeSubject = CompletableSubject.create();
private final ReentrantLock closeLock = new ReentrantLock();

private final Logger logger = LoggerFactory.getLogger(OkHttpWebSocketWrapper.class);

Expand Down Expand Up @@ -87,14 +89,29 @@ public void onMessage(WebSocket webSocket, String message) {
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
onClose.invoke(code, reason);
closeSubject.onComplete();
try {
closeLock.lock();
closeSubject.onComplete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that the closeSubject hasn't already been completed with an error here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling onComplete should noop if the subject is already completed either with an error or without an error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@halter73 any other feedback?

}
finally {
closeLock.unlock();
}
checkStartFailure();
}

@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.error("WebSocket closed from an error: {}.", t.getMessage());
closeSubject.onError(new RuntimeException(t));

try {
closeLock.lock();
if (!closeSubject.hasComplete()) {
closeSubject.onError(new RuntimeException(t));
}
}
finally {
closeLock.unlock();
}
onClose.invoke(null, t.getMessage());
checkStartFailure();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that checkStartFailure() ever gets called before the Completable is returned from start() meaning we'd need to do the same ting for startSubject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is technically possible, but very unlikely in practice.

I'm not sure how the global error handler works in that case, because the startSubject will be observed by someone, they just haven't registered the handler by the time onError is called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's possible, we should handle it.

I'm not sure how the global error handler works in that case, because the startSubject will be observed by someone, they just haven't registered the handler by the time onError is called.

We could test this by intentionally faulting the startSubject before returning. I expect it would also be crashing. How would the runtime even know the startSubject will be observed by someone in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read the docs for CompletableSubject:

If there were no CompletableObservers subscribed to this CompletableSubject when the onError() was called, the global error handler is not invoked.

This also implies that the "fix" in this PR might not be correct, and the error was happening through some other means. The docs also mention that if onError is called multiple times then it will call the global handler. onError should only ever be called once here, but I'm beginning to think it was somehow triggered multiple times.

}
Expand Down