Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -977,7 +977,7 @@ public boolean closeConnection(String connectionId) {
this.connectionsMonitor.lock();
try {
boolean closed = false;
TcpConnectionSupport connection = this.connections.remove(connectionId);
TcpConnectionSupport connection = removeConnection(connectionId);
if (connection != null) {
try {
connection.close();
Expand All @@ -996,6 +996,11 @@ public boolean closeConnection(String connectionId) {
}
}

@Nullable
protected TcpConnectionSupport removeConnection(String connectionId) {
return this.connections.remove(connectionId);
}

@Override
public String toString() {
return super.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ public void enableManualListenerRegistration() {
this.targetConnectionFactory.enableManualListenerRegistration();
}

@Override
protected TcpConnectionSupport removeConnection(String connectionId) {
return this.targetConnectionFactory.removeConnection(connectionId.replaceFirst("Cached:", ""));
}

@Override
public void start() {
setActive(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package org.springframework.integration.ip.dsl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.aopalliance.intercept.MethodInterceptor;
import org.junit.jupiter.api.Test;
Expand All @@ -47,6 +50,7 @@
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnectionServerListeningEvent;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
Expand All @@ -55,12 +59,12 @@
import org.springframework.integration.ip.udp.MulticastSendingMessageHandler;
import org.springframework.integration.ip.udp.UdpServerListeningEvent;
import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter;
import org.springframework.integration.ip.udp.UnicastSendingMessageHandler;
import org.springframework.integration.ip.util.TestingUtilities;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
Expand Down Expand Up @@ -102,9 +106,6 @@ public class IpIntegrationTests {
@Autowired
private UnicastReceivingChannelAdapter udpInbound;

@Autowired
private UnicastSendingMessageHandler udpOutbound;

@Autowired
private QueueChannel udpIn;

Expand Down Expand Up @@ -236,6 +237,45 @@ void async() {
assertThat(TestUtils.getPropertyValue(this.tcpOutAsync, "async", Boolean.class)).isTrue();
}

@Autowired
private AbstractServerConnectionFactory server2;

@Autowired
private TcpNetClientConnectionFactory client3;

@Autowired
@Qualifier("outboundFlow.input")
MessageChannel outboundFlowInput;

@Autowired
PollableChannel cachingRepliesChannel;

@Test
void allRepliesAreReceivedViaLimitedCachingConnectionFactory() {
this.client3.stop();
TestingUtilities.waitListening(this.server2, null);
this.client3.setPort(this.server2.getPort());
this.client3.start();

List<String> expected =
IntStream.rangeClosed('a', 'z')
.mapToObj((characterCode) -> (char) characterCode)
.map((character) -> "" + character)
.peek((character) -> this.outboundFlowInput.send(new GenericMessage<>(character)))
.map(String::toUpperCase)
.toList();

List<String> replies = new ArrayList<>();

for (int i = 0; i < expected.size(); i++) {
Message<?> replyMessage = this.cachingRepliesChannel.receive(10_000);
assertThat(replyMessage).isNotNull();
replies.add(replyMessage.getPayload().toString());
}

assertThat(replies).containsAll(expected);
}

@Configuration
@EnableIntegration
public static class Config {
Expand Down Expand Up @@ -318,8 +358,9 @@ public ApplicationListener<UdpServerListeningEvent> events() {
}

@Bean
public TcpNetClientConnectionFactorySpec client1(TcpNetServerConnectionFactory server1) {
return Tcp.netClient("localhost", server1.getPort())
public TcpNetClientConnectionFactorySpec client1() {
// The port from server is assigned
return Tcp.netClient("localhost", 0)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1());
}
Expand All @@ -337,8 +378,9 @@ public QueueChannel unsolicited() {
}

@Bean
public TcpNetClientConnectionFactorySpec client2(TcpNetServerConnectionFactory server1) {
return Tcp.netClient("localhost", server1.getPort())
public TcpNetClientConnectionFactorySpec client2() {
// The port from server is assigned
return Tcp.netClient("localhost", 0)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1());
}
Expand Down Expand Up @@ -370,6 +412,45 @@ public IntegrationFlow clientTcpFlow(TcpOutboundGateway tcpOut) {
.transform(Transformers.objectToString());
}

@Bean
public TcpNetServerConnectionFactorySpec server2() {
return Tcp.netServer(0);
}

@Bean
public IntegrationFlow server2Flow(TcpNetServerConnectionFactory server2) {
return IntegrationFlow.from(Tcp.inboundGateway(server2))
.transform(Transformers.objectToString())
.<String, String>transform(String::toUpperCase)
.get();
}

@Bean
public TcpNetClientConnectionFactorySpec client3() {
// The port from server is assigned
return Tcp.netClient("localhost", 0);
}

@Bean
CachingClientConnectionFactory cachingClient(TcpNetClientConnectionFactory client3) {
var cachingClientConnectionFactory = new CachingClientConnectionFactory(client3, 10);
cachingClientConnectionFactory.setConnectionWaitTimeout(10_000);
return cachingClientConnectionFactory;
}

@Bean
IntegrationFlow outboundFlow(CachingClientConnectionFactory cachingClient) {
return (flow) -> flow.handle(Tcp.outboundAdapter(cachingClient));
}

@Bean
IntegrationFlow inboundFlow(CachingClientConnectionFactory cachingClient) {
return IntegrationFlow.from(Tcp.inboundAdapter(cachingClient))
.transform(Transformers.objectToString())
.channel((channels) -> channels.queue("cachingRepliesChannel"))
.get();
}

}

}