@@ -91,6 +91,17 @@ public class TcpOutboundGateway extends AbstractReplyProducingMessageHandler
9191
9292 private boolean closeStreamAfterSend ;
9393
94+ private String unsolicitedMessageChannelName ;
95+
96+ private MessageChannel unsolicitedMessageChannel ;
97+
98+ public void setConnectionFactory (AbstractClientConnectionFactory connectionFactory ) {
99+ this .connectionFactory = connectionFactory ;
100+ connectionFactory .registerListener (this );
101+ connectionFactory .registerSender (this );
102+ this .isSingleUse = connectionFactory .isSingleUse ();
103+ }
104+
94105 /**
95106 * @param requestTimeout the requestTimeout to set
96107 */
@@ -118,14 +129,53 @@ public void setIntegrationEvaluationContext(EvaluationContext evaluationContext)
118129 this .evaluationContextSet = true ;
119130 }
120131
121- @ Override
122- protected void doInit () {
123- super .doInit ();
124- if (!this .evaluationContextSet ) {
125- this .evaluationContext = ExpressionUtils .createStandardEvaluationContext (getBeanFactory ());
126- }
127- Assert .state (!this .closeStreamAfterSend || this .isSingleUse ,
128- "Single use connection needed with closeStreamAfterSend" );
132+ /**
133+ * Specify the Spring Integration reply channel. If this property is not
134+ * set the gateway will check for a 'replyChannel' header on the request.
135+ * @param replyChannel The reply channel.
136+ */
137+ public void setReplyChannel (MessageChannel replyChannel ) {
138+ setOutputChannel (replyChannel );
139+ }
140+
141+ /**
142+ * Specify the Spring Integration reply channel name. If this property is not
143+ * set the gateway will check for a 'replyChannel' header on the request.
144+ * @param replyChannel The reply channel.
145+ * @since 5.0
146+ */
147+ public void setReplyChannelName (String replyChannel ) {
148+ setOutputChannelName (replyChannel );
149+ }
150+
151+ /**
152+ * Set the channel name for unsolicited incoming messages, or late replies.
153+ * @param unsolicitedMessageChannelName the channel name.
154+ * @since 5.4
155+ */
156+ public void setUnsolicitedMessageChannelName (String unsolicitedMessageChannelName ) {
157+ this .unsolicitedMessageChannelName = unsolicitedMessageChannelName ;
158+ }
159+
160+ /**
161+ * Set the channel for unsolicited incoming messages, or late replies.
162+ * @param unsolicitedMessageChannel the channel.
163+ * @since 5.4
164+ */
165+ public void setUnsolicitedMessageChannel (MessageChannel unsolicitedMessageChannel ) {
166+ this .unsolicitedMessageChannel = unsolicitedMessageChannel ;
167+ }
168+
169+ /**
170+ * Set to true to close the connection ouput stream after sending without
171+ * closing the connection. Use to signal EOF to the server, such as when using
172+ * a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
173+ * Requires a single-use connection factory.
174+ * @param closeStreamAfterSend true to close.
175+ * @since 5.2
176+ */
177+ public void setCloseStreamAfterSend (boolean closeStreamAfterSend ) {
178+ this .closeStreamAfterSend = closeStreamAfterSend ;
129179 }
130180
131181 /**
@@ -140,6 +190,21 @@ public void setSecondChanceDelay(int secondChanceDelay) {
140190 this .secondChanceDelay = secondChanceDelay ;
141191 }
142192
193+ @ Override
194+ public String getComponentType () {
195+ return "ip:tcp-outbound-gateway" ;
196+ }
197+
198+ @ Override
199+ protected void doInit () {
200+ super .doInit ();
201+ if (!this .evaluationContextSet ) {
202+ this .evaluationContext = ExpressionUtils .createStandardEvaluationContext (getBeanFactory ());
203+ }
204+ Assert .state (!this .closeStreamAfterSend || this .isSingleUse ,
205+ "Single use connection needed with closeStreamAfterSend" );
206+ }
207+
143208 @ Override
144209 protected Object handleRequestMessage (Message <?> requestMessage ) {
145210 Assert .notNull (this .connectionFactory , this .getClass ().getName () +
@@ -260,6 +325,9 @@ private void cleanUp(boolean haveSemaphore, TcpConnection connection, String con
260325 public boolean onMessage (Message <?> message ) {
261326 String connectionId = message .getHeaders ().get (IpHeaders .CONNECTION_ID , String .class );
262327 if (connectionId == null ) {
328+ if (unsolicitedSupported (message )) {
329+ return false ;
330+ }
263331 logger .error ("Cannot correlate response - no connection id" );
264332 publishNoConnectionEvent (message , null , "Cannot correlate response - no connection id" );
265333 return false ;
@@ -277,6 +345,9 @@ public boolean onMessage(Message<?> message) {
277345 return false ;
278346 }
279347 else {
348+ if (unsolicitedSupported (message )) {
349+ return false ;
350+ }
280351 String errorMessage = "Cannot correlate response - no pending reply for " + connectionId ;
281352 logger .error (errorMessage );
282353 publishNoConnectionEvent (message , connectionId , errorMessage );
@@ -293,6 +364,24 @@ public boolean onMessage(Message<?> message) {
293364 return false ;
294365 }
295366
367+ private boolean unsolicitedSupported (Message <?> message ) {
368+ String channelName = this .unsolicitedMessageChannelName ;
369+ if (channelName != null ) {
370+ this .unsolicitedMessageChannel = getChannelResolver ().resolveDestination (channelName );
371+ this .unsolicitedMessageChannelName = null ;
372+ }
373+ if (this .unsolicitedMessageChannel != null ) {
374+ try {
375+ this .messagingTemplate .send (this .unsolicitedMessageChannel , message );
376+ }
377+ catch (Exception e ) {
378+ logger .error ("Failed to send unsolicited message " + message , e );
379+ }
380+ return true ;
381+ }
382+ return false ;
383+ }
384+
296385 private void publishNoConnectionEvent (Message <?> message , String connectionId , String errorMessage ) {
297386 ApplicationEventPublisher applicationEventPublisher = this .connectionFactory .getApplicationEventPublisher ();
298387 if (applicationEventPublisher != null ) {
@@ -301,13 +390,6 @@ private void publishNoConnectionEvent(Message<?> message, String connectionId, S
301390 }
302391 }
303392
304- public void setConnectionFactory (AbstractClientConnectionFactory connectionFactory ) {
305- this .connectionFactory = connectionFactory ;
306- connectionFactory .registerListener (this );
307- connectionFactory .registerSender (this );
308- this .isSingleUse = connectionFactory .isSingleUse ();
309- }
310-
311393 @ Override
312394 public void addNewConnection (TcpConnection connection ) {
313395 // do nothing - no asynchronous multiplexing supported
@@ -318,42 +400,6 @@ public void removeDeadConnection(TcpConnection connection) {
318400 // do nothing - no asynchronous multiplexing supported
319401 }
320402
321- /**
322- * Specify the Spring Integration reply channel. If this property is not
323- * set the gateway will check for a 'replyChannel' header on the request.
324- * @param replyChannel The reply channel.
325- */
326- public void setReplyChannel (MessageChannel replyChannel ) {
327- this .setOutputChannel (replyChannel );
328- }
329-
330- /**
331- * Specify the Spring Integration reply channel name. If this property is not
332- * set the gateway will check for a 'replyChannel' header on the request.
333- * @param replyChannel The reply channel.
334- * @since 5.0
335- */
336- public void setReplyChannelName (String replyChannel ) {
337- this .setOutputChannelName (replyChannel );
338- }
339-
340- /**
341- * Set to true to close the connection ouput stream after sending without
342- * closing the connection. Use to signal EOF to the server, such as when using
343- * a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
344- * Requires a single-use connection factory.
345- * @param closeStreamAfterSend true to close.
346- * @since 5.2
347- */
348- public void setCloseStreamAfterSend (boolean closeStreamAfterSend ) {
349- this .closeStreamAfterSend = closeStreamAfterSend ;
350- }
351-
352- @ Override
353- public String getComponentType () {
354- return "ip:tcp-outbound-gateway" ;
355- }
356-
357403 @ Override
358404 public void start () {
359405 this .connectionFactory .start ();
0 commit comments