Skip to content

Commit 0b12cab

Browse files
committed
Cluster Health should run on applied states, even if waitFor=0 #17440
We already protect against making decisions based on an inflight cluster state if someone asks for a waitFor rule (like wait for green). We should do the same for normal health checks as well (unless timeout is set to 0) as it be trappy to debug failures when health says the cluster is in a certain state, but that state wasn't applied yet. Closes #17440
1 parent 9ebc0c8 commit 0b12cab

File tree

6 files changed

+118
-10
lines changed

6 files changed

+118
-10
lines changed

core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private void executeHealth(final ClusterHealthRequest request, final ActionListe
143143
assert waitFor >= 0;
144144
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
145145
final ClusterState state = observer.observedState();
146-
if (waitFor == 0 || request.timeout().millis() == 0) {
146+
if (request.timeout().millis() == 0) {
147147
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));
148148
return;
149149
}

core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.action.ActionRunnable;
2626
import org.elasticsearch.action.support.ActionFilters;
2727
import org.elasticsearch.action.support.HandledTransportAction;
28-
import org.elasticsearch.action.support.ThreadedActionListener;
2928
import org.elasticsearch.cluster.ClusterState;
3029
import org.elasticsearch.cluster.ClusterStateObserver;
3130
import org.elasticsearch.cluster.MasterNodeChangePredicate;
@@ -116,10 +115,6 @@ protected boolean validate(ClusterState newState) {
116115
if (task != null) {
117116
request.setParentTask(clusterService.localNode().getId(), task.getId());
118117
}
119-
// TODO do we really need to wrap it in a listener? the handlers should be cheap
120-
if ((listener instanceof ThreadedActionListener) == false) {
121-
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
122-
}
123118
this.listener = listener;
124119
}
125120

core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import org.elasticsearch.threadpool.ThreadPool;
4242
import org.elasticsearch.transport.TransportService;
4343
import org.junit.After;
44+
import org.junit.AfterClass;
4445
import org.junit.Before;
46+
import org.junit.BeforeClass;
4547

4648
import java.nio.charset.StandardCharsets;
4749
import java.util.HashSet;
@@ -53,17 +55,26 @@
5355
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
5456
import static org.hamcrest.CoreMatchers.equalTo;
5557
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
56-
import static org.mockito.Mockito.mock;
5758

5859
public class TransportBulkActionTookTests extends ESTestCase {
5960

60-
private ThreadPool threadPool;
61+
static private ThreadPool threadPool;
6162
private ClusterService clusterService;
6263

64+
@BeforeClass
65+
public static void beforeClass() {
66+
threadPool = new ThreadPool("TransportBulkActionTookTests");
67+
}
68+
69+
@AfterClass
70+
public static void afterClass() {
71+
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
72+
threadPool = null;
73+
}
74+
6375
@Before
6476
public void setUp() throws Exception {
6577
super.setUp();
66-
threadPool = mock(ThreadPool.class);
6778
clusterService = createClusterService(threadPool);
6879
}
6980

core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,42 @@
1919
package org.elasticsearch.cluster.health;
2020

2121
import org.elasticsearch.Version;
22+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
23+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
24+
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
25+
import org.elasticsearch.action.support.ActionFilters;
2226
import org.elasticsearch.action.support.IndicesOptions;
27+
import org.elasticsearch.action.support.PlainActionFuture;
2328
import org.elasticsearch.cluster.ClusterName;
2429
import org.elasticsearch.cluster.ClusterState;
30+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2531
import org.elasticsearch.cluster.metadata.IndexMetaData;
2632
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2733
import org.elasticsearch.cluster.metadata.MetaData;
2834
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2935
import org.elasticsearch.cluster.routing.RoutingTable;
36+
import org.elasticsearch.cluster.service.ClusterService;
3037
import org.elasticsearch.common.io.stream.BytesStreamOutput;
3138
import org.elasticsearch.common.io.stream.StreamInput;
3239
import org.elasticsearch.common.settings.Settings;
3340
import org.elasticsearch.test.ESTestCase;
41+
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
42+
import org.elasticsearch.test.transport.CapturingTransport;
43+
import org.elasticsearch.threadpool.ThreadPool;
44+
import org.elasticsearch.transport.TransportService;
3445
import org.hamcrest.Matchers;
46+
import org.junit.After;
47+
import org.junit.AfterClass;
48+
import org.junit.Before;
49+
import org.junit.BeforeClass;
3550

3651
import java.io.IOException;
52+
import java.util.HashSet;
53+
import java.util.concurrent.CountDownLatch;
54+
import java.util.concurrent.ExecutionException;
55+
import java.util.concurrent.TimeUnit;
3756

57+
import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService;
3858
import static org.hamcrest.CoreMatchers.allOf;
3959
import static org.hamcrest.CoreMatchers.equalTo;
4060
import static org.hamcrest.Matchers.empty;
@@ -45,6 +65,83 @@
4565
public class ClusterStateHealthTests extends ESTestCase {
4666
private final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY);
4767

68+
69+
private static ThreadPool threadPool;
70+
71+
private ClusterService clusterService;
72+
private TransportService transportService;
73+
private CapturingTransport transport;
74+
75+
@BeforeClass
76+
public static void beforeClass() {
77+
threadPool = new ThreadPool("ClusterStateHealthTests");
78+
}
79+
80+
@Override
81+
@Before
82+
public void setUp() throws Exception {
83+
super.setUp();
84+
transport = new CapturingTransport();
85+
clusterService = createClusterService(threadPool);
86+
transportService = new TransportService(transport, threadPool);
87+
transportService.start();
88+
transportService.acceptIncomingRequests();
89+
}
90+
91+
@After
92+
public void tearDown() throws Exception {
93+
super.tearDown();
94+
clusterService.close();
95+
transportService.close();
96+
}
97+
98+
@AfterClass
99+
public static void afterClass() {
100+
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
101+
threadPool = null;
102+
}
103+
104+
public void testClusterHealthWaitsForClusterStateApplication() throws InterruptedException, ExecutionException {
105+
final CountDownLatch applyLatch = new CountDownLatch(1);
106+
final CountDownLatch listenerCalled = new CountDownLatch(1);
107+
clusterService.add(event -> {
108+
listenerCalled.countDown();
109+
try {
110+
applyLatch.await();
111+
} catch (InterruptedException e) {
112+
logger.debug("interrupted", e);
113+
}
114+
});
115+
116+
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
117+
@Override
118+
public ClusterState execute(ClusterState currentState) throws Exception {
119+
return ClusterState.builder(currentState).build();
120+
}
121+
122+
@Override
123+
public void onFailure(String source, Throwable t) {
124+
logger.warn("unexpected failure", t);
125+
}
126+
});
127+
128+
logger.info("--> waiting for listener to be called and cluster state being blocked");
129+
listenerCalled.await();
130+
131+
TransportClusterHealthAction action = new TransportClusterHealthAction(Settings.EMPTY, transportService,
132+
clusterService, threadPool, clusterService.state().getClusterName(), new ActionFilters(new HashSet<>()),
133+
indexNameExpressionResolver, NoopGatewayAllocator.INSTANCE);
134+
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
135+
136+
action.execute(new ClusterHealthRequest(), listener);
137+
138+
assertFalse(listener.isDone());
139+
140+
applyLatch.countDown();
141+
listener.get();
142+
}
143+
144+
48145
public void testClusterHealth() throws IOException {
49146
RoutingTableGenerator routingTableGenerator = new RoutingTableGenerator();
50147
RoutingTableGenerator.ShardCounter counter = new RoutingTableGenerator.ShardCounter();

core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public void testPrimaryFailureIncreasesTerm() throws Exception {
4848
indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null);
4949

5050
logger.info("--> waiting for a yellow index");
51-
assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)));
51+
assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(),
52+
equalTo(ClusterHealthStatus.YELLOW)));
5253

5354
final long term0 = shard == 0 ? 2 : 1;
5455
final long term1 = shard == 1 ? 2 : 1;

core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2727
import org.elasticsearch.cluster.NodeConnectionsService;
2828
import org.elasticsearch.cluster.node.DiscoveryNode;
29+
import org.elasticsearch.cluster.node.DiscoveryNodes;
2930
import org.elasticsearch.common.settings.ClusterSettings;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.common.transport.DummyTransportAddress;
@@ -60,6 +61,9 @@ public void disconnectFromRemovedNodes(ClusterChangedEvent event) {
6061
clusterService.setClusterStatePublisher((event, ackListener) -> {
6162
});
6263
clusterService.start();
64+
final DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes());
65+
nodes.masterNodeId(clusterService.localNode().getId());
66+
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodes));
6367
return clusterService;
6468
}
6569

0 commit comments

Comments
 (0)