From a71c6763a9f612070ceec18a2efa731db02435dd Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 3 Nov 2022 11:21:54 +0000 Subject: [PATCH 1/3] Reinstate "Update RareClusterStateIT to work with ... (#87922)" Reverts #90720 and adds some additional logging to further investigate the failure here. Relates #87922 --- .../coordination/RareClusterStateIT.java | 59 ++++++++++++++++--- .../BlockClusterStateProcessing.java | 35 +++++++---- 2 files changed, 73 insertions(+), 21 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java index 024545fbe68dd..86c0a09a9c787 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -28,6 +29,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -37,6 +40,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.BlockClusterStateProcessing; +import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.elasticsearch.transport.TransportSettings; import java.util.List; @@ -132,15 +136,43 @@ private ActionFuture { - assertFalse(masterCoordinator.publicationInProgress()); - final long applierVersion = masterCoordinator.getApplierState().version(); - for (Coordinator instance : internalCluster().getInstances(Coordinator.class)) { - assertEquals(instance.getApplierState().version(), applierVersion); - } - }); + + ensureNoPendingMasterTasks().actionGet(TimeValue.timeValueSeconds(30)); ActionFuture future = req.execute(); + + // cancel the first cluster state update produced by the request above assertBusy(() -> assertTrue(masterCoordinator.cancelCommittedPublication())); + // await and cancel any other forked cluster state updates that might be produced by the request + var task = ensureNoPendingMasterTasks(); + while (task.isDone() == false) { + masterCoordinator.cancelCommittedPublication(); + Thread.onSpinWait(); + } + task.actionGet(TimeValue.timeValueSeconds(30)); + + return future; + } + + private PlainActionFuture ensureNoPendingMasterTasks() { + var future = new PlainActionFuture(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.LANGUID, TimeValue.timeValueSeconds(30)) { + + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + future.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + future.onFailure(e); + } + }); return future; } @@ -158,8 +190,9 @@ public void testDeleteCreateInOneBulk() throws Exception { indexDoc("test", "1"); refresh(); disruption.startDisrupting(); - logger.info("--> delete index and recreate it"); + logger.info("--> delete index"); executeAndCancelCommittedPublication(client().admin().indices().prepareDelete("test").setTimeout("0s")).get(10, TimeUnit.SECONDS); + logger.info("--> and recreate it"); executeAndCancelCommittedPublication( prepareCreate("test").setSettings( Settings.builder() @@ -272,6 +305,10 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception { assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); } + @TestIssueLogging( + issueUrl = "https://github.com/elastic/elasticsearch/issues/89905", + value = "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:DEBUG" + ) public void testDelayedMappingPropagationOnReplica() throws Exception { // This is essentially the same thing as testDelayedMappingPropagationOnPrimary // but for replicas @@ -371,6 +408,12 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled // and not just because it takes time to replicate the indexing request to the replica Thread.sleep(100); + final var results = "[" + + (putMappingResponse.isDone() ? Strings.toString(putMappingResponse.get()) : "null") + + "," + + (docIndexResponse.isDone() ? Strings.toString(docIndexResponse.get()) : "null") + + "]"; + assertEquals("[null,null]", results); assertFalse(putMappingResponse.isDone()); assertFalse(docIndexResponse.isDone()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java index e456a2f0da6ff..73b2546d6f0a4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java @@ -18,6 +18,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + public class BlockClusterStateProcessing extends SingleNodeDisruption { private final AtomicReference disruptionLatch = new AtomicReference<>(); @@ -38,18 +41,20 @@ public void startDisrupting() { return; } logger.info("delaying cluster state updates on node [{}]", disruptionNodeCopy); - boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1)); - assert success : "startDisrupting called without waiting on stopDisrupting to complete"; + assertTrue(disruptionLatch.compareAndSet(null, new CountDownLatch(1))); final CountDownLatch started = new CountDownLatch(1); clusterService.getClusterApplierService().runOnApplierThread("service_disruption_block", Priority.IMMEDIATE, currentState -> { started.countDown(); CountDownLatch latch = disruptionLatch.get(); - if (latch != null) { - try { - latch.await(); - } catch (InterruptedException e) { - Throwables.rethrow(e); - } + assertNotNull(latch); + try { + logger.info("waiting for removal of cluster state update disruption on node [{}]", disruptionNodeCopy); + latch.await(); + logger.info("removing cluster state update disruption on node [{}]", disruptionNodeCopy); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted during disruption", e); + Throwables.rethrow(e); } }, new ActionListener<>() { @Override @@ -58,20 +63,24 @@ public void onResponse(Void unused) {} @Override public void onFailure(Exception e) { logger.error("unexpected error during disruption", e); + assert false : e; } }); try { started.await(); - } catch (InterruptedException e) {} + logger.info("cluster state updates on node [{}] are now being delayed", disruptionNodeCopy); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted waiting for disruption to start", e); + assert false : e; + } } @Override public void stopDisrupting() { CountDownLatch latch = disruptionLatch.get(); - if (latch != null) { - latch.countDown(); - } - + assertNotNull(latch); + latch.countDown(); } @Override From dc638d97de33bb3f0ad8af08a72d573d1fecf3b3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 3 Nov 2022 13:42:57 +0000 Subject: [PATCH 2/3] Avoid out-of-order commits --- .../coordination/RareClusterStateIT.java | 58 ++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java index 86c0a09a9c787..e2d11e225dea8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java @@ -156,23 +156,26 @@ private ActionFuture ensureNoPendingMasterTasks() { var future = new PlainActionFuture(); internalCluster().getCurrentMasterNodeInstance(ClusterService.class) - .submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.LANGUID, TimeValue.timeValueSeconds(30)) { - - @Override - public ClusterState execute(ClusterState currentState) { - return currentState; - } - - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - future.onResponse(null); - } - - @Override - public void onFailure(Exception e) { - future.onFailure(e); + .submitUnbatchedStateUpdateTask( + "ensureNoPendingMasterTasks", + new ClusterStateUpdateTask(Priority.LANGUID, TimeValue.timeValueSeconds(30)) { + + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + future.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + future.onFailure(e); + } } - }); + ); return future; } @@ -305,10 +308,6 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception { assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); } - @TestIssueLogging( - issueUrl = "https://github.com/elastic/elasticsearch/issues/89905", - value = "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:DEBUG" - ) public void testDelayedMappingPropagationOnReplica() throws Exception { // This is essentially the same thing as testDelayedMappingPropagationOnPrimary // but for replicas @@ -378,6 +377,19 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { assertNotNull(mapper.mappers().getMapper("field")); }); + // If the put-mapping commit messages arrive out-of-order then the earlier one is acked (with a CoordinationStateRejectedException) + // prematurely, bypassing the disruption. Wait for the commit messages to arrive everywhere before proceeding: + assertBusy(() -> { + long minVersion = Long.MAX_VALUE; + long maxVersion = Long.MIN_VALUE; + for (final var coordinator : internalCluster().getInstances(Coordinator.class)) { + final var clusterStateVersion = coordinator.getApplierState().version(); + minVersion = Math.min(minVersion, clusterStateVersion); + maxVersion = Math.max(maxVersion, clusterStateVersion); + } + assertEquals(minVersion, maxVersion); + }); + final ActionFuture docIndexResponse = client().prepareIndex("index").setId("1").setSource("field", 42).execute(); assertBusy(() -> assertTrue(client().prepareGet("index", "1").get().isExists())); @@ -408,12 +420,6 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled // and not just because it takes time to replicate the indexing request to the replica Thread.sleep(100); - final var results = "[" - + (putMappingResponse.isDone() ? Strings.toString(putMappingResponse.get()) : "null") - + "," - + (docIndexResponse.isDone() ? Strings.toString(docIndexResponse.get()) : "null") - + "]"; - assertEquals("[null,null]", results); assertFalse(putMappingResponse.isDone()); assertFalse(docIndexResponse.isDone()); From 036b0dc5fc4c347281d0561cbf02b9259ad17b6f Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 3 Nov 2022 13:54:53 +0000 Subject: [PATCH 3/3] Spotless --- .../elasticsearch/cluster/coordination/RareClusterStateIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java index e2d11e225dea8..5ddd3dcf4de7f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -40,7 +39,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.BlockClusterStateProcessing; -import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.elasticsearch.transport.TransportSettings; import java.util.List;