From 0b12cab07a350edf6b4fa3b1b9fa5879bdd19a9a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 31 Mar 2016 09:11:08 +0200 Subject: [PATCH] Cluster Health should run on applied states, even if waitFor=0 #17440 We already protect against making decisions based on an inflight cluster state if someone asks for a waitFor rule (like wait for green). We should do the same for normal health checks as well (unless timeout is set to 0) as it be trappy to debug failures when health says the cluster is in a certain state, but that state wasn't applied yet. Closes #17440 --- .../health/TransportClusterHealthAction.java | 2 +- .../master/TransportMasterNodeAction.java | 5 - .../bulk/TransportBulkActionTookTests.java | 17 +++- .../health/ClusterStateHealthTests.java | 97 +++++++++++++++++++ .../routing/allocation/ShardStateIT.java | 3 +- .../cluster/service/ClusterServiceUtils.java | 4 + 6 files changed, 118 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 069f0ebe1b8e5..853e6986d02b6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -143,7 +143,7 @@ private void executeHealth(final ClusterHealthRequest request, final ActionListe assert waitFor >= 0; final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext()); final ClusterState state = observer.observedState(); - if (waitFor == 0 || request.timeout().millis() == 0) { + if (request.timeout().millis() == 0) { listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0)); return; } diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index b87190cabe81c..691bcc662727a 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.MasterNodeChangePredicate; @@ -116,10 +115,6 @@ protected boolean validate(ClusterState newState) { if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); } - // TODO do we really need to wrap it in a listener? the handlers should be cheap - if ((listener instanceof ThreadedActionListener) == false) { - listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener); - } this.listener = listener; } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index e36b4e4f0287e..219fa7274e1b7 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -41,7 +41,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import java.nio.charset.StandardCharsets; import java.util.HashSet; @@ -53,17 +55,26 @@ import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.mockito.Mockito.mock; public class TransportBulkActionTookTests extends ESTestCase { - private ThreadPool threadPool; + static private ThreadPool threadPool; private ClusterService clusterService; + @BeforeClass + public static void beforeClass() { + threadPool = new ThreadPool("TransportBulkActionTookTests"); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + @Before public void setUp() throws Exception { super.setUp(); - threadPool = mock(ThreadPool.class); clusterService = createClusterService(threadPool); } diff --git a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index 9c0f1014dcfed..69fb74dd7fb6b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -19,22 +19,42 @@ package org.elasticsearch.cluster.health; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import java.io.IOException; +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.empty; @@ -45,6 +65,83 @@ public class ClusterStateHealthTests extends ESTestCase { private final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY); + + private static ThreadPool threadPool; + + private ClusterService clusterService; + private TransportService transportService; + private CapturingTransport transport; + + @BeforeClass + public static void beforeClass() { + threadPool = new ThreadPool("ClusterStateHealthTests"); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = new TransportService(transport, threadPool); + transportService.start(); + transportService.acceptIncomingRequests(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + transportService.close(); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + public void testClusterHealthWaitsForClusterStateApplication() throws InterruptedException, ExecutionException { + final CountDownLatch applyLatch = new CountDownLatch(1); + final CountDownLatch listenerCalled = new CountDownLatch(1); + clusterService.add(event -> { + listenerCalled.countDown(); + try { + applyLatch.await(); + } catch (InterruptedException e) { + logger.debug("interrupted", e); + } + }); + + clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return ClusterState.builder(currentState).build(); + } + + @Override + public void onFailure(String source, Throwable t) { + logger.warn("unexpected failure", t); + } + }); + + logger.info("--> waiting for listener to be called and cluster state being blocked"); + listenerCalled.await(); + + TransportClusterHealthAction action = new TransportClusterHealthAction(Settings.EMPTY, transportService, + clusterService, threadPool, clusterService.state().getClusterName(), new ActionFilters(new HashSet<>()), + indexNameExpressionResolver, NoopGatewayAllocator.INSTANCE); + PlainActionFuture listener = new PlainActionFuture<>(); + + action.execute(new ClusterHealthRequest(), listener); + + assertFalse(listener.isDone()); + + applyLatch.countDown(); + listener.get(); + } + + public void testClusterHealth() throws IOException { RoutingTableGenerator routingTableGenerator = new RoutingTableGenerator(); RoutingTableGenerator.ShardCounter counter = new RoutingTableGenerator.ShardCounter(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index 5f2e0241a82de..65ec13fb2afa5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -48,7 +48,8 @@ public void testPrimaryFailureIncreasesTerm() throws Exception { indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null); logger.info("--> waiting for a yellow index"); - assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW))); + assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), + equalTo(ClusterHealthStatus.YELLOW))); final long term0 = shard == 0 ? 2 : 1; final long term1 = shard == 1 ? 2 : 1; diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceUtils.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceUtils.java index a637f0b61ddc8..e3f132b18fbcf 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceUtils.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceUtils.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; @@ -60,6 +61,9 @@ public void disconnectFromRemovedNodes(ClusterChangedEvent event) { clusterService.setClusterStatePublisher((event, ackListener) -> { }); clusterService.start(); + final DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes()); + nodes.masterNodeId(clusterService.localNode().getId()); + setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodes)); return clusterService; }