Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ ipch/
*.opendb
*.db
coverage/

clients/java/signalr/local\.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,7 +41,7 @@ public class HubConnection {
private HubProtocol protocol;
private Boolean handshakeReceived = false;
private HubConnectionState hubConnectionState = HubConnectionState.DISCONNECTED;
private final Lock hubConnectionStateLock = new ReentrantLock();
private final ReentrantReadWriteLock hubConnectionStateLock = new ReentrantReadWriteLock();
private List<OnClosedCallback> onClosedCallbackList;
private final boolean skipNegotiate;
private Single<String> accessTokenProvider;
Expand Down Expand Up @@ -258,7 +259,12 @@ private Single<NegotiateResponse> handleNegotiate(String url) {
* @return HubConnection state enum.
*/
public HubConnectionState getConnectionState() {
return hubConnectionState;
hubConnectionStateLock.readLock().lock();
try {
return hubConnectionState;
} finally {
hubConnectionStateLock.readLock().unlock();
}
}

/**
Expand All @@ -267,7 +273,7 @@ public HubConnectionState getConnectionState() {
* @return A Completable that completes when the connection has been established.
*/
public Completable start() {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
if (getConnectionState() != HubConnectionState.DISCONNECTED) {
return Completable.complete();
}

Expand Down Expand Up @@ -307,7 +313,7 @@ public Completable start() {
return transport.send(handshake).andThen(Completable.defer(() -> {
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
return handshakeResponseSubject.andThen(Completable.defer(() -> {
hubConnectionStateLock.lock();
hubConnectionStateLock.writeLock().lock();
try {
connectionState = new ConnectionState(this);
hubConnectionState = HubConnectionState.CONNECTED;
Expand Down Expand Up @@ -336,7 +342,7 @@ public void run() {
}
}, new Date(0), tickRate);
} finally {
hubConnectionStateLock.unlock();
hubConnectionStateLock.writeLock().unlock();
}

return Completable.complete();
Expand All @@ -350,7 +356,7 @@ public void run() {
}

private Single<String> startNegotiate(String url, int negotiateAttempts) {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
if (getConnectionState() != HubConnectionState.DISCONNECTED) {
return Single.just(null);
}

Expand Down Expand Up @@ -387,7 +393,7 @@ private Single<String> startNegotiate(String url, int negotiateAttempts) {
* @return A Completable that completes when the connection has been stopped.
*/
private Completable stop(String errorMessage) {
hubConnectionStateLock.lock();
hubConnectionStateLock.writeLock().lock();
try {
if (hubConnectionState == HubConnectionState.DISCONNECTED) {
return Completable.complete();
Expand All @@ -400,7 +406,7 @@ private Completable stop(String errorMessage) {
logger.debug("Stopping HubConnection.");
}
} finally {
hubConnectionStateLock.unlock();
hubConnectionStateLock.writeLock().unlock();
}

return transport.stop();
Expand All @@ -417,7 +423,7 @@ public Completable stop() {

private void stopConnection(String errorMessage) {
RuntimeException exception = null;
hubConnectionStateLock.lock();
hubConnectionStateLock.writeLock().lock();
try {
// errorMessage gets passed in from the transport. An already existing stopError value
// should take precedence.
Expand All @@ -434,7 +440,7 @@ private void stopConnection(String errorMessage) {
hubConnectionState = HubConnectionState.DISCONNECTED;
handshakeResponseSubject.onComplete();
} finally {
hubConnectionStateLock.unlock();
hubConnectionStateLock.writeLock().unlock();
}

// Do not run these callbacks inside the hubConnectionStateLock
Expand All @@ -453,12 +459,17 @@ private void stopConnection(String errorMessage) {
* @param args The arguments to be passed to the method.
*/
public void send(String method, Object... args) {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'send' method cannot be called if the connection is not active");
}
try {
hubConnectionStateLock.readLock().lock();
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'send' method cannot be called if the connection is not active");
}

InvocationMessage invocationMessage = new InvocationMessage(null, method, args);
sendHubMessage(invocationMessage);
InvocationMessage invocationMessage = new InvocationMessage(null, method, args);
sendHubMessage(invocationMessage);
} finally {
hubConnectionStateLock.readLock().unlock();
}
}

/**
Expand Down