|
15 | 15 | import org.elasticsearch.action.ActionRequestBuilder; |
16 | 16 | import org.elasticsearch.action.ActionResponse; |
17 | 17 | import org.elasticsearch.action.index.IndexResponse; |
| 18 | +import org.elasticsearch.action.support.PlainActionFuture; |
18 | 19 | import org.elasticsearch.action.support.master.AcknowledgedResponse; |
19 | 20 | import org.elasticsearch.cluster.ClusterState; |
20 | 21 | import org.elasticsearch.cluster.ClusterStateUpdateTask; |
|
28 | 29 | import org.elasticsearch.cluster.routing.ShardRouting; |
29 | 30 | import org.elasticsearch.cluster.routing.allocation.AllocationService; |
30 | 31 | import org.elasticsearch.cluster.service.ClusterService; |
| 32 | +import org.elasticsearch.common.Priority; |
31 | 33 | import org.elasticsearch.common.settings.Settings; |
32 | 34 | import org.elasticsearch.core.TimeValue; |
33 | 35 | import org.elasticsearch.index.Index; |
|
37 | 39 | import org.elasticsearch.indices.IndicesService; |
38 | 40 | import org.elasticsearch.test.ESIntegTestCase; |
39 | 41 | import org.elasticsearch.test.disruption.BlockClusterStateProcessing; |
40 | | -import org.elasticsearch.threadpool.ThreadPool; |
41 | 42 | import org.elasticsearch.transport.TransportSettings; |
42 | 43 |
|
43 | 44 | import java.util.List; |
@@ -133,20 +134,43 @@ private <Req extends ActionRequest, Res extends ActionResponse> ActionFuture<Res |
133 | 134 | // Wait for no publication in progress to not accidentally cancel a publication different from the one triggered by the given |
134 | 135 | // request. |
135 | 136 | final Coordinator masterCoordinator = internalCluster().getCurrentMasterNodeInstance(Coordinator.class); |
136 | | - assertBusy(() -> { |
137 | | - assertFalse(masterCoordinator.publicationInProgress()); |
138 | | - final long applierVersion = masterCoordinator.getApplierState().version(); |
139 | | - for (Coordinator instance : internalCluster().getInstances(Coordinator.class)) { |
140 | | - assertEquals(instance.getApplierState().version(), applierVersion); |
141 | | - } |
142 | | - }); |
| 137 | + |
| 138 | + ensureNoPendingMasterTasks().actionGet(TimeValue.timeValueSeconds(30)); |
143 | 139 | ActionFuture<Res> future = req.execute(); |
144 | | - // cancel all commit publications produced by the request |
145 | | - internalCluster().getCurrentMasterNodeInstance(ThreadPool.class).generic().execute(() -> { |
146 | | - while (!future.isDone()) { |
147 | | - masterCoordinator.cancelCommittedPublication(); |
148 | | - } |
149 | | - }); |
| 140 | + |
| 141 | + // cancel the first cluster state update produced by the request above |
| 142 | + assertBusy(() -> assertTrue(masterCoordinator.cancelCommittedPublication())); |
| 143 | + // await and cancel any other forked cluster state updates that might be produced by the request |
| 144 | + var task = ensureNoPendingMasterTasks(); |
| 145 | + while (task.isDone() == false) { |
| 146 | + masterCoordinator.cancelCommittedPublication(); |
| 147 | + Thread.onSpinWait(); |
| 148 | + } |
| 149 | + task.actionGet(TimeValue.timeValueSeconds(30)); |
| 150 | + |
| 151 | + return future; |
| 152 | + } |
| 153 | + |
| 154 | + private PlainActionFuture<Void> ensureNoPendingMasterTasks() { |
| 155 | + var future = new PlainActionFuture<Void>(); |
| 156 | + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) |
| 157 | + .submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.LANGUID, TimeValue.timeValueSeconds(30)) { |
| 158 | + |
| 159 | + @Override |
| 160 | + public ClusterState execute(ClusterState currentState) { |
| 161 | + return currentState; |
| 162 | + } |
| 163 | + |
| 164 | + @Override |
| 165 | + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { |
| 166 | + future.onResponse(null); |
| 167 | + } |
| 168 | + |
| 169 | + @Override |
| 170 | + public void onFailure(Exception e) { |
| 171 | + future.onFailure(e); |
| 172 | + } |
| 173 | + }); |
150 | 174 | return future; |
151 | 175 | } |
152 | 176 |
|
|
0 commit comments