From 277947cd943ac72398499c29200dd623675fef58 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 29 Aug 2024 17:31:41 -0400 Subject: [PATCH] GH-9430: Fix CachingClientCF for collaborating channel adapters Fixes: #9430 When `CachingClientConnectionFactory` is used in combination of `Tcp.outboundAdapter()` & `Tcp.inboundAdapter()`, the connection is not released back to the cache because `CachingClientConnectionFactory` does not store created connections into its `connections` property. So, when `TcpReceivingChannelAdapter` calls `this.clientConnectionFactory.closeConnection(connectionId);` it returned immediately because there is nothing to remove from the `this.connections` * Add `protected removeConnection()` in the `AbstractConnectionFactory` and override it in the `CachingClientConnectionFactory` with delegation to the `this.targetConnectionFactory` * Demonstrate the problem in the new `IpIntegrationTests.allRepliesAreReceivedViaLimitedCachingConnectionFactory()` test and ensure that all 27 letters from English alphabet are sent to the server and received in uppercase while size of the `CachingClientConnectionFactory` is only `10` **Auto-cherry-pick to `6.3.x` & `6.2.x`** --- .../connection/AbstractConnectionFactory.java | 9 +- .../CachingClientConnectionFactory.java | 5 + .../ip/dsl/IpIntegrationTests.java | 97 +++++++++++++++++-- 3 files changed, 101 insertions(+), 10 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java index 4007bf51331..d527f4835bd 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -977,7 +977,7 @@ public boolean closeConnection(String connectionId) { this.connectionsMonitor.lock(); try { boolean closed = false; - TcpConnectionSupport connection = this.connections.remove(connectionId); + TcpConnectionSupport connection = removeConnection(connectionId); if (connection != null) { try { connection.close(); @@ -996,6 +996,11 @@ public boolean closeConnection(String connectionId) { } } + @Nullable + protected TcpConnectionSupport removeConnection(String connectionId) { + return this.connections.remove(connectionId); + } + @Override public String toString() { return super.toString() diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java index 50c3f600bff..c5f9f65dfac 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java @@ -358,6 +358,11 @@ public void enableManualListenerRegistration() { this.targetConnectionFactory.enableManualListenerRegistration(); } + @Override + protected TcpConnectionSupport removeConnection(String connectionId) { + return this.targetConnectionFactory.removeConnection(connectionId.replaceFirst("Cached:", "")); + } + @Override public void start() { setActive(true); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java index b283e158f63..db18e206d54 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -16,11 +16,14 @@ package org.springframework.integration.ip.dsl; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; import org.aopalliance.intercept.MethodInterceptor; import org.junit.jupiter.api.Test; @@ -47,6 +50,7 @@ import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpConnectionServerListeningEvent; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; @@ -55,12 +59,12 @@ import org.springframework.integration.ip.udp.MulticastSendingMessageHandler; import org.springframework.integration.ip.udp.UdpServerListeningEvent; import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter; -import org.springframework.integration.ip.udp.UnicastSendingMessageHandler; import org.springframework.integration.ip.util.TestingUtilities; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -102,9 +106,6 @@ public class IpIntegrationTests { @Autowired private UnicastReceivingChannelAdapter udpInbound; - @Autowired - private UnicastSendingMessageHandler udpOutbound; - @Autowired private QueueChannel udpIn; @@ -236,6 +237,45 @@ void async() { assertThat(TestUtils.getPropertyValue(this.tcpOutAsync, "async", Boolean.class)).isTrue(); } + @Autowired + private AbstractServerConnectionFactory server2; + + @Autowired + private TcpNetClientConnectionFactory client3; + + @Autowired + @Qualifier("outboundFlow.input") + MessageChannel outboundFlowInput; + + @Autowired + PollableChannel cachingRepliesChannel; + + @Test + void allRepliesAreReceivedViaLimitedCachingConnectionFactory() { + this.client3.stop(); + TestingUtilities.waitListening(this.server2, null); + this.client3.setPort(this.server2.getPort()); + this.client3.start(); + + List expected = + IntStream.rangeClosed('a', 'z') + .mapToObj((characterCode) -> (char) characterCode) + .map((character) -> "" + character) + .peek((character) -> this.outboundFlowInput.send(new GenericMessage<>(character))) + .map(String::toUpperCase) + .toList(); + + List replies = new ArrayList<>(); + + for (int i = 0; i < expected.size(); i++) { + Message replyMessage = this.cachingRepliesChannel.receive(10_000); + assertThat(replyMessage).isNotNull(); + replies.add(replyMessage.getPayload().toString()); + } + + assertThat(replies).containsAll(expected); + } + @Configuration @EnableIntegration public static class Config { @@ -318,8 +358,9 @@ public ApplicationListener events() { } @Bean - public TcpNetClientConnectionFactorySpec client1(TcpNetServerConnectionFactory server1) { - return Tcp.netClient("localhost", server1.getPort()) + public TcpNetClientConnectionFactorySpec client1() { + // The port from server is assigned + return Tcp.netClient("localhost", 0) .serializer(TcpCodecs.crlf()) .deserializer(TcpCodecs.lengthHeader1()); } @@ -337,8 +378,9 @@ public QueueChannel unsolicited() { } @Bean - public TcpNetClientConnectionFactorySpec client2(TcpNetServerConnectionFactory server1) { - return Tcp.netClient("localhost", server1.getPort()) + public TcpNetClientConnectionFactorySpec client2() { + // The port from server is assigned + return Tcp.netClient("localhost", 0) .serializer(TcpCodecs.crlf()) .deserializer(TcpCodecs.lengthHeader1()); } @@ -370,6 +412,45 @@ public IntegrationFlow clientTcpFlow(TcpOutboundGateway tcpOut) { .transform(Transformers.objectToString()); } + @Bean + public TcpNetServerConnectionFactorySpec server2() { + return Tcp.netServer(0); + } + + @Bean + public IntegrationFlow server2Flow(TcpNetServerConnectionFactory server2) { + return IntegrationFlow.from(Tcp.inboundGateway(server2)) + .transform(Transformers.objectToString()) + .transform(String::toUpperCase) + .get(); + } + + @Bean + public TcpNetClientConnectionFactorySpec client3() { + // The port from server is assigned + return Tcp.netClient("localhost", 0); + } + + @Bean + CachingClientConnectionFactory cachingClient(TcpNetClientConnectionFactory client3) { + var cachingClientConnectionFactory = new CachingClientConnectionFactory(client3, 10); + cachingClientConnectionFactory.setConnectionWaitTimeout(10_000); + return cachingClientConnectionFactory; + } + + @Bean + IntegrationFlow outboundFlow(CachingClientConnectionFactory cachingClient) { + return (flow) -> flow.handle(Tcp.outboundAdapter(cachingClient)); + } + + @Bean + IntegrationFlow inboundFlow(CachingClientConnectionFactory cachingClient) { + return IntegrationFlow.from(Tcp.inboundAdapter(cachingClient)) + .transform(Transformers.objectToString()) + .channel((channels) -> channels.queue("cachingRepliesChannel")) + .get(); + } + } }