Skip to content

Commit 3a7b011

Browse files
committed
Add NioGroup for use in different transports (#27737)
This commit is related to #27260. It adds a base NioGroup for use in different transports. This class creates and starts the underlying selectors. Different protocols or transports are established by passing the ChannelFactory to the bindServerChannel or openChannel methods. This allows a TcpChannelFactory to be passed which will create and register channels that support the elasticsearch tcp binary protocol or a channel factory that will create http channels (or other).
1 parent eab1e02 commit 3a7b011

File tree

7 files changed

+241
-114
lines changed

7 files changed

+241
-114
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport.nio;
21+
22+
import org.apache.logging.log4j.Logger;
23+
import org.apache.lucene.util.IOUtils;
24+
import org.elasticsearch.transport.nio.channel.ChannelFactory;
25+
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
26+
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
27+
28+
import java.io.IOException;
29+
import java.net.InetSocketAddress;
30+
import java.util.ArrayList;
31+
import java.util.concurrent.ThreadFactory;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.function.BiFunction;
34+
import java.util.function.Function;
35+
import java.util.function.Supplier;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
38+
39+
/**
40+
* The NioGroup is a group of selectors for interfacing with java nio. When it is started it will create the
41+
* configured number of socket and acceptor selectors. Each selector will be running in a dedicated thread.
42+
* Server connections can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)}
43+
* method. Client connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)}
44+
* method.
45+
* <p>
46+
* The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method
47+
* when the channel is created. This is what allows an NioGroup to support different channel types.
48+
*/
49+
public class NioGroup implements AutoCloseable {
50+
51+
52+
private final ArrayList<AcceptingSelector> acceptors;
53+
private final RoundRobinSupplier<AcceptingSelector> acceptorSupplier;
54+
55+
private final ArrayList<SocketSelector> socketSelectors;
56+
private final RoundRobinSupplier<SocketSelector> socketSelectorSupplier;
57+
58+
private final AtomicBoolean isOpen = new AtomicBoolean(true);
59+
60+
public NioGroup(Logger logger, ThreadFactory acceptorThreadFactory, int acceptorCount,
61+
BiFunction<Logger, Supplier<SocketSelector>, AcceptorEventHandler> acceptorEventHandlerFunction,
62+
ThreadFactory socketSelectorThreadFactory, int socketSelectorCount,
63+
Function<Logger, SocketEventHandler> socketEventHandlerFunction) throws IOException {
64+
acceptors = new ArrayList<>(acceptorCount);
65+
socketSelectors = new ArrayList<>(socketSelectorCount);
66+
67+
try {
68+
for (int i = 0; i < socketSelectorCount; ++i) {
69+
SocketSelector selector = new SocketSelector(socketEventHandlerFunction.apply(logger));
70+
socketSelectors.add(selector);
71+
}
72+
startSelectors(socketSelectors, socketSelectorThreadFactory);
73+
74+
for (int i = 0; i < acceptorCount; ++i) {
75+
SocketSelector[] childSelectors = this.socketSelectors.toArray(new SocketSelector[this.socketSelectors.size()]);
76+
Supplier<SocketSelector> selectorSupplier = new RoundRobinSupplier<>(childSelectors);
77+
AcceptingSelector acceptor = new AcceptingSelector(acceptorEventHandlerFunction.apply(logger, selectorSupplier));
78+
acceptors.add(acceptor);
79+
}
80+
startSelectors(acceptors, acceptorThreadFactory);
81+
} catch (Exception e) {
82+
try {
83+
close();
84+
} catch (Exception e1) {
85+
e.addSuppressed(e1);
86+
}
87+
throw e;
88+
}
89+
90+
socketSelectorSupplier = new RoundRobinSupplier<>(socketSelectors.toArray(new SocketSelector[socketSelectors.size()]));
91+
acceptorSupplier = new RoundRobinSupplier<>(acceptors.toArray(new AcceptingSelector[acceptors.size()]));
92+
}
93+
94+
public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
95+
throws IOException {
96+
ensureOpen();
97+
if (acceptors.isEmpty()) {
98+
throw new IllegalArgumentException("There are no acceptors configured. Without acceptors, server channels are not supported.");
99+
}
100+
return factory.openNioServerSocketChannel(address, acceptorSupplier.get());
101+
}
102+
103+
public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
104+
ensureOpen();
105+
return factory.openNioChannel(address, socketSelectorSupplier.get());
106+
}
107+
108+
@Override
109+
public void close() throws IOException {
110+
if (isOpen.compareAndSet(true, false)) {
111+
IOUtils.close(Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList()));
112+
}
113+
}
114+
115+
private static <S extends ESSelector> void startSelectors(Iterable<S> selectors, ThreadFactory threadFactory) {
116+
for (ESSelector acceptor : selectors) {
117+
if (acceptor.isRunning() == false) {
118+
threadFactory.newThread(acceptor::runLoop).start();
119+
acceptor.isRunningFuture().actionGet();
120+
}
121+
}
122+
}
123+
124+
private void ensureOpen() {
125+
if (isOpen.get() == false) {
126+
throw new IllegalStateException("NioGroup is closed.");
127+
}
128+
}
129+
}

test/framework/src/main/java/org/elasticsearch/transport/nio/NioShutdown.java

Lines changed: 0 additions & 55 deletions
This file was deleted.

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport.nio;
2121

22+
import org.apache.logging.log4j.Logger;
2223
import org.elasticsearch.ElasticsearchException;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -46,9 +47,7 @@
4647
import java.io.IOException;
4748
import java.net.InetSocketAddress;
4849
import java.nio.ByteBuffer;
49-
import java.util.ArrayList;
5050
import java.util.concurrent.ConcurrentMap;
51-
import java.util.concurrent.ThreadFactory;
5251
import java.util.function.Consumer;
5352
import java.util.function.Supplier;
5453

@@ -71,11 +70,8 @@ public class NioTransport extends TcpTransport {
7170

7271
private final PageCacheRecycler pageCacheRecycler;
7372
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
74-
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
75-
private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>();
76-
private RoundRobinSelectorSupplier clientSelectorSupplier;
77-
private TcpChannelFactory clientChannelFactory;
78-
private int acceptorNumber;
73+
private volatile NioGroup nioGroup;
74+
private volatile TcpChannelFactory clientChannelFactory;
7975

8076
public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
8177
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
@@ -87,14 +83,13 @@ public NioTransport(Settings settings, ThreadPool threadPool, NetworkService net
8783
@Override
8884
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
8985
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
90-
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
91-
return channelFactory.openNioServerSocketChannel(address, selector);
86+
return nioGroup.bindServerChannel(address, channelFactory);
9287
}
9388

9489
@Override
9590
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
9691
throws IOException {
97-
TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
92+
TcpNioSocketChannel channel = nioGroup.openChannel(node.getAddress().address(), clientChannelFactory);
9893
channel.addConnectListener(connectListener);
9994
return channel;
10095
}
@@ -103,42 +98,19 @@ protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue conn
10398
protected void doStart() {
10499
boolean success = false;
105100
try {
106-
int workerCount = NioTransport.NIO_WORKER_COUNT.get(settings);
107-
for (int i = 0; i < workerCount; ++i) {
108-
SocketSelector selector = new SocketSelector(getSocketEventHandler());
109-
socketSelectors.add(selector);
101+
int acceptorCount = 0;
102+
boolean useNetworkServer = NetworkService.NETWORK_SERVER.get(settings);
103+
if (useNetworkServer) {
104+
acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
110105
}
106+
nioGroup = new NioGroup(logger, daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
107+
AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX),
108+
NioTransport.NIO_WORKER_COUNT.get(settings), this::getSocketEventHandler);
111109

112-
for (SocketSelector selector : socketSelectors) {
113-
if (selector.isRunning() == false) {
114-
ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
115-
threadFactory.newThread(selector::runLoop).start();
116-
selector.isRunningFuture().actionGet();
117-
}
118-
}
119-
120-
Consumer<NioSocketChannel> clientContextSetter = getContextSetter("client-socket");
121-
clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
122110
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
123-
clientChannelFactory = new TcpChannelFactory(clientProfileSettings, clientContextSetter, getServerContextSetter());
124-
125-
if (NetworkService.NETWORK_SERVER.get(settings)) {
126-
int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
127-
for (int i = 0; i < acceptorCount; ++i) {
128-
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
129-
AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, selectorSupplier);
130-
AcceptingSelector acceptor = new AcceptingSelector(eventHandler);
131-
acceptors.add(acceptor);
132-
}
133-
134-
for (AcceptingSelector acceptor : acceptors) {
135-
if (acceptor.isRunning() == false) {
136-
ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX);
137-
threadFactory.newThread(acceptor::runLoop).start();
138-
acceptor.isRunningFuture().actionGet();
139-
}
140-
}
111+
clientChannelFactory = new TcpChannelFactory(clientProfileSettings, getContextSetter("client"), getServerContextSetter());
141112

113+
if (useNetworkServer) {
142114
// loop through all profiles and start them up, special handling for default one
143115
for (ProfileSettings profileSettings : profileSettings) {
144116
String profileName = profileSettings.profileName;
@@ -162,14 +134,15 @@ protected void doStart() {
162134

163135
@Override
164136
protected void stopInternal() {
165-
NioShutdown nioShutdown = new NioShutdown(logger);
166-
nioShutdown.orderlyShutdown(acceptors, socketSelectors);
167-
137+
try {
138+
nioGroup.close();
139+
} catch (Exception e) {
140+
logger.warn("unexpected exception while stopping nio group", e);
141+
}
168142
profileToChannelFactory.clear();
169-
socketSelectors.clear();
170143
}
171144

172-
protected SocketEventHandler getSocketEventHandler() {
145+
protected SocketEventHandler getSocketEventHandler(Logger logger) {
173146
return new SocketEventHandler(logger);
174147
}
175148

@@ -189,8 +162,7 @@ private Consumer<NioSocketChannel> getContextSetter(String profileName) {
189162
}
190163

191164
private void acceptChannel(NioSocketChannel channel) {
192-
TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel;
193-
serverAcceptedChannel(tcpChannel);
165+
serverAcceptedChannel((TcpNioSocketChannel) channel);
194166

195167
}
196168

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,21 @@
1919

2020
package org.elasticsearch.transport.nio;
2121

22-
import java.util.ArrayList;
2322
import java.util.concurrent.atomic.AtomicInteger;
2423
import java.util.function.Supplier;
2524

26-
public class RoundRobinSelectorSupplier implements Supplier<SocketSelector> {
25+
public class RoundRobinSupplier<S> implements Supplier<S> {
2726

28-
private final ArrayList<SocketSelector> selectors;
27+
private final S[] selectors;
2928
private final int count;
3029
private AtomicInteger counter = new AtomicInteger(0);
3130

32-
public RoundRobinSelectorSupplier(ArrayList<SocketSelector> selectors) {
33-
this.count = selectors.size();
31+
public RoundRobinSupplier(S[] selectors) {
32+
this.count = selectors.length;
3433
this.selectors = selectors;
3534
}
3635

37-
public SocketSelector get() {
38-
return selectors.get(counter.getAndIncrement() % count);
36+
public S get() {
37+
return selectors[counter.getAndIncrement() % count];
3938
}
4039
}

test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void setUpHandler() throws IOException {
5757
acceptedChannelCallback = mock(Consumer.class);
5858
ArrayList<SocketSelector> selectors = new ArrayList<>();
5959
selectors.add(socketSelector);
60-
handler = new AcceptorEventHandler(logger, new RoundRobinSelectorSupplier(selectors));
60+
handler = new AcceptorEventHandler(logger, new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()])));
6161

6262
AcceptingSelector selector = mock(AcceptingSelector.class);
6363
channel = new DoNotRegisterServerChannel(mock(ServerSocketChannel.class), channelFactory, selector);

0 commit comments

Comments
 (0)