2020package org .elasticsearch .discovery .zen ;
2121
2222import com .carrotsearch .hppc .cursors .ObjectCursor ;
23+ import org .apache .logging .log4j .Logger ;
2324import org .apache .logging .log4j .message .ParameterizedMessage ;
2425import org .apache .logging .log4j .util .Supplier ;
2526import org .elasticsearch .ElasticsearchException ;
2829import org .elasticsearch .cluster .node .DiscoveryNode ;
2930import org .elasticsearch .cluster .node .DiscoveryNodes ;
3031import org .elasticsearch .common .Nullable ;
31- import org .elasticsearch .common .SuppressLoggerChecks ;
3232import org .elasticsearch .common .UUIDs ;
3333import org .elasticsearch .common .component .AbstractComponent ;
3434import org .elasticsearch .common .io .stream .StreamInput ;
6363import java .util .Arrays ;
6464import java .util .Collection ;
6565import java .util .HashSet ;
66+ import java .util .Iterator ;
6667import java .util .List ;
6768import java .util .Map ;
69+ import java .util .Objects ;
6870import java .util .Queue ;
6971import java .util .Set ;
72+ import java .util .concurrent .Callable ;
7073import java .util .concurrent .CountDownLatch ;
74+ import java .util .concurrent .ExecutionException ;
7175import java .util .concurrent .ExecutorService ;
76+ import java .util .concurrent .Future ;
7277import java .util .concurrent .RejectedExecutionException ;
7378import java .util .concurrent .ThreadFactory ;
7479import java .util .concurrent .TimeUnit ;
@@ -92,6 +97,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
9297 Property .NodeScope );
9398 public static final Setting <Integer > DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
9499 Setting .intSetting ("discovery.zen.ping.unicast.concurrent_connects" , 10 , 0 , Property .NodeScope );
100+ public static final Setting <TimeValue > DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
101+ Setting .positiveTimeSetting ("discovery.zen.ping.unicast.hosts.resolve_timeout" , TimeValue .timeValueSeconds (30 ), Property .NodeScope );
95102
96103 // these limits are per-address
97104 public static final int LIMIT_FOREIGN_PORTS_COUNT = 1 ;
@@ -126,6 +133,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
126133
127134 private final ExecutorService unicastConnectExecutor ;
128135
136+ private final TimeValue resolveTimeout ;
137+
129138 private volatile boolean closed = false ;
130139
131140 public UnicastZenPing (Settings settings , ThreadPool threadPool , TransportService transportService ,
@@ -147,41 +156,148 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
147156 // we only limit to 1 addresses, makes no sense to ping 100 ports
148157 limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT ;
149158 }
150- logger .debug ("using initial hosts {}, with concurrent_connects [{}]" , configuredHosts , concurrentConnects );
159+ resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT .get (settings );
160+ logger .debug (
161+ "using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]" ,
162+ configuredHosts ,
163+ concurrentConnects ,
164+ resolveTimeout );
151165
152166 transportService .registerRequestHandler (ACTION_NAME , UnicastPingRequest ::new , ThreadPool .Names .SAME ,
153167 new UnicastPingRequestHandler ());
154168
155169 ThreadFactory threadFactory = EsExecutors .daemonThreadFactory (settings , "[unicast_connect]" );
156170 unicastConnectExecutor = EsExecutors .newScaling ("unicast_connect" , 0 , concurrentConnects , 60 , TimeUnit .SECONDS ,
157171 threadFactory , threadPool .getThreadContext ());
172+
173+ }
174+
175+ private static class ResolvedHostname {
176+
177+ private final TransportAddress [] addresses ;
178+ private final UnknownHostException failure ;
179+
180+ public static ResolvedHostname success (final TransportAddress [] addresses ) {
181+ return new ResolvedHostname (addresses , null );
182+ }
183+
184+ public static ResolvedHostname failure (final UnknownHostException failure ) {
185+ return new ResolvedHostname (null , failure );
186+ }
187+
188+ private ResolvedHostname (final TransportAddress [] addresses , UnknownHostException failure ) {
189+ assert addresses != null && failure == null || addresses == null && failure != null ;
190+ this .addresses = addresses ;
191+ this .failure = failure ;
192+ }
193+
194+ public boolean isSuccess () {
195+ return addresses != null ;
196+ }
197+
198+ public TransportAddress [] addresses () {
199+ return addresses ;
200+ }
201+
202+ public UnknownHostException failure () {
203+ assert !isSuccess ();
204+ return failure ;
205+ }
206+
158207 }
159208
160209 /**
161- * Resolves a host to a list of discovery nodes. The host is resolved into a transport address (or a collection of addresses if the
162- * number of ports is greater than one) and the transport addresses are used to created discovery nodes.
210+ * Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses
211+ * if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done
212+ * in parallel using the generic thread pool from the specified thread pool up to the specified resolve timeout.
163213 *
164- * @param host the host to resolve
214+ * @param threadPool the thread pool used to parallelize hostname lookups
215+ * @param logger logger used for logging messages regarding hostname lookups
216+ * @param hosts the hosts to resolve
165217 * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
166218 * @param transportService the transport service
167219 * @param idGenerator the generator to supply unique ids for each discovery node
220+ * @param resolveTimeout the timeout before returning from hostname lookups
168221 * @return a list of discovery nodes with resolved transport addresses
169- * @throws UnknownHostException if the host fails to resolve to an address
170222 */
171223 public static List <DiscoveryNode > resolveDiscoveryNodes (
172- final String host ,
224+ final ThreadPool threadPool ,
225+ final Logger logger ,
226+ final List <String > hosts ,
173227 final int limitPortCounts ,
174228 final TransportService transportService ,
175- final Supplier <String > idGenerator ) throws UnknownHostException {
229+ final Supplier <String > idGenerator ,
230+ final TimeValue resolveTimeout ) throws InterruptedException {
231+ Objects .requireNonNull (threadPool );
232+ Objects .requireNonNull (logger );
233+ Objects .requireNonNull (hosts );
234+ Objects .requireNonNull (transportService );
235+ Objects .requireNonNull (idGenerator );
236+ Objects .requireNonNull (resolveTimeout );
237+ if (resolveTimeout .nanos () < 0 ) {
238+ throw new IllegalArgumentException ("resolve timeout must be non-negative but was [" + resolveTimeout + "]" );
239+ }
240+ // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
241+ final List <Callable <ResolvedHostname >> callables =
242+ hosts .stream ().map (hn -> lookup (hn , transportService , limitPortCounts )).collect (Collectors .toList ());
243+ final List <Future <ResolvedHostname >> futures =
244+ threadPool .generic ().invokeAll (callables , resolveTimeout .nanos (), TimeUnit .NANOSECONDS );
176245 final List <DiscoveryNode > discoveryNodes = new ArrayList <>();
177- final TransportAddress [] addresses = transportService .addressesFromString (host , limitPortCounts );
178- for (TransportAddress address : addresses ) {
179- discoveryNodes .add (new DiscoveryNode (idGenerator .get (), address , emptyMap (), emptySet (),
180- Version .CURRENT .minimumCompatibilityVersion ()));
246+ // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
247+ // hostname with the corresponding task by iterating together
248+ final Iterator <String > it = hosts .iterator ();
249+ for (final Future <ResolvedHostname > future : futures ) {
250+ final String hostname = it .next ();
251+ if (!future .isCancelled ()) {
252+ try {
253+ final ResolvedHostname resolvedHostname = future .get ();
254+ if (resolvedHostname .isSuccess ()) {
255+ logger .trace ("resolved host [{}] to {}" , hostname , resolvedHostname .addresses ());
256+ for (final TransportAddress address : resolvedHostname .addresses ()) {
257+ discoveryNodes .add (
258+ new DiscoveryNode (
259+ idGenerator .get (),
260+ address ,
261+ emptyMap (),
262+ emptySet (),
263+ Version .CURRENT .minimumCompatibilityVersion ()));
264+ }
265+ } else {
266+ final String message = "failed to resolve host [" + hostname + "]" ;
267+ logger .warn (message , resolvedHostname .failure ());
268+ }
269+ } catch (final ExecutionException e ) {
270+ final String message = "failed to resolve host [" + hostname + "]" ;
271+ logger .warn (message , e );
272+ }
273+ } else {
274+ logger .warn ("timed out resolving host [{}]" , hostname );
275+ }
181276 }
182277 return discoveryNodes ;
183278 }
184279
280+ /**
281+ * Creates a callable for looking up the specified host.
282+ *
283+ * @param host the host to lookup
284+ * @param transportService the transport service to use for lookups
285+ * @param limitPortCounts the port count limit
286+ * @return a callable that can be used to submit to an executor service
287+ */
288+ private static Callable <ResolvedHostname > lookup (
289+ final String host ,
290+ final TransportService transportService ,
291+ final int limitPortCounts ) {
292+ return () -> {
293+ try {
294+ return ResolvedHostname .success (transportService .addressesFromString (host , limitPortCounts ));
295+ } catch (final UnknownHostException e ) {
296+ return ResolvedHostname .failure (e );
297+ }
298+ };
299+ }
300+
185301 @ Override
186302 public void close () {
187303 ThreadPool .terminate (unicastConnectExecutor , 0 , TimeUnit .SECONDS );
@@ -329,21 +445,19 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send
329445
330446 // add the configured hosts first
331447 final List <DiscoveryNode > nodesToPing = new ArrayList <>();
332- for (final String host : configuredHosts ) {
333- try {
334- final List <DiscoveryNode > resolvedDiscoveryNodes = resolveDiscoveryNodes (
335- host ,
336- limitPortCounts ,
337- transportService ,
338- () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator .incrementAndGet () + "#" );
339- logger .trace (
340- "resolved host [{}] to {}" ,
341- () -> host ,
342- () -> resolvedDiscoveryNodes .stream ().map (UnicastZenPing ::formatResolvedDiscoveryNode ).collect (Collectors .toList ()));
343- nodesToPing .addAll (resolvedDiscoveryNodes );
344- } catch (final UnknownHostException e ) {
345- logger .warn ("failed to resolve host [" + host + "]" , e );
346- }
448+ final List <DiscoveryNode > resolvedDiscoveryNodes ;
449+ try {
450+ resolvedDiscoveryNodes = resolveDiscoveryNodes (
451+ threadPool ,
452+ logger ,
453+ configuredHosts ,
454+ limitPortCounts ,
455+ transportService ,
456+ () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator .incrementAndGet () + "#" ,
457+ resolveTimeout );
458+ nodesToPing .addAll (resolvedDiscoveryNodes );
459+ } catch (final InterruptedException e ) {
460+ throw new RuntimeException (e );
347461 }
348462
349463 nodesToPing .addAll (sortedNodesToPing );
@@ -444,11 +558,6 @@ public void run() {
444558 }
445559 }
446560
447- private static String formatResolvedDiscoveryNode (final DiscoveryNode discoveryNode ) {
448- final TransportAddress transportAddress = discoveryNode .getAddress ();
449- return transportAddress .getAddress () + "@" + transportAddress .getPort ();
450- }
451-
452561 private void sendPingRequestToNode (final int id , final TimeValue timeout , final UnicastPingRequest pingRequest ,
453562 final CountDownLatch latch , final DiscoveryNode node , final DiscoveryNode nodeToSend ) {
454563 logger .trace ("[{}] sending to {}" , id , nodeToSend );
0 commit comments