2929import java .io .IOException ;
3030import java .security .AccessController ;
3131import java .security .PrivilegedAction ;
32+ import java .util .ArrayList ;
33+ import java .util .Collections ;
3234import java .util .List ;
3335import java .util .concurrent .Executors ;
36+ import java .util .concurrent .ConcurrentHashMap ;
3437import java .util .concurrent .ScheduledExecutorService ;
3538import java .util .concurrent .ScheduledFuture ;
3639import java .util .concurrent .ThreadFactory ;
@@ -53,15 +56,16 @@ public class Sniffer implements Closeable {
5356
5457 private final Task task ;
5558
56- Sniffer (RestClient restClient , HostsSniffer hostsSniffer , long sniffInterval , long sniffAfterFailureDelay ) {
57- this .task = new Task (hostsSniffer , restClient , sniffInterval , sniffAfterFailureDelay );
59+ Sniffer (RestClient restClient , HostsSniffer hostsSniffer , long sniffInterval , long sniffAfterFailureDelay , int maxExcludedRounds ) {
60+ this .task = new Task (hostsSniffer , restClient , sniffInterval , sniffAfterFailureDelay , maxExcludedRounds );
5861 }
5962
6063 /**
6164 * Triggers a new sniffing round and explicitly takes out the failed host provided as argument
6265 */
6366 public void sniffOnFailure (HttpHost failedHost ) {
64- this .task .sniffOnFailure (failedHost );
67+ this .task .failedHosts .putIfAbsent (failedHost , 0L );
68+ this .task .scheduleNextRun (0 );
6569 }
6670
6771 @ Override
@@ -75,15 +79,24 @@ private static class Task implements Runnable {
7579
7680 private final long sniffIntervalMillis ;
7781 private final long sniffAfterFailureDelayMillis ;
82+ private final int maxExcludedRounds ;
7883 private final ScheduledExecutorService scheduledExecutorService ;
7984 private final AtomicBoolean running = new AtomicBoolean (false );
8085 private ScheduledFuture <?> scheduledFuture ;
86+ private ConcurrentHashMap <HttpHost , Long > failedHosts = new ConcurrentHashMap <>();
87+
88+ private Task (
89+ HostsSniffer hostsSniffer ,
90+ RestClient restClient ,
91+ long sniffIntervalMillis ,
92+ long sniffAfterFailureDelayMillis ,
93+ int maxExcludedRounds ) {
8194
82- private Task (HostsSniffer hostsSniffer , RestClient restClient , long sniffIntervalMillis , long sniffAfterFailureDelayMillis ) {
8395 this .hostsSniffer = hostsSniffer ;
8496 this .restClient = restClient ;
8597 this .sniffIntervalMillis = sniffIntervalMillis ;
8698 this .sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis ;
99+ this .maxExcludedRounds = maxExcludedRounds ;
87100 SnifferThreadFactory threadFactory = new SnifferThreadFactory (SNIFFER_THREAD_NAME );
88101 this .scheduledExecutorService = Executors .newScheduledThreadPool (1 , threadFactory );
89102 scheduleNextRun (0 );
@@ -106,35 +119,63 @@ synchronized void scheduleNextRun(long delayMillis) {
106119
107120 @ Override
108121 public void run () {
109- sniff (null , sniffIntervalMillis );
110- }
111-
112- void sniffOnFailure (HttpHost failedHost ) {
113- sniff (failedHost , sniffAfterFailureDelayMillis );
122+ sniff (sniffIntervalMillis );
114123 }
115124
116- void sniff (HttpHost excludeHost , long nextSniffDelayMillis ) {
125+ void sniff (long nextSniffDelayMillis ) {
117126 if (running .compareAndSet (false , true )) {
127+ long nextSniffDelay = nextSniffDelayMillis ;
118128 try {
119129 List <HttpHost > sniffedHosts = hostsSniffer .sniffHosts ();
120130 logger .debug ("sniffed hosts: " + sniffedHosts );
121- if (excludeHost != null ) {
122- sniffedHosts .remove (excludeHost );
123- }
124- if (sniffedHosts .isEmpty ()) {
131+
132+ List <HttpHost > hostsFiltered = removeExcludedAndCycle (sniffedHosts );
133+ logger .debug ("sniffed hosts after filtering: " + sniffedHosts );
134+
135+ if (hostsFiltered .isEmpty ()) {
125136 logger .warn ("no hosts to set, hosts will be updated at the next sniffing round" );
126137 } else {
127- this .restClient .setHosts (sniffedHosts .toArray (new HttpHost [sniffedHosts .size ()]));
138+ this .restClient .setHosts (hostsFiltered .toArray (new HttpHost [hostsFiltered .size ()]));
128139 }
129140 } catch (Exception e ) {
130141 logger .error ("error while sniffing nodes" , e );
142+ nextSniffDelay = sniffAfterFailureDelayMillis ;
131143 } finally {
132- scheduleNextRun (nextSniffDelayMillis );
144+ scheduleNextRun (nextSniffDelay );
133145 running .set (false );
134146 }
135147 }
136148 }
137149
150+ /**
151+ * Remove excluded hosts from the list of all sniffed hosts, and cycle through the map. Hosts in the map remain
152+ * there for {@link org.elasticsearch.client.sniff.Sniffer.Task#maxExcludedRounds} cycles
153+ * @param allHosts the list of all sniffed hosts
154+ * @return a new list containing the remaining hosts
155+ */
156+ private List <HttpHost > removeExcludedAndCycle (List <HttpHost > allHosts ) {
157+ final List <HttpHost > excluded = Collections .list (failedHosts .keys ());
158+
159+ if (excluded .isEmpty ()) {
160+ return allHosts ;
161+ }
162+
163+ try {
164+ List <HttpHost > copy = new ArrayList <>(allHosts );
165+ copy .removeAll (excluded );
166+ return copy ;
167+ } finally {
168+ for (HttpHost host : excluded ) {
169+ long excludedCycles = failedHosts .get (host ) + 1 ;
170+ if (excludedCycles >= maxExcludedRounds ) {
171+ failedHosts .remove (host );
172+ } else {
173+ failedHosts .put (host , excludedCycles );
174+ }
175+ }
176+ }
177+ }
178+
138179 synchronized void shutdown () {
139180 scheduledExecutorService .shutdown ();
140181 try {
0 commit comments