1616
1717package org .springframework .messaging .tcp .reactor ;
1818
19+ import java .lang .reflect .Method ;
1920import java .net .InetSocketAddress ;
2021import java .util .ArrayList ;
21- import java .util .Arrays ;
2222import java .util .Collections ;
2323import java .util .List ;
2424import java .util .Properties ;
4545import reactor .io .net .ReactorChannelHandler ;
4646import reactor .io .net .Reconnect ;
4747import reactor .io .net .Spec .TcpClientSpec ;
48+ import reactor .io .net .config .ClientSocketOptions ;
4849import reactor .io .net .impl .netty .NettyClientSocketOptions ;
4950import reactor .io .net .impl .netty .tcp .NettyTcpClient ;
5051import reactor .io .net .tcp .TcpClient ;
5960import org .springframework .messaging .tcp .TcpConnectionHandler ;
6061import org .springframework .messaging .tcp .TcpOperations ;
6162import org .springframework .util .Assert ;
63+ import org .springframework .util .ClassUtils ;
64+ import org .springframework .util .ReflectionUtils ;
6265import org .springframework .util .concurrent .ListenableFuture ;
6366
6467/**
@@ -77,6 +80,9 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
7780 @ SuppressWarnings ("rawtypes" )
7881 public static final Class <NettyTcpClient > REACTOR_TCP_CLIENT_TYPE = NettyTcpClient .class ;
7982
83+ private static final Method eventLoopGroupMethod = initEventLoopGroupMethod ();
84+
85+
8086
8187 private final EventLoopGroup eventLoopGroup ;
8288
@@ -102,7 +108,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
102108 */
103109 public Reactor2TcpClient (final String host , final int port , final Codec <Buffer , Message <P >, Message <P >> codec ) {
104110
105- // Reactor 2.0.5 required NioEventLoopGroup ( 2.0.6 changed to EventLoopGroup)
111+ // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
106112 final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup ();
107113 this .eventLoopGroup = nioEventLoopGroup ;
108114
@@ -113,7 +119,12 @@ public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Mes
113119 .env (new Environment (new SynchronousDispatcherConfigReader ()))
114120 .codec (codec )
115121 .connect (host , port )
116- .options (new NettyClientSocketOptions ().eventLoopGroup (nioEventLoopGroup ));
122+ .options (createClientSocketOptions ());
123+ }
124+
125+ private ClientSocketOptions createClientSocketOptions () {
126+ return (ClientSocketOptions ) ReflectionUtils .invokeMethod (eventLoopGroupMethod ,
127+ new NettyClientSocketOptions (), nioEventLoopGroup );
117128 }
118129 };
119130 }
@@ -245,6 +256,16 @@ public void operationComplete(Future<Object> future) throws Exception {
245256 }
246257
247258
259+ private static Method initEventLoopGroupMethod () {
260+ for (Method method : NettyClientSocketOptions .class .getMethods ()) {
261+ if (method .getName ().equals ("eventLoopGroup" ) && method .getParameterTypes ().length == 1 ) {
262+ return method ;
263+ }
264+ }
265+ throw new IllegalStateException ("No compatible Reactor version found." );
266+ }
267+
268+
248269 private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
249270
250271 @ Override
0 commit comments