From 7fd9e24b71842f548c278112d567dbd22fc22782 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 16 Nov 2016 16:07:03 -0500 Subject: [PATCH 01/10] Lazy resolve unicast hosts Today we eagerly resolve unicast hosts. This means that if DNS changes, we will never find the host at the new address. Moreover, a single host failng to resolve causes startup to abort. This commit introduces lazy resolution of unicast hosts. If a DNS entry changes, there is an opportunity for the host to be discovered. Note that under the Java security manager, there is a default positive cache of infinity for resolved hosts; this means that if a user does want to operate in an environment where DNS can change, they must adjust networkaddress.cache.ttl in their security policy. And if a host fails to resolve, we warn log the hostname but continue pinging other configured hosts. --- .../discovery/zen/UnicastZenPing.java | 83 ++++-- .../elasticsearch/transport/TcpTransport.java | 3 +- .../elasticsearch/transport/Transport.java | 3 +- .../transport/TransportService.java | 3 +- .../transport/FailAndRetryMockTransport.java | 3 +- .../cluster/NodeConnectionsServiceTests.java | 3 +- .../discovery/zen/UnicastZenPingTests.java | 277 ++++++++++++++---- .../discovery/ec2/Ec2DiscoveryTests.java | 4 +- .../test/transport/CapturingTransport.java | 3 +- .../test/transport/MockTransportService.java | 3 +- 10 files changed, 289 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 7794c58ddd3ab..b453118b9428b 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.SuppressLoggerChecks; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -57,6 +58,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -74,6 +76,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -100,7 +103,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final int concurrentConnects; - private final DiscoveryNode[] configuredTargetNodes; + private final List configuredHosts; + + private final int limitPortCounts; private volatile PingContextProvider contextProvider; @@ -114,7 +119,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final Map receivedResponses = newConcurrentMap(); - // a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes) + // a list of temporal responses a node will return for a request (holds requests from other configuredHosts) private final Queue temporalResponses = ConcurrentCollections.newQueue(); private final UnicastHostsProvider hostsProvider; @@ -132,24 +137,17 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService this.hostsProvider = unicastHostsProvider; this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); - List hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); - final int limitPortCounts; + final List hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); if (hosts.isEmpty()) { // if unicast hosts are not specified, fill with simple defaults on the local machine + configuredHosts = transportService.getLocalAddresses(); limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; - hosts.addAll(transportService.getLocalAddresses()); } else { + configuredHosts = hosts; // we only limit to 1 addresses, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } - - logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects); - List configuredTargetNodes = new ArrayList<>(); - for (final String host : hosts) { - configuredTargetNodes.addAll(resolveDiscoveryNodes(host, limitPortCounts, transportService, - () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#")); - } - this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]); + logger.debug("using initial hosts {}, with concurrent_connects [{}]", configuredHosts, concurrentConnects); transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); @@ -160,27 +158,26 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService } /** - * Resolves a host to a list of discovery nodes. The host is resolved into a transport - * address (or a collection of addresses if the number of ports is greater than one) and - * the transport addresses are used to created discovery nodes. + * Resolves a host to a list of discovery nodes. The host is resolved into a transport address (or a collection of addresses if the + * number of ports is greater than one) and the transport addresses are used to created discovery nodes. * - * @param host the host to resolve - * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) + * @param host the host to resolve + * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) * @param transportService the transport service - * @param idGenerator the generator to supply unique ids for each discovery node + * @param idGenerator the generator to supply unique ids for each discovery node * @return a list of discovery nodes with resolved transport addresses + * @throws UnknownHostException if the host fails to resolve to an address */ - public static List resolveDiscoveryNodes(final String host, final int limitPortCounts, - final TransportService transportService, final Supplier idGenerator) { - List discoveryNodes = new ArrayList<>(); - try { - TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts); - for (TransportAddress address : addresses) { - discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(), - Version.CURRENT.minimumCompatibilityVersion())); - } - } catch (Exception e) { - throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e); + public static List resolveDiscoveryNodes( + final String host, + final int limitPortCounts, + final TransportService transportService, + final Supplier idGenerator) throws UnknownHostException { + final List discoveryNodes = new ArrayList<>(); + final TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts); + for (TransportAddress address : addresses) { + discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(), + Version.CURRENT.minimumCompatibilityVersion())); } return discoveryNodes; } @@ -330,8 +327,25 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send // sort the nodes by likelihood of being an active master List sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); - // new add the unicast targets first - List nodesToPing = CollectionUtils.arrayAsArrayList(configuredTargetNodes); + // add the configured hosts first + final List nodesToPing = new ArrayList<>(); + for (final String host : configuredHosts) { + try { + final List resolvedDiscoveryNodes = resolveDiscoveryNodes( + host, + limitPortCounts, + transportService, + () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#"); + logger.trace( + "resolved host [{}] to {}", + () -> host, + () -> resolvedDiscoveryNodes.stream().map(UnicastZenPing::formatResolvedDiscoveryNode).collect(Collectors.toList())); + nodesToPing.addAll(resolvedDiscoveryNodes); + } catch (final UnknownHostException e) { + logger.warn("failed to resolve host [" + host + "]", e); + } + } + nodesToPing.addAll(sortedNodesToPing); final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); @@ -430,6 +444,11 @@ public void run() { } } + private static String formatResolvedDiscoveryNode(final DiscoveryNode discoveryNode) { + final TransportAddress transportAddress = discoveryNode.getAddress(); + return transportAddress.getAddress() + "@" + transportAddress.getPort(); + } + private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { logger.trace("[{}] sending to {}", id, nodeToSend); diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 086c48c411476..a68863d0e52ef 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -20,7 +20,6 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; - import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; @@ -715,7 +714,7 @@ public static int resolvePublishPort(String profileName, Settings settings, Sett } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit); } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index 9a85a8678882c..c3c178a2c84d4 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.transport.TransportAddress; import java.io.IOException; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; @@ -53,7 +54,7 @@ public interface Transport extends LifecycleComponent { /** * Returns an address from its string representation. */ - TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception; + TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException; /** * Is the address type supported. diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 7b1c83d66aa83..16c1842adcab2 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -52,6 +52,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -617,7 +618,7 @@ private long newRequestId() { return requestIds.getAndIncrement(); } - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return transport.addressesFromString(address, perAddressLimit); } diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index bc771f5721d29..e94b0b0c8d67a 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Collections; import java.util.Map; import java.util.Random; @@ -133,7 +134,7 @@ public BoundTransportAddress boundAddress() { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 863349e897a20..5dcbefbe0347d 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -42,6 +42,7 @@ import org.junit.Before; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -188,7 +189,7 @@ public Map profileBoundAddresses() { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return new TransportAddress[0]; } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 4294bdd3dd462..1636259f6c6ac 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -40,16 +40,30 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; +import org.junit.After; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -58,29 +72,56 @@ import static org.hamcrest.Matchers.greaterThan; public class UnicastZenPingTests extends ESTestCase { + + private ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private Stack closeables = new Stack<>(); + + @After + public void tearDown() throws Exception { + try { + // we need to close these in reverse order they were opened but Java stack is broken, it does not iterate in the expected order + // (as if you were popping) + final List reverse = new ArrayList<>(); + while (!closeables.isEmpty()) { + reverse.add(closeables.pop()); + } + IOUtils.close(reverse); + } finally { + terminate(threadPool); + super.tearDown(); + } + } + private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList; public void testSimplePings() throws IOException, InterruptedException { - int startPort = 11000 + randomIntBetween(0, 1000); - int endPort = startPort + 10; - Settings settings = Settings.builder() - .put("cluster.name", "test") - .put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build(); - - Settings settingsMismatch = Settings.builder().put(settings) - .put("cluster.name", "mismatch") - .put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build(); + // use ephemeral ports + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + final Settings settingsMismatch = + Settings.builder().put(settings).put("cluster.name", "mismatch").put(TransportSettings.PORT.getKey(), 0).build(); - ThreadPool threadPool = new TestThreadPool(getClass().getName()); NetworkService networkService = new NetworkService(settings, Collections.emptyList()); - NetworkHandle handleA = startServices(settings, threadPool, networkService, "UZP_A", Version.CURRENT); - NetworkHandle handleB = startServices(settings, threadPool, networkService, "UZP_B", Version.CURRENT); - NetworkHandle handleC = startServices(settingsMismatch, threadPool, networkService, "UZP_C", Version.CURRENT); + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v); + + NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); + closeables.push(handleA.transportService); + NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier); + closeables.push(handleB.transportService); + NetworkHandle handleC = startServices(settingsMismatch, threadPool, "UZP_C", Version.CURRENT, supplier); + closeables.push(handleC.transportService); // just fake that no versions are compatible with this node Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()); Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); - NetworkHandle handleD = startServices(settingsMismatch, threadPool, networkService, "UZP_D", versionD); + NetworkHandle handleD = startServices(settingsMismatch, threadPool, "UZP_D", versionD, supplier); + closeables.push(handleD.transportService); final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); @@ -106,6 +147,7 @@ public ClusterState clusterState() { return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); } }); + closeables.push(zenPingA); UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); zenPingB.start(new PingContextProvider() { @@ -119,6 +161,7 @@ public ClusterState clusterState() { return state; } }); + closeables.push(zenPingB); UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) { @Override @@ -137,6 +180,7 @@ public ClusterState clusterState() { return state; } }); + closeables.push(zenPingC); UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER); zenPingD.start(new PingContextProvider() { @@ -150,41 +194,155 @@ public ClusterState clusterState() { return state; } }); + closeables.push(zenPingD); - try { - logger.info("ping from UZP_A"); - Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + logger.info("ping from UZP_A"); + Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(1)); + ZenPing.PingResponse ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.getClusterStateVersion(), equalTo(state.version())); + assertCounters(handleA, handleA, handleB, handleC, handleD); + + // ping again, this time from B, + logger.info("ping from UZP_B"); + pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(1)); + ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_A")); + assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); + assertCounters(handleB, handleA, handleB, handleC, handleD); + + logger.info("ping from UZP_C"); + pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(0)); + assertCounters(handleC, handleA, handleB, handleC, handleD); + + logger.info("ping from UZP_D"); + pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(0)); + assertCounters(handleD, handleA, handleB, handleC, handleD); + } + + public void testUnknownHost() { + // use ephemeral ports + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + + final NetworkService networkService = new NetworkService(settings, Collections.emptyList()); + + final Map addresses = new HashMap<>(); + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v) { + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + final TransportAddress[] transportAddresses = addresses.get(address); + if (transportAddresses == null) { + throw new UnknownHostException(address); + } else { + return transportAddresses; + } + } + }; + + final NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); + closeables.push(handleA.transportService); + final NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier); + closeables.push(handleB.transportService); + final NetworkHandle handleC = startServices(settings, threadPool, "UZP_C", Version.CURRENT, supplier); + closeables.push(handleC.transportService); + + addresses.put( + "UZP_A", + new TransportAddress[]{ + new TransportAddress( + new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort()))}); + addresses.put( + "UZP_C", + new TransportAddress[]{ + new TransportAddress( + new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort()))}); + + final Settings hostsSettings = Settings.builder() + .putArray("discovery.zen.ping.unicast.hosts", "UZP_A", "UZP_B", "UZP_C") + .put("cluster.name", "test") + .build(); + + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + + final UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER); + zenPingA.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build(); + } + + @Override + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); + } + }); + closeables.push(zenPingA); + + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); + zenPingB.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingB); + + UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER); + zenPingC.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingC); + + // the presence of an unresolvable host should not prevent resolvable hosts from being pinged + { + final Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(3)); assertThat(pingResponses.size(), equalTo(1)); ZenPing.PingResponse ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.node().getId(), equalTo("UZP_C")); assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - assertCounters(handleA, handleA, handleB, handleC, handleD); + assertCounters(handleA, handleA, handleC); + assertNull(handleA.counters.get(handleB.address)); + } - // ping again, this time from B, - logger.info("ping from UZP_B"); - pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.size(), equalTo(1)); - ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_A")); - assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); - assertCounters(handleB, handleA, handleB, handleC, handleD); - - logger.info("ping from UZP_C"); - pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.size(), equalTo(0)); - assertCounters(handleC, handleA, handleB, handleC, handleD); - - logger.info("ping from UZP_D"); - pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.size(), equalTo(0)); - assertCounters(handleD, handleA, handleB, handleC, handleD); - } finally { - try { - IOUtils.close(zenPingA, zenPingB, zenPingC, zenPingD, - handleA.transportService, handleB.transportService, handleC.transportService, handleD.transportService); - } finally { - terminate(threadPool); - } + // now allow UZP_B to be resolvable + addresses.put( + "UZP_B", + new TransportAddress[]{ + new TransportAddress( + new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort()))}); + + // now we should see pings to UZP_B; this establishes that host resolutions are not cached + { + // ping from C so that we can assert on the counters from a fresh source (as opposed to resetting them) + final Collection secondPingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(3)); + assertThat(secondPingResponses.size(), equalTo(2)); + final Set ids = new HashSet<>(secondPingResponses.stream().map(p -> p.node().getId()).collect(Collectors.toList())); + assertThat(ids, equalTo(new HashSet<>(Arrays.asList("UZP_A", "UZP_B")))); + assertCounters(handleC, handleA, handleB, handleC); } } @@ -197,16 +355,20 @@ private void assertCounters(NetworkHandle that, NetworkHandle...handles) { } } - private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId, - Version version) { - MockTcpTransport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()), networkService, version); + private NetworkHandle startServices( + final Settings settings, + final ThreadPool threadPool, + final String nodeId, + final Version version, + final BiFunction supplier) { + final Transport transport = supplier.apply(settings, version); final TransportService transportService = new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); transportService.start(); transportService.acceptIncomingRequests(); - ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); + final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); transportService.addConnectionListener(new TransportConnectionListener() { + @Override public void onNodeConnected(DiscoveryNode node) { counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger()); @@ -216,25 +378,32 @@ public void onNodeConnected(DiscoveryNode node) { @Override public void onNodeDisconnected(DiscoveryNode node) { } + }); - final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), - version); + final DiscoveryNode node = + new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version); transportService.setLocalNode(node); - return new NetworkHandle((TransportAddress)transport.boundAddress().publishAddress(), transportService, node, counters); + return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, node, counters); } private static class NetworkHandle { + public final TransportAddress address; public final TransportService transportService; public final DiscoveryNode node; public final ConcurrentMap counters; - public NetworkHandle(TransportAddress address, TransportService transportService, DiscoveryNode discoveryNode, - ConcurrentMap counters) { + public NetworkHandle( + final TransportAddress address, + final TransportService transportService, + final DiscoveryNode discoveryNode, + final ConcurrentMap counters) { this.address = address; this.transportService = transportService; this.node = discoveryNode; this.counters = counters; } + } + } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index 055d9df8465a6..fe6e19f966fff 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -42,6 +41,7 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -78,7 +78,7 @@ public void createTransportService() { new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()), Version.CURRENT) { @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { // we just need to ensure we don't resolve DNS here return new TransportAddress[] {poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress())}; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 4ff899aeac205..6b3ed0bbad0f8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -208,7 +209,7 @@ public Map profileBoundAddresses() { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return new TransportAddress[0]; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index dfa308742211d..ac40006538665 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -53,6 +53,7 @@ import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -506,7 +507,7 @@ public BoundTransportAddress boundAddress() { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return transport.addressesFromString(address, perAddressLimit); } From cc7d53ecd2e2be857eb840a6a4bc508b6d6a79a3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 16 Nov 2016 21:03:15 -0500 Subject: [PATCH 02/10] Add timeout for concurrent DNS resolutions When doing DNS resolutions for unicast hostnames, we wait until the DNS lookups timeout. This appears to be forty-five seconds on modern JVMs, and it is not configurable. If we do these serially, the cluster can be blocked during ping for a lengthy period of time. This commit introduces doing the DNS lookups in parallel on the generic thread pool, and adds a user-configurable timeout for these lookups. --- .../common/settings/ClusterSettings.java | 1 + .../discovery/zen/UnicastZenPing.java | 173 +++++++++++++--- .../discovery/zen/UnicastZenPingTests.java | 185 +++++++++++++++++- docs/reference/modules/discovery/zen.asciidoc | 26 ++- .../file/FileBasedDiscoveryPlugin.java | 30 ++- .../file/FileBasedUnicastHostsProvider.java | 31 +-- .../FileBasedUnicastHostsProviderTests.java | 4 +- 7 files changed, 387 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index cf6a21034708c..7e926db6e78e7 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -339,6 +339,7 @@ public void apply(Settings value, Settings current, Settings previous) { ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING, + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT, SearchService.DEFAULT_KEEPALIVE_SETTING, SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index b453118b9428b..fe3b476151d94 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; @@ -28,7 +29,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.SuppressLoggerChecks; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -63,12 +63,17 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -92,6 +97,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { Property.NodeScope); public static final Setting DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope); + public static final Setting DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = + Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(30), Property.NodeScope); // these limits are per-address public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; @@ -126,6 +133,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final ExecutorService unicastConnectExecutor; + private final TimeValue resolveTimeout; + private volatile boolean closed = false; public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -147,7 +156,12 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService // we only limit to 1 addresses, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } - logger.debug("using initial hosts {}, with concurrent_connects [{}]", configuredHosts, concurrentConnects); + resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); + logger.debug( + "using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]", + configuredHosts, + concurrentConnects, + resolveTimeout); transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); @@ -155,33 +169,135 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext()); + + } + + private static class ResolvedHostname { + + private final TransportAddress[] addresses; + private final UnknownHostException failure; + + public static ResolvedHostname success(final TransportAddress[] addresses) { + return new ResolvedHostname(addresses, null); + } + + public static ResolvedHostname failure(final UnknownHostException failure) { + return new ResolvedHostname(null, failure); + } + + private ResolvedHostname(final TransportAddress[] addresses, UnknownHostException failure) { + assert addresses != null && failure == null || addresses == null && failure != null; + this.addresses = addresses; + this.failure = failure; + } + + public boolean isSuccess() { + return addresses != null; + } + + public TransportAddress[] addresses() { + return addresses; + } + + public UnknownHostException failure() { + assert !isSuccess(); + return failure; + } + } /** - * Resolves a host to a list of discovery nodes. The host is resolved into a transport address (or a collection of addresses if the - * number of ports is greater than one) and the transport addresses are used to created discovery nodes. + * Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses + * if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done + * in parallel using the generic thread pool from the specified thread pool up to the specified resolve timeout. * - * @param host the host to resolve + * @param threadPool the thread pool used to parallelize hostname lookups + * @param logger logger used for logging messages regarding hostname lookups + * @param hosts the hosts to resolve * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) * @param transportService the transport service * @param idGenerator the generator to supply unique ids for each discovery node + * @param resolveTimeout the timeout before returning from hostname lookups * @return a list of discovery nodes with resolved transport addresses - * @throws UnknownHostException if the host fails to resolve to an address */ public static List resolveDiscoveryNodes( - final String host, + final ThreadPool threadPool, + final Logger logger, + final List hosts, final int limitPortCounts, final TransportService transportService, - final Supplier idGenerator) throws UnknownHostException { + final Supplier idGenerator, + final TimeValue resolveTimeout) throws InterruptedException { + Objects.requireNonNull(threadPool); + Objects.requireNonNull(logger); + Objects.requireNonNull(hosts); + Objects.requireNonNull(transportService); + Objects.requireNonNull(idGenerator); + Objects.requireNonNull(resolveTimeout); + if (resolveTimeout.nanos() < 0) { + throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); + } + // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete + final List> callables = + hosts.stream().map(hn -> lookup(hn, transportService, limitPortCounts)).collect(Collectors.toList()); + final List> futures = + threadPool.generic().invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); final List discoveryNodes = new ArrayList<>(); - final TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts); - for (TransportAddress address : addresses) { - discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(), - Version.CURRENT.minimumCompatibilityVersion())); + // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the + // hostname with the corresponding task by iterating together + final Iterator it = hosts.iterator(); + for (final Future future : futures) { + final String hostname = it.next(); + if (!future.isCancelled()) { + try { + final ResolvedHostname resolvedHostname = future.get(); + if (resolvedHostname.isSuccess()) { + logger.trace("resolved host [{}] to {}", hostname, resolvedHostname.addresses()); + for (final TransportAddress address : resolvedHostname.addresses()) { + discoveryNodes.add( + new DiscoveryNode( + idGenerator.get(), + address, + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion())); + } + } else { + final String message = "failed to resolve host [" + hostname + "]"; + logger.warn(message, resolvedHostname.failure()); + } + } catch (final ExecutionException e) { + final String message = "failed to resolve host [" + hostname + "]"; + logger.warn(message, e); + } + } else { + logger.warn("timed out resolving host [{}]", hostname); + } } return discoveryNodes; } + /** + * Creates a callable for looking up the specified host. + * + * @param host the host to lookup + * @param transportService the transport service to use for lookups + * @param limitPortCounts the port count limit + * @return a callable that can be used to submit to an executor service + */ + private static Callable lookup( + final String host, + final TransportService transportService, + final int limitPortCounts) { + return () -> { + try { + return ResolvedHostname.success(transportService.addressesFromString(host, limitPortCounts)); + } catch (final UnknownHostException e) { + return ResolvedHostname.failure(e); + } + }; + } + @Override public void close() { ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); @@ -329,21 +445,19 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send // add the configured hosts first final List nodesToPing = new ArrayList<>(); - for (final String host : configuredHosts) { - try { - final List resolvedDiscoveryNodes = resolveDiscoveryNodes( - host, - limitPortCounts, - transportService, - () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#"); - logger.trace( - "resolved host [{}] to {}", - () -> host, - () -> resolvedDiscoveryNodes.stream().map(UnicastZenPing::formatResolvedDiscoveryNode).collect(Collectors.toList())); - nodesToPing.addAll(resolvedDiscoveryNodes); - } catch (final UnknownHostException e) { - logger.warn("failed to resolve host [" + host + "]", e); - } + final List resolvedDiscoveryNodes; + try { + resolvedDiscoveryNodes = resolveDiscoveryNodes( + threadPool, + logger, + configuredHosts, + limitPortCounts, + transportService, + () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", + resolveTimeout); + nodesToPing.addAll(resolvedDiscoveryNodes); + } catch (final InterruptedException e) { + throw new RuntimeException(e); } nodesToPing.addAll(sortedNodesToPing); @@ -444,11 +558,6 @@ public void run() { } } - private static String formatResolvedDiscoveryNode(final DiscoveryNode discoveryNode) { - final TransportAddress transportAddress = discoveryNode.getAddress(); - return transportAddress.getAddress() + "@" + transportAddress.getPort(); - } - private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { logger.trace("[{}] sending to {}", id, nodeToSend); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 1636259f6c6ac..567e6343d1ead 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -29,6 +30,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -45,6 +47,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.junit.After; +import org.mockito.Matchers; import java.io.Closeable; import java.io.IOException; @@ -61,26 +64,36 @@ import java.util.Set; import java.util.Stack; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; public class UnicastZenPingTests extends ESTestCase { private ThreadPool threadPool = new TestThreadPool(getClass().getName()); + // close in reverse order as opened private Stack closeables = new Stack<>(); @After public void tearDown() throws Exception { try { - // we need to close these in reverse order they were opened but Java stack is broken, it does not iterate in the expected order - // (as if you were popping) + // JDK stack is broken, it does not iterate in the expected order (http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4475301) final List reverse = new ArrayList<>(); while (!closeables.isEmpty()) { reverse.add(closeables.pop()); @@ -224,7 +237,7 @@ public ClusterState clusterState() { assertCounters(handleD, handleA, handleB, handleC, handleD); } - public void testUnknownHost() { + public void testUnknownHostNotCached() { // use ephemeral ports final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); @@ -346,6 +359,168 @@ public ClusterState clusterState() { } } + public void testPortLimit() throws InterruptedException { + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT); + closeables.push(transport); + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + final int limitPortCounts = randomIntBetween(1, 10); + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + threadPool, + logger, + Collections.singletonList("127.0.0.1"), + limitPortCounts, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + TimeValue.timeValueMillis(100)); + assertThat(discoveryNodes, hasSize(limitPortCounts)); + final Set ports = new HashSet<>(); + for (final DiscoveryNode discoveryNode : discoveryNodes) { + assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress()); + ports.add(discoveryNode.getAddress().getPort()); + } + assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet()))); + } + + public void testUnknownHost() throws InterruptedException { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final String hostname = randomAsciiOfLength(8); + final UnknownHostException unknownHostException = new UnknownHostException(hostname); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT) { + + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + throw unknownHostException; + } + + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + threadPool, + logger, + Arrays.asList(hostname), + 1, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + TimeValue.timeValueMillis(100) + ); + + assertThat(discoveryNodes, empty()); + verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException); + } + + public void testResolveTimeout() throws InterruptedException { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final CountDownLatch latch = new CountDownLatch(1); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT) { + + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + if ("hostname1".equals(address)) { + return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; + } else if ("hostname2".equals(address)) { + try { + latch.await(); + return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + throw new UnknownHostException(address); + } + } + + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + try { + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + threadPool, + logger, + Arrays.asList("hostname1", "hostname2"), + 1, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + TimeValue.timeValueMillis(100)); + + assertThat(discoveryNodes, hasSize(1)); + verify(logger).trace( + "resolved host [{}] to {}", "hostname1", + new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}); + verify(logger).warn("timed out resolving host [{}]", "hostname2"); + verifyNoMoreInteractions(logger); + } finally { + latch.countDown(); + } + } + + public void testInvalidHosts() throws InterruptedException { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT); + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + threadPool, + logger, + Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), + 1, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + TimeValue.timeValueMillis(100)); + assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used + assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1")); + assertThat(discoveryNodes.get(0).getAddress().getPort(), equalTo(9301)); + verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class)); + } + // assert that we tried to ping each of the configured nodes at least once private void assertCounters(NetworkHandle that, NetworkHandle...handles) { for (NetworkHandle handle : handles) { @@ -362,8 +537,8 @@ private NetworkHandle startServices( final Version version, final BiFunction supplier) { final Transport transport = supplier.apply(settings, version); - final TransportService transportService = new TransportService(settings, transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + final TransportService transportService = + new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); transportService.start(); transportService.acceptIncomingRequests(); final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); diff --git a/docs/reference/modules/discovery/zen.asciidoc b/docs/reference/modules/discovery/zen.asciidoc index 082567053b247..169754fa5fab0 100644 --- a/docs/reference/modules/discovery/zen.asciidoc +++ b/docs/reference/modules/discovery/zen.asciidoc @@ -22,21 +22,31 @@ other nodes. [[unicast]] ===== Unicast -The unicast discovery requires a list of hosts to use that will act -as gossip routers. It provides the following settings with the -`discovery.zen.ping.unicast` prefix: +Unicast discovery requires a list of hosts to use that will act as gossip routers. These hosts can be specified as +hostnames or IP addresses; hosts specified as hostnames are resolved to IP addresses during each round of pinging. Note +that with the Java security manager in place, the JVM defaults to caching positive hostname resolutions indefinitely. +This can be modified by adding +http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html[`networkaddress.cache.ttl=`] to your +http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java security policy]. Any hosts that +fail to resolve will be logged. Note also that with the Java security manager in place, the JVM defaults to caching +negative hostname resolutions for ten seconds. This can be modified by adding +http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html +[`networkaddress.cache.negative.ttl=`] to your + http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java security policy]. + +Unicast discovery provides the following settings with the `discovery.zen.ping.unicast` prefix: [cols="<,<",options="header",] |======================================================================= |Setting |Description |`hosts` |Either an array setting or a comma delimited setting. Each -value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be bracketed. Defaults to -`127.0.0.1, [::1]` +value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be +bracketed. Defaults to `127.0.0.1, [::1]` +|`hosts.resolve_timeout` |The amount of time to wait for DNS lookups on each round of pinging. Specified as +<>. Defaults to 30s. |======================================================================= -The unicast discovery uses the -<> module to -perform the discovery. +The unicast discovery uses the <> module to perform the discovery. [float] [[master-election]] diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java index d93725a03c390..04875f01078ab 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java @@ -19,11 +19,9 @@ package org.elasticsearch.discovery.file; -import java.util.Collections; -import java.util.Map; -import java.util.function.Supplier; - import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkService; @@ -33,7 +31,16 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.SearchRequestParsers; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; /** * Plugin for providing file-based unicast hosts discovery. The list of unicast hosts @@ -46,15 +53,28 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); private final Settings settings; + private ThreadPool threadPool; public FileBasedDiscoveryPlugin(Settings settings) { this.settings = settings; } + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + SearchRequestParsers searchRequestParsers) { + this.threadPool = threadPool; + return Collections.emptyList(); + } + @Override public Map> getZenHostsProviders(TransportService transportService, NetworkService networkService) { - return Collections.singletonMap("file", () -> new FileBasedUnicastHostsProvider(settings, transportService)); + return Collections.singletonMap("file", () -> new FileBasedUnicastHostsProvider(settings, transportService, threadPool)); } @Override diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index d7323d43acc52..803db3d66cec9 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -23,10 +23,11 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.env.Environment; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.FileNotFoundException; @@ -41,6 +42,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT; import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNodes; /** @@ -61,15 +63,20 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_"; private final TransportService transportService; + private final ThreadPool threadPool; private final Path unicastHostsFilePath; private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node - FileBasedUnicastHostsProvider(Settings settings, TransportService transportService) { + private final TimeValue resolveTimeout; + + FileBasedUnicastHostsProvider(Settings settings, TransportService transportService, ThreadPool threadPool) { super(settings); this.transportService = transportService; + this.threadPool = threadPool; this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE); + this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); } @Override @@ -89,15 +96,17 @@ public List buildDynamicNodes() { } final List discoNodes = new ArrayList<>(); - for (final String host : hostsList) { - try { - discoNodes.addAll(resolveDiscoveryNodes(host, 1, transportService, - () -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#")); - } catch (IllegalArgumentException e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[discovery-file] Failed to parse transport address from [{}]", - host), e); - continue; - } + try { + discoNodes.addAll(resolveDiscoveryNodes( + threadPool, + logger, + hostsList, + 1, + transportService, + () -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#", + resolveTimeout)); + } catch (InterruptedException e) { + } logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes); diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java index ffb9726d264f5..26b5ee17c811f 100644 --- a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java @@ -103,7 +103,7 @@ public void testUnicastHostsDoesNotExist() throws Exception { final Settings settings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .build(); - final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService); + final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService, threadPool); final List nodes = provider.buildDynamicNodes(); assertEquals(0, nodes.size()); } @@ -136,6 +136,6 @@ private List setupAndRunHostProvider(final List hostEntri writer.write(String.join("\n", hostEntries)); } - return new FileBasedUnicastHostsProvider(settings, transportService).buildDynamicNodes(); + return new FileBasedUnicastHostsProvider(settings, transportService, threadPool).buildDynamicNodes(); } } From 2456768e0ddd4dd59843bdf265ece413ae375ea3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 17 Nov 2016 21:18:30 -0500 Subject: [PATCH 03/10] Cleanup setup of test resources for UnicastZenPing This commit properly sets up test resources for UnicastZenPingTests. --- .../discovery/zen/UnicastZenPingTests.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 567e6343d1ead..9d8cf2bf36fb2 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -47,6 +46,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.junit.After; +import org.junit.Before; import org.mockito.Matchers; import java.io.Closeable; @@ -74,7 +74,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -86,9 +85,16 @@ public class UnicastZenPingTests extends ESTestCase { - private ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private ThreadPool threadPool; // close in reverse order as opened - private Stack closeables = new Stack<>(); + private Stack closeables; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + closeables = new Stack<>(); + } @After public void tearDown() throws Exception { From 0493ba7ee73dc085c1dbf11cbfdba2d4e51fcc88 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 18 Nov 2016 05:14:48 -0500 Subject: [PATCH 04/10] Do not swallow interrupted exception This was just a silly oops, this commit fixes the swallowing of an interrupted exception in the file-based unicast hosts provider. --- .../discovery/file/FileBasedUnicastHostsProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index 803db3d66cec9..482290667d7ab 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -106,7 +106,7 @@ public List buildDynamicNodes() { () -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#", resolveTimeout)); } catch (InterruptedException e) { - + throw new RuntimeException(e); } logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes); From f6203d7c1c3325d9519b463488d31054c4c5fc59 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 18 Nov 2016 05:19:46 -0500 Subject: [PATCH 05/10] Add resolve timeout to log message This commit adds the resolve timeout to the warn log message displayed when we timeout resolving unicast hosts. --- .../org/elasticsearch/discovery/zen/UnicastZenPing.java | 2 +- .../elasticsearch/discovery/zen/UnicastZenPingTests.java | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index fe3b476151d94..99081d25b2e16 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -271,7 +271,7 @@ public static List resolveDiscoveryNodes( logger.warn(message, e); } } else { - logger.warn("timed out resolving host [{}]", hostname); + logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); } } return discoveryNodes; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 9d8cf2bf36fb2..4927398924753 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -474,7 +474,8 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi final TransportService transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); closeables.push(transportService); - final AtomicInteger idGenerator = new AtomicInteger(); + final AtomicInteger idGenerator = new AtomicInteger(); + final TimeValue resolveTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 100)); try { final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( threadPool, @@ -483,13 +484,13 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi 1, transportService, () -> Integer.toString(idGenerator.incrementAndGet()), - TimeValue.timeValueMillis(100)); + resolveTimeout); assertThat(discoveryNodes, hasSize(1)); verify(logger).trace( "resolved host [{}] to {}", "hostname1", new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}); - verify(logger).warn("timed out resolving host [{}]", "hostname2"); + verify(logger).warn("timed out after [{}] resolving host [{}]", resolveTimeout, "hostname2"); verifyNoMoreInteractions(logger); } finally { latch.countDown(); From 4868321727293ea3edf11dd6b3aabe6b3d75387c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 18 Nov 2016 06:12:34 -0500 Subject: [PATCH 06/10] Fix comment in UnicastZenPing This commit fixes an incorrect comment in UnicastZenPing; the comment incorrectly specified which responses is held by the temporal responses. --- .../java/org/elasticsearch/discovery/zen/UnicastZenPing.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 99081d25b2e16..e6516ec4a3c2a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -126,7 +126,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final Map receivedResponses = newConcurrentMap(); - // a list of temporal responses a node will return for a request (holds requests from other configuredHosts) + // a list of temporal responses a node will return for a request (holds responses from other nodes) private final Queue temporalResponses = ConcurrentCollections.newQueue(); private final UnicastHostsProvider hostsProvider; From a28dc6e8a3963962464cf090c7d621b718a61f35 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 18 Nov 2016 13:39:12 -0500 Subject: [PATCH 07/10] Refactor host resolution to use executor service This commit refactors the resolution of unicast hosts from hostnames to IP addresses to use an executor service instead of the generic thread pool. --- .../discovery/zen/UnicastZenPing.java | 75 ++++++++++--------- .../discovery/zen/UnicastZenPingTests.java | 17 ++++- docs/reference/modules/discovery/zen.asciidoc | 2 +- .../file/FileBasedDiscoveryPlugin.java | 29 ++++++- .../file/FileBasedUnicastHostsProvider.java | 10 +-- .../FileBasedUnicastHostsProviderTests.java | 31 ++++++-- 6 files changed, 109 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index e6516ec4a3c2a..61bf1cc5ad87e 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -98,7 +98,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { public static final Setting DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope); public static final Setting DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = - Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(30), Property.NodeScope); + Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(1), Property.NodeScope); // these limits are per-address public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; @@ -131,7 +131,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final UnicastHostsProvider hostsProvider; - private final ExecutorService unicastConnectExecutor; + private final ExecutorService unicastZenPingExecutorService; private final TimeValue resolveTimeout; @@ -166,10 +166,14 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); - ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); - unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, - threadFactory, threadPool.getThreadContext()); - + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); + unicastZenPingExecutorService = EsExecutors.newScaling( + "unicast_connect", + 0, concurrentConnects, + 60, + TimeUnit.SECONDS, + threadFactory, + threadPool.getThreadContext()); } private static class ResolvedHostname { @@ -209,9 +213,9 @@ public UnknownHostException failure() { /** * Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses * if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done - * in parallel using the generic thread pool from the specified thread pool up to the specified resolve timeout. + * in parallel using specified executor service up to the specified resolve timeout. * - * @param threadPool the thread pool used to parallelize hostname lookups + * @param executorService the executor service used to parallelize hostname lookups * @param logger logger used for logging messages regarding hostname lookups * @param hosts the hosts to resolve * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) @@ -221,14 +225,14 @@ public UnknownHostException failure() { * @return a list of discovery nodes with resolved transport addresses */ public static List resolveDiscoveryNodes( - final ThreadPool threadPool, + final ExecutorService executorService, final Logger logger, final List hosts, final int limitPortCounts, final TransportService transportService, final Supplier idGenerator, final TimeValue resolveTimeout) throws InterruptedException { - Objects.requireNonNull(threadPool); + Objects.requireNonNull(executorService); Objects.requireNonNull(logger); Objects.requireNonNull(hosts); Objects.requireNonNull(transportService); @@ -241,7 +245,7 @@ public static List resolveDiscoveryNodes( final List> callables = hosts.stream().map(hn -> lookup(hn, transportService, limitPortCounts)).collect(Collectors.toList()); final List> futures = - threadPool.generic().invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); + executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); final List discoveryNodes = new ArrayList<>(); // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the // hostname with the corresponding task by iterating together @@ -300,7 +304,7 @@ private static Callable lookup( @Override public void close() { - ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); + ThreadPool.terminate(unicastZenPingExecutorService, 0, TimeUnit.SECONDS); Releasables.close(receivedResponses.values()); closed = true; } @@ -335,25 +339,38 @@ Collection pingAndWait(TimeValue duration) { @Override public void ping(final PingListener listener, final TimeValue duration) { + final List resolvedDiscoveryNodes; + try { + resolvedDiscoveryNodes = resolveDiscoveryNodes( + unicastZenPingExecutorService, + logger, + configuredHosts, + limitPortCounts, + transportService, + () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", + resolveTimeout); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); try { receivedResponses.put(sendPingsHandler.id(), sendPingsHandler); try { - sendPings(duration, null, sendPingsHandler); + sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); } catch (RejectedExecutionException e) { logger.debug("Ping execution rejected", e); - // The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings + // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings // But don't bail here, we can retry later on after the send ping has been scheduled. } threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() { - sendPings(duration, null, sendPingsHandler); + sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() throws Exception { - sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler); + sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes); sendPingsHandler.close(); listener.onPing(sendPingsHandler.pingCollection().toList()); for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { @@ -418,7 +435,11 @@ public void close() { } - void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) { + void sendPings( + final TimeValue timeout, + @Nullable TimeValue waitTime, + final SendPingsHandler sendPingsHandler, + final List resolvedDiscoveryNodes) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; @@ -444,22 +465,8 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send List sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); // add the configured hosts first - final List nodesToPing = new ArrayList<>(); - final List resolvedDiscoveryNodes; - try { - resolvedDiscoveryNodes = resolveDiscoveryNodes( - threadPool, - logger, - configuredHosts, - limitPortCounts, - transportService, - () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", - resolveTimeout); - nodesToPing.addAll(resolvedDiscoveryNodes); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - + final List nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size()); + nodesToPing.addAll(resolvedDiscoveryNodes); nodesToPing.addAll(sortedNodesToPing); final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); @@ -497,7 +504,7 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send } // fork the connection to another thread final DiscoveryNode finalNodeToSend = nodeToSend; - unicastConnectExecutor.execute(new Runnable() { + unicastZenPingExecutorService.execute(new Runnable() { @Override public void run() { if (sendPingsHandler.isClosed()) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 4927398924753..4549b78feeffb 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -66,6 +67,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -86,6 +90,7 @@ public class UnicastZenPingTests extends ESTestCase { private ThreadPool threadPool; + private ExecutorService executorService; // close in reverse order as opened private Stack closeables; @@ -93,6 +98,9 @@ public class UnicastZenPingTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]"); + executorService = + EsExecutors.newScaling(getClass().getName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext()); closeables = new Stack<>(); } @@ -106,6 +114,7 @@ public void tearDown() throws Exception { } IOUtils.close(reverse); } finally { + terminate(executorService); terminate(threadPool); super.tearDown(); } @@ -382,7 +391,7 @@ public void testPortLimit() throws InterruptedException { final AtomicInteger idGenerator = new AtomicInteger(); final int limitPortCounts = randomIntBetween(1, 10); final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( - threadPool, + executorService, logger, Collections.singletonList("127.0.0.1"), limitPortCounts, @@ -426,7 +435,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi final AtomicInteger idGenerator = new AtomicInteger(); final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( - threadPool, + executorService, logger, Arrays.asList(hostname), 1, @@ -478,7 +487,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi final TimeValue resolveTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 100)); try { final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( - threadPool, + executorService, logger, Arrays.asList("hostname1", "hostname2"), 1, @@ -515,7 +524,7 @@ public void testInvalidHosts() throws InterruptedException { closeables.push(transportService); final AtomicInteger idGenerator = new AtomicInteger(); final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( - threadPool, + executorService, logger, Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), 1, diff --git a/docs/reference/modules/discovery/zen.asciidoc b/docs/reference/modules/discovery/zen.asciidoc index 169754fa5fab0..f31f0ab872ec7 100644 --- a/docs/reference/modules/discovery/zen.asciidoc +++ b/docs/reference/modules/discovery/zen.asciidoc @@ -43,7 +43,7 @@ Unicast discovery provides the following settings with the `discovery.zen.ping.u value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be bracketed. Defaults to `127.0.0.1, [::1]` |`hosts.resolve_timeout` |The amount of time to wait for DNS lookups on each round of pinging. Specified as -<>. Defaults to 30s. +<>. Defaults to 1s. |======================================================================= The unicast discovery uses the <> module to perform the discovery. diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java index 04875f01078ab..7499232346d11 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java @@ -26,8 +26,10 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.Plugin; @@ -37,9 +39,13 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -53,7 +59,7 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); private final Settings settings; - private ThreadPool threadPool; + private ExecutorService fileBasedDiscoveryExecutorService; public FileBasedDiscoveryPlugin(Settings settings) { this.settings = settings; @@ -67,14 +73,31 @@ public Collection createComponents( ResourceWatcherService resourceWatcherService, ScriptService scriptService, SearchRequestParsers searchRequestParsers) { - this.threadPool = threadPool; + final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]"); + fileBasedDiscoveryExecutorService = EsExecutors.newScaling( + "file_based_discovery_resolve", + 0, + concurrentConnects, + 60, + TimeUnit.SECONDS, + threadFactory, + threadPool.getThreadContext()); + return Collections.emptyList(); } + @Override + public void close() throws IOException { + ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS); + } + @Override public Map> getZenHostsProviders(TransportService transportService, NetworkService networkService) { - return Collections.singletonMap("file", () -> new FileBasedUnicastHostsProvider(settings, transportService, threadPool)); + return Collections.singletonMap( + "file", + () -> new FileBasedUnicastHostsProvider(settings, transportService, fileBasedDiscoveryExecutorService)); } @Override diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index 482290667d7ab..55e2029c8b752 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.env.Environment; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.FileNotFoundException; @@ -38,6 +37,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -63,7 +63,7 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_"; private final TransportService transportService; - private final ThreadPool threadPool; + private final ExecutorService executorService; private final Path unicastHostsFilePath; @@ -71,10 +71,10 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast private final TimeValue resolveTimeout; - FileBasedUnicastHostsProvider(Settings settings, TransportService transportService, ThreadPool threadPool) { + FileBasedUnicastHostsProvider(Settings settings, TransportService transportService, ExecutorService executorService) { super(settings); this.transportService = transportService; - this.threadPool = threadPool; + this.executorService = executorService; this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE); this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); } @@ -98,7 +98,7 @@ public List buildDynamicNodes() { final List discoNodes = new ArrayList<>(); try { discoNodes.addAll(resolveDiscoveryNodes( - threadPool, + executorService, logger, hostsList, 1, diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java index 26b5ee17c811f..920792b6c7a3d 100644 --- a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; import org.elasticsearch.transport.TransportService; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -43,6 +44,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX; @@ -52,17 +56,28 @@ */ public class FileBasedUnicastHostsProviderTests extends ESTestCase { - private static ThreadPool threadPool; + private ThreadPool threadPool; + private ExecutorService executorService; private MockTransportService transportService; - @BeforeClass - public static void createThreadPool() { + @Before + public void setUp() throws Exception { + super.setUp(); threadPool = new TestThreadPool(FileBasedUnicastHostsProviderTests.class.getName()); + executorService = Executors.newSingleThreadExecutor(); } - @AfterClass - public static void stopThreadPool() throws InterruptedException { - terminate(threadPool); + @After + public void tearDown() throws Exception { + try { + terminate(executorService); + } finally { + try { + terminate(threadPool); + } finally { + super.tearDown(); + } + } } @Before @@ -103,7 +118,7 @@ public void testUnicastHostsDoesNotExist() throws Exception { final Settings settings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .build(); - final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService, threadPool); + final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService, executorService); final List nodes = provider.buildDynamicNodes(); assertEquals(0, nodes.size()); } @@ -136,6 +151,6 @@ private List setupAndRunHostProvider(final List hostEntri writer.write(String.join("\n", hostEntries)); } - return new FileBasedUnicastHostsProvider(settings, transportService, threadPool).buildDynamicNodes(); + return new FileBasedUnicastHostsProvider(settings, transportService, executorService).buildDynamicNodes(); } } From e2fc5a2d944f7641987bc79941e7f731dad8fc39 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 22 Nov 2016 13:29:45 -0500 Subject: [PATCH 08/10] Simplify handling of hostname lookups in UZP This commit simplifies the handling of hostname lookups in UnicastZenPing, removing an unnecessary abstraction and, subsequently, an unnecessary method. --- .../discovery/zen/UnicastZenPing.java | 96 ++++--------------- 1 file changed, 20 insertions(+), 76 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 61bf1cc5ad87e..15ab521f9c85f 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -58,7 +58,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -176,40 +175,6 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService threadPool.getThreadContext()); } - private static class ResolvedHostname { - - private final TransportAddress[] addresses; - private final UnknownHostException failure; - - public static ResolvedHostname success(final TransportAddress[] addresses) { - return new ResolvedHostname(addresses, null); - } - - public static ResolvedHostname failure(final UnknownHostException failure) { - return new ResolvedHostname(null, failure); - } - - private ResolvedHostname(final TransportAddress[] addresses, UnknownHostException failure) { - assert addresses != null && failure == null || addresses == null && failure != null; - this.addresses = addresses; - this.failure = failure; - } - - public boolean isSuccess() { - return addresses != null; - } - - public TransportAddress[] addresses() { - return addresses; - } - - public UnknownHostException failure() { - assert !isSuccess(); - return failure; - } - - } - /** * Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses * if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done @@ -242,37 +207,37 @@ public static List resolveDiscoveryNodes( throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); } // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete - final List> callables = - hosts.stream().map(hn -> lookup(hn, transportService, limitPortCounts)).collect(Collectors.toList()); - final List> futures = + final List> callables = + hosts + .stream() + .map(hn -> (Callable)() -> transportService.addressesFromString(hn, limitPortCounts)) + .collect(Collectors.toList()); + final List> futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); final List discoveryNodes = new ArrayList<>(); // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the // hostname with the corresponding task by iterating together final Iterator it = hosts.iterator(); - for (final Future future : futures) { + for (final Future future : futures) { final String hostname = it.next(); if (!future.isCancelled()) { + assert future.isDone(); try { - final ResolvedHostname resolvedHostname = future.get(); - if (resolvedHostname.isSuccess()) { - logger.trace("resolved host [{}] to {}", hostname, resolvedHostname.addresses()); - for (final TransportAddress address : resolvedHostname.addresses()) { - discoveryNodes.add( - new DiscoveryNode( - idGenerator.get(), - address, - emptyMap(), - emptySet(), - Version.CURRENT.minimumCompatibilityVersion())); - } - } else { - final String message = "failed to resolve host [" + hostname + "]"; - logger.warn(message, resolvedHostname.failure()); + final TransportAddress[] addresses = future.get(); + logger.trace("resolved host [{}] to {}", hostname, addresses); + for (final TransportAddress address : addresses) { + discoveryNodes.add( + new DiscoveryNode( + idGenerator.get(), + address, + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion())); } } catch (final ExecutionException e) { + assert e.getCause() != null; final String message = "failed to resolve host [" + hostname + "]"; - logger.warn(message, e); + logger.warn(message, e.getCause()); } } else { logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); @@ -281,27 +246,6 @@ public static List resolveDiscoveryNodes( return discoveryNodes; } - /** - * Creates a callable for looking up the specified host. - * - * @param host the host to lookup - * @param transportService the transport service to use for lookups - * @param limitPortCounts the port count limit - * @return a callable that can be used to submit to an executor service - */ - private static Callable lookup( - final String host, - final TransportService transportService, - final int limitPortCounts) { - return () -> { - try { - return ResolvedHostname.success(transportService.addressesFromString(host, limitPortCounts)); - } catch (final UnknownHostException e) { - return ResolvedHostname.failure(e); - } - }; - } - @Override public void close() { ThreadPool.terminate(unicastZenPingExecutorService, 0, TimeUnit.SECONDS); From 62f2cc77afb53aed2b7127a66c1bd5355193323b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 22 Nov 2016 13:34:25 -0500 Subject: [PATCH 09/10] Set default resolve timeout to five seconds This commit sets the deafult resolve timeout on DNS lookups in UnicastZenPing to five seconds. --- .../java/org/elasticsearch/discovery/zen/UnicastZenPing.java | 2 +- docs/reference/modules/discovery/zen.asciidoc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 15ab521f9c85f..8f014b1aad71b 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -97,7 +97,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { public static final Setting DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope); public static final Setting DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = - Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(1), Property.NodeScope); + Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), Property.NodeScope); // these limits are per-address public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; diff --git a/docs/reference/modules/discovery/zen.asciidoc b/docs/reference/modules/discovery/zen.asciidoc index f31f0ab872ec7..9202e1b984201 100644 --- a/docs/reference/modules/discovery/zen.asciidoc +++ b/docs/reference/modules/discovery/zen.asciidoc @@ -43,7 +43,7 @@ Unicast discovery provides the following settings with the `discovery.zen.ping.u value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be bracketed. Defaults to `127.0.0.1, [::1]` |`hosts.resolve_timeout` |The amount of time to wait for DNS lookups on each round of pinging. Specified as -<>. Defaults to 1s. +<>. Defaults to 5s. |======================================================================= The unicast discovery uses the <> module to perform the discovery. From 156a49b87c88e60b6ff31fd747f0648b532e1d6d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 22 Nov 2016 13:50:25 -0500 Subject: [PATCH 10/10] Add Javadocs for UnicastZenPing#ping This commit adds Javadocs for UnicastZenPing#ping, especially clarifying the purpose of the duration parameter. --- .../org/elasticsearch/discovery/zen/UnicastZenPing.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 8f014b1aad71b..eec9548dd08ac 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -281,6 +281,15 @@ Collection pingAndWait(TimeValue duration) { } } + /** + * Sends three rounds of pings notifying the specified {@link PingListener} when pinging is complete. Pings are sent after resolving + * configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch + * of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}. + * The pings that are sent carry a timeout of 1.25 times the {@link TimeValue}. + * + * @param listener the callback when pinging is complete + * @param duration the timeout for various components of the pings + */ @Override public void ping(final PingListener listener, final TimeValue duration) { final List resolvedDiscoveryNodes;