diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java index 024545fbe68dd..5ddd3dcf4de7f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -28,6 +29,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -132,15 +134,46 @@ private ActionFuture { - assertFalse(masterCoordinator.publicationInProgress()); - final long applierVersion = masterCoordinator.getApplierState().version(); - for (Coordinator instance : internalCluster().getInstances(Coordinator.class)) { - assertEquals(instance.getApplierState().version(), applierVersion); - } - }); + + ensureNoPendingMasterTasks().actionGet(TimeValue.timeValueSeconds(30)); ActionFuture future = req.execute(); + + // cancel the first cluster state update produced by the request above assertBusy(() -> assertTrue(masterCoordinator.cancelCommittedPublication())); + // await and cancel any other forked cluster state updates that might be produced by the request + var task = ensureNoPendingMasterTasks(); + while (task.isDone() == false) { + masterCoordinator.cancelCommittedPublication(); + Thread.onSpinWait(); + } + task.actionGet(TimeValue.timeValueSeconds(30)); + + return future; + } + + private PlainActionFuture ensureNoPendingMasterTasks() { + var future = new PlainActionFuture(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .submitUnbatchedStateUpdateTask( + "ensureNoPendingMasterTasks", + new ClusterStateUpdateTask(Priority.LANGUID, TimeValue.timeValueSeconds(30)) { + + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + future.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + future.onFailure(e); + } + } + ); return future; } @@ -158,8 +191,9 @@ public void testDeleteCreateInOneBulk() throws Exception { indexDoc("test", "1"); refresh(); disruption.startDisrupting(); - logger.info("--> delete index and recreate it"); + logger.info("--> delete index"); executeAndCancelCommittedPublication(client().admin().indices().prepareDelete("test").setTimeout("0s")).get(10, TimeUnit.SECONDS); + logger.info("--> and recreate it"); executeAndCancelCommittedPublication( prepareCreate("test").setSettings( Settings.builder() @@ -341,6 +375,19 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { assertNotNull(mapper.mappers().getMapper("field")); }); + // If the put-mapping commit messages arrive out-of-order then the earlier one is acked (with a CoordinationStateRejectedException) + // prematurely, bypassing the disruption. Wait for the commit messages to arrive everywhere before proceeding: + assertBusy(() -> { + long minVersion = Long.MAX_VALUE; + long maxVersion = Long.MIN_VALUE; + for (final var coordinator : internalCluster().getInstances(Coordinator.class)) { + final var clusterStateVersion = coordinator.getApplierState().version(); + minVersion = Math.min(minVersion, clusterStateVersion); + maxVersion = Math.max(maxVersion, clusterStateVersion); + } + assertEquals(minVersion, maxVersion); + }); + final ActionFuture docIndexResponse = client().prepareIndex("index").setId("1").setSource("field", 42).execute(); assertBusy(() -> assertTrue(client().prepareGet("index", "1").get().isExists())); diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java index e456a2f0da6ff..73b2546d6f0a4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java @@ -18,6 +18,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + public class BlockClusterStateProcessing extends SingleNodeDisruption { private final AtomicReference disruptionLatch = new AtomicReference<>(); @@ -38,18 +41,20 @@ public void startDisrupting() { return; } logger.info("delaying cluster state updates on node [{}]", disruptionNodeCopy); - boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1)); - assert success : "startDisrupting called without waiting on stopDisrupting to complete"; + assertTrue(disruptionLatch.compareAndSet(null, new CountDownLatch(1))); final CountDownLatch started = new CountDownLatch(1); clusterService.getClusterApplierService().runOnApplierThread("service_disruption_block", Priority.IMMEDIATE, currentState -> { started.countDown(); CountDownLatch latch = disruptionLatch.get(); - if (latch != null) { - try { - latch.await(); - } catch (InterruptedException e) { - Throwables.rethrow(e); - } + assertNotNull(latch); + try { + logger.info("waiting for removal of cluster state update disruption on node [{}]", disruptionNodeCopy); + latch.await(); + logger.info("removing cluster state update disruption on node [{}]", disruptionNodeCopy); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted during disruption", e); + Throwables.rethrow(e); } }, new ActionListener<>() { @Override @@ -58,20 +63,24 @@ public void onResponse(Void unused) {} @Override public void onFailure(Exception e) { logger.error("unexpected error during disruption", e); + assert false : e; } }); try { started.await(); - } catch (InterruptedException e) {} + logger.info("cluster state updates on node [{}] are now being delayed", disruptionNodeCopy); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted waiting for disruption to start", e); + assert false : e; + } } @Override public void stopDisrupting() { CountDownLatch latch = disruptionLatch.get(); - if (latch != null) { - latch.countDown(); - } - + assertNotNull(latch); + latch.countDown(); } @Override