From 2ea92ac2d38e754be218ae0a4cdffd15103c1794 Mon Sep 17 00:00:00 2001 From: mrbald Date: Sun, 28 Jan 2018 22:48:56 +0100 Subject: [PATCH 1/4] QFJ-943: optional watermarks-based back pressure propagaion from inbound queue to the socket --- quickfixj-core/pom.xml | 18 ++ .../AbstractSessionConnectorBuilder.java | 74 +++++++ .../main/java/quickfix/SocketAcceptor.java | 28 +++ .../main/java/quickfix/SocketInitiator.java | 28 +++ .../java/quickfix/ThreadedSocketAcceptor.java | 28 +++ .../quickfix/ThreadedSocketInitiator.java | 28 +++ .../quickfix/mina/IoSessionResponder.java | 3 + .../main/java/quickfix/mina/QueueTracker.java | 30 +++ .../SingleThreadedEventHandlingStrategy.java | 51 ++++- ...ThreadPerSessionEventHandlingStrategy.java | 71 +++++-- .../java/quickfix/mina/WatermarkTracker.java | 185 ++++++++++++++++++ ...adPerSessionEventHandlingStrategyTest.java | 5 +- .../quickfix/mina/WatermarkTrackerTest.java | 156 +++++++++++++++ 13 files changed, 681 insertions(+), 24 deletions(-) create mode 100644 quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java create mode 100644 quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java create mode 100644 quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java create mode 100644 quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java diff --git a/quickfixj-core/pom.xml b/quickfixj-core/pom.xml index 65a58df01..9fb67d54d 100644 --- a/quickfixj-core/pom.xml +++ b/quickfixj-core/pom.xml @@ -401,6 +401,24 @@ + + org.codehaus.mojo + build-helper-maven-plugin + 1.9.1 + + + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources + + + + + diff --git a/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java new file mode 100644 index 000000000..7bddc45ea --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java @@ -0,0 +1,74 @@ +package quickfix; + +public abstract class AbstractSessionConnectorBuilder { + private final Class derived; + Application application; + MessageStoreFactory messageStoreFactory; + SessionSettings settings; + LogFactory logFactory; + MessageFactory messageFactory; + + int queueCapacity = -1; + int queueLowerWatermark = -1; + int queueUpperWatermark = -1; + + AbstractSessionConnectorBuilder(Class derived) { + this.derived = derived; + } + + public Derived withApplication(Application val) throws ConfigError { + application = val; + return derived.cast(this); + } + + public Derived withMessageStoreFactory(MessageStoreFactory val) throws ConfigError { + messageStoreFactory = val; + return derived.cast(this); + } + + public Derived withSettings(SessionSettings val) { + settings = val; + return derived.cast(this); + } + + public Derived withLogFactory(LogFactory val) throws ConfigError { + logFactory = val; + return derived.cast(this); + } + + public Derived withMessageFactory(MessageFactory val) throws ConfigError { + messageFactory = val; + return derived.cast(this); + } + + public Derived withQueueCapacity(int val) throws ConfigError { + if (queueLowerWatermark >= 0) { + throw new ConfigError("queue capacity and watermarks may not be configured together"); + } else if (queueCapacity < 0) { + throw new ConfigError("negative queue capacity"); + } + queueCapacity = val; + return derived.cast(this); + } + + public Derived withQueueWatermarks(int lower, int upper) throws ConfigError { + if (queueCapacity >= 0) { + throw new ConfigError("queue capacity and watermarks may not be configured together"); + } else if (queueLowerWatermark < 0 || queueUpperWatermark <= queueLowerWatermark) { + throw new ConfigError("invalid queue watermarks, required: 0 <= lower watermark < upper watermark"); + } + queueLowerWatermark = lower; + queueUpperWatermark = upper; + return derived.cast(this); + } + + public final Product build() throws ConfigError { + if (logFactory == null) { + logFactory = new ScreenLogFactory(settings); + } + + return doBuild(); + } + + protected abstract Product doBuild() throws ConfigError; +} diff --git a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java index c8dfc5edf..076bc3d76 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java @@ -31,6 +31,34 @@ public class SocketAcceptor extends AbstractSocketAcceptor { private volatile Boolean isStarted = Boolean.FALSE; private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; + private SocketAcceptor(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected SocketAcceptor doBuild() throws ConfigError { + return new SocketAcceptor(this); + } + } + public SocketAcceptor(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory, int queueCapacity) diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index 71d5931ce..184eeda15 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -31,6 +31,34 @@ public class SocketInitiator extends AbstractSocketInitiator { private volatile Boolean isStarted = Boolean.FALSE; private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; + private SocketInitiator(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new SingleThreadedEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected SocketInitiator doBuild() throws ConfigError { + return new SocketInitiator(this); + } + } + public SocketInitiator(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, MessageFactory messageFactory, int queueCapacity) throws ConfigError { super(application, messageStoreFactory, settings, new ScreenLogFactory(settings), diff --git a/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java index adf3dbbb6..608f67711 100644 --- a/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java @@ -29,6 +29,34 @@ public class ThreadedSocketAcceptor extends AbstractSocketAcceptor { private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy; + private ThreadedSocketAcceptor(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected ThreadedSocketAcceptor doBuild() throws ConfigError { + return new ThreadedSocketAcceptor(this); + } + } + public ThreadedSocketAcceptor(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory, int queueCapacity ) diff --git a/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java index 29233c65a..a011c25ea 100644 --- a/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java @@ -29,6 +29,34 @@ public class ThreadedSocketInitiator extends AbstractSocketInitiator { private final ThreadPerSessionEventHandlingStrategy eventHandlingStrategy; + private ThreadedSocketInitiator(Builder builder) throws ConfigError { + super(builder.application, builder.messageStoreFactory, builder.settings, + builder.logFactory, builder.messageFactory); + + if (builder.queueCapacity >= 0) { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueCapacity); + } else { + eventHandlingStrategy + = new ThreadPerSessionEventHandlingStrategy(this, builder.queueLowerWatermark, builder.queueUpperWatermark); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder extends AbstractSessionConnectorBuilder { + private Builder() { + super(Builder.class); + } + + @Override + protected ThreadedSocketInitiator doBuild() throws ConfigError { + return new ThreadedSocketInitiator(this); + } + } + public ThreadedSocketInitiator(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, LogFactory logFactory, MessageFactory messageFactory, int queueCapacity) throws ConfigError { diff --git a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java index df3ba2660..1c32dcb36 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java +++ b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java @@ -95,4 +95,7 @@ public String getRemoteAddress() { return null; } + IoSession getIoSession() { + return ioSession; + } } diff --git a/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java b/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java new file mode 100644 index 000000000..dd97a47a8 --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java @@ -0,0 +1,30 @@ +package quickfix.mina; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +interface QueueTracker { + void put(E e) throws InterruptedException; + E poll(long timeout, TimeUnit unit) throws InterruptedException; + int drainTo(Collection collection); + + static QueueTracker wrap(BlockingQueue queue) { + return new QueueTracker() { + @Override + public void put(E e) throws InterruptedException { + queue.put(e); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + + @Override + public int drainTo(Collection collection) { + return queue.drainTo(collection); + } + }; + } +} diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index e73056570..d86802fec 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -20,11 +20,10 @@ package quickfix.mina; -import quickfix.LogUtil; -import quickfix.Message; -import quickfix.Session; -import quickfix.SessionID; -import quickfix.SystemTime; +import org.apache.mina.core.session.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.*; import java.util.ArrayList; import java.util.List; @@ -38,8 +37,11 @@ * Processes messages for all sessions in a single thread. */ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrategy { + private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class); + public static final String MESSAGE_PROCESSOR_THREAD_NAME = "QFJ Message Processor"; private final BlockingQueue eventQueue; + private final QueueTracker queueTracker; private final SessionConnector sessionConnector; private volatile ThreadAdapter messageProcessingThread; private volatile boolean isStopped; @@ -49,6 +51,39 @@ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrateg public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueCapacity) { sessionConnector = connector; eventQueue = new LinkedBlockingQueue<>(queueCapacity); + queueTracker = QueueTracker.wrap(eventQueue); + } + + public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) { + sessionConnector = connector; + eventQueue = new LinkedBlockingQueue<>(); + queueTracker = WatermarkTracker.newMulti(eventQueue, queueLowerWatermark, queueUpperWatermark, + evt -> evt.quickfixSession, + qfSession -> { // lower watermark crossed down, while reads suspended + final IoSession ioSession = lookupIoSession(qfSession); + if (ioSession != null && ioSession.isReadSuspended()) { + ioSession.resumeRead(); + LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", + qfSession.getSessionID(), queueLowerWatermark); + } + }, + qfSession -> { // upper watermark crossed up, while reads active + final IoSession ioSession = lookupIoSession(qfSession); + if (ioSession != null && !ioSession.isReadSuspended()) { + ioSession.suspendRead(); + LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended", + qfSession.getSessionID(), queueUpperWatermark); + } + }); + } + private static IoSession lookupIoSession(Session qfSession) { + final Responder responder = qfSession.getResponder(); + + if (responder instanceof IoSessionResponder) { + return ((IoSessionResponder)responder).getIoSession(); + } else { + return null; + } } public void setExecutor(Executor executor) { @@ -61,7 +96,7 @@ public void onMessage(Session quickfixSession, Message message) { return; } try { - eventQueue.put(new SessionMessageEvent(quickfixSession, message)); + queueTracker.put(new SessionMessageEvent(quickfixSession, message)); } catch (InterruptedException e) { isStopped = true; throw new RuntimeException(e); @@ -79,7 +114,7 @@ public void block() { if (isStopped) { if (!eventQueue.isEmpty()) { final List tempList = new ArrayList<>(); - eventQueue.drainTo(tempList); + queueTracker.drainTo(tempList); for (SessionMessageEvent event : tempList) { event.processMessage(); } @@ -107,7 +142,7 @@ public void block() { } private SessionMessageEvent getMessage() throws InterruptedException { - return eventQueue.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); + return queueTracker.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); } /** diff --git a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java index 209a78bc4..1490d2ed8 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java @@ -20,10 +20,10 @@ package quickfix.mina; -import quickfix.LogUtil; -import quickfix.Message; -import quickfix.Session; -import quickfix.SessionID; +import org.apache.mina.core.session.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.*; import java.util.ArrayList; import java.util.Collection; @@ -39,15 +39,27 @@ * Processes messages in a session-specific thread. */ public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy { + private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class); private final ConcurrentMap dispatchers = new ConcurrentHashMap<>(); private final SessionConnector sessionConnector; private final int queueCapacity; + private final int queueLowerWatermark; + private final int queueUpperWatermark; private volatile Executor executor; public ThreadPerSessionEventHandlingStrategy(SessionConnector connector, int queueCapacity) { sessionConnector = connector; this.queueCapacity = queueCapacity; + this.queueLowerWatermark = -1; + this.queueUpperWatermark = -1; + } + + public ThreadPerSessionEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) { + sessionConnector = connector; + this.queueCapacity = -1; + this.queueLowerWatermark = queueLowerWatermark; + this.queueUpperWatermark = queueUpperWatermark; } public void setExecutor(Executor executor) { @@ -59,7 +71,7 @@ public void onMessage(Session quickfixSession, Message message) { MessageDispatchingThread dispatcher = dispatchers.get(quickfixSession.getSessionID()); if (dispatcher == null) { dispatcher = dispatchers.computeIfAbsent(quickfixSession.getSessionID(), sessionID -> { - final MessageDispatchingThread newDispatcher = new MessageDispatchingThread(quickfixSession, queueCapacity, executor); + final MessageDispatchingThread newDispatcher = new MessageDispatchingThread(quickfixSession, executor); startDispatcherThread(newDispatcher); return newDispatcher; }); @@ -161,13 +173,46 @@ public void execute(Runnable command) { protected class MessageDispatchingThread extends ThreadAdapter { private final Session quickfixSession; private final BlockingQueue messages; + private final QueueTracker queueTracker; private volatile boolean stopped; private volatile boolean stopping; - private MessageDispatchingThread(Session session, int queueCapacity, Executor executor) { + private MessageDispatchingThread(Session session, Executor executor) { super("QF/J Session dispatcher: " + session.getSessionID(), executor); quickfixSession = session; - messages = new LinkedBlockingQueue<>(queueCapacity); + if (queueCapacity >= 0) { + messages = new LinkedBlockingQueue<>(queueCapacity); + queueTracker = QueueTracker.wrap(messages); + } else { + messages = new LinkedBlockingQueue<>(); + queueTracker = WatermarkTracker.newMono(messages, queueLowerWatermark, queueUpperWatermark, + () -> { // lower watermark crossed down, while reads suspended + final IoSession ioSession = lookupIoSession(); + if (ioSession != null && ioSession.isReadSuspended()) { + ioSession.resumeRead(); + LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", + quickfixSession.getSessionID(), queueLowerWatermark); + } + }, + () -> { // upper watermark crossed up, while reads active + final IoSession ioSession = lookupIoSession(); + if (ioSession != null && !ioSession.isReadSuspended()) { + ioSession.suspendRead(); + LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended", + quickfixSession.getSessionID(), queueUpperWatermark); + } + }); + } + } + + private IoSession lookupIoSession() { + final Responder responder = quickfixSession.getResponder(); + + if (responder instanceof IoSessionResponder) { + return ((IoSessionResponder)responder).getIoSession(); + } else { + return null; + } } public void enqueue(Message message) { @@ -175,7 +220,7 @@ public void enqueue(Message message) { return; } try { - messages.put(message); + queueTracker.put(message); } catch (final InterruptedException e) { quickfixSession.getLog().onErrorEvent(e.toString()); } @@ -189,7 +234,7 @@ public int getQueueSize() { void doRun() { while (!stopping) { try { - final Message message = getNextMessage(messages); + final Message message = getNextMessage(queueTracker); if (message == null) { // no message available in polling interval continue; @@ -209,7 +254,7 @@ void doRun() { } if (!messages.isEmpty()) { final List tempList = new ArrayList<>(); - messages.drainTo(tempList); + queueTracker.drainTo(tempList); for (Message message : tempList) { try { quickfixSession.next(message); @@ -245,12 +290,12 @@ protected MessageDispatchingThread getDispatcher(SessionID sessionID) { * We do not block indefinitely as that would prevent this thread from ever stopping * * @see #THREAD_WAIT_FOR_MESSAGE_MS - * @param messages + * @param queueTracker * @return next message or null if nothing arrived within the timeout period * @throws InterruptedException */ - protected Message getNextMessage(BlockingQueue messages) throws InterruptedException { - return messages.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); + protected Message getNextMessage(QueueTracker queueTracker) throws InterruptedException { + return queueTracker.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); } @Override diff --git a/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java new file mode 100644 index 000000000..44ddf1e31 --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java @@ -0,0 +1,185 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +/* + * Copyright (c) 2018 Vladimir Lysyy (mrbald@github) + * ALv2 (http://www.apache.org/licenses/LICENSE-2.0) + */ + +package quickfix.mina; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A blocking queue wrapper implementing watermarks-based back pressure propagation + * from the queue sink to one or more logical sources. + * + * @param payload type + * @param logical source key type + * + * @author Vladimir Lysyy (mrbald@github) + */ +public class WatermarkTracker implements QueueTracker { + private final BlockingQueue queue; + private final long lowerWatermark; + private final long upperWatermark; + private final Consumer onLowerWatermarkCrossed; + private final Consumer onUpperWatermarkCrossed; + private final Function classifier; + private final Function trackerSupplier; + + class StreamTracker { + private final S key; + long counter = 0; + private boolean suspended = false; + + StreamTracker(S key) { + this.key = key; + } + + synchronized void incoming(int n) { + if ((counter += n) >= upperWatermark && !suspended) { + suspended = true; + onUpperWatermarkCrossed.accept(key); + } + } + + synchronized void outgoing(int n) { + if ((counter -= n) == lowerWatermark && suspended) { + suspended = false; + onLowerWatermarkCrossed.accept(key); + } + } + + synchronized boolean isSuspended() { + return suspended; + } + } + + public static WatermarkTracker newMono( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) { + return new WatermarkTracker(queue, lowerWatermark, upperWatermark, onLowerWatermarkCrossed, onUpperWatermarkCrossed); + } + + public static WatermarkTracker newMulti( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Function classifier, + Consumer onLowerWatermarkCrossed, Consumer onUpperWatermarkCrossed) { + return new WatermarkTracker<>(queue, lowerWatermark, upperWatermark, classifier, onLowerWatermarkCrossed, onUpperWatermarkCrossed); + } + + private WatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Function classifier, + Consumer onLowerWatermarkCrossed, Consumer onUpperWatermarkCrossed) { + assert lowerWatermark >= 0 && lowerWatermark < upperWatermark; + + this.queue = queue; + this.lowerWatermark = lowerWatermark; + this.upperWatermark = upperWatermark; + this.classifier = classifier; + this.onLowerWatermarkCrossed = onLowerWatermarkCrossed; + this.onUpperWatermarkCrossed = onUpperWatermarkCrossed; + + final Map trackerMap = new ConcurrentHashMap<>(); + + this.trackerSupplier = key -> trackerMap.computeIfAbsent(key, StreamTracker::new); + } + + private WatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) { + assert lowerWatermark >= 0 && lowerWatermark < upperWatermark; + + this.queue = queue; + this.lowerWatermark = lowerWatermark; + this.upperWatermark = upperWatermark; + this.classifier = x -> null; + this.onLowerWatermarkCrossed = x -> onLowerWatermarkCrossed.run(); + this.onUpperWatermarkCrossed = x -> onUpperWatermarkCrossed.run(); + + final StreamTracker streamTracker = new StreamTracker(null); + + this.trackerSupplier = key -> streamTracker; + } + + @Override + public void put(E e) throws InterruptedException { + queue.put(e); + trackerForPayload(e).incoming(1); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + final E e = queue.poll(timeout, unit); + + if (e != null) { + trackerForPayload(e).outgoing(1); + } + + return e; + } + + @Override + public int drainTo(Collection collection) { + return queue.drainTo(new AbstractCollection() { + @Override public Iterator iterator() { throw new UnsupportedOperationException(); } + @Override public int size() { throw new UnsupportedOperationException(); } + + public boolean add(E e) { + final boolean added = collection.add(e); + if (added) { + trackerForPayload(e).outgoing(1); + } + return added; + } + + }); + } + + public boolean isSuspended(S key) { + return trackerForStream(key).isSuspended(); + } + + public boolean isSuspended() { + return isSuspended(null); + } + + StreamTracker trackerForPayload(E e) { + return trackerForStream(classifier.apply(e)); + } + + StreamTracker trackerForStream(S s) { + return trackerSupplier.apply(s); + } + +} diff --git a/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java b/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java index 06d005f17..e1a286f1f 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/ThreadPerSessionEventHandlingStrategyTest.java @@ -47,7 +47,6 @@ import quickfix.fix40.Logon; import java.util.Date; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -74,7 +73,7 @@ protected void startDispatcherThread( } @Override - protected Message getNextMessage(BlockingQueue messages) throws InterruptedException { + protected Message getNextMessage(QueueTracker queueTracker) throws InterruptedException { if (getMessageCount-- == 0) { throw new InterruptedException("END COUNT"); } @@ -84,7 +83,7 @@ protected Message getNextMessage(BlockingQueue messages) throws Interru } throw (RuntimeException) getNextMessageException; } - return super.getNextMessage(messages); + return super.getNextMessage(queueTracker); } } diff --git a/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java b/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java new file mode 100644 index 000000000..a7a053629 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java @@ -0,0 +1,156 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +/* + * Copyright (c) 2018 Vladimir Lysyy (mrbald@github) + * ALv2 (http://www.apache.org/licenses/LICENSE-2.0) + */ + +package quickfix.mina; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; + +import static quickfix.mina.WatermarkTracker.newMono; +import static quickfix.mina.WatermarkTracker.newMulti; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class WatermarkTrackerTest { + @Mock + private Runnable onLowerMono; + + @Mock + private Runnable onUpperMono; + + @Mock + private Consumer onLowerMulti; + + @Mock + private Consumer onUpperMulti; + + private BlockingQueue queue; + + private WatermarkTracker mono; + + @Before + public void setUp() { + queue = new ArrayBlockingQueue<>(4); + mono = newMono(queue, 1, 3, onLowerMono, onUpperMono); + } + + @Test + public void basics() throws InterruptedException { + mono.put(1); + assertEquals(1, queue.size()); + + final Integer x = mono.poll(1, TimeUnit.DAYS); + assertEquals(0, queue.size()); + assertEquals(1, x.intValue()); + } + + /* + * Queue size over time in the below test, covers all scenarios + * + * (queue size) + * 3 * * + * 2 * * * + * 1 * * * + * 0 * * * + * 1 2 3 4 5 6 7 8 9 0 1 (steps) + */ + @Test + public void watermarks() throws InterruptedException { + // cross lower watermark up + mono.put(1); + verifyState(false, false, false); + + // cross lower watermark down while not suspended + mono.poll(1, TimeUnit.DAYS); + verifyState(false, false, false); + + // cross lower then upper watermarks up + mono.put(1); + verifyState(false, false, false); + + mono.put(2); + verifyState(false, false, false); + + mono.put(3); + verifyState(true, false, true); + + // cross upper watermark down + mono.poll(1, TimeUnit.DAYS); // 3 + verifyState(true, false, false); + + // cross upper watermark back up (without reaching the lower) + mono.put(3); + verifyState(true, false, false); + + // cross upper then lower watermarks down + mono.poll(1, TimeUnit.DAYS); // 2 + verifyState(true, false, false); + + mono.poll(1, TimeUnit.DAYS); // 2 + verifyState(false, true, false); + + mono.poll(1, TimeUnit.DAYS); // 1 + verifyState(false, false, false); + } + + @Test + public void multiShouldWork() throws InterruptedException { + final Function classifier = x -> x % 2; + final WatermarkTracker multi + = newMulti(queue, 1, 3, classifier, onLowerMulti, onUpperMulti); + + assertEquals(multi.trackerForStream(0), multi.trackerForStream(0)); + assertEquals(multi.trackerForStream(1), multi.trackerForStream(1)); + assertNotEquals(multi.trackerForStream(0), multi.trackerForStream(1)); + + assertEquals(multi.trackerForPayload(0), multi.trackerForPayload(2)); + assertEquals(multi.trackerForPayload(1), multi.trackerForPayload(3)); + assertNotEquals(multi.trackerForPayload(0), multi.trackerForPayload(1)); + + multi.put(1); + assertEquals(1, multi.trackerForPayload(1).counter); + multi.put(3); + assertEquals(2, multi.trackerForPayload(3).counter); + } + + // === helpers === + + private void verifyState(boolean suspended, boolean onLower, boolean onUpper) { + assertEquals(suspended, mono.isSuspended()); + verify(onLowerMono, times(onLower ? 1 : 0)).run(); + verify(onUpperMono, times(onUpper ? 1 : 0)).run(); + reset(onLowerMono, onUpperMono); + } +} \ No newline at end of file From b6a7131624a6e0f3213d163607c76a34cbb2c4f9 Mon Sep 17 00:00:00 2001 From: mrbald Date: Wed, 31 Jan 2018 00:19:11 +0100 Subject: [PATCH 2/4] QFJ-943: code review feedback --- quickfixj-core/pom.xml | 18 ------------------ .../quickfix/mina/EventHandlingStrategy.java | 13 +++++++++++++ .../SingleThreadedEventHandlingStrategy.java | 11 ++--------- .../ThreadPerSessionEventHandlingStrategy.java | 16 ++++------------ .../java/quickfix/mina/WatermarkTracker.java | 9 --------- 5 files changed, 19 insertions(+), 48 deletions(-) diff --git a/quickfixj-core/pom.xml b/quickfixj-core/pom.xml index 9fb67d54d..65a58df01 100644 --- a/quickfixj-core/pom.xml +++ b/quickfixj-core/pom.xml @@ -401,24 +401,6 @@ - - org.codehaus.mojo - build-helper-maven-plugin - 1.9.1 - - - generate-sources - - add-source - - - - ${project.build.directory}/generated-sources - - - - - diff --git a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java index 1b2b50e81..77ef55ed9 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java @@ -19,7 +19,9 @@ package quickfix.mina; +import org.apache.mina.core.session.IoSession; import quickfix.Message; +import quickfix.Responder; import quickfix.Session; import quickfix.SessionID; @@ -48,4 +50,15 @@ public interface EventHandlingStrategy { int getQueueSize(); int getQueueSize(SessionID sessionID); + + static IoSession lookupIoSession(Session qfSession) { + final Responder responder = qfSession.getResponder(); + + if (responder instanceof IoSessionResponder) { + return ((IoSessionResponder)responder).getIoSession(); + } else { + return null; + } + } + } diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index d86802fec..1c1dab60b 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -33,6 +33,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static quickfix.mina.EventHandlingStrategy.lookupIoSession; + /** * Processes messages for all sessions in a single thread. */ @@ -76,15 +78,6 @@ public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queue } }); } - private static IoSession lookupIoSession(Session qfSession) { - final Responder responder = qfSession.getResponder(); - - if (responder instanceof IoSessionResponder) { - return ((IoSessionResponder)responder).getIoSession(); - } else { - return null; - } - } public void setExecutor(Executor executor) { this.executor = executor; diff --git a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java index 1490d2ed8..f51ef5f68 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java @@ -35,6 +35,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static quickfix.mina.EventHandlingStrategy.lookupIoSession; + /** * Processes messages in a session-specific thread. */ @@ -187,7 +189,7 @@ private MessageDispatchingThread(Session session, Executor executor) { messages = new LinkedBlockingQueue<>(); queueTracker = WatermarkTracker.newMono(messages, queueLowerWatermark, queueUpperWatermark, () -> { // lower watermark crossed down, while reads suspended - final IoSession ioSession = lookupIoSession(); + final IoSession ioSession = lookupIoSession(quickfixSession); if (ioSession != null && ioSession.isReadSuspended()) { ioSession.resumeRead(); LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", @@ -195,7 +197,7 @@ private MessageDispatchingThread(Session session, Executor executor) { } }, () -> { // upper watermark crossed up, while reads active - final IoSession ioSession = lookupIoSession(); + final IoSession ioSession = lookupIoSession(quickfixSession); if (ioSession != null && !ioSession.isReadSuspended()) { ioSession.suspendRead(); LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended", @@ -205,16 +207,6 @@ private MessageDispatchingThread(Session session, Executor executor) { } } - private IoSession lookupIoSession() { - final Responder responder = quickfixSession.getResponder(); - - if (responder instanceof IoSessionResponder) { - return ((IoSessionResponder)responder).getIoSession(); - } else { - return null; - } - } - public void enqueue(Message message) { if (message == END_OF_STREAM && stopping) { return; diff --git a/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java index 44ddf1e31..a16ba6d55 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java +++ b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java @@ -17,11 +17,6 @@ * are not clear to you. ******************************************************************************/ -/* - * Copyright (c) 2018 Vladimir Lysyy (mrbald@github) - * ALv2 (http://www.apache.org/licenses/LICENSE-2.0) - */ - package quickfix.mina; import java.util.AbstractCollection; @@ -100,8 +95,6 @@ private WatermarkTracker( long lowerWatermark, long upperWatermark, Function classifier, Consumer onLowerWatermarkCrossed, Consumer onUpperWatermarkCrossed) { - assert lowerWatermark >= 0 && lowerWatermark < upperWatermark; - this.queue = queue; this.lowerWatermark = lowerWatermark; this.upperWatermark = upperWatermark; @@ -118,8 +111,6 @@ private WatermarkTracker( BlockingQueue queue, long lowerWatermark, long upperWatermark, Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) { - assert lowerWatermark >= 0 && lowerWatermark < upperWatermark; - this.queue = queue; this.lowerWatermark = lowerWatermark; this.upperWatermark = upperWatermark; From 9cccdb3e3afb2fbf35239cd1c2b81c887f50f983 Mon Sep 17 00:00:00 2001 From: mrbald Date: Thu, 15 Feb 2018 20:04:27 +0100 Subject: [PATCH 3/4] QFJ-943: code review feedback - watermarks logging normalized --- .../src/main/java/quickfix/mina/EventHandlingStrategy.java | 2 ++ .../quickfix/mina/SingleThreadedEventHandlingStrategy.java | 7 +++---- .../mina/ThreadPerSessionEventHandlingStrategy.java | 7 +++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java index 77ef55ed9..3b22e4831 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java @@ -30,6 +30,8 @@ * it only handles message reception events. */ public interface EventHandlingStrategy { + final String LOWER_WATERMARK_FMT = "inbound queue size < lower watermark (%d), socket reads resumed"; + final String UPPER_WATERMARK_FMT = "inbound queue size > upper watermark (%d), socket reads suspended"; /** * Constant indicating how long we wait for an incoming message. After diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index 1c1dab60b..0bf3c45be 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -33,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static java.lang.String.format; import static quickfix.mina.EventHandlingStrategy.lookupIoSession; /** @@ -65,16 +66,14 @@ public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queue final IoSession ioSession = lookupIoSession(qfSession); if (ioSession != null && ioSession.isReadSuspended()) { ioSession.resumeRead(); - LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", - qfSession.getSessionID(), queueLowerWatermark); + qfSession.getLog().onEvent(format(LOWER_WATERMARK_FMT, queueLowerWatermark)); } }, qfSession -> { // upper watermark crossed up, while reads active final IoSession ioSession = lookupIoSession(qfSession); if (ioSession != null && !ioSession.isReadSuspended()) { ioSession.suspendRead(); - LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended", - qfSession.getSessionID(), queueUpperWatermark); + qfSession.getLog().onEvent(format(UPPER_WATERMARK_FMT, queueUpperWatermark)); } }); } diff --git a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java index f51ef5f68..a6e382204 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java @@ -35,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static java.lang.String.format; import static quickfix.mina.EventHandlingStrategy.lookupIoSession; /** @@ -192,16 +193,14 @@ private MessageDispatchingThread(Session session, Executor executor) { final IoSession ioSession = lookupIoSession(quickfixSession); if (ioSession != null && ioSession.isReadSuspended()) { ioSession.resumeRead(); - LOG.info("{}: inbound queue size < lower watermark ({}), socket reads resumed", - quickfixSession.getSessionID(), queueLowerWatermark); + quickfixSession.getLog().onEvent(format(LOWER_WATERMARK_FMT, queueLowerWatermark)); } }, () -> { // upper watermark crossed up, while reads active final IoSession ioSession = lookupIoSession(quickfixSession); if (ioSession != null && !ioSession.isReadSuspended()) { ioSession.suspendRead(); - LOG.info("{}: inbound queue size > upper watermark ({}), socket reads suspended", - quickfixSession.getSessionID(), queueUpperWatermark); + quickfixSession.getLog().onEvent(format(UPPER_WATERMARK_FMT, queueUpperWatermark)); } }); } From a65915edbc420273b24a8014060e8be7dd02bda8 Mon Sep 17 00:00:00 2001 From: mrbald Date: Sun, 4 Mar 2018 21:08:15 +0100 Subject: [PATCH 4/4] QFJ-943: code review feedback - cleaned up last bits of cut/paste + leftover mrbald copyright --- .../quickfix/mina/EventHandlingStrategy.java | 16 ---- .../main/java/quickfix/mina/QueueTracker.java | 19 ---- .../java/quickfix/mina/QueueTrackers.java | 89 +++++++++++++++++++ .../SingleThreadedEventHandlingStrategy.java | 29 ++---- ...ThreadPerSessionEventHandlingStrategy.java | 28 ++---- .../java/quickfix/mina/WatermarkTracker.java | 6 +- .../quickfix/mina/WatermarkTrackerTest.java | 5 -- 7 files changed, 102 insertions(+), 90 deletions(-) create mode 100644 quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java diff --git a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java index 3b22e4831..573288f17 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/EventHandlingStrategy.java @@ -19,9 +19,7 @@ package quickfix.mina; -import org.apache.mina.core.session.IoSession; import quickfix.Message; -import quickfix.Responder; import quickfix.Session; import quickfix.SessionID; @@ -30,9 +28,6 @@ * it only handles message reception events. */ public interface EventHandlingStrategy { - final String LOWER_WATERMARK_FMT = "inbound queue size < lower watermark (%d), socket reads resumed"; - final String UPPER_WATERMARK_FMT = "inbound queue size > upper watermark (%d), socket reads suspended"; - /** * Constant indicating how long we wait for an incoming message. After * thread has been asked to stop, it can take up to this long to terminate. @@ -52,15 +47,4 @@ public interface EventHandlingStrategy { int getQueueSize(); int getQueueSize(SessionID sessionID); - - static IoSession lookupIoSession(Session qfSession) { - final Responder responder = qfSession.getResponder(); - - if (responder instanceof IoSessionResponder) { - return ((IoSessionResponder)responder).getIoSession(); - } else { - return null; - } - } - } diff --git a/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java b/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java index dd97a47a8..3f47341bf 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java +++ b/quickfixj-core/src/main/java/quickfix/mina/QueueTracker.java @@ -8,23 +8,4 @@ interface QueueTracker { void put(E e) throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; int drainTo(Collection collection); - - static QueueTracker wrap(BlockingQueue queue) { - return new QueueTracker() { - @Override - public void put(E e) throws InterruptedException { - queue.put(e); - } - - @Override - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } - - @Override - public int drainTo(Collection collection) { - return queue.drainTo(collection); - } - }; - } } diff --git a/quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java b/quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java new file mode 100644 index 000000000..00f1ecb3b --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/mina/QueueTrackers.java @@ -0,0 +1,89 @@ +package quickfix.mina; + +import org.apache.mina.core.session.IoSession; +import quickfix.Responder; +import quickfix.Session; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static java.lang.String.format; + +/** + * Queue trackers factory methods + */ +final class QueueTrackers { + private static final String LOWER_WATERMARK_FMT = "inbound queue size < lower watermark (%d), socket reads resumed"; + private static final String UPPER_WATERMARK_FMT = "inbound queue size > upper watermark (%d), socket reads suspended"; + + /** + * Watermarks-based queue tracker + */ + static WatermarkTracker newMultiSessionWatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Function classifier) { + return WatermarkTracker.newMulti(queue, lowerWatermark, upperWatermark, classifier, + qfSession -> resumeReads(qfSession, (int)lowerWatermark), + qfSession -> suspendReads(qfSession, (int)upperWatermark)); + } + + /** + * Default no-op queue tracker + */ + static QueueTracker newDefaultQueueTracker(BlockingQueue queue) { + return new QueueTracker() { + @Override + public void put(E e) throws InterruptedException { + queue.put(e); + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + + @Override + public int drainTo(Collection collection) { + return queue.drainTo(collection); + } + }; + } + + private static IoSession lookupIoSession(Session qfSession) { + final Responder responder = qfSession.getResponder(); + + if (responder instanceof IoSessionResponder) { + return ((IoSessionResponder)responder).getIoSession(); + } else { + return null; + } + } + + private static void resumeReads(Session qfSession, int queueLowerWatermark) { + final IoSession ioSession = lookupIoSession(qfSession); + if (ioSession != null && ioSession.isReadSuspended()) { + ioSession.resumeRead(); + qfSession.getLog().onEvent(format(LOWER_WATERMARK_FMT, queueLowerWatermark)); + } + } + + private static void suspendReads(Session qfSession, int queueUpperWatermark) { + final IoSession ioSession = lookupIoSession(qfSession); + if (ioSession != null && !ioSession.isReadSuspended()) { + ioSession.suspendRead(); + qfSession.getLog().onEvent(format(UPPER_WATERMARK_FMT, queueUpperWatermark)); + } + } + + static WatermarkTracker newSingleSessionWatermarkTracker( + BlockingQueue queue, + long lowerWatermark, long upperWatermark, + Session qfSession) { + return WatermarkTracker.newMono(queue, lowerWatermark, upperWatermark, + () -> resumeReads(qfSession, (int)lowerWatermark), + () -> suspendReads(qfSession, (int)upperWatermark)); + } +} diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index 0bf3c45be..b14f4c653 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -20,9 +20,6 @@ package quickfix.mina; -import org.apache.mina.core.session.IoSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import quickfix.*; import java.util.ArrayList; @@ -33,15 +30,13 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static java.lang.String.format; -import static quickfix.mina.EventHandlingStrategy.lookupIoSession; +import static quickfix.mina.QueueTrackers.newDefaultQueueTracker; +import static quickfix.mina.QueueTrackers.newMultiSessionWatermarkTracker; /** * Processes messages for all sessions in a single thread. */ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrategy { - private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class); - public static final String MESSAGE_PROCESSOR_THREAD_NAME = "QFJ Message Processor"; private final BlockingQueue eventQueue; private final QueueTracker queueTracker; @@ -54,28 +49,14 @@ public class SingleThreadedEventHandlingStrategy implements EventHandlingStrateg public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueCapacity) { sessionConnector = connector; eventQueue = new LinkedBlockingQueue<>(queueCapacity); - queueTracker = QueueTracker.wrap(eventQueue); + queueTracker = newDefaultQueueTracker(eventQueue); } public SingleThreadedEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) { sessionConnector = connector; eventQueue = new LinkedBlockingQueue<>(); - queueTracker = WatermarkTracker.newMulti(eventQueue, queueLowerWatermark, queueUpperWatermark, - evt -> evt.quickfixSession, - qfSession -> { // lower watermark crossed down, while reads suspended - final IoSession ioSession = lookupIoSession(qfSession); - if (ioSession != null && ioSession.isReadSuspended()) { - ioSession.resumeRead(); - qfSession.getLog().onEvent(format(LOWER_WATERMARK_FMT, queueLowerWatermark)); - } - }, - qfSession -> { // upper watermark crossed up, while reads active - final IoSession ioSession = lookupIoSession(qfSession); - if (ioSession != null && !ioSession.isReadSuspended()) { - ioSession.suspendRead(); - qfSession.getLog().onEvent(format(UPPER_WATERMARK_FMT, queueUpperWatermark)); - } - }); + queueTracker = newMultiSessionWatermarkTracker(eventQueue, queueLowerWatermark, queueUpperWatermark, + evt -> evt.quickfixSession); } public void setExecutor(Executor executor) { diff --git a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java index a6e382204..5cdd3bbe0 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java @@ -20,9 +20,6 @@ package quickfix.mina; -import org.apache.mina.core.session.IoSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import quickfix.*; import java.util.ArrayList; @@ -35,15 +32,13 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static java.lang.String.format; -import static quickfix.mina.EventHandlingStrategy.lookupIoSession; +import static quickfix.mina.QueueTrackers.newDefaultQueueTracker; +import static quickfix.mina.QueueTrackers.newSingleSessionWatermarkTracker; /** * Processes messages in a session-specific thread. */ public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy { - private static final Logger LOG = LoggerFactory.getLogger(EventHandlingStrategy.class); - private final ConcurrentMap dispatchers = new ConcurrentHashMap<>(); private final SessionConnector sessionConnector; private final int queueCapacity; @@ -185,24 +180,11 @@ private MessageDispatchingThread(Session session, Executor executor) { quickfixSession = session; if (queueCapacity >= 0) { messages = new LinkedBlockingQueue<>(queueCapacity); - queueTracker = QueueTracker.wrap(messages); + queueTracker = newDefaultQueueTracker(messages); } else { messages = new LinkedBlockingQueue<>(); - queueTracker = WatermarkTracker.newMono(messages, queueLowerWatermark, queueUpperWatermark, - () -> { // lower watermark crossed down, while reads suspended - final IoSession ioSession = lookupIoSession(quickfixSession); - if (ioSession != null && ioSession.isReadSuspended()) { - ioSession.resumeRead(); - quickfixSession.getLog().onEvent(format(LOWER_WATERMARK_FMT, queueLowerWatermark)); - } - }, - () -> { // upper watermark crossed up, while reads active - final IoSession ioSession = lookupIoSession(quickfixSession); - if (ioSession != null && !ioSession.isReadSuspended()) { - ioSession.suspendRead(); - quickfixSession.getLog().onEvent(format(UPPER_WATERMARK_FMT, queueUpperWatermark)); - } - }); + queueTracker = newSingleSessionWatermarkTracker(messages, queueLowerWatermark, queueUpperWatermark, + quickfixSession); } } diff --git a/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java index a16ba6d55..25e910e7b 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java +++ b/quickfixj-core/src/main/java/quickfix/mina/WatermarkTracker.java @@ -75,14 +75,14 @@ synchronized boolean isSuspended() { } } - public static WatermarkTracker newMono( + static WatermarkTracker newMono( BlockingQueue queue, long lowerWatermark, long upperWatermark, Runnable onLowerWatermarkCrossed, Runnable onUpperWatermarkCrossed) { - return new WatermarkTracker(queue, lowerWatermark, upperWatermark, onLowerWatermarkCrossed, onUpperWatermarkCrossed); + return new WatermarkTracker<>(queue, lowerWatermark, upperWatermark, onLowerWatermarkCrossed, onUpperWatermarkCrossed); } - public static WatermarkTracker newMulti( + static WatermarkTracker newMulti( BlockingQueue queue, long lowerWatermark, long upperWatermark, Function classifier, diff --git a/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java b/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java index a7a053629..fbb6d9ccf 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/WatermarkTrackerTest.java @@ -17,11 +17,6 @@ * are not clear to you. ******************************************************************************/ -/* - * Copyright (c) 2018 Vladimir Lysyy (mrbald@github) - * ALv2 (http://www.apache.org/licenses/LICENSE-2.0) - */ - package quickfix.mina; import org.junit.Before;