1717 * under the License.
1818 */
1919
20- package org .elasticsearch .indices . state ;
20+ package org .elasticsearch .cluster . coordination ;
2121
2222import org .elasticsearch .ElasticsearchParseException ;
2323import org .elasticsearch .Version ;
2424import org .elasticsearch .action .ActionFuture ;
25- import org .elasticsearch .action .ActionListener ;
25+ import org .elasticsearch .action .ActionRequest ;
26+ import org .elasticsearch .action .ActionRequestBuilder ;
27+ import org .elasticsearch .action .ActionResponse ;
2628import org .elasticsearch .action .index .IndexResponse ;
2729import org .elasticsearch .action .support .master .AcknowledgedResponse ;
2830import org .elasticsearch .cluster .ClusterState ;
4042import org .elasticsearch .common .collect .ImmutableOpenMap ;
4143import org .elasticsearch .common .settings .Settings ;
4244import org .elasticsearch .common .unit .TimeValue ;
43- import org .elasticsearch .discovery .DiscoverySettings ;
45+ import org .elasticsearch .discovery .Discovery ;
4446import org .elasticsearch .index .Index ;
4547import org .elasticsearch .index .IndexService ;
4648import org .elasticsearch .index .mapper .DocumentMapper ;
5153import org .elasticsearch .test .disruption .BlockClusterStateProcessing ;
5254import org .elasticsearch .test .junit .annotations .TestLogging ;
5355
54- import java .util .Arrays ;
5556import java .util .List ;
5657import java .util .Map ;
57- import java .util .concurrent .atomic . AtomicReference ;
58+ import java .util .concurrent .TimeUnit ;
5859
5960import static java .util .Collections .emptyMap ;
6061import static java .util .Collections .emptySet ;
@@ -86,7 +87,7 @@ protected int numberOfReplicas() {
8687 return 0 ;
8788 }
8889
89- public void testAssignmentWithJustAddedNodes () throws Exception {
90+ public void testAssignmentWithJustAddedNodes () {
9091 internalCluster ().startNode ();
9192 final String index = "index" ;
9293 prepareCreate (index ).setSettings (Settings .builder ().put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , 1 )
@@ -149,22 +150,20 @@ public void onFailure(String source, Exception e) {
149150 });
150151 }
151152
153+ private <Req extends ActionRequest , Res extends ActionResponse > ActionFuture <Res > executeAndCancelCommittedPublication (
154+ ActionRequestBuilder <Req , Res > req ) throws Exception {
155+ ActionFuture <Res > future = req .execute ();
156+ assertBusy (() -> assertTrue (((Coordinator )internalCluster ().getMasterNodeInstance (Discovery .class )).cancelCommittedPublication ()));
157+ return future ;
158+ }
159+
152160 public void testDeleteCreateInOneBulk () throws Exception {
153- internalCluster ().startMasterOnlyNode (Settings .builder ()
154- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
155- .build ());
156- String dataNode = internalCluster ().startDataOnlyNode (Settings .builder ()
157- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
158- .build ());
161+ internalCluster ().startMasterOnlyNode ();
162+ String dataNode = internalCluster ().startDataOnlyNode ();
159163 assertFalse (client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("2" ).get ().isTimedOut ());
160164 prepareCreate ("test" ).setSettings (Settings .builder ().put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , 0 )).addMapping ("type" ).get ();
161165 ensureGreen ("test" );
162166
163- // now that the cluster is stable, remove publishing timeout
164- assertAcked (client ().admin ().cluster ().prepareUpdateSettings ().setTransientSettings (Settings .builder ()
165- .put (DiscoverySettings .PUBLISH_TIMEOUT_SETTING .getKey (), "0" )
166- .put (DiscoverySettings .COMMIT_TIMEOUT_SETTING .getKey (), "30s" )));
167-
168167 // block none master node.
169168 BlockClusterStateProcessing disruption = new BlockClusterStateProcessing (dataNode , random ());
170169 internalCluster ().setDisruptionScheme (disruption );
@@ -173,10 +172,14 @@ public void testDeleteCreateInOneBulk() throws Exception {
173172 refresh ();
174173 disruption .startDisrupting ();
175174 logger .info ("--> delete index and recreate it" );
176- assertFalse (client ().admin ().indices ().prepareDelete ("test" ).setTimeout ("200ms" ).get ().isAcknowledged ());
177- assertFalse (prepareCreate ("test" ).setTimeout ("200ms" ).setSettings (Settings .builder ().put (IndexMetaData
178- .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetaData .SETTING_WAIT_FOR_ACTIVE_SHARDS .getKey (), "0" )).get ().isAcknowledged ());
175+ executeAndCancelCommittedPublication (client ().admin ().indices ().prepareDelete ("test" ).setTimeout ("0s" ))
176+ .get (10 , TimeUnit .SECONDS );
177+ executeAndCancelCommittedPublication (prepareCreate ("test" ).setSettings (Settings .builder ().put (IndexMetaData
178+ .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetaData .SETTING_WAIT_FOR_ACTIVE_SHARDS .getKey (), "0" )).setTimeout ("0s" ))
179+ .get (10 , TimeUnit .SECONDS );
180+
179181 logger .info ("--> letting cluster proceed" );
182+
180183 disruption .stopDisrupting ();
181184 ensureGreen (TimeValue .timeValueMinutes (30 ), "test" );
182185 // due to publish_timeout of 0, wait for data node to have cluster state fully applied
@@ -196,12 +199,7 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
196199 // but the change might not be on the node that performed the indexing
197200 // operation yet
198201
199- Settings settings = Settings .builder ()
200- .put (DiscoverySettings .COMMIT_TIMEOUT_SETTING .getKey (), "30s" ) // explicitly set so it won't default to publish timeout
201- .put (DiscoverySettings .PUBLISH_TIMEOUT_SETTING .getKey (), "0s" ) // don't wait post commit as we are blocking things by design
202- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
203- .build ();
204- final List <String > nodeNames = internalCluster ().startNodes (2 , settings );
202+ final List <String > nodeNames = internalCluster ().startNodes (2 );
205203 assertFalse (client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("2" ).get ().isTimedOut ());
206204
207205 final String master = internalCluster ().getMasterName ();
@@ -242,19 +240,10 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
242240 disruption .startDisrupting ();
243241
244242 // Add a new mapping...
245- final AtomicReference <Object > putMappingResponse = new AtomicReference <>();
246- client ().admin ().indices ().preparePutMapping ("index" ).setType ("type" ).setSource ("field" , "type=long" ).execute (
247- new ActionListener <AcknowledgedResponse >() {
248- @ Override
249- public void onResponse (AcknowledgedResponse response ) {
250- putMappingResponse .set (response );
251- }
243+ ActionFuture <AcknowledgedResponse > putMappingResponse =
244+ executeAndCancelCommittedPublication (client ().admin ().indices ().preparePutMapping ("index" )
245+ .setType ("type" ).setSource ("field" , "type=long" ));
252246
253- @ Override
254- public void onFailure (Exception e ) {
255- putMappingResponse .set (e );
256- }
257- });
258247 // ...and wait for mappings to be available on master
259248 assertBusy (() -> {
260249 ImmutableOpenMap <String , MappingMetaData > indexMappings = client ().admin ().indices ()
@@ -273,36 +262,24 @@ public void onFailure(Exception e) {
273262 assertNotNull (fieldMapping );
274263 });
275264
276- final AtomicReference <Object > docIndexResponse = new AtomicReference <>();
277- client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute (new ActionListener <IndexResponse >() {
278- @ Override
279- public void onResponse (IndexResponse response ) {
280- docIndexResponse .set (response );
281- }
282-
283- @ Override
284- public void onFailure (Exception e ) {
285- docIndexResponse .set (e );
286- }
287- });
265+ // this request does not change the cluster state, because mapping is already created,
266+ // we don't await and cancel committed publication
267+ ActionFuture <IndexResponse > docIndexResponse =
268+ client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute ();
288269
289270 // Wait a bit to make sure that the reason why we did not get a response
290271 // is that cluster state processing is blocked and not just that it takes
291272 // time to process the indexing request
292273 Thread .sleep (100 );
293- assertThat (putMappingResponse .get (), equalTo ( null ));
294- assertThat (docIndexResponse .get (), equalTo ( null ));
274+ assertFalse (putMappingResponse .isDone ( ));
275+ assertFalse (docIndexResponse .isDone ( ));
295276
296277 // Now make sure the indexing request finishes successfully
297278 disruption .stopDisrupting ();
298279 assertBusy (() -> {
299- assertThat (putMappingResponse .get (), instanceOf (AcknowledgedResponse .class ));
300- AcknowledgedResponse resp = (AcknowledgedResponse ) putMappingResponse .get ();
301- assertTrue (resp .isAcknowledged ());
302- assertThat (docIndexResponse .get (), instanceOf (IndexResponse .class ));
303- IndexResponse docResp = (IndexResponse ) docIndexResponse .get ();
304- assertEquals (Arrays .toString (docResp .getShardInfo ().getFailures ()),
305- 1 , docResp .getShardInfo ().getTotal ());
280+ assertTrue (putMappingResponse .get (10 , TimeUnit .SECONDS ).isAcknowledged ());
281+ assertThat (docIndexResponse .get (10 , TimeUnit .SECONDS ), instanceOf (IndexResponse .class ));
282+ assertEquals (1 , docIndexResponse .get (10 , TimeUnit .SECONDS ).getShardInfo ().getTotal ());
306283 });
307284 }
308285
@@ -312,12 +289,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
312289 // Here we want to test that everything goes well if the mappings that
313290 // are needed for a document are not available on the replica at the
314291 // time of indexing it
315- final List <String > nodeNames = internalCluster ().startNodes (2 ,
316- Settings .builder ()
317- .put (DiscoverySettings .COMMIT_TIMEOUT_SETTING .getKey (), "30s" ) // explicitly set so it won't default to publish timeout
318- .put (DiscoverySettings .PUBLISH_TIMEOUT_SETTING .getKey (), "0s" ) // don't wait post commit as we are blocking things by design
319- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
320- .build ());
292+ final List <String > nodeNames = internalCluster ().startNodes (2 );
321293 assertFalse (client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("2" ).get ().isTimedOut ());
322294
323295 final String master = internalCluster ().getMasterName ();
@@ -359,19 +331,10 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
359331 BlockClusterStateProcessing disruption = new BlockClusterStateProcessing (otherNode , random ());
360332 internalCluster ().setDisruptionScheme (disruption );
361333 disruption .startDisrupting ();
362- final AtomicReference <Object > putMappingResponse = new AtomicReference <>();
363- client ().admin ().indices ().preparePutMapping ("index" ).setType ("type" ).setSource ("field" , "type=long" ).execute (
364- new ActionListener <AcknowledgedResponse >() {
365- @ Override
366- public void onResponse (AcknowledgedResponse response ) {
367- putMappingResponse .set (response );
368- }
334+ final ActionFuture <AcknowledgedResponse > putMappingResponse =
335+ executeAndCancelCommittedPublication (client ().admin ().indices ().preparePutMapping ("index" )
336+ .setType ("type" ).setSource ("field" , "type=long" ));
369337
370- @ Override
371- public void onFailure (Exception e ) {
372- putMappingResponse .set (e );
373- }
374- });
375338 final Index index = resolveIndex ("index" );
376339 // Wait for mappings to be available on master
377340 assertBusy (() -> {
@@ -384,25 +347,17 @@ public void onFailure(Exception e) {
384347 assertNotNull (mapper .mappers ().getMapper ("field" ));
385348 });
386349
387- final AtomicReference <Object > docIndexResponse = new AtomicReference <>();
388- client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute (new ActionListener <IndexResponse >() {
389- @ Override
390- public void onResponse (IndexResponse response ) {
391- docIndexResponse .set (response );
392- }
393-
394- @ Override
395- public void onFailure (Exception e ) {
396- docIndexResponse .set (e );
397- }
398- });
350+ final ActionFuture <IndexResponse > docIndexResponse = client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute ();
399351
400352 assertBusy (() -> assertTrue (client ().prepareGet ("index" , "type" , "1" ).get ().isExists ()));
401353
402354 // index another document, this time using dynamic mappings.
403355 // The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even
404356 // if the dynamic mapping update is not applied on the replica yet.
405- ActionFuture <IndexResponse > dynamicMappingsFut = client ().prepareIndex ("index" , "type" , "2" ).setSource ("field2" , 42 ).execute ();
357+ // this request does not change the cluster state, because the mapping is dynamic,
358+ // we need to await and cancel committed publication
359+ ActionFuture <IndexResponse > dynamicMappingsFut =
360+ executeAndCancelCommittedPublication (client ().prepareIndex ("index" , "type" , "2" ).setSource ("field2" , 42 ));
406361
407362 // ...and wait for second mapping to be available on master
408363 assertBusy (() -> {
@@ -421,22 +376,18 @@ public void onFailure(Exception e) {
421376 // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
422377 // and not just because it takes time to replicate the indexing request to the replica
423378 Thread .sleep (100 );
424- assertThat (putMappingResponse .get (), equalTo ( null ));
425- assertThat (docIndexResponse .get (), equalTo ( null ));
379+ assertFalse (putMappingResponse .isDone ( ));
380+ assertFalse (docIndexResponse .isDone ( ));
426381
427382 // Now make sure the indexing request finishes successfully
428383 disruption .stopDisrupting ();
429384 assertBusy (() -> {
430- assertThat (putMappingResponse .get (), instanceOf (AcknowledgedResponse .class ));
431- AcknowledgedResponse resp = (AcknowledgedResponse ) putMappingResponse .get ();
432- assertTrue (resp .isAcknowledged ());
433- assertThat (docIndexResponse .get (), instanceOf (IndexResponse .class ));
434- IndexResponse docResp = (IndexResponse ) docIndexResponse .get ();
435- assertEquals (Arrays .toString (docResp .getShardInfo ().getFailures ()),
436- 2 , docResp .getShardInfo ().getTotal ()); // both shards should have succeeded
385+ assertTrue (putMappingResponse .get (10 , TimeUnit .SECONDS ).isAcknowledged ());
386+ assertThat (docIndexResponse .get (10 , TimeUnit .SECONDS ), instanceOf (IndexResponse .class ));
387+ assertEquals (2 , docIndexResponse .get (10 , TimeUnit .SECONDS ).getShardInfo ().getTotal ()); // both shards should have succeeded
437388 });
438389
439- assertThat (dynamicMappingsFut .get ().getResult (), equalTo (CREATED ));
390+ assertThat (dynamicMappingsFut .get (10 , TimeUnit . SECONDS ).getResult (), equalTo (CREATED ));
440391 }
441392
442393}
0 commit comments