1515import org .elasticsearch .action .ActionRequestBuilder ;
1616import org .elasticsearch .action .ActionResponse ;
1717import org .elasticsearch .action .index .IndexResponse ;
18+ import org .elasticsearch .action .support .PlainActionFuture ;
1819import org .elasticsearch .action .support .master .AcknowledgedResponse ;
1920import org .elasticsearch .cluster .ClusterState ;
2021import org .elasticsearch .cluster .ClusterStateUpdateTask ;
2829import org .elasticsearch .cluster .routing .ShardRouting ;
2930import org .elasticsearch .cluster .routing .allocation .AllocationService ;
3031import org .elasticsearch .cluster .service .ClusterService ;
32+ import org .elasticsearch .common .Priority ;
3133import org .elasticsearch .common .settings .Settings ;
3234import org .elasticsearch .core .TimeValue ;
3335import org .elasticsearch .index .Index ;
@@ -132,15 +134,43 @@ private <Req extends ActionRequest, Res extends ActionResponse> ActionFuture<Res
132134 // Wait for no publication in progress to not accidentally cancel a publication different from the one triggered by the given
133135 // request.
134136 final Coordinator masterCoordinator = internalCluster ().getCurrentMasterNodeInstance (Coordinator .class );
135- assertBusy (() -> {
136- assertFalse (masterCoordinator .publicationInProgress ());
137- final long applierVersion = masterCoordinator .getApplierState ().version ();
138- for (Coordinator instance : internalCluster ().getInstances (Coordinator .class )) {
139- assertEquals (instance .getApplierState ().version (), applierVersion );
140- }
141- });
137+
138+ ensureNoPendingMasterTasks ().actionGet (TimeValue .timeValueSeconds (30 ));
142139 ActionFuture <Res > future = req .execute ();
140+
141+ // cancel the first cluster state update produced by the request above
143142 assertBusy (() -> assertTrue (masterCoordinator .cancelCommittedPublication ()));
143+ // await and cancel any other forked cluster state updates that might be produced by the request
144+ var task = ensureNoPendingMasterTasks ();
145+ while (task .isDone () == false ) {
146+ masterCoordinator .cancelCommittedPublication ();
147+ Thread .onSpinWait ();
148+ }
149+ task .actionGet (TimeValue .timeValueSeconds (30 ));
150+
151+ return future ;
152+ }
153+
154+ private PlainActionFuture <Void > ensureNoPendingMasterTasks () {
155+ var future = new PlainActionFuture <Void >();
156+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
157+ .submitUnbatchedStateUpdateTask ("test" , new ClusterStateUpdateTask (Priority .LANGUID , TimeValue .timeValueSeconds (30 )) {
158+
159+ @ Override
160+ public ClusterState execute (ClusterState currentState ) {
161+ return currentState ;
162+ }
163+
164+ @ Override
165+ public void clusterStateProcessed (ClusterState oldState , ClusterState newState ) {
166+ future .onResponse (null );
167+ }
168+
169+ @ Override
170+ public void onFailure (Exception e ) {
171+ future .onFailure (e );
172+ }
173+ });
144174 return future ;
145175 }
146176
@@ -158,8 +188,9 @@ public void testDeleteCreateInOneBulk() throws Exception {
158188 indexDoc ("test" , "1" );
159189 refresh ();
160190 disruption .startDisrupting ();
161- logger .info ("--> delete index and recreate it " );
191+ logger .info ("--> delete index" );
162192 executeAndCancelCommittedPublication (client ().admin ().indices ().prepareDelete ("test" ).setTimeout ("0s" )).get (10 , TimeUnit .SECONDS );
193+ logger .info ("--> and recreate it" );
163194 executeAndCancelCommittedPublication (
164195 prepareCreate ("test" ).setSettings (
165196 Settings .builder ()
0 commit comments