3333import org .elasticsearch .common .util .PageCacheRecycler ;
3434import org .elasticsearch .common .util .concurrent .EsExecutors ;
3535import org .elasticsearch .indices .breaker .CircuitBreakerService ;
36+ import org .elasticsearch .nio .AcceptingSelector ;
3637import org .elasticsearch .nio .AcceptorEventHandler ;
3738import org .elasticsearch .nio .BytesReadContext ;
3839import org .elasticsearch .nio .BytesWriteContext ;
40+ import org .elasticsearch .nio .ChannelFactory ;
3941import org .elasticsearch .nio .InboundChannelBuffer ;
4042import org .elasticsearch .nio .NioGroup ;
4143import org .elasticsearch .nio .NioSocketChannel ;
4244import org .elasticsearch .nio .ReadContext ;
4345import org .elasticsearch .nio .SocketEventHandler ;
46+ import org .elasticsearch .nio .SocketSelector ;
4447import org .elasticsearch .threadpool .ThreadPool ;
4548import org .elasticsearch .transport .TcpChannel ;
4649import org .elasticsearch .transport .TcpTransport ;
4952import java .io .IOException ;
5053import java .net .InetSocketAddress ;
5154import java .nio .ByteBuffer ;
55+ import java .nio .channels .ServerSocketChannel ;
56+ import java .nio .channels .SocketChannel ;
5257import java .util .concurrent .ConcurrentMap ;
53- import java .util .function .Consumer ;
5458import java .util .function .Supplier ;
5559
5660import static org .elasticsearch .common .settings .Setting .intSetting ;
@@ -110,13 +114,13 @@ protected void doStart() {
110114 NioTransport .NIO_WORKER_COUNT .get (settings ), SocketEventHandler ::new );
111115
112116 ProfileSettings clientProfileSettings = new ProfileSettings (settings , "default" );
113- clientChannelFactory = new TcpChannelFactory (clientProfileSettings , getContextSetter (), getServerContextSetter () );
117+ clientChannelFactory = new TcpChannelFactory (clientProfileSettings );
114118
115119 if (useNetworkServer ) {
116120 // loop through all profiles and start them up, special handling for default one
117121 for (ProfileSettings profileSettings : profileSettings ) {
118122 String profileName = profileSettings .profileName ;
119- TcpChannelFactory factory = new TcpChannelFactory (profileSettings , getContextSetter (), getServerContextSetter () );
123+ TcpChannelFactory factory = new TcpChannelFactory (profileSettings );
120124 profileToChannelFactory .putIfAbsent (profileName , factory );
121125 bindServer (profileSettings );
122126 }
@@ -143,29 +147,46 @@ protected void stopInternal() {
143147 profileToChannelFactory .clear ();
144148 }
145149
146- final void exceptionCaught (NioSocketChannel channel , Exception exception ) {
150+ private void exceptionCaught (NioSocketChannel channel , Exception exception ) {
147151 onException ((TcpChannel ) channel , exception );
148152 }
149153
150- private Consumer <TcpNioSocketChannel > getContextSetter () {
151- return (c ) -> {
154+ private void acceptChannel (NioSocketChannel channel ) {
155+ serverAcceptedChannel ((TcpNioSocketChannel ) channel );
156+ }
157+
158+ private class TcpChannelFactory extends ChannelFactory <TcpNioServerSocketChannel , TcpNioSocketChannel > {
159+
160+ private final String profileName ;
161+
162+ TcpChannelFactory (TcpTransport .ProfileSettings profileSettings ) {
163+ super (new RawChannelFactory (profileSettings .tcpNoDelay ,
164+ profileSettings .tcpKeepAlive ,
165+ profileSettings .reuseAddress ,
166+ Math .toIntExact (profileSettings .sendBufferSize .getBytes ()),
167+ Math .toIntExact (profileSettings .receiveBufferSize .getBytes ())));
168+ this .profileName = profileSettings .profileName ;
169+ }
170+
171+ @ Override
172+ public TcpNioSocketChannel createChannel (SocketSelector selector , SocketChannel channel ) throws IOException {
173+ TcpNioSocketChannel nioChannel = new TcpNioSocketChannel (profileName , channel , selector );
152174 Supplier <InboundChannelBuffer .Page > pageSupplier = () -> {
153175 Recycler .V <byte []> bytes = pageCacheRecycler .bytePage (false );
154176 return new InboundChannelBuffer .Page (ByteBuffer .wrap (bytes .v ()), bytes ::close );
155177 };
156178 ReadContext .ReadConsumer nioReadConsumer = channelBuffer ->
157- consumeNetworkReads (c , BytesReference .fromByteBuffers (channelBuffer .sliceBuffersTo (channelBuffer .getIndex ())));
158- BytesReadContext readContext = new BytesReadContext (c , nioReadConsumer , new InboundChannelBuffer (pageSupplier ));
159- c .setContexts (readContext , new BytesWriteContext (c ), this ::exceptionCaught );
160- };
161- }
162-
163- private void acceptChannel (NioSocketChannel channel ) {
164- serverAcceptedChannel ((TcpNioSocketChannel ) channel );
165-
166- }
179+ consumeNetworkReads (nioChannel , BytesReference .fromByteBuffers (channelBuffer .sliceBuffersTo (channelBuffer .getIndex ())));
180+ BytesReadContext readContext = new BytesReadContext (nioChannel , nioReadConsumer , new InboundChannelBuffer (pageSupplier ));
181+ nioChannel .setContexts (readContext , new BytesWriteContext (nioChannel ), NioTransport .this ::exceptionCaught );
182+ return nioChannel ;
183+ }
167184
168- private Consumer <TcpNioServerSocketChannel > getServerContextSetter () {
169- return (c ) -> c .setAcceptContext (this ::acceptChannel );
185+ @ Override
186+ public TcpNioServerSocketChannel createServerChannel (AcceptingSelector selector , ServerSocketChannel channel ) throws IOException {
187+ TcpNioServerSocketChannel nioServerChannel = new TcpNioServerSocketChannel (profileName , channel , this , selector );
188+ nioServerChannel .setAcceptContext (NioTransport .this ::acceptChannel );
189+ return nioServerChannel ;
190+ }
170191 }
171192}
0 commit comments