diff --git a/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java new file mode 100644 index 000000000..7bddc45ea --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java @@ -0,0 +1,74 @@ +package quickfix; + +public abstract class AbstractSessionConnectorBuilder { + private final Class derived; + Application application; + MessageStoreFactory messageStoreFactory; + SessionSettings settings; + LogFactory logFactory; + MessageFactory messageFactory; + + int queueCapacity = -1; + int queueLowerWatermark = -1; + int queueUpperWatermark = -1; + + AbstractSessionConnectorBuilder(Class derived) { + this.derived = derived; + } + + public Derived withApplication(Application val) throws ConfigError { + application = val; + return derived.cast(this); + } + + public Derived withMessageStoreFactory(MessageStoreFactory val) throws ConfigError { + messageStoreFactory = val; + return derived.cast(this); + } + + public Derived withSettings(SessionSettings val) { + settings = val; + return derived.cast(this); + } + + public Derived withLogFactory(LogFactory val) throws ConfigError { + logFactory = val; + return derived.cast(this); + } + + public Derived withMessageFactory(MessageFactory val) throws ConfigError { + messageFactory = val; + return derived.cast(this); + } + + public Derived withQueueCapacity(int val) throws ConfigError { + if (queueLowerWatermark >= 0) { + throw new ConfigError("queue capacity and watermarks may not be configured together"); + } else if (queueCapacity < 0) { + throw new ConfigError("negative queue capacity"); + } + queueCapacity = val; + return derived.cast(this); + } + + public Derived withQueueWatermarks(int lower, int upper) throws ConfigError { + if (queueCapacity >= 0) { + throw new ConfigError("queue capacity and watermarks may not be configured together"); + } else if (queueLowerWatermark < 0 || queueUpperWatermark <= queueLowerWatermark) { + throw new ConfigError("invalid queue watermarks, required: 0 <= lower watermark < upper watermark"); + } + queueLowerWatermark = lower; + queueUpperWatermark = upper; + return derived.cast(this); + } + + public final Product build() throws ConfigError { + if (logFactory == null) { + logFactory = new ScreenLogFactory(settings); + } + + return doBuild(); + } + + protected abstract Product doBuild() throws ConfigError; +} diff --git a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java index c8dfc5edf..076bc3d76 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java @@ -31,6 +31,34 @@ public class SocketAcceptor extends AbstractSocketAcceptor { private volatile Boolean isStarted = Boolean.FALSE; private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; + private SocketAcceptor(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected SocketAcceptor doBuild() throws ConfigError { + return new SocketAcceptor(this); + } + } + public SocketAcceptor(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory, int queueCapacity) diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index 71d5931ce..184eeda15 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -31,6 +31,34 @@ public class SocketInitiator extends AbstractSocketInitiator { private volatile Boolean isStarted = Boolean.FALSE; private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; + private SocketInitiator(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected SocketInitiator doBuild() throws ConfigError { + return new SocketInitiator(this); + } + } + public SocketInitiator(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, MessageFactory messageFactory, int queueCapacity) throws ConfigError { super(application, messageStoreFactory, settings, new ScreenLogFactory(settings), diff --git a/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java index adf3dbbb6..608f67711 100644 --- a/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java @@ -29,6 +29,34 @@ public class ThreadedSocketAcceptor extends AbstractSocketAcceptor { private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy; + private ThreadedSocketAcceptor(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected ThreadedSocketAcceptor doBuild() throws ConfigError { + return new ThreadedSocketAcceptor(this); + } + } + public ThreadedSocketAcceptor(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory, int queueCapacity ) diff --git a/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java index 29233c65a..a011c25ea 100644 --- a/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java @@ -29,6 +29,34 @@ public class ThreadedSocketInitiator extends AbstractSocketInitiator { private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy; + private ThreadedSocketInitiator(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected ThreadedSocketInitiator doBuild() throws ConfigError { + return new ThreadedSocketInitiator(this); + } + } + public ThreadedSocketInitiator(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory, int queueCapacity) throws ConfigError { diff --git a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java index 1b2b50e81..573288f17 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java @@ -28,7 +28,6 @@ * it only handles message reception events. */ public interface EventHandlingStrategy { - /** * Constant indicating how long we wait for an incoming message. After * thread has been asked to stop, it can take up to this long to terminate. diff --git a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java index df3ba2660..1c32dcb36 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java +++ b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java @@ -95,4 +95,7 @@ public String getRemoteAddress() { return null; } + IoSession getIoSession() { + return ioSession; + } } diff --git a/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java b/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java new file mode 100644 index 000000000..3f47341bf --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java @@ -0,0 +1,11 @@ +package quickfix.mina; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +interface QueueTracker { + void put(E e) throws InterruptedException; + E poll(long timeout, TimeUnit unit) throws InterruptedException; + int drainTo(Collection collection); +} diff --git a/quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java b/quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java new file mode 100644 index 000000000..00f1ecb3b --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java @@ -0,0 +1,89 @@ +package quickfix.mina; + +import org.apache.mina.core.session.IoSession; +import quickfix.Responder; +import quickfix.Session; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static java.lang.String.format; + +/** + * Queue trackers factory methods + */ +final class QueueTrackers { + private static final String LOWER_WATERMARK_FMT = "inbound queue size < lower watermark (%d), socket reads resumed"; + private static final String UPPER_WATERMARK_FMT = "inbound queue size > upper watermark (%d), socket reads suspended"; + + /** + * Watermarks-based queue tracker + */ + static WatermarkTracker newMultiSessionWatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Function classifier) { + return WatermarkTracker.newMulti(queue, lowerWatermark, upperWatermark, classifier, + qfSession -> resumeReads(qfSession, (int)lowerWatermark), + qfSession -> suspendReads(qfSession, (int)upperWatermark)); + } + + /** + * Default no-op queue tracker + */ + static QueueTracker newDefaultQueueTracker(BlockingQueue queue) { + return new QueueTracker() { + @Override + public void put(E e) throws InterruptedException { + queue.put(e); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + + @Override + public int drainTo(Collection collection) { + return queue.drainTo(collection); + } + }; + } + + private static IoSession lookupIoSession(Session qfSession) { + final Responder responder = qfSession.getResponder(); + + if (responder instanceof IoSessionResponder) { + return ((IoSessionResponder)responder).getIoSession(); + } else { + return null; + } + } + + private static void resumeReads(Session qfSession, int queueLowerWatermark) { + final IoSession ioSession = lookupIoSession(qfSession); + if (ioSession != null && ioSession.isReadSuspended()) { + ioSession.resumeRead(); + qfSession.getLog().onEvent(format(LOWER_WATERMARK_FMT, queueLowerWatermark)); + } + } + + private static void suspendReads(Session qfSession, int queueUpperWatermark) { + final IoSession ioSession = lookupIoSession(qfSession); + if (ioSession != null && !ioSession.isReadSuspended()) { + ioSession.suspendRead(); + qfSession.getLog().onEvent(format(UPPER_WATERMARK_FMT, queueUpperWatermark)); + } + } + + static WatermarkTracker newSingleSessionWatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Session qfSession) { + return WatermarkTracker.newMono(queue, lowerWatermark, upperWatermark, + () -> resumeReads(qfSession, (int)lowerWatermark), + () -> suspendReads(qfSession, (int)upperWatermark)); + } +} diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index e73056570..b14f4c653 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -20,11 +20,7 @@ package quickfix.mina; -import quickfix.LogUtil; -import quickfix.Message; -import quickfix.Session; -import quickfix.SessionID; -import quickfix.SystemTime; +import quickfix.*; import java.util.ArrayList; import java.util.List; @@ -34,12 +30,16 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static quickfix.mina.QueueTrackers.newDefaultQueueTracker; +import static quickfix.mina.QueueTrackers.newMultiSessionWatermarkTracker; + /** * Processes messages for all sessions in a single thread. */ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrategy { public static final String MESSAGE_PROCESSOR_THREAD_NAME = "QFJ Message Processor"; private final BlockingQueue eventQueue; + private final QueueTracker queueTracker; private final SessionConnector sessionConnector; private volatile ThreadAdapter messageProcessingThread; private volatile boolean isStopped; @@ -49,6 +49,14 @@ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrateg public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueCapacity) { sessionConnector = connector; eventQueue = new LinkedBlockingQueue<>(queueCapacity); + queueTracker = newDefaultQueueTracker(eventQueue); + } + + public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) { + sessionConnector = connector; + eventQueue = new LinkedBlockingQueue<>(); + queueTracker = newMultiSessionWatermarkTracker(eventQueue, queueLowerWatermark, queueUpperWatermark, + evt -> evt.quickfixSession); } public void setExecutor(Executor executor) { @@ -61,7 +69,7 @@ public void onMessage(Session quickfixSession, Message message) { return; } try { - eventQueue.put(new SessionMessageEvent(quickfixSession, message)); + queueTracker.put(new SessionMessageEvent(quickfixSession, message)); } catch (InterruptedException e) { isStopped = true; throw new RuntimeException(e); @@ -79,7 +87,7 @@ public void block() { if (isStopped) { if (!eventQueue.isEmpty()) { final List tempList = new ArrayList<>(); - eventQueue.drainTo(tempList); + queueTracker.drainTo(tempList); for (SessionMessageEvent event : tempList) { event.processMessage(); } @@ -107,7 +115,7 @@ public void block() { } private SessionMessageEvent getMessage() throws InterruptedException { - return eventQueue.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); + return queueTracker.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); } /** diff --git a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java index 209a78bc4..5cdd3bbe0 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java @@ -20,10 +20,7 @@ package quickfix.mina; -import quickfix.LogUtil; -import quickfix.Message; -import quickfix.Session; -import quickfix.SessionID; +import quickfix.*; import java.util.ArrayList; import java.util.Collection; @@ -35,19 +32,32 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static quickfix.mina.QueueTrackers.newDefaultQueueTracker; +import static quickfix.mina.QueueTrackers.newSingleSessionWatermarkTracker; + /** * Processes messages in a session-specific thread. */ public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy { - private final ConcurrentMap dispatchers = new ConcurrentHashMap<>(); private final SessionConnector sessionConnector; private final int queueCapacity; + private final int queueLowerWatermark; + private final int queueUpperWatermark; private volatile Executor executor; public ThreadPerSessionEventHandlingStrategy(SessionConnector connector, int queueCapacity) { sessionConnector = connector; this.queueCapacity = queueCapacity; + this.queueLowerWatermark = -1; + this.queueUpperWatermark = -1; + } + + public ThreadPerSessionEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) { + sessionConnector = connector; + this.queueCapacity = -1; + this.queueLowerWatermark = queueLowerWatermark; + this.queueUpperWatermark = queueUpperWatermark; } public void setExecutor(Executor executor) { @@ -59,7 +69,7 @@ public void onMessage(Session quickfixSession, Message message) { MessageDispatchingThread dispatcher = dispatchers.get(quickfixSession.getSessionID()); if (dispatcher == null) { dispatcher = dispatchers.computeIfAbsent(quickfixSession.getSessionID(), sessionID -> { - final MessageDispatchingThread newDispatcher = new MessageDispatchingThread(quickfixSession, queueCapacity, executor); + final MessageDispatchingThread newDispatcher = new MessageDispatchingThread(quickfixSession, executor); startDispatcherThread(newDispatcher); return newDispatcher; }); @@ -161,13 +171,21 @@ public void execute(Runnable command) { protected class MessageDispatchingThread extends ThreadAdapter { private final Session quickfixSession; private final BlockingQueue messages; + private final QueueTracker queueTracker; private volatile boolean stopped; private volatile boolean stopping; - private MessageDispatchingThread(Session session, int queueCapacity, Executor executor) { + private MessageDispatchingThread(Session session, Executor executor) { super("QF/J Session dispatcher: " + session.getSessionID(), executor); quickfixSession = session; - messages = new LinkedBlockingQueue<>(queueCapacity); + if (queueCapacity >= 0) { + messages = new LinkedBlockingQueue<>(queueCapacity); + queueTracker = newDefaultQueueTracker(messages); + } else { + messages = new LinkedBlockingQueue<>(); + queueTracker = newSingleSessionWatermarkTracker(messages, queueLowerWatermark, queueUpperWatermark, + quickfixSession); + } } public void enqueue(Message message) { @@ -175,7 +193,7 @@ public void enqueue(Message message) { return; } try { - messages.put(message); + queueTracker.put(message); } catch (final InterruptedException e) { quickfixSession.getLog().onErrorEvent(e.toString()); } @@ -189,7 +207,7 @@ public int getQueueSize() { void doRun() { while (!stopping) { try { - final Message message = getNextMessage(messages); + final Message message = getNextMessage(queueTracker); if (message == null) { // no message available in polling interval continue; @@ -209,7 +227,7 @@ void doRun() { } if (!messages.isEmpty()) { final List tempList = new ArrayList<>(); - messages.drainTo(tempList); + queueTracker.drainTo(tempList); for (Message message : tempList) { try { quickfixSession.next(message); @@ -245,12 +263,12 @@ protected MessageDispatchingThread getDispatcher(SessionID sessionID) { * We do not block indefinitely as that would prevent this thread from ever stopping * * @see #THREAD_WAIT_FOR_MESSAGE_MS - * @param messages + * @param queueTracker * @return next message or null if nothing arrived within the timeout period * @throws InterruptedException */ - protected Message getNextMessage(BlockingQueue messages) throws InterruptedException { - return messages.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); + protected Message getNextMessage(QueueTracker queueTracker) throws InterruptedException { + return queueTracker.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); } @Override diff --git a/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java new file mode 100644 index 000000000..25e910e7b --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java @@ -0,0 +1,176 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix.mina; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A blocking queue wrapper implementing watermarks-based back pressure propagation + * from the queue sink to one or more logical sources. + * + * @param payload type + * @param logical source key type + * + * @author Vladimir Lysyy (mrbald@github) + */ +public class WatermarkTracker implements QueueTracker { + private final BlockingQueue queue; + private final long lowerWatermark; + private final long upperWatermark; + private final Consumer onLowerWatermarkCrossed; + private final Consumer onUpperWatermarkCrossed; + private final Function classifier; + private final Function trackerSupplier; + + class StreamTracker { + private final S key; + long counter = 0; + private boolean suspended = false; + + StreamTracker(S key) { + this.key = key; + } + + synchronized void incoming(int n) { + if ((counter += n) >= upperWatermark && !suspended) { + suspended = true; + onUpperWatermarkCrossed.accept(key); + } + } + + synchronized void outgoing(int n) { + if ((counter -= n) == lowerWatermark && suspended) { + suspended = false; + onLowerWatermarkCrossed.accept(key); + } + } + + synchronized boolean isSuspended() { + return suspended; + } + } + + static WatermarkTracker newMono( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) { + return new WatermarkTracker<>(queue, lowerWatermark, upperWatermark, onLowerWatermarkCrossed, onUpperWatermarkCrossed); + } + + static WatermarkTracker newMulti( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Function classifier, + Consumer onLowerWatermarkCrossed, Consumer onUpperWatermarkCrossed) { + return new WatermarkTracker<>(queue, lowerWatermark, upperWatermark, classifier, onLowerWatermarkCrossed, onUpperWatermarkCrossed); + } + + private WatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Function classifier, + Consumer onLowerWatermarkCrossed, Consumer onUpperWatermarkCrossed) { + this.queue = queue; + this.lowerWatermark = lowerWatermark; + this.upperWatermark = upperWatermark; + this.classifier = classifier; + this.onLowerWatermarkCrossed = onLowerWatermarkCrossed; + this.onUpperWatermarkCrossed = onUpperWatermarkCrossed; + + final Map trackerMap = new ConcurrentHashMap<>(); + + this.trackerSupplier = key -> trackerMap.computeIfAbsent(key, StreamTracker::new); + } + + private WatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) { + this.queue = queue; + this.lowerWatermark = lowerWatermark; + this.upperWatermark = upperWatermark; + this.classifier = x -> null; + this.onLowerWatermarkCrossed = x -> onLowerWatermarkCrossed.run(); + this.onUpperWatermarkCrossed = x -> onUpperWatermarkCrossed.run(); + + final StreamTracker streamTracker = new StreamTracker(null); + + this.trackerSupplier = key -> streamTracker; + } + + @Override + public void put(E e) throws InterruptedException { + queue.put(e); + trackerForPayload(e).incoming(1); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + final E e = queue.poll(timeout, unit); + + if (e != null) { + trackerForPayload(e).outgoing(1); + } + + return e; + } + + @Override + public int drainTo(Collection collection) { + return queue.drainTo(new AbstractCollection() { + @Override public Iterator iterator() { throw new UnsupportedOperationException(); } + @Override public int size() { throw new UnsupportedOperationException(); } + + public boolean add(E e) { + final boolean added = collection.add(e); + if (added) { + trackerForPayload(e).outgoing(1); + } + return added; + } + + }); + } + + public boolean isSuspended(S key) { + return trackerForStream(key).isSuspended(); + } + + public boolean isSuspended() { + return isSuspended(null); + } + + StreamTracker trackerForPayload(E e) { + return trackerForStream(classifier.apply(e)); + } + + StreamTracker trackerForStream(S s) { + return trackerSupplier.apply(s); + } + +} diff --git a/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java b/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java index 06d005f17..e1a286f1f 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java @@ -47,7 +47,6 @@ import quickfix.fix40.Logon; import java.util.Date; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -74,7 +73,7 @@ protected void startDispatcherThread( } @Override - protected Message getNextMessage(BlockingQueue messages) throws InterruptedException { + protected Message getNextMessage(QueueTracker queueTracker) throws InterruptedException { if (getMessageCount-- == 0) { throw new InterruptedException("END COUNT"); } @@ -84,7 +83,7 @@ protected Message getNextMessage(BlockingQueue messages) throws Interru } throw (RuntimeException) getNextMessageException; } - return super.getNextMessage(messages); + return super.getNextMessage(queueTracker); } } diff --git a/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java b/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java new file mode 100644 index 000000000..fbb6d9ccf --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java @@ -0,0 +1,151 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix.mina; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +import static quickfix.mina.WatermarkTracker.newMono; +import static quickfix.mina.WatermarkTracker.newMulti; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class WatermarkTrackerTest { + @Mock + private Runnable onLowerMono; + + @Mock + private Runnable onUpperMono; + + @Mock + private Consumer onLowerMulti; + + @Mock + private Consumer onUpperMulti; + + private BlockingQueue queue; + + private WatermarkTracker mono; + + @Before + public void setUp() { + queue = new ArrayBlockingQueue<>(4); + mono = newMono(queue, 1, 3, onLowerMono, onUpperMono); + } + + @Test + public void basics() throws InterruptedException { + mono.put(1); + assertEquals(1, queue.size()); + + final Integer x = mono.poll(1, TimeUnit.DAYS); + assertEquals(0, queue.size()); + assertEquals(1, x.intValue()); + } + + /* + * Queue size over time in the below test, covers all scenarios + * + * (queue size) + * 3 * * + * 2 * * * + * 1 * * * + * 0 * * * + * 1 2 3 4 5 6 7 8 9 0 1 (steps) + */ + @Test + public void watermarks() throws InterruptedException { + // cross lower watermark up + mono.put(1); + verifyState(false, false, false); + + // cross lower watermark down while not suspended + mono.poll(1, TimeUnit.DAYS); + verifyState(false, false, false); + + // cross lower then upper watermarks up + mono.put(1); + verifyState(false, false, false); + + mono.put(2); + verifyState(false, false, false); + + mono.put(3); + verifyState(true, false, true); + + // cross upper watermark down + mono.poll(1, TimeUnit.DAYS); // 3 + verifyState(true, false, false); + + // cross upper watermark back up (without reaching the lower) + mono.put(3); + verifyState(true, false, false); + + // cross upper then lower watermarks down + mono.poll(1, TimeUnit.DAYS); // 2 + verifyState(true, false, false); + + mono.poll(1, TimeUnit.DAYS); // 2 + verifyState(false, true, false); + + mono.poll(1, TimeUnit.DAYS); // 1 + verifyState(false, false, false); + } + + @Test + public void multiShouldWork() throws InterruptedException { + final Function classifier = x -> x % 2; + final WatermarkTracker multi + = newMulti(queue, 1, 3, classifier, onLowerMulti, onUpperMulti); + + assertEquals(multi.trackerForStream(0), multi.trackerForStream(0)); + assertEquals(multi.trackerForStream(1), multi.trackerForStream(1)); + assertNotEquals(multi.trackerForStream(0), multi.trackerForStream(1)); + + assertEquals(multi.trackerForPayload(0), multi.trackerForPayload(2)); + assertEquals(multi.trackerForPayload(1), multi.trackerForPayload(3)); + assertNotEquals(multi.trackerForPayload(0), multi.trackerForPayload(1)); + + multi.put(1); + assertEquals(1, multi.trackerForPayload(1).counter); + multi.put(3); + assertEquals(2, multi.trackerForPayload(3).counter); + } + + // === helpers === + + private void verifyState(boolean suspended, boolean onLower, boolean onUpper) { + assertEquals(suspended, mono.isSuspended()); + verify(onLowerMono, times(onLower ? 1 : 0)).run(); + verify(onUpperMono, times(onUpper ? 1 : 0)).run(); + reset(onLowerMono, onUpperMono); + } +} \ No newline at end of file