diff --git a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html
index 9259dbf3c..48a2f5180 100644
--- a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html
+++ b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html
@@ -540,6 +540,13 @@
QuickFIX Settings
valid IP address in the format of x.x.x.x or a domain name |
If unset the socket will be bound to all local interfaces. |
+
+ DynamicSession |
+
+ Leave the corresponding session disconnected until AbstractSocketInitiator.createDynamicSession is called |
+ Y N |
+ N |
+
Acceptor |
diff --git a/quickfixj-core/src/main/java/quickfix/Initiator.java b/quickfixj-core/src/main/java/quickfix/Initiator.java
index ffb39bf63..a0e742e87 100644
--- a/quickfixj-core/src/main/java/quickfix/Initiator.java
+++ b/quickfixj-core/src/main/java/quickfix/Initiator.java
@@ -116,4 +116,11 @@ public interface Initiator extends Connector {
* type is "initiator".
*/
String SETTING_PROXY_WORKSTATION = "ProxyWorkstation";
+
+ /**
+ * Leave the corresponding session disconnected until
+ * AbstractSocketInitiator.createDynamicSession is called
+ */
+ String SETTING_DYNAMIC_SESSION = "DynamicSession";
+
}
diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java
index b17ca5838..fd3ea0aa2 100644
--- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java
+++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java
@@ -78,75 +78,81 @@ protected void createSessionInitiators()
throws ConfigError {
try {
createSessions();
- SessionSettings settings = getSettings();
for (final Session session : getSessionMap().values()) {
- final SessionID sessionID = session.getSessionID();
- final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID);
-
- final SocketAddress[] socketAddresses = getSocketAddresses(sessionID);
- if (socketAddresses.length == 0) {
- throw new ConfigError("Must specify at least one socket address");
- }
-
- SocketAddress localAddress = getLocalAddress(settings, sessionID);
+ createInitiator(session);
+ }
+ } catch (final FieldConvertError e) {
+ throw new ConfigError(e);
+ }
+ }
- final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings()
- .getSessionProperties(sessionID, true));
+ private void createInitiator(final Session session) throws ConfigError, FieldConvertError {
+
+ SessionSettings settings = getSettings();
+ final SessionID sessionID = session.getSessionID();
+ final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID);
- boolean sslEnabled = false;
- SSLConfig sslConfig = null;
- if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL)
- && BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) {
- sslEnabled = true;
- sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID);
- }
+ final SocketAddress[] socketAddresses = getSocketAddresses(sessionID);
+ if (socketAddresses.length == 0) {
+ throw new ConfigError("Must specify at least one socket address");
+ }
- String proxyUser = null;
- String proxyPassword = null;
- String proxyHost = null;
+ SocketAddress localAddress = getLocalAddress(settings, sessionID);
- String proxyType = null;
- String proxyVersion = null;
+ final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings()
+ .getSessionProperties(sessionID, true));
- String proxyWorkstation = null;
- String proxyDomain = null;
+ boolean sslEnabled = false;
+ SSLConfig sslConfig = null;
+ if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL)
+ && BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) {
+ sslEnabled = true;
+ sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID);
+ }
- int proxyPort = -1;
+ String proxyUser = null;
+ String proxyPassword = null;
+ String proxyHost = null;
- if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) {
- proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE);
- if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) {
- proxyVersion = settings.getString(sessionID,
- Initiator.SETTING_PROXY_VERSION);
- }
+ String proxyType = null;
+ String proxyVersion = null;
- if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) {
- proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER);
- proxyPassword = settings.getString(sessionID,
- Initiator.SETTING_PROXY_PASSWORD);
- }
- if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION)
- && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
- proxyWorkstation = settings.getString(sessionID,
- Initiator.SETTING_PROXY_WORKSTATION);
- proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN);
- }
+ String proxyWorkstation = null;
+ String proxyDomain = null;
- proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST);
- proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
- }
+ int proxyPort = -1;
- final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
- socketAddresses, localAddress, reconnectingIntervals,
- getScheduledExecutorService(), networkingOptions,
- getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
- proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
+ if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) {
+ proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE);
+ if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) {
+ proxyVersion = settings.getString(sessionID,
+ Initiator.SETTING_PROXY_VERSION);
+ }
- initiators.add(ioSessionInitiator);
+ if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) {
+ proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER);
+ proxyPassword = settings.getString(sessionID,
+ Initiator.SETTING_PROXY_PASSWORD);
}
- } catch (final FieldConvertError e) {
- throw new ConfigError(e);
+ if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION)
+ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
+ proxyWorkstation = settings.getString(sessionID,
+ Initiator.SETTING_PROXY_WORKSTATION);
+ proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN);
+ }
+
+ proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST);
+ proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
}
+
+ final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
+ socketAddresses, localAddress, reconnectingIntervals,
+ getScheduledExecutorService(), networkingOptions,
+ getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
+ proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
+
+ initiators.add(ioSessionInitiator);
+
}
// QFJ-482
@@ -181,8 +187,10 @@ private void createSessions() throws ConfigError, FieldConvertError {
final SessionID sessionID = i.next();
if (isInitiatorSession(sessionID)) {
try {
- final Session quickfixSession = createSession(sessionID);
- initiatorSessions.put(sessionID, quickfixSession);
+ if (!settings.isSetting(sessionID, SETTING_DYNAMIC_SESSION) || !settings.getBool(sessionID, SETTING_DYNAMIC_SESSION)) {
+ final Session quickfixSession = createSession(sessionID);
+ initiatorSessions.put(sessionID, quickfixSession);
+ }
} catch (final Throwable e) {
if (continueInitOnError) {
log.error("error during session initialization, continuing...", e);
@@ -193,11 +201,20 @@ private void createSessions() throws ConfigError, FieldConvertError {
}
}
}
- if (initiatorSessions.isEmpty()) {
- throw new ConfigError("no initiators in settings");
- }
setSessions(initiatorSessions);
}
+
+ public void createDynamicSession(SessionID sessionID) throws ConfigError {
+
+ try {
+ Session session = createSession(sessionID);
+ super.addDynamicSession(session);
+ createInitiator(session);
+ startInitiators();
+ } catch (final FieldConvertError e) {
+ throw new ConfigError(e);
+ }
+ }
private int[] getReconnectIntervalInSeconds(SessionID sessionID) throws ConfigError {
final SessionSettings settings = getSettings();
diff --git a/quickfixj-core/src/test/java/quickfix/mina/SessionConnectorTest.java b/quickfixj-core/src/test/java/quickfix/mina/SessionConnectorTest.java
index 413b64e34..33f1eec6a 100644
--- a/quickfixj-core/src/test/java/quickfix/mina/SessionConnectorTest.java
+++ b/quickfixj-core/src/test/java/quickfix/mina/SessionConnectorTest.java
@@ -23,6 +23,7 @@
import quickfix.ConfigError;
import quickfix.DefaultSessionFactory;
import quickfix.FixVersions;
+import quickfix.Initiator;
import quickfix.MemoryStoreFactory;
import quickfix.RuntimeError;
import quickfix.SLF4JLogFactory;
@@ -31,7 +32,11 @@
import quickfix.SessionID;
import quickfix.SessionSettings;
import quickfix.SessionState;
+import quickfix.SocketInitiator;
import quickfix.UnitTestApplication;
+import quickfix.mina.initiator.AbstractSocketInitiator;
+import quickfix.mina.initiator.IoSessionInitiator;
+import quickfix.mina.ssl.SSLSupport;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
@@ -41,6 +46,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -187,6 +194,56 @@ public void testAddingRemovingDynamicSessions() throws Exception {
session2.close();
}
+ /**
+ * Test dynamic initiator sessions
+ */
+ @Test
+ public void testDynamicInitiatorSession() throws Exception {
+ SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX40, "TW", "ISLD");
+ SessionID sessionID2 = new SessionID(FixVersions.BEGINSTRING_FIX40, "me", "you");
+ SessionSettings settings = setUpInitiatorSessionSettings(sessionID);
+ DefaultSessionFactory sessionFactory = new DefaultSessionFactory(new UnitTestApplication(),
+ new MemoryStoreFactory(), new SLF4JLogFactory(new SessionSettings()));
+
+ AbstractSocketInitiatorUnderTest connector = new AbstractSocketInitiatorUnderTest(settings, sessionFactory);
+ connector.setSessions(new HashMap<>());
+ //Two sessions to test dynamic sessions while check initializers
+ connector.createDynamicSession(sessionID);
+ connector.createDynamicSession(sessionID2);
+ List sessions = connector.getManagedSessions();
+ //Check sessions created and available
+ assertEquals(2,sessions.size());
+ HashMap map = new HashMap<>();
+ for (Session s : sessions) {
+ map.put(s.getSessionID(), s);
+ }
+ assertNotNull(map.get(sessionID));
+ assertNotNull(map.get(sessionID2));
+ //Check initiators created and not null
+ assertEquals(2, connector.getInitiators().size());
+ Set initiators = connector.getInitiators();
+ for(IoSessionInitiator initiator: initiators){
+ assertNotNull(initiator);
+ }
+ connector.removeDynamicSession(sessionID);
+ connector.removeDynamicSession(sessionID2);
+ //Check if initiators are re - created for this sessions but not sessions available
+ assertEquals(0, connector.getManagedSessions().size());
+ connector.createSessionInitiators();
+ sessions=connector.getManagedSessions();
+ initiators=connector.getInitiators();
+ //Sessions re created during session initiatore re creation, initiators are stacked
+ assertEquals(2, sessions.size());
+ assertEquals(4,initiators.size());
+ //This should remove initiators
+ connector.stopInitiators();
+ assertEquals(0,connector.getInitiators().size());
+ //Tear down
+ for(Session s:sessions){
+ s.close();
+ }
+ }
+
private SessionSettings setUpSessionSettings(SessionID sessionID) {
SessionSettings settings = new SessionSettings();
settings.setString(Session.SETTING_USE_DATA_DICTIONARY, "N");
@@ -197,6 +254,34 @@ private SessionSettings setUpSessionSettings(SessionID sessionID) {
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
return settings;
}
+ private SessionSettings setUpInitiatorSessionSettings(SessionID sessionID) {
+ SessionSettings settings = new SessionSettings();
+ settings.setString(Session.SETTING_USE_DATA_DICTIONARY, "N");
+ settings.setString(Session.SETTING_START_TIME, "00:00:00");
+ settings.setString(Session.SETTING_END_TIME, "00:00:00");
+ settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PORT, "9999");
+ settings.setLong(Session.SETTING_HEARTBTINT,100L);
+ settings.setString(SocketInitiator.SETTING_SOCKET_CONNECT_HOST,"127.0.0.1");
+ settings.setString(SocketInitiator.SETTING_SOCKET_CONNECT_PORT,"54321");
+ settings.setString(SessionFactory.SETTING_CONNECTION_TYPE,
+ SessionFactory.INITIATOR_CONNECTION_TYPE);
+ settings.setBool( SSLSupport.SETTING_USE_SSL,true);
+ settings.setString(Initiator.SETTING_PROXY_TYPE,"socks");
+ settings.setString(Initiator.SETTING_PROXY_VERSION,"5");
+ settings.setString(Initiator.SETTING_PROXY_USER,"Test Proxy User");
+ settings.setString(Initiator.SETTING_PROXY_PASSWORD,"Test Proxy User Password");
+ settings.setString(Initiator.SETTING_PROXY_WORKSTATION,"Test Proxy Workstation");
+ settings.setString(Initiator.SETTING_PROXY_DOMAIN,"Test Proxy Domain");
+ settings.setString(Initiator.SETTING_PROXY_HOST,"Test Proxy Host");
+ settings.setString(Initiator.SETTING_PROXY_PORT,"888");
+
+
+
+ settings.setBool(Initiator.SETTING_DYNAMIC_SESSION,false);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE,
+ SessionFactory.INITIATOR_CONNECTION_TYPE);
+ return settings;
+ }
private final class SessionConnectorListener implements PropertyChangeListener {
public void propertyChange(PropertyChangeEvent event) {
@@ -224,4 +309,35 @@ public void stop(boolean force) {
public void block() throws ConfigError, RuntimeError {
}
}
+ private static class AbstractSocketInitiatorUnderTest extends AbstractSocketInitiator {
+
+ public AbstractSocketInitiatorUnderTest(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError {
+ super(settings, sessionFactory);
+ }
+
+ public void start() throws ConfigError, RuntimeError {
+ }
+ public void createDynamicSession(SessionID sessionID) throws ConfigError {
+ super.createDynamicSession(sessionID);
+ }
+ public void stop() {
+ }
+ public void stopInitiators(){
+ super.stopInitiators();
+ }
+ public void stop(boolean force) {
+ }
+
+ public void block() throws ConfigError, RuntimeError {
+ }
+ @Override
+ protected void createSessionInitiators() throws ConfigError {
+ super.createSessionInitiators();
+ }
+
+ @Override
+ protected EventHandlingStrategy getEventHandlingStrategy() {
+ return null;
+ }
+ }
}