Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/com/pusher/client/channel/impl/ChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void onMessage(final String event, final String message) {
for (final SubscriptionEventListener listener : listeners) {
final String data = extractDataFrom(message);

factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
listener.onEvent(name, event, data);
Expand Down Expand Up @@ -155,7 +155,7 @@ public void updateState(final ChannelState state) {
this.state = state;

if (state == ChannelState.SUBSCRIBED && eventListener != null) {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
eventListener.onSubscriptionSucceeded(ChannelImpl.this.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void onError(final String message, final String code, final Exception e)

private void sendOrQueueSubscribeMessage(final InternalChannel channel) {

factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {

@Override
public void run() {
Expand All @@ -141,8 +141,7 @@ public void run() {
final String message = channel.toSubscribeMessage();
connection.sendMessage(message);
channel.updateState(ChannelState.SUBSCRIBE_SENT);
}
catch (final AuthorizationFailureException e) {
} catch (final AuthorizationFailureException e) {
clearDownSubscription(channel, e);
}
}
Expand All @@ -151,7 +150,7 @@ public void run() {
}

private void sendUnsubscribeMessage(final InternalChannel channel) {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
connection.sendMessage(channel.toUnsubscribeMessage());
Expand All @@ -166,7 +165,7 @@ private void clearDownSubscription(final InternalChannel channel, final Exceptio
channel.updateState(ChannelState.FAILED);

if (channel.getEventListener() != null) {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {

@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public WebSocketConnection(

@Override
public void connect() {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {

@Override
public void run() {
Expand All @@ -82,7 +82,7 @@ public void run() {

@Override
public void disconnect() {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
if (state == ConnectionState.CONNECTED) {
Expand Down Expand Up @@ -112,7 +112,7 @@ public ConnectionState getState() {

@Override
public void sendMessage(final String message) {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
try {
Expand Down Expand Up @@ -148,7 +148,7 @@ private void updateState(final ConnectionState newState) {
interestedListeners.addAll(eventListeners.get(newState));

for (final ConnectionEventListener listener : interestedListeners) {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
listener.onConnectionStateChange(change);
Expand Down Expand Up @@ -216,7 +216,7 @@ private void sendErrorToAllListeners(final String message, final String code, fi
}

for (final ConnectionEventListener listener : allListeners) {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
listener.onError(message, code, e);
Expand All @@ -237,7 +237,7 @@ public void onOpen(final ServerHandshake handshakedata) {
public void onMessage(final String message) {
activityTimer.activity();

factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
final Map<String, String> map = new Gson().fromJson(message, Map.class);
Expand All @@ -251,7 +251,7 @@ public void run() {
public void onClose(final int code, final String reason, final boolean remote) {
activityTimer.cancelTimeouts();

factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
if (state != ConnectionState.DISCONNECTED) {
Expand All @@ -268,7 +268,7 @@ public void run() {

@Override
public void onError(final Exception ex) {
factory.getEventQueue().execute(new Runnable() {
factory.queueOnEventThread(new Runnable() {
@Override
public void run() {
// Do not change connection state as Java_WebSocket will also
Expand Down
26 changes: 19 additions & 7 deletions src/main/java/com/pusher/client/util/Factory.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class Factory {
private ChannelManager channelManager;
private ExecutorService eventQueue;
private ScheduledExecutorService timers;
private static final Object eventLock = new Object();

public synchronized InternalConnection getConnection(final String apiKey, final PusherOptions options) {
if (connection == null) {
Expand All @@ -68,13 +69,6 @@ public WebSocketClient newWebSocketClientWrapper(final URI uri, final Proxy prox
return new WebSocketClientWrapper(uri, proxy, webSocketListener);
}

public synchronized ExecutorService getEventQueue() {
if (eventQueue == null) {
eventQueue = Executors.newSingleThreadExecutor(new DaemonThreadFactory("eventQueue"));
}
return eventQueue;
}

public synchronized ScheduledExecutorService getTimers() {
if (timers == null) {
timers = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("timers"));
Expand Down Expand Up @@ -103,6 +97,17 @@ public synchronized ChannelManager getChannelManager() {
return channelManager;
}

public void queueOnEventThread(final Runnable r) {
getEventQueue().execute(new Runnable() {
@Override
public void run() {
synchronized (eventLock) {
r.run();
}
}
});
}

public synchronized void shutdownThreads() {
if (eventQueue != null) {
eventQueue.shutdown();
Expand All @@ -114,6 +119,13 @@ public synchronized void shutdownThreads() {
}
}

private synchronized ExecutorService getEventQueue() {
if (eventQueue == null) {
eventQueue = Executors.newSingleThreadExecutor(new DaemonThreadFactory("eventQueue"));
}
return eventQueue;
}

private static class DaemonThreadFactory implements ThreadFactory {
private final String name;

Expand Down
11 changes: 9 additions & 2 deletions src/test/java/com/pusher/client/EndToEndTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.pusher.client.connection.websocket.WebSocketListener;
import com.pusher.client.util.DoNothingExecutor;
import com.pusher.client.util.Factory;
import com.pusher.client.util.InstantExecutor;

@RunWith(MockitoJUnitRunner.class)
public class EndToEndTest {
Expand Down Expand Up @@ -56,7 +55,15 @@ public void setUp() throws Exception {

connection = new WebSocketConnection(pusherOptions.buildUrl(API_KEY), ACTIVITY_TIMEOUT, PONG_TIMEOUT, proxy, factory);

when(factory.getEventQueue()).thenReturn(new InstantExecutor());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final Runnable r = (Runnable) invocation.getArguments()[0];
r.run();
return null;
}
}).when(factory).queueOnEventThread(any(Runnable.class));

when(factory.getTimers()).thenReturn(new DoNothingExecutor());
when(factory.newWebSocketClientWrapper(any(URI.class), any(Proxy.class), any(WebSocketListener.class))).thenAnswer(
new Answer<WebSocketClientWrapper>() {
Expand Down
13 changes: 10 additions & 3 deletions src/test/java/com/pusher/client/PusherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import com.pusher.client.channel.ChannelEventListener;
import com.pusher.client.channel.PresenceChannelEventListener;
Expand All @@ -22,7 +24,6 @@
import com.pusher.client.connection.impl.InternalConnection;
import com.pusher.client.util.Factory;
import com.pusher.client.util.HttpAuthorizer;
import com.pusher.client.util.InstantExecutor;

@RunWith(MockitoJUnitRunner.class)
public class PusherTest {
Expand Down Expand Up @@ -58,8 +59,14 @@ public void setUp() {
.thenReturn(mockPrivateChannel);
when(factory.newPresenceChannel(mockConnection, PRESENCE_CHANNEL_NAME, authorizer)).thenReturn(
mockPresenceChannel);
when(factory.getEventQueue()).thenReturn(new InstantExecutor());

doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final Runnable r = (Runnable) invocation.getArguments()[0];
r.run();
return null;
}
}).when(factory).queueOnEventThread(any(Runnable.class));
pusher = new Pusher(API_KEY, options, factory);
}

Expand Down
12 changes: 10 additions & 2 deletions src/test/java/com/pusher/client/channel/impl/ChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import com.pusher.client.channel.ChannelEventListener;
import com.pusher.client.channel.ChannelState;
import com.pusher.client.util.Factory;
import com.pusher.client.util.InstantExecutor;

@RunWith(MockitoJUnitRunner.class)
public class ChannelImplTest {
Expand All @@ -27,7 +28,14 @@ public class ChannelImplTest {

@Before
public void setUp() {
when(factory.getEventQueue()).thenReturn(new InstantExecutor());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final Runnable r = (Runnable) invocation.getArguments()[0];
r.run();
return null;
}
}).when(factory).queueOnEventThread(any(Runnable.class));

mockListener = getEventListener();
channel = newInstance(getChannelName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.junit.runner.RunWith;
import static org.junit.Assert.*;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import com.pusher.client.AuthorizationFailureException;
import com.pusher.client.channel.Channel;
Expand All @@ -22,7 +24,6 @@
import com.pusher.client.connection.ConnectionStateChange;
import com.pusher.client.connection.impl.InternalConnection;
import com.pusher.client.util.Factory;
import com.pusher.client.util.InstantExecutor;

@RunWith(MockitoJUnitRunner.class)
public class ChannelManagerTest {
Expand All @@ -48,12 +49,18 @@ public class ChannelManagerTest {
private ChannelManager subscriptionTestChannelManager;
private @Mock Factory subscriptionTestFactory;
private @Mock InternalConnection subscriptionTestConnection;
private @Mock InstantExecutor mockQueue;

@Before
public void setUp() throws AuthorizationFailureException {

when(factory.getEventQueue()).thenReturn(new InstantExecutor());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final Runnable r = (Runnable) invocation.getArguments()[0];
r.run();
return null;
}
}).when(factory).queueOnEventThread(any(Runnable.class));
when(mockInternalChannel.getName()).thenReturn(CHANNEL_NAME);
when(mockInternalChannel.toSubscribeMessage()).thenReturn(OUTGOING_SUBSCRIBE_MESSAGE);
when(mockInternalChannel.toUnsubscribeMessage()).thenReturn(OUTGOING_UNSUBSCRIBE_MESSAGE);
Expand All @@ -71,7 +78,14 @@ public void setUp() throws AuthorizationFailureException {
channelManager.setConnection(mockConnection);


when(subscriptionTestFactory.getEventQueue()).thenReturn(mockQueue);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final Runnable r = (Runnable) invocation.getArguments()[0];
r.run();
return null;
}
}).when(subscriptionTestFactory).queueOnEventThread(any(Runnable.class));
subscriptionTestChannelManager = new ChannelManager(subscriptionTestFactory);
subscriptionTestChannelManager.setConnection(subscriptionTestConnection);

Expand Down Expand Up @@ -265,7 +279,7 @@ public void testUnsubscribeFromSubscribedChannelUnsubscribesInEventQueue() {
subscriptionTestChannelManager.subscribeTo(mockInternalChannel, mockEventListener);
subscriptionTestChannelManager.unsubscribeFrom(CHANNEL_NAME);

verify(mockQueue).execute(any(Runnable.class));
verify(subscriptionTestFactory).queueOnEventThread(any(Runnable.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import com.pusher.client.channel.impl.ChannelManager;
import com.pusher.client.connection.ConnectionEventListener;
import com.pusher.client.connection.ConnectionState;
import com.pusher.client.connection.ConnectionStateChange;
import com.pusher.client.util.DoNothingExecutor;
import com.pusher.client.util.Factory;
import com.pusher.client.util.InstantExecutor;

@RunWith(MockitoJUnitRunner.class)
public class WebSocketConnectionTest {
Expand Down Expand Up @@ -53,7 +54,14 @@ public void setUp() throws URISyntaxException, SSLException {
when(factory.getChannelManager()).thenReturn(mockChannelManager);
when(factory.newWebSocketClientWrapper(any(URI.class), any(Proxy.class), any(WebSocketConnection.class))).thenReturn(
mockUnderlyingConnection);
when(factory.getEventQueue()).thenReturn(new InstantExecutor());
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final Runnable r = (Runnable) invocation.getArguments()[0];
r.run();
return null;
}
}).when(factory).queueOnEventThread(any(Runnable.class));
when(factory.getTimers()).thenReturn(new DoNothingExecutor());

connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory);
Expand Down
Loading