Skip to content

Commit 7fd9e24

Browse files
committed
Lazy resolve unicast hosts
Today we eagerly resolve unicast hosts. This means that if DNS changes, we will never find the host at the new address. Moreover, a single host failng to resolve causes startup to abort. This commit introduces lazy resolution of unicast hosts. If a DNS entry changes, there is an opportunity for the host to be discovered. Note that under the Java security manager, there is a default positive cache of infinity for resolved hosts; this means that if a user does want to operate in an environment where DNS can change, they must adjust networkaddress.cache.ttl in their security policy. And if a host fails to resolve, we warn log the hostname but continue pinging other configured hosts.
1 parent 914664d commit 7fd9e24

File tree

10 files changed

+289
-96
lines changed

10 files changed

+289
-96
lines changed

core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.node.DiscoveryNode;
2929
import org.elasticsearch.cluster.node.DiscoveryNodes;
3030
import org.elasticsearch.common.Nullable;
31+
import org.elasticsearch.common.SuppressLoggerChecks;
3132
import org.elasticsearch.common.UUIDs;
3233
import org.elasticsearch.common.component.AbstractComponent;
3334
import org.elasticsearch.common.io.stream.StreamInput;
@@ -57,6 +58,7 @@
5758
import org.elasticsearch.transport.TransportService;
5859

5960
import java.io.IOException;
61+
import java.net.UnknownHostException;
6062
import java.util.ArrayList;
6163
import java.util.Arrays;
6264
import java.util.Collection;
@@ -74,6 +76,7 @@
7476
import java.util.concurrent.atomic.AtomicInteger;
7577
import java.util.concurrent.atomic.AtomicReference;
7678
import java.util.function.Function;
79+
import java.util.stream.Collectors;
7780

7881
import static java.util.Collections.emptyList;
7982
import static java.util.Collections.emptyMap;
@@ -100,7 +103,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
100103

101104
private final int concurrentConnects;
102105

103-
private final DiscoveryNode[] configuredTargetNodes;
106+
private final List<String> configuredHosts;
107+
108+
private final int limitPortCounts;
104109

105110
private volatile PingContextProvider contextProvider;
106111

@@ -114,7 +119,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
114119

115120
private final Map<Integer, SendPingsHandler> receivedResponses = newConcurrentMap();
116121

117-
// a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
122+
// a list of temporal responses a node will return for a request (holds requests from other configuredHosts)
118123
private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();
119124

120125
private final UnicastHostsProvider hostsProvider;
@@ -132,24 +137,17 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
132137
this.hostsProvider = unicastHostsProvider;
133138

134139
this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
135-
List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
136-
final int limitPortCounts;
140+
final List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
137141
if (hosts.isEmpty()) {
138142
// if unicast hosts are not specified, fill with simple defaults on the local machine
143+
configuredHosts = transportService.getLocalAddresses();
139144
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
140-
hosts.addAll(transportService.getLocalAddresses());
141145
} else {
146+
configuredHosts = hosts;
142147
// we only limit to 1 addresses, makes no sense to ping 100 ports
143148
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
144149
}
145-
146-
logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
147-
List<DiscoveryNode> configuredTargetNodes = new ArrayList<>();
148-
for (final String host : hosts) {
149-
configuredTargetNodes.addAll(resolveDiscoveryNodes(host, limitPortCounts, transportService,
150-
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#"));
151-
}
152-
this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]);
150+
logger.debug("using initial hosts {}, with concurrent_connects [{}]", configuredHosts, concurrentConnects);
153151

154152
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME,
155153
new UnicastPingRequestHandler());
@@ -160,27 +158,26 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
160158
}
161159

162160
/**
163-
* Resolves a host to a list of discovery nodes. The host is resolved into a transport
164-
* address (or a collection of addresses if the number of ports is greater than one) and
165-
* the transport addresses are used to created discovery nodes.
161+
* Resolves a host to a list of discovery nodes. The host is resolved into a transport address (or a collection of addresses if the
162+
* number of ports is greater than one) and the transport addresses are used to created discovery nodes.
166163
*
167-
* @param host the host to resolve
168-
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
164+
* @param host the host to resolve
165+
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
169166
* @param transportService the transport service
170-
* @param idGenerator the generator to supply unique ids for each discovery node
167+
* @param idGenerator the generator to supply unique ids for each discovery node
171168
* @return a list of discovery nodes with resolved transport addresses
169+
* @throws UnknownHostException if the host fails to resolve to an address
172170
*/
173-
public static List<DiscoveryNode> resolveDiscoveryNodes(final String host, final int limitPortCounts,
174-
final TransportService transportService, final Supplier<String> idGenerator) {
175-
List<DiscoveryNode> discoveryNodes = new ArrayList<>();
176-
try {
177-
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
178-
for (TransportAddress address : addresses) {
179-
discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(),
180-
Version.CURRENT.minimumCompatibilityVersion()));
181-
}
182-
} catch (Exception e) {
183-
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);
171+
public static List<DiscoveryNode> resolveDiscoveryNodes(
172+
final String host,
173+
final int limitPortCounts,
174+
final TransportService transportService,
175+
final Supplier<String> idGenerator) throws UnknownHostException {
176+
final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
177+
final TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
178+
for (TransportAddress address : addresses) {
179+
discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(),
180+
Version.CURRENT.minimumCompatibilityVersion()));
184181
}
185182
return discoveryNodes;
186183
}
@@ -330,8 +327,25 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send
330327
// sort the nodes by likelihood of being an active master
331328
List<DiscoveryNode> sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet);
332329

333-
// new add the unicast targets first
334-
List<DiscoveryNode> nodesToPing = CollectionUtils.arrayAsArrayList(configuredTargetNodes);
330+
// add the configured hosts first
331+
final List<DiscoveryNode> nodesToPing = new ArrayList<>();
332+
for (final String host : configuredHosts) {
333+
try {
334+
final List<DiscoveryNode> resolvedDiscoveryNodes = resolveDiscoveryNodes(
335+
host,
336+
limitPortCounts,
337+
transportService,
338+
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#");
339+
logger.trace(
340+
"resolved host [{}] to {}",
341+
() -> host,
342+
() -> resolvedDiscoveryNodes.stream().map(UnicastZenPing::formatResolvedDiscoveryNode).collect(Collectors.toList()));
343+
nodesToPing.addAll(resolvedDiscoveryNodes);
344+
} catch (final UnknownHostException e) {
345+
logger.warn("failed to resolve host [" + host + "]", e);
346+
}
347+
}
348+
335349
nodesToPing.addAll(sortedNodesToPing);
336350

337351
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
@@ -430,6 +444,11 @@ public void run() {
430444
}
431445
}
432446

447+
private static String formatResolvedDiscoveryNode(final DiscoveryNode discoveryNode) {
448+
final TransportAddress transportAddress = discoveryNode.getAddress();
449+
return transportAddress.getAddress() + "@" + transportAddress.getPort();
450+
}
451+
433452
private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest,
434453
final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
435454
logger.trace("[{}] sending to {}", id, nodeToSend);

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.carrotsearch.hppc.IntHashSet;
2222
import com.carrotsearch.hppc.IntSet;
23-
2423
import org.apache.logging.log4j.message.ParameterizedMessage;
2524
import org.apache.logging.log4j.util.Supplier;
2625
import org.apache.lucene.util.IOUtils;
@@ -715,7 +714,7 @@ public static int resolvePublishPort(String profileName, Settings settings, Sett
715714
}
716715

717716
@Override
718-
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
717+
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
719718
return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit);
720719
}
721720

core/src/main/java/org/elasticsearch/transport/Transport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.transport.TransportAddress;
3030

3131
import java.io.IOException;
32+
import java.net.UnknownHostException;
3233
import java.util.List;
3334
import java.util.Map;
3435

@@ -53,7 +54,7 @@ public interface Transport extends LifecycleComponent {
5354
/**
5455
* Returns an address from its string representation.
5556
*/
56-
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception;
57+
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException;
5758

5859
/**
5960
* Is the address type supported.

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.threadpool.ThreadPool;
5353

5454
import java.io.IOException;
55+
import java.net.UnknownHostException;
5556
import java.util.Arrays;
5657
import java.util.Collections;
5758
import java.util.LinkedHashMap;
@@ -617,7 +618,7 @@ private long newRequestId() {
617618
return requestIds.getAndIncrement();
618619
}
619620

620-
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
621+
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
621622
return transport.addressesFromString(address, perAddressLimit);
622623
}
623624

core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.transport.TransportServiceAdapter;
3838

3939
import java.io.IOException;
40+
import java.net.UnknownHostException;
4041
import java.util.Collections;
4142
import java.util.Map;
4243
import java.util.Random;
@@ -133,7 +134,7 @@ public BoundTransportAddress boundAddress() {
133134
}
134135

135136
@Override
136-
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
137+
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
137138
throw new UnsupportedOperationException();
138139
}
139140

core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.Before;
4343

4444
import java.io.IOException;
45+
import java.net.UnknownHostException;
4546
import java.util.ArrayList;
4647
import java.util.Arrays;
4748
import java.util.Collections;
@@ -188,7 +189,7 @@ public Map<String, BoundTransportAddress> profileBoundAddresses() {
188189
}
189190

190191
@Override
191-
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
192+
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
192193
return new TransportAddress[0];
193194
}
194195

0 commit comments

Comments
 (0)