Skip to content

Commit a28dc6e

Browse files
committed
Refactor host resolution to use executor service
This commit refactors the resolution of unicast hosts from hostnames to IP addresses to use an executor service instead of the generic thread pool.
1 parent 37c73aa commit a28dc6e

File tree

6 files changed

+109
-55
lines changed

6 files changed

+109
-55
lines changed

core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
9898
public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
9999
Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope);
100100
public static final Setting<TimeValue> DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
101-
Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(30), Property.NodeScope);
101+
Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(1), Property.NodeScope);
102102

103103
// these limits are per-address
104104
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
@@ -131,7 +131,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
131131

132132
private final UnicastHostsProvider hostsProvider;
133133

134-
private final ExecutorService unicastConnectExecutor;
134+
private final ExecutorService unicastZenPingExecutorService;
135135

136136
private final TimeValue resolveTimeout;
137137

@@ -166,10 +166,14 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
166166
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME,
167167
new UnicastPingRequestHandler());
168168

169-
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
170-
unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS,
171-
threadFactory, threadPool.getThreadContext());
172-
169+
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
170+
unicastZenPingExecutorService = EsExecutors.newScaling(
171+
"unicast_connect",
172+
0, concurrentConnects,
173+
60,
174+
TimeUnit.SECONDS,
175+
threadFactory,
176+
threadPool.getThreadContext());
173177
}
174178

175179
private static class ResolvedHostname {
@@ -209,9 +213,9 @@ public UnknownHostException failure() {
209213
/**
210214
* Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses
211215
* if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done
212-
* in parallel using the generic thread pool from the specified thread pool up to the specified resolve timeout.
216+
* in parallel using specified executor service up to the specified resolve timeout.
213217
*
214-
* @param threadPool the thread pool used to parallelize hostname lookups
218+
* @param executorService the executor service used to parallelize hostname lookups
215219
* @param logger logger used for logging messages regarding hostname lookups
216220
* @param hosts the hosts to resolve
217221
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
@@ -221,14 +225,14 @@ public UnknownHostException failure() {
221225
* @return a list of discovery nodes with resolved transport addresses
222226
*/
223227
public static List<DiscoveryNode> resolveDiscoveryNodes(
224-
final ThreadPool threadPool,
228+
final ExecutorService executorService,
225229
final Logger logger,
226230
final List<String> hosts,
227231
final int limitPortCounts,
228232
final TransportService transportService,
229233
final Supplier<String> idGenerator,
230234
final TimeValue resolveTimeout) throws InterruptedException {
231-
Objects.requireNonNull(threadPool);
235+
Objects.requireNonNull(executorService);
232236
Objects.requireNonNull(logger);
233237
Objects.requireNonNull(hosts);
234238
Objects.requireNonNull(transportService);
@@ -241,7 +245,7 @@ public static List<DiscoveryNode> resolveDiscoveryNodes(
241245
final List<Callable<ResolvedHostname>> callables =
242246
hosts.stream().map(hn -> lookup(hn, transportService, limitPortCounts)).collect(Collectors.toList());
243247
final List<Future<ResolvedHostname>> futures =
244-
threadPool.generic().invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
248+
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
245249
final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
246250
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
247251
// hostname with the corresponding task by iterating together
@@ -300,7 +304,7 @@ private static Callable<ResolvedHostname> lookup(
300304

301305
@Override
302306
public void close() {
303-
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
307+
ThreadPool.terminate(unicastZenPingExecutorService, 0, TimeUnit.SECONDS);
304308
Releasables.close(receivedResponses.values());
305309
closed = true;
306310
}
@@ -335,25 +339,38 @@ Collection<PingResponse> pingAndWait(TimeValue duration) {
335339

336340
@Override
337341
public void ping(final PingListener listener, final TimeValue duration) {
342+
final List<DiscoveryNode> resolvedDiscoveryNodes;
343+
try {
344+
resolvedDiscoveryNodes = resolveDiscoveryNodes(
345+
unicastZenPingExecutorService,
346+
logger,
347+
configuredHosts,
348+
limitPortCounts,
349+
transportService,
350+
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
351+
resolveTimeout);
352+
} catch (InterruptedException e) {
353+
throw new RuntimeException(e);
354+
}
338355
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
339356
try {
340357
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
341358
try {
342-
sendPings(duration, null, sendPingsHandler);
359+
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
343360
} catch (RejectedExecutionException e) {
344361
logger.debug("Ping execution rejected", e);
345-
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
362+
// The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
346363
// But don't bail here, we can retry later on after the send ping has been scheduled.
347364
}
348365

349366
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
350367
@Override
351368
protected void doRun() {
352-
sendPings(duration, null, sendPingsHandler);
369+
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
353370
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
354371
@Override
355372
protected void doRun() throws Exception {
356-
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler);
373+
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes);
357374
sendPingsHandler.close();
358375
listener.onPing(sendPingsHandler.pingCollection().toList());
359376
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
@@ -418,7 +435,11 @@ public void close() {
418435
}
419436

420437

421-
void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) {
438+
void sendPings(
439+
final TimeValue timeout,
440+
@Nullable TimeValue waitTime,
441+
final SendPingsHandler sendPingsHandler,
442+
final List<DiscoveryNode> resolvedDiscoveryNodes) {
422443
final UnicastPingRequest pingRequest = new UnicastPingRequest();
423444
pingRequest.id = sendPingsHandler.id();
424445
pingRequest.timeout = timeout;
@@ -444,22 +465,8 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send
444465
List<DiscoveryNode> sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet);
445466

446467
// add the configured hosts first
447-
final List<DiscoveryNode> nodesToPing = new ArrayList<>();
448-
final List<DiscoveryNode> resolvedDiscoveryNodes;
449-
try {
450-
resolvedDiscoveryNodes = resolveDiscoveryNodes(
451-
threadPool,
452-
logger,
453-
configuredHosts,
454-
limitPortCounts,
455-
transportService,
456-
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
457-
resolveTimeout);
458-
nodesToPing.addAll(resolvedDiscoveryNodes);
459-
} catch (final InterruptedException e) {
460-
throw new RuntimeException(e);
461-
}
462-
468+
final List<DiscoveryNode> nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size());
469+
nodesToPing.addAll(resolvedDiscoveryNodes);
463470
nodesToPing.addAll(sortedNodesToPing);
464471

465472
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
@@ -497,7 +504,7 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send
497504
}
498505
// fork the connection to another thread
499506
final DiscoveryNode finalNodeToSend = nodeToSend;
500-
unicastConnectExecutor.execute(new Runnable() {
507+
unicastZenPingExecutorService.execute(new Runnable() {
501508
@Override
502509
public void run() {
503510
if (sendPingsHandler.isClosed()) {

core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.unit.TimeValue;
3636
import org.elasticsearch.common.util.BigArrays;
3737
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
38+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3839
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
3940
import org.elasticsearch.test.ESTestCase;
4041
import org.elasticsearch.test.VersionUtils;
@@ -66,6 +67,9 @@
6667
import java.util.concurrent.ConcurrentMap;
6768
import java.util.concurrent.CountDownLatch;
6869
import java.util.concurrent.ExecutionException;
70+
import java.util.concurrent.ExecutorService;
71+
import java.util.concurrent.ThreadFactory;
72+
import java.util.concurrent.TimeUnit;
6973
import java.util.concurrent.atomic.AtomicInteger;
7074
import java.util.function.BiFunction;
7175
import java.util.stream.Collectors;
@@ -86,13 +90,17 @@
8690
public class UnicastZenPingTests extends ESTestCase {
8791

8892
private ThreadPool threadPool;
93+
private ExecutorService executorService;
8994
// close in reverse order as opened
9095
private Stack<Closeable> closeables;
9196

9297
@Before
9398
public void setUp() throws Exception {
9499
super.setUp();
95100
threadPool = new TestThreadPool(getClass().getName());
101+
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]");
102+
executorService =
103+
EsExecutors.newScaling(getClass().getName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
96104
closeables = new Stack<>();
97105
}
98106

@@ -106,6 +114,7 @@ public void tearDown() throws Exception {
106114
}
107115
IOUtils.close(reverse);
108116
} finally {
117+
terminate(executorService);
109118
terminate(threadPool);
110119
super.tearDown();
111120
}
@@ -382,7 +391,7 @@ public void testPortLimit() throws InterruptedException {
382391
final AtomicInteger idGenerator = new AtomicInteger();
383392
final int limitPortCounts = randomIntBetween(1, 10);
384393
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
385-
threadPool,
394+
executorService,
386395
logger,
387396
Collections.singletonList("127.0.0.1"),
388397
limitPortCounts,
@@ -426,7 +435,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
426435
final AtomicInteger idGenerator = new AtomicInteger();
427436

428437
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
429-
threadPool,
438+
executorService,
430439
logger,
431440
Arrays.asList(hostname),
432441
1,
@@ -478,7 +487,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
478487
final TimeValue resolveTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 100));
479488
try {
480489
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
481-
threadPool,
490+
executorService,
482491
logger,
483492
Arrays.asList("hostname1", "hostname2"),
484493
1,
@@ -515,7 +524,7 @@ public void testInvalidHosts() throws InterruptedException {
515524
closeables.push(transportService);
516525
final AtomicInteger idGenerator = new AtomicInteger();
517526
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
518-
threadPool,
527+
executorService,
519528
logger,
520529
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),
521530
1,

docs/reference/modules/discovery/zen.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ Unicast discovery provides the following settings with the `discovery.zen.ping.u
4343
value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be
4444
bracketed. Defaults to `127.0.0.1, [::1]`
4545
|`hosts.resolve_timeout` |The amount of time to wait for DNS lookups on each round of pinging. Specified as
46-
<<time-units, time units>>. Defaults to 30s.
46+
<<time-units, time units>>. Defaults to 1s.
4747
|=======================================================================
4848

4949
The unicast discovery uses the <<modules-transport,transport>> module to perform the discovery.

plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.elasticsearch.common.logging.Loggers;
2727
import org.elasticsearch.common.network.NetworkService;
2828
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2930
import org.elasticsearch.discovery.DiscoveryModule;
3031
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
32+
import org.elasticsearch.discovery.zen.UnicastZenPing;
3133
import org.elasticsearch.env.Environment;
3234
import org.elasticsearch.plugins.DiscoveryPlugin;
3335
import org.elasticsearch.plugins.Plugin;
@@ -37,9 +39,13 @@
3739
import org.elasticsearch.transport.TransportService;
3840
import org.elasticsearch.watcher.ResourceWatcherService;
3941

42+
import java.io.IOException;
4043
import java.util.Collection;
4144
import java.util.Collections;
4245
import java.util.Map;
46+
import java.util.concurrent.ExecutorService;
47+
import java.util.concurrent.ThreadFactory;
48+
import java.util.concurrent.TimeUnit;
4349
import java.util.function.Supplier;
4450

4551
/**
@@ -53,7 +59,7 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin
5359
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
5460

5561
private final Settings settings;
56-
private ThreadPool threadPool;
62+
private ExecutorService fileBasedDiscoveryExecutorService;
5763

5864
public FileBasedDiscoveryPlugin(Settings settings) {
5965
this.settings = settings;
@@ -67,14 +73,31 @@ public Collection<Object> createComponents(
6773
ResourceWatcherService resourceWatcherService,
6874
ScriptService scriptService,
6975
SearchRequestParsers searchRequestParsers) {
70-
this.threadPool = threadPool;
76+
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
77+
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
78+
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
79+
"file_based_discovery_resolve",
80+
0,
81+
concurrentConnects,
82+
60,
83+
TimeUnit.SECONDS,
84+
threadFactory,
85+
threadPool.getThreadContext());
86+
7187
return Collections.emptyList();
7288
}
7389

90+
@Override
91+
public void close() throws IOException {
92+
ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS);
93+
}
94+
7495
@Override
7596
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
7697
NetworkService networkService) {
77-
return Collections.singletonMap("file", () -> new FileBasedUnicastHostsProvider(settings, transportService, threadPool));
98+
return Collections.singletonMap(
99+
"file",
100+
() -> new FileBasedUnicastHostsProvider(settings, transportService, fileBasedDiscoveryExecutorService));
78101
}
79102

80103
@Override

plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.common.unit.TimeValue;
2828
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
2929
import org.elasticsearch.env.Environment;
30-
import org.elasticsearch.threadpool.ThreadPool;
3130
import org.elasticsearch.transport.TransportService;
3231

3332
import java.io.FileNotFoundException;
@@ -38,6 +37,7 @@
3837
import java.util.ArrayList;
3938
import java.util.Collections;
4039
import java.util.List;
40+
import java.util.concurrent.ExecutorService;
4141
import java.util.concurrent.atomic.AtomicLong;
4242
import java.util.stream.Collectors;
4343
import java.util.stream.Stream;
@@ -63,18 +63,18 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
6363
static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_";
6464

6565
private final TransportService transportService;
66-
private final ThreadPool threadPool;
66+
private final ExecutorService executorService;
6767

6868
private final Path unicastHostsFilePath;
6969

7070
private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node
7171

7272
private final TimeValue resolveTimeout;
7373

74-
FileBasedUnicastHostsProvider(Settings settings, TransportService transportService, ThreadPool threadPool) {
74+
FileBasedUnicastHostsProvider(Settings settings, TransportService transportService, ExecutorService executorService) {
7575
super(settings);
7676
this.transportService = transportService;
77-
this.threadPool = threadPool;
77+
this.executorService = executorService;
7878
this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
7979
this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
8080
}
@@ -98,7 +98,7 @@ public List<DiscoveryNode> buildDynamicNodes() {
9898
final List<DiscoveryNode> discoNodes = new ArrayList<>();
9999
try {
100100
discoNodes.addAll(resolveDiscoveryNodes(
101-
threadPool,
101+
executorService,
102102
logger,
103103
hostsList,
104104
1,

0 commit comments

Comments
 (0)