2626import org .elasticsearch .transport .TcpTransport ;
2727import org .elasticsearch .transport .nio .AcceptingSelector ;
2828import org .elasticsearch .transport .nio .SocketSelector ;
29- import org .elasticsearch .transport .nio .TcpReadHandler ;
3029
3130import java .io .Closeable ;
3231import java .io .IOException ;
3938
4039public class ChannelFactory {
4140
42- private final TcpReadHandler handler ;
41+ private final Consumer < NioSocketChannel > contextSetter ;
4342 private final RawChannelFactory rawChannelFactory ;
4443
45- public ChannelFactory (TcpTransport .ProfileSettings profileSettings , TcpReadHandler handler ) {
46- this (new RawChannelFactory (profileSettings ), handler );
44+ /**
45+ * This will create a {@link ChannelFactory} using the profile settings and context setter passed to this
46+ * constructor. The context setter must be a {@link Consumer} that calls
47+ * {@link NioSocketChannel#setContexts(ReadContext, WriteContext)} with the appropriate read and write
48+ * contexts. The read and write contexts handle the protocol specific encoding and decoding of messages.
49+ *
50+ * @param profileSettings the profile settings channels opened by this factory
51+ * @param contextSetter a consumer that takes a channel and sets the read and write contexts
52+ */
53+ public ChannelFactory (TcpTransport .ProfileSettings profileSettings , Consumer <NioSocketChannel > contextSetter ) {
54+ this (new RawChannelFactory (profileSettings .tcpNoDelay ,
55+ profileSettings .tcpKeepAlive ,
56+ profileSettings .reuseAddress ,
57+ Math .toIntExact (profileSettings .sendBufferSize .getBytes ()),
58+ Math .toIntExact (profileSettings .receiveBufferSize .getBytes ())), contextSetter );
4759 }
4860
49- ChannelFactory (RawChannelFactory rawChannelFactory , TcpReadHandler handler ) {
50- this .handler = handler ;
61+ ChannelFactory (RawChannelFactory rawChannelFactory , Consumer < NioSocketChannel > contextSetter ) {
62+ this .contextSetter = contextSetter ;
5163 this .rawChannelFactory = rawChannelFactory ;
5264 }
5365
5466 public NioSocketChannel openNioChannel (InetSocketAddress remoteAddress , SocketSelector selector ,
5567 Consumer <NioChannel > closeListener ) throws IOException {
5668 SocketChannel rawChannel = rawChannelFactory .openNioChannel (remoteAddress );
5769 NioSocketChannel channel = new NioSocketChannel (NioChannel .CLIENT , rawChannel , selector );
58- channel . setContexts (new TcpReadContext ( channel , handler ), new TcpWriteContext ( channel ) );
70+ setContexts (channel );
5971 channel .getCloseFuture ().addListener (ActionListener .wrap (closeListener ::accept , (e ) -> closeListener .accept (channel )));
6072 scheduleChannel (channel , selector );
6173 return channel ;
@@ -65,7 +77,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S
6577 Consumer <NioChannel > closeListener ) throws IOException {
6678 SocketChannel rawChannel = rawChannelFactory .acceptNioChannel (serverChannel );
6779 NioSocketChannel channel = new NioSocketChannel (serverChannel .getProfile (), rawChannel , selector );
68- channel . setContexts (new TcpReadContext ( channel , handler ), new TcpWriteContext ( channel ) );
80+ setContexts (channel );
6981 channel .getCloseFuture ().addListener (ActionListener .wrap (closeListener ::accept , (e ) -> closeListener .accept (channel )));
7082 scheduleChannel (channel , selector );
7183 return channel ;
@@ -97,6 +109,12 @@ private void scheduleServerChannel(NioServerSocketChannel channel, AcceptingSele
97109 }
98110 }
99111
112+ private void setContexts (NioSocketChannel channel ) {
113+ contextSetter .accept (channel );
114+ assert channel .getReadContext () != null : "read context should have been set on channel" ;
115+ assert channel .getWriteContext () != null : "write context should have been set on channel" ;
116+ }
117+
100118 static class RawChannelFactory {
101119
102120 private final boolean tcpNoDelay ;
@@ -105,12 +123,13 @@ static class RawChannelFactory {
105123 private final int tcpSendBufferSize ;
106124 private final int tcpReceiveBufferSize ;
107125
108- RawChannelFactory (TcpTransport .ProfileSettings profileSettings ) {
109- tcpNoDelay = profileSettings .tcpNoDelay ;
110- tcpKeepAlive = profileSettings .tcpKeepAlive ;
111- tcpReusedAddress = profileSettings .reuseAddress ;
112- tcpSendBufferSize = Math .toIntExact (profileSettings .sendBufferSize .getBytes ());
113- tcpReceiveBufferSize = Math .toIntExact (profileSettings .receiveBufferSize .getBytes ());
126+ RawChannelFactory (boolean tcpNoDelay , boolean tcpKeepAlive , boolean tcpReusedAddress , int tcpSendBufferSize ,
127+ int tcpReceiveBufferSize ) {
128+ this .tcpNoDelay = tcpNoDelay ;
129+ this .tcpKeepAlive = tcpKeepAlive ;
130+ this .tcpReusedAddress = tcpReusedAddress ;
131+ this .tcpSendBufferSize = tcpSendBufferSize ;
132+ this .tcpReceiveBufferSize = tcpReceiveBufferSize ;
114133 }
115134
116135 SocketChannel openNioChannel (InetSocketAddress remoteAddress ) throws IOException {
0 commit comments