diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java index e7d0aec5c00..7ef82adb55e 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java @@ -26,6 +26,7 @@ import org.springframework.integration.ip.tcp.TcpOutboundGateway; import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; /** * A {@link MessageHandlerSpec} for {@link TcpOutboundGateway}s. @@ -114,6 +115,28 @@ public TcpOutboundGatewaySpec async(boolean async) { return this; } + /** + * Set the unsolicited message channel name. + * @param channelName the name. + * @return the spec. + * @since 5.4 + */ + public TcpOutboundGatewaySpec unsolictedMessageChannelName(String channelName) { + this.target.setUnsolicitedMessageChannelName(channelName); + return this; + } + + /** + * Set the unsolicited message channel. + * @param channel the channel. + * @return the spec. + * @since 5.4 + */ + public TcpOutboundGatewaySpec unsolictedMessageChannelName(MessageChannel channel) { + this.target.setUnsolicitedMessageChannel(channel); + return this; + } + @Override public Map getComponentsToRegister() { return this.connectionFactory != null diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java index 3a4390c0e1f..8db3f8f2a1f 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java @@ -91,6 +91,17 @@ public class TcpOutboundGateway extends AbstractReplyProducingMessageHandler private boolean closeStreamAfterSend; + private String unsolicitedMessageChannelName; + + private MessageChannel unsolicitedMessageChannel; + + public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + connectionFactory.registerListener(this); + connectionFactory.registerSender(this); + this.isSingleUse = connectionFactory.isSingleUse(); + } + /** * @param requestTimeout the requestTimeout to set */ @@ -118,14 +129,53 @@ public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) this.evaluationContextSet = true; } - @Override - protected void doInit() { - super.doInit(); - if (!this.evaluationContextSet) { - this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); - } - Assert.state(!this.closeStreamAfterSend || this.isSingleUse, - "Single use connection needed with closeStreamAfterSend"); + /** + * Specify the Spring Integration reply channel. If this property is not + * set the gateway will check for a 'replyChannel' header on the request. + * @param replyChannel The reply channel. + */ + public void setReplyChannel(MessageChannel replyChannel) { + setOutputChannel(replyChannel); + } + + /** + * Specify the Spring Integration reply channel name. If this property is not + * set the gateway will check for a 'replyChannel' header on the request. + * @param replyChannel The reply channel. + * @since 5.0 + */ + public void setReplyChannelName(String replyChannel) { + setOutputChannelName(replyChannel); + } + + /** + * Set the channel name for unsolicited incoming messages, or late replies. + * @param unsolicitedMessageChannelName the channel name. + * @since 5.4 + */ + public void setUnsolicitedMessageChannelName(String unsolicitedMessageChannelName) { + this.unsolicitedMessageChannelName = unsolicitedMessageChannelName; + } + + /** + * Set the channel for unsolicited incoming messages, or late replies. + * @param unsolicitedMessageChannel the channel. + * @since 5.4 + */ + public void setUnsolicitedMessageChannel(MessageChannel unsolicitedMessageChannel) { + this.unsolicitedMessageChannel = unsolicitedMessageChannel; + } + + /** + * Set to true to close the connection ouput stream after sending without + * closing the connection. Use to signal EOF to the server, such as when using + * a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}. + * Requires a single-use connection factory. + * @param closeStreamAfterSend true to close. + * @since 5.2 + */ + public void setCloseStreamAfterSend(boolean closeStreamAfterSend) { + this.closeStreamAfterSend = closeStreamAfterSend; } /** @@ -140,6 +190,21 @@ public void setSecondChanceDelay(int secondChanceDelay) { this.secondChanceDelay = secondChanceDelay; } + @Override + public String getComponentType() { + return "ip:tcp-outbound-gateway"; + } + + @Override + protected void doInit() { + super.doInit(); + if (!this.evaluationContextSet) { + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + Assert.state(!this.closeStreamAfterSend || this.isSingleUse, + "Single use connection needed with closeStreamAfterSend"); + } + @Override protected Object handleRequestMessage(Message requestMessage) { Assert.notNull(this.connectionFactory, this.getClass().getName() + @@ -260,6 +325,9 @@ private void cleanUp(boolean haveSemaphore, TcpConnection connection, String con public boolean onMessage(Message message) { String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class); if (connectionId == null) { + if (unsolicitedSupported(message)) { + return false; + } logger.error("Cannot correlate response - no connection id"); publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id"); return false; @@ -277,6 +345,9 @@ public boolean onMessage(Message message) { return false; } else { + if (unsolicitedSupported(message)) { + return false; + } String errorMessage = "Cannot correlate response - no pending reply for " + connectionId; logger.error(errorMessage); publishNoConnectionEvent(message, connectionId, errorMessage); @@ -293,6 +364,24 @@ public boolean onMessage(Message message) { return false; } + private boolean unsolicitedSupported(Message message) { + String channelName = this.unsolicitedMessageChannelName; + if (channelName != null) { + this.unsolicitedMessageChannel = getChannelResolver().resolveDestination(channelName); + this.unsolicitedMessageChannelName = null; + } + if (this.unsolicitedMessageChannel != null) { + try { + this.messagingTemplate.send(this.unsolicitedMessageChannel, message); + } + catch (Exception e) { + logger.error("Failed to send unsolicited message " + message, e); + } + return true; + } + return false; + } + private void publishNoConnectionEvent(Message message, String connectionId, String errorMessage) { ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher(); if (applicationEventPublisher != null) { @@ -301,13 +390,6 @@ private void publishNoConnectionEvent(Message message, String connectionId, S } } - public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) { - this.connectionFactory = connectionFactory; - connectionFactory.registerListener(this); - connectionFactory.registerSender(this); - this.isSingleUse = connectionFactory.isSingleUse(); - } - @Override public void addNewConnection(TcpConnection connection) { // do nothing - no asynchronous multiplexing supported @@ -318,42 +400,6 @@ public void removeDeadConnection(TcpConnection connection) { // do nothing - no asynchronous multiplexing supported } - /** - * Specify the Spring Integration reply channel. If this property is not - * set the gateway will check for a 'replyChannel' header on the request. - * @param replyChannel The reply channel. - */ - public void setReplyChannel(MessageChannel replyChannel) { - this.setOutputChannel(replyChannel); - } - - /** - * Specify the Spring Integration reply channel name. If this property is not - * set the gateway will check for a 'replyChannel' header on the request. - * @param replyChannel The reply channel. - * @since 5.0 - */ - public void setReplyChannelName(String replyChannel) { - this.setOutputChannelName(replyChannel); - } - - /** - * Set to true to close the connection ouput stream after sending without - * closing the connection. Use to signal EOF to the server, such as when using - * a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}. - * Requires a single-use connection factory. - * @param closeStreamAfterSend true to close. - * @since 5.2 - */ - public void setCloseStreamAfterSend(boolean closeStreamAfterSend) { - this.closeStreamAfterSend = closeStreamAfterSend; - } - - @Override - public String getComponentType() { - return "ip:tcp-outbound-gateway"; - } - @Override public void start() { this.connectionFactory.start(); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java index 1a220a9babf..5bd097c1113 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java @@ -218,10 +218,7 @@ protected void initializeConnection(TcpConnectionSupport connection, Socket sock connection.registerListener(listener); } } - TcpSender sender = getSender(); - if (sender != null) { - connection.registerSender(sender); - } + connection.registerSenders(getSenders()); connection.setMapper(getMapper()); connection.setDeserializer(getDeserializer()); connection.setSerializer(getSerializer()); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java index 6d22a2ed893..c4256c858a5 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java @@ -79,14 +79,14 @@ public abstract class AbstractConnectionFactory extends IntegrationObjectSupport private final BlockingQueue delayedReads = new LinkedBlockingQueue<>(); + private final List senders = Collections.synchronizedList(new ArrayList<>()); + private String host; private int port; private TcpListener listener; - private TcpSender sender; - private int soTimeout = -1; private int soSendBufferSize; @@ -326,11 +326,20 @@ public TcpListener getListener() { } /** - * @return the sender + * @return the first sender, if present. */ @Nullable public TcpSender getSender() { - return this.sender; + return this.senders.size() > 0 ? this.senders.get(0) : null; + } + + /** + * Return the list of senders. + * @return the senders. + * @since 5.4 + */ + public List getSenders() { + return Collections.unmodifiableList(this.senders); } /** @@ -372,8 +381,16 @@ public void registerListener(TcpListener listenerToRegister) { * @param senderToRegister The sender */ public void registerSender(TcpSender senderToRegister) { - Assert.isNull(this.sender, this.getClass().getName() + " may only be used by one outbound adapter"); - this.sender = senderToRegister; + this.senders.add(senderToRegister); + } + + /** + * Unregister a TcpSender. + * @param sender the sender. + * @return true if the sender was registered. + */ + public boolean unregisterSender(TcpSender sender) { + return this.senders.remove(sender); } /** @@ -600,7 +617,7 @@ protected TcpConnectionSupport wrapConnection(TcpConnectionSupport connectionArg if (this.listener == null) { connection.registerListener(wrapper); } - if (this.sender == null) { + if (this.senders.size() == 0) { connection.registerSender(wrapper); } connection = wrapper; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractServerConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractServerConnectionFactory.java index 44d7ca8979b..98e184dc4e1 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractServerConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractServerConnectionFactory.java @@ -127,7 +127,7 @@ protected void initializeConnection(TcpConnectionSupport connection, Socket sock if (listener != null) { connection.registerListener(listener); } - connection.registerSender(getSender()); + connection.registerSenders(getSenders()); connection.setMapper(getMapper()); connection.setDeserializer(getDeserializer()); connection.setSerializer(getSerializer()); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java index df4c1340069..aede7f50178 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java @@ -18,7 +18,9 @@ import java.net.InetAddress; import java.net.Socket; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -68,6 +70,8 @@ public abstract class TcpConnectionSupport implements TcpConnection { private final SocketInfo socketInfo; + private final List senders = Collections.synchronizedList(new ArrayList<>()); + @SuppressWarnings("rawtypes") private Deserializer deserializer; @@ -80,8 +84,6 @@ public abstract class TcpConnectionSupport implements TcpConnection { private volatile TcpListener testListener; - private TcpSender sender; - private String connectionId; private String hostName = "unknown"; @@ -162,8 +164,8 @@ void setTestFailed(boolean testFailed) { */ @Override public void close() { - if (this.sender != null) { - this.sender.removeDeadConnection(this); + for (TcpSender sender : this.senders) { + sender.removeDeadConnection(this); } // close() may be called multiple times; only publish once if (!this.closePublished.getAndSet(true)) { @@ -297,11 +299,25 @@ public void enableManualListenerRegistration() { * Registers a sender. Used on server side connections so a * sender can determine which connection to send a reply * to. - * @param sender the sender. + * @param senderToRegister the sender. */ - public void registerSender(@Nullable TcpSender sender) { - this.sender = sender; - if (sender != null) { + public void registerSender(@Nullable TcpSender senderToRegister) { + if (senderToRegister != null) { + this.senders.add(senderToRegister); + senderToRegister.addNewConnection(this); + } + } + + /** + * Registers the senders. Used on server side connections so a + * sender can determine which connection to send a reply + * to. + * @param sendersToRegister the sender. + * @since 5.4 + */ + public void registerSenders(List sendersToRegister) { + this.senders.addAll(sendersToRegister); + for (TcpSender sender : sendersToRegister) { sender.addNewConnection(this); } } @@ -336,10 +352,20 @@ private void waitForListenerRegistration() { } /** - * @return the sender + * @return the first sender, if present. */ + @Nullable public TcpSender getSender() { - return this.sender; + return this.senders.size() > 0 ? this.senders.get(0) : null; + } + + /** + * Return the list of senders. + * @return the senders. + * @since 5.4 + */ + public List getSenders() { + return Collections.unmodifiableList(this.senders); } @Override diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ThreadAffinityClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ThreadAffinityClientConnectionFactory.java index 072a5ac114b..4b0c226dcf8 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ThreadAffinityClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/ThreadAffinityClientConnectionFactory.java @@ -250,6 +250,11 @@ public TcpSender getSender() { return this.connectionFactory.getSender(); } + @Override + public List getSenders() { + return this.connectionFactory.getSenders(); + } + @Override public Serializer getSerializer() { return this.connectionFactory.getSerializer(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java index cbaed556977..ce0f28e09bb 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -43,6 +44,7 @@ import org.springframework.integration.dsl.Transformers; import org.springframework.integration.dsl.context.IntegrationFlowContext; import org.springframework.integration.dsl.context.IntegrationFlowContext.IntegrationFlowRegistration; +import org.springframework.integration.ip.IpHeaders; import org.springframework.integration.ip.tcp.TcpOutboundGateway; import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; @@ -110,6 +112,13 @@ public class IpIntegrationTests { @Autowired private AtomicBoolean adviceCalled; + @Autowired + @Qualifier("unsolicitedServerSide.input") + private MessageChannel unsolicitedServerSide; + + @Autowired + private QueueChannel unsolicited; + @Test void testTcpAdapters() { ApplicationEventPublisher publisher = e -> { }; @@ -149,6 +158,11 @@ void testTcpGateways() { assertThat(messagingTemplate.convertSendAndReceive("foo", String.class)).isEqualTo("FOO"); assertThat(messagingTemplate.convertSendAndReceive("junk", String.class)).isEqualTo("error:non-convertible"); assertThat(this.adviceCalled.get()).isTrue(); + + GenericMessage unsol = new GenericMessage<>("foo", + Collections.singletonMap(IpHeaders.CONNECTION_ID, this.config.connectionId)); + this.unsolicitedServerSide.send(unsol); + assertThat(this.unsolicited.receive(10_000).getPayload()).isEqualTo("foo".getBytes()); } @Test @@ -227,6 +241,8 @@ public static class Config { private volatile int serverPort; + private volatile String connectionId; + @Bean public AbstractServerConnectionFactory server1() { return Tcp.netServer(0) @@ -242,12 +258,23 @@ public IntegrationFlow inTcpGateway() { .replyTimeout(1) .errorOnTimeout(true) .errorChannel("inTcpGatewayErrorFlow.input")) + .handle(this, "captureId") .transform(Transformers.objectToString()) .filter((payload) -> !"junk".equals(payload)) .transform(String::toUpperCase) .get(); } + public Message captureId(Message msg) { + this.connectionId = msg.getHeaders().get(IpHeaders.CONNECTION_ID, String.class); + return msg; + } + + @Bean + public IntegrationFlow unsolicitedServerSide() { + return f -> f.handle(Tcp.outboundAdapter(server1())); + } + @Bean public IntegrationFlow inTcpGatewayErrorFlow() { return (flow) -> flow @@ -299,9 +326,15 @@ public AbstractClientConnectionFactory client1() { public TcpOutboundGateway tcpOut() { return Tcp.outboundGateway(client1()) .remoteTimeout(m -> 5000) + .unsolictedMessageChannelName("unsolicited") .get(); } + @Bean + public QueueChannel unsolicited() { + return new QueueChannel(); + } + @Bean public AbstractClientConnectionFactory client2() { return Tcp.netClient("localhost", server1().getPort()) diff --git a/src/reference/asciidoc/ip.adoc b/src/reference/asciidoc/ip.adoc index 70eaaaaf5ba..392b4aa6f7a 100644 --- a/src/reference/asciidoc/ip.adoc +++ b/src/reference/asciidoc/ip.adoc @@ -910,6 +910,12 @@ This only applies when using the `TcpNetClientConnectionFactory`, it is ignored IMPORTANT: When using a shared connection (`singleUse=false`), a new request, while another is in process, will be blocked until the current reply is received. Consider using the `CachingClientConnectionFactory` if you wish to support concurrent requests on a pool of long-lived connections. +Starting with version 5.4, the inbound can be configured with an `unsolicitedMessageChannel`. +Unsolicited inbound messages will be sent to this channel, as well as late replies (where the client timed out). +To support this on the server side, you can now register multiple `TcpSender` s with the connection factory. +Gateways and Channel Adapters automatically register themselves. +When sending unsolicited messages from the server, you must add the appropriate `IpHeaders.CONNECTION_ID` to the messages sent. + [[ip-correlation]] === TCP Message Correlation @@ -971,6 +977,10 @@ This default behavior was not appropriate in a truly asynchronous environment, s You can reinstate the previous default behavior by setting the `so-timeout` attribute on the client connection factory to 10000 milliseconds. ===== +Starting with version 5.4, multiple outbound channel adapters and one `TcpInboundChannelAdapter` can share the same connection factory. +This allows an application to support both request/reply and arbitrary server -> client messaging. +See <> for more information. + [[ip-headers]] ==== Transferring Headers @@ -2054,6 +2064,11 @@ Only applies if the reply-channel might block (such as a bounded QueueChannel th | `async` | | Release the sending thread after the send; the reply (or error) will be sent on the receiving thread. + +| `unsolicited` +`MessageChannel` +| +| A channel to which to send unsolicited messages and late replies. |=== .IP Message Headers diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 82e01e8f33d..c204bb0323c 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -35,6 +35,13 @@ See <<./redis.adoc#redis-stream-outbound,Redis Stream Outbound Channel Adapter>> A Renewable lock registry has been introduced to allow renew lease of a distributed lock. See <<./jdbc.adoc#jdbc-lock-registry,JDBC implementation>> for more information. +==== TCP Changes + +Connection factories now support multiple sending components (`TcpSender`); they remain limited to one receiving component (`TcpListener`). +This allows, for example, an inbound gateway and outbound channel adapter to share the same factory, supporting both request/reply and arbitrary messaging from the server to the client. +Shared factories should not be used with outbound gateways, unless single-use connections or the `ThreadAffinityClientConnectionFactory` are being used. +See <<./ip.adoc#ip-collaborating-adapters,Collaborating Channel Adapters>> and <<./ip.adoc#tcp-gateways, TCP Gateways>> for more information. + [[x5.4-general]] === General Changes