diff --git a/src/main/java/com/pusher/client/channel/impl/ChannelImpl.java b/src/main/java/com/pusher/client/channel/impl/ChannelImpl.java index 671ee134..a46af90f 100644 --- a/src/main/java/com/pusher/client/channel/impl/ChannelImpl.java +++ b/src/main/java/com/pusher/client/channel/impl/ChannelImpl.java @@ -111,7 +111,7 @@ public void onMessage(final String event, final String message) { for (final SubscriptionEventListener listener : listeners) { final String data = extractDataFrom(message); - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { listener.onEvent(name, event, data); @@ -155,7 +155,7 @@ public void updateState(final ChannelState state) { this.state = state; if (state == ChannelState.SUBSCRIBED && eventListener != null) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { eventListener.onSubscriptionSucceeded(ChannelImpl.this.getName()); diff --git a/src/main/java/com/pusher/client/channel/impl/ChannelManager.java b/src/main/java/com/pusher/client/channel/impl/ChannelManager.java index 81039117..da0a064f 100644 --- a/src/main/java/com/pusher/client/channel/impl/ChannelManager.java +++ b/src/main/java/com/pusher/client/channel/impl/ChannelManager.java @@ -131,7 +131,7 @@ public void onError(final String message, final String code, final Exception e) private void sendOrQueueSubscribeMessage(final InternalChannel channel) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { @@ -141,8 +141,7 @@ public void run() { final String message = channel.toSubscribeMessage(); connection.sendMessage(message); channel.updateState(ChannelState.SUBSCRIBE_SENT); - } - catch (final AuthorizationFailureException e) { + } catch (final AuthorizationFailureException e) { clearDownSubscription(channel, e); } } @@ -151,7 +150,7 @@ public void run() { } private void sendUnsubscribeMessage(final InternalChannel channel) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { connection.sendMessage(channel.toUnsubscribeMessage()); @@ -166,7 +165,7 @@ private void clearDownSubscription(final InternalChannel channel, final Exceptio channel.updateState(ChannelState.FAILED); if (channel.getEventListener() != null) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { diff --git a/src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java b/src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java index 4232ebb8..31c123e6 100644 --- a/src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java +++ b/src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java @@ -61,7 +61,7 @@ public WebSocketConnection( @Override public void connect() { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { @@ -82,7 +82,7 @@ public void run() { @Override public void disconnect() { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { if (state == ConnectionState.CONNECTED) { @@ -112,7 +112,7 @@ public ConnectionState getState() { @Override public void sendMessage(final String message) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { try { @@ -148,7 +148,7 @@ private void updateState(final ConnectionState newState) { interestedListeners.addAll(eventListeners.get(newState)); for (final ConnectionEventListener listener : interestedListeners) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { listener.onConnectionStateChange(change); @@ -216,7 +216,7 @@ private void sendErrorToAllListeners(final String message, final String code, fi } for (final ConnectionEventListener listener : allListeners) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { listener.onError(message, code, e); @@ -237,7 +237,7 @@ public void onOpen(final ServerHandshake handshakedata) { public void onMessage(final String message) { activityTimer.activity(); - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { final Map map = new Gson().fromJson(message, Map.class); @@ -251,7 +251,7 @@ public void run() { public void onClose(final int code, final String reason, final boolean remote) { activityTimer.cancelTimeouts(); - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { if (state != ConnectionState.DISCONNECTED) { @@ -268,7 +268,7 @@ public void run() { @Override public void onError(final Exception ex) { - factory.getEventQueue().execute(new Runnable() { + factory.queueOnEventThread(new Runnable() { @Override public void run() { // Do not change connection state as Java_WebSocket will also diff --git a/src/main/java/com/pusher/client/util/Factory.java b/src/main/java/com/pusher/client/util/Factory.java index ffff5782..8c4ce23b 100644 --- a/src/main/java/com/pusher/client/util/Factory.java +++ b/src/main/java/com/pusher/client/util/Factory.java @@ -50,6 +50,7 @@ public class Factory { private ChannelManager channelManager; private ExecutorService eventQueue; private ScheduledExecutorService timers; + private static final Object eventLock = new Object(); public synchronized InternalConnection getConnection(final String apiKey, final PusherOptions options) { if (connection == null) { @@ -68,13 +69,6 @@ public WebSocketClient newWebSocketClientWrapper(final URI uri, final Proxy prox return new WebSocketClientWrapper(uri, proxy, webSocketListener); } - public synchronized ExecutorService getEventQueue() { - if (eventQueue == null) { - eventQueue = Executors.newSingleThreadExecutor(new DaemonThreadFactory("eventQueue")); - } - return eventQueue; - } - public synchronized ScheduledExecutorService getTimers() { if (timers == null) { timers = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("timers")); @@ -103,6 +97,17 @@ public synchronized ChannelManager getChannelManager() { return channelManager; } + public void queueOnEventThread(final Runnable r) { + getEventQueue().execute(new Runnable() { + @Override + public void run() { + synchronized (eventLock) { + r.run(); + } + } + }); + } + public synchronized void shutdownThreads() { if (eventQueue != null) { eventQueue.shutdown(); @@ -114,6 +119,13 @@ public synchronized void shutdownThreads() { } } + private synchronized ExecutorService getEventQueue() { + if (eventQueue == null) { + eventQueue = Executors.newSingleThreadExecutor(new DaemonThreadFactory("eventQueue")); + } + return eventQueue; + } + private static class DaemonThreadFactory implements ThreadFactory { private final String name; diff --git a/src/test/java/com/pusher/client/EndToEndTest.java b/src/test/java/com/pusher/client/EndToEndTest.java index 47d2cd89..2f280a76 100644 --- a/src/test/java/com/pusher/client/EndToEndTest.java +++ b/src/test/java/com/pusher/client/EndToEndTest.java @@ -26,7 +26,6 @@ import com.pusher.client.connection.websocket.WebSocketListener; import com.pusher.client.util.DoNothingExecutor; import com.pusher.client.util.Factory; -import com.pusher.client.util.InstantExecutor; @RunWith(MockitoJUnitRunner.class) public class EndToEndTest { @@ -56,7 +55,15 @@ public void setUp() throws Exception { connection = new WebSocketConnection(pusherOptions.buildUrl(API_KEY), ACTIVITY_TIMEOUT, PONG_TIMEOUT, proxy, factory); - when(factory.getEventQueue()).thenReturn(new InstantExecutor()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + } + }).when(factory).queueOnEventThread(any(Runnable.class)); + when(factory.getTimers()).thenReturn(new DoNothingExecutor()); when(factory.newWebSocketClientWrapper(any(URI.class), any(Proxy.class), any(WebSocketListener.class))).thenAnswer( new Answer() { diff --git a/src/test/java/com/pusher/client/PusherTest.java b/src/test/java/com/pusher/client/PusherTest.java index 9d73df89..b2af4d8f 100644 --- a/src/test/java/com/pusher/client/PusherTest.java +++ b/src/test/java/com/pusher/client/PusherTest.java @@ -8,7 +8,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import com.pusher.client.channel.ChannelEventListener; import com.pusher.client.channel.PresenceChannelEventListener; @@ -22,7 +24,6 @@ import com.pusher.client.connection.impl.InternalConnection; import com.pusher.client.util.Factory; import com.pusher.client.util.HttpAuthorizer; -import com.pusher.client.util.InstantExecutor; @RunWith(MockitoJUnitRunner.class) public class PusherTest { @@ -58,8 +59,14 @@ public void setUp() { .thenReturn(mockPrivateChannel); when(factory.newPresenceChannel(mockConnection, PRESENCE_CHANNEL_NAME, authorizer)).thenReturn( mockPresenceChannel); - when(factory.getEventQueue()).thenReturn(new InstantExecutor()); - + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + } + }).when(factory).queueOnEventThread(any(Runnable.class)); pusher = new Pusher(API_KEY, options, factory); } diff --git a/src/test/java/com/pusher/client/channel/impl/ChannelImplTest.java b/src/test/java/com/pusher/client/channel/impl/ChannelImplTest.java index 2eae2960..bbf054a3 100644 --- a/src/test/java/com/pusher/client/channel/impl/ChannelImplTest.java +++ b/src/test/java/com/pusher/client/channel/impl/ChannelImplTest.java @@ -10,12 +10,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import com.pusher.client.channel.ChannelEventListener; import com.pusher.client.channel.ChannelState; import com.pusher.client.util.Factory; -import com.pusher.client.util.InstantExecutor; @RunWith(MockitoJUnitRunner.class) public class ChannelImplTest { @@ -27,7 +28,14 @@ public class ChannelImplTest { @Before public void setUp() { - when(factory.getEventQueue()).thenReturn(new InstantExecutor()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + } + }).when(factory).queueOnEventThread(any(Runnable.class)); mockListener = getEventListener(); channel = newInstance(getChannelName()); diff --git a/src/test/java/com/pusher/client/channel/impl/ChannelManagerTest.java b/src/test/java/com/pusher/client/channel/impl/ChannelManagerTest.java index 416f4c1a..1a96b144 100644 --- a/src/test/java/com/pusher/client/channel/impl/ChannelManagerTest.java +++ b/src/test/java/com/pusher/client/channel/impl/ChannelManagerTest.java @@ -8,7 +8,9 @@ import org.junit.runner.RunWith; import static org.junit.Assert.*; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import com.pusher.client.AuthorizationFailureException; import com.pusher.client.channel.Channel; @@ -22,7 +24,6 @@ import com.pusher.client.connection.ConnectionStateChange; import com.pusher.client.connection.impl.InternalConnection; import com.pusher.client.util.Factory; -import com.pusher.client.util.InstantExecutor; @RunWith(MockitoJUnitRunner.class) public class ChannelManagerTest { @@ -48,12 +49,18 @@ public class ChannelManagerTest { private ChannelManager subscriptionTestChannelManager; private @Mock Factory subscriptionTestFactory; private @Mock InternalConnection subscriptionTestConnection; - private @Mock InstantExecutor mockQueue; @Before public void setUp() throws AuthorizationFailureException { - when(factory.getEventQueue()).thenReturn(new InstantExecutor()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + } + }).when(factory).queueOnEventThread(any(Runnable.class)); when(mockInternalChannel.getName()).thenReturn(CHANNEL_NAME); when(mockInternalChannel.toSubscribeMessage()).thenReturn(OUTGOING_SUBSCRIBE_MESSAGE); when(mockInternalChannel.toUnsubscribeMessage()).thenReturn(OUTGOING_UNSUBSCRIBE_MESSAGE); @@ -71,7 +78,14 @@ public void setUp() throws AuthorizationFailureException { channelManager.setConnection(mockConnection); - when(subscriptionTestFactory.getEventQueue()).thenReturn(mockQueue); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + } + }).when(subscriptionTestFactory).queueOnEventThread(any(Runnable.class)); subscriptionTestChannelManager = new ChannelManager(subscriptionTestFactory); subscriptionTestChannelManager.setConnection(subscriptionTestConnection); @@ -265,7 +279,7 @@ public void testUnsubscribeFromSubscribedChannelUnsubscribesInEventQueue() { subscriptionTestChannelManager.subscribeTo(mockInternalChannel, mockEventListener); subscriptionTestChannelManager.unsubscribeFrom(CHANNEL_NAME); - verify(mockQueue).execute(any(Runnable.class)); + verify(subscriptionTestFactory).queueOnEventThread(any(Runnable.class)); } @Test diff --git a/src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java b/src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java index 3998fcf9..7f0b8cd1 100644 --- a/src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java +++ b/src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java @@ -15,7 +15,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import com.pusher.client.channel.impl.ChannelManager; import com.pusher.client.connection.ConnectionEventListener; @@ -23,7 +25,6 @@ import com.pusher.client.connection.ConnectionStateChange; import com.pusher.client.util.DoNothingExecutor; import com.pusher.client.util.Factory; -import com.pusher.client.util.InstantExecutor; @RunWith(MockitoJUnitRunner.class) public class WebSocketConnectionTest { @@ -53,7 +54,14 @@ public void setUp() throws URISyntaxException, SSLException { when(factory.getChannelManager()).thenReturn(mockChannelManager); when(factory.newWebSocketClientWrapper(any(URI.class), any(Proxy.class), any(WebSocketConnection.class))).thenReturn( mockUnderlyingConnection); - when(factory.getEventQueue()).thenReturn(new InstantExecutor()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final Runnable r = (Runnable) invocation.getArguments()[0]; + r.run(); + return null; + } + }).when(factory).queueOnEventThread(any(Runnable.class)); when(factory.getTimers()).thenReturn(new DoNothingExecutor()); connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory); diff --git a/src/test/java/com/pusher/client/util/InstantExecutor.java b/src/test/java/com/pusher/client/util/InstantExecutor.java deleted file mode 100644 index 30594bfe..00000000 --- a/src/test/java/com/pusher/client/util/InstantExecutor.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.pusher.client.util; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class InstantExecutor implements ExecutorService { - - @Override - public void execute(final Runnable command) { - command.run(); - } - - @Override - public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { - return false; - } - - @Override - public List> invokeAll(final Collection> tasks) throws InterruptedException { - return null; - } - - @Override - public List> invokeAll(final Collection> tasks, final long timeout, - final TimeUnit unit) throws InterruptedException { - return null; - } - - @Override - public T invokeAny(final Collection> tasks) throws InterruptedException, - ExecutionException { - return null; - } - - @Override - public T invokeAny(final Collection> tasks, final long timeout, final TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return null; - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public boolean isTerminated() { - return false; - } - - @Override - public void shutdown() { - } - - @Override - public List shutdownNow() { - return null; - } - - @Override - public Future submit(final Callable task) { - return null; - } - - @Override - public Future submit(final Runnable task) { - return null; - } - - @Override - public Future submit(final Runnable task, final T result) { - return null; - } - -}