4444import org .elasticsearch .test .ESIntegTestCase ;
4545import org .elasticsearch .test .InternalTestCluster ;
4646import org .elasticsearch .test .NodeConfigurationSource ;
47+ import org .elasticsearch .test .TestCustomMetaData ;
4748import org .elasticsearch .transport .MockTcpTransportPlugin ;
4849import org .elasticsearch .tribe .TribeServiceTests .MergableCustomMetaData1 ;
50+ import org .elasticsearch .tribe .TribeServiceTests .MergableCustomMetaData2 ;
4951import org .junit .After ;
5052import org .junit .AfterClass ;
5153import org .junit .Before ;
6163import java .util .function .Consumer ;
6264import java .util .function .Function ;
6365import java .util .function .Predicate ;
66+ import java .util .function .UnaryOperator ;
6467import java .util .stream .Stream ;
6568import java .util .stream .StreamSupport ;
6669
@@ -454,42 +457,93 @@ public void testClusterStateNodes() throws Exception {
454457
455458 public void testMergingRemovedCustomMetaData () throws Exception {
456459 MetaData .registerPrototype (MergableCustomMetaData1 .TYPE , new MergableCustomMetaData1 ("" ));
460+ removeCustomMetaData (cluster1 , MergableCustomMetaData1 .TYPE );
461+ removeCustomMetaData (cluster2 , MergableCustomMetaData1 .TYPE );
457462 MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1 ("a" );
458463 MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1 ("b" );
459464 try (Releasable tribeNode = startTribeNode ()) {
460- updateCustomMetaData (cluster1 , customMetaData1 );
461- updateCustomMetaData (cluster2 , customMetaData2 );
465+ assertNodes (ALL );
466+ putCustomMetaData (cluster1 , customMetaData1 );
467+ putCustomMetaData (cluster2 , customMetaData2 );
462468 assertCustomMetaDataUpdated (internalCluster (), customMetaData2 );
463- updateCustomMetaData (cluster2 , null );
469+ removeCustomMetaData (cluster2 , customMetaData2 . type () );
464470 assertCustomMetaDataUpdated (internalCluster (), customMetaData1 );
465471 }
466472 }
467473
468474 public void testMergingCustomMetaData () throws Exception {
469475 MetaData .registerPrototype (MergableCustomMetaData1 .TYPE , new MergableCustomMetaData1 ("" ));
476+ removeCustomMetaData (cluster1 , MergableCustomMetaData1 .TYPE );
477+ removeCustomMetaData (cluster2 , MergableCustomMetaData1 .TYPE );
470478 MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1 (randomAsciiOfLength (10 ));
471479 MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1 (randomAsciiOfLength (10 ));
472480 List <MergableCustomMetaData1 > customMetaDatas = Arrays .asList (customMetaData1 , customMetaData2 );
473481 Collections .sort (customMetaDatas , (cm1 , cm2 ) -> cm2 .getData ().compareTo (cm1 .getData ()));
474482 final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas .get (0 );
475483 try (Releasable tribeNode = startTribeNode ()) {
476- updateCustomMetaData (cluster1 , customMetaData1 );
477- updateCustomMetaData (cluster2 , customMetaData2 );
484+ assertNodes (ALL );
485+ putCustomMetaData (cluster1 , customMetaData1 );
486+ assertCustomMetaDataUpdated (internalCluster (), customMetaData1 );
487+ putCustomMetaData (cluster2 , customMetaData2 );
478488 assertCustomMetaDataUpdated (internalCluster (), tribeNodeCustomMetaData );
479489 }
480490 }
481491
492+ public void testMergingMultipleCustomMetaData () throws Exception {
493+ MetaData .registerPrototype (MergableCustomMetaData1 .TYPE , new MergableCustomMetaData1 ("" ));
494+ MetaData .registerPrototype (MergableCustomMetaData2 .TYPE , new MergableCustomMetaData2 ("" ));
495+ removeCustomMetaData (cluster1 , MergableCustomMetaData1 .TYPE );
496+ removeCustomMetaData (cluster2 , MergableCustomMetaData1 .TYPE );
497+ MergableCustomMetaData1 firstCustomMetaDataType1 = new MergableCustomMetaData1 (randomAsciiOfLength (10 ));
498+ MergableCustomMetaData1 secondCustomMetaDataType1 = new MergableCustomMetaData1 (randomAsciiOfLength (10 ));
499+ MergableCustomMetaData2 firstCustomMetaDataType2 = new MergableCustomMetaData2 (randomAsciiOfLength (10 ));
500+ MergableCustomMetaData2 secondCustomMetaDataType2 = new MergableCustomMetaData2 (randomAsciiOfLength (10 ));
501+ List <MergableCustomMetaData1 > mergedCustomMetaDataType1 = Arrays .asList (firstCustomMetaDataType1 , secondCustomMetaDataType1 );
502+ List <MergableCustomMetaData2 > mergedCustomMetaDataType2 = Arrays .asList (firstCustomMetaDataType2 , secondCustomMetaDataType2 );
503+ Collections .sort (mergedCustomMetaDataType1 , (cm1 , cm2 ) -> cm2 .getData ().compareTo (cm1 .getData ()));
504+ Collections .sort (mergedCustomMetaDataType2 , (cm1 , cm2 ) -> cm2 .getData ().compareTo (cm1 .getData ()));
505+ try (Releasable tribeNode = startTribeNode ()) {
506+ assertNodes (ALL );
507+ // test putting multiple custom md types propagates to tribe
508+ putCustomMetaData (cluster1 , firstCustomMetaDataType1 );
509+ putCustomMetaData (cluster1 , firstCustomMetaDataType2 );
510+ assertCustomMetaDataUpdated (internalCluster (), firstCustomMetaDataType1 );
511+ assertCustomMetaDataUpdated (internalCluster (), firstCustomMetaDataType2 );
512+
513+ // test multiple same type custom md is merged and propagates to tribe
514+ putCustomMetaData (cluster2 , secondCustomMetaDataType1 );
515+ assertCustomMetaDataUpdated (internalCluster (), firstCustomMetaDataType2 );
516+ assertCustomMetaDataUpdated (internalCluster (), mergedCustomMetaDataType1 .get (0 ));
517+
518+ // test multiple same type custom md is merged and propagates to tribe
519+ putCustomMetaData (cluster2 , secondCustomMetaDataType2 );
520+ assertCustomMetaDataUpdated (internalCluster (), mergedCustomMetaDataType1 .get (0 ));
521+ assertCustomMetaDataUpdated (internalCluster (), mergedCustomMetaDataType2 .get (0 ));
522+ }
523+ }
524+
482525 private static void assertCustomMetaDataUpdated (InternalTestCluster cluster ,
483- MergableCustomMetaData1 expectedCustomMetaData ) throws Exception {
526+ TestCustomMetaData expectedCustomMetaData ) throws Exception {
484527 assertBusy (() -> {
485528 ClusterState tribeState = cluster .getInstance (ClusterService .class , cluster .getNodeNames ()[0 ]).state ();
486- MetaData .Custom custom = tribeState .metaData ().custom (MergableCustomMetaData1 . TYPE );
529+ MetaData .Custom custom = tribeState .metaData ().custom (expectedCustomMetaData . type () );
487530 assertNotNull (custom );
488531 assertThat (custom , equalTo (expectedCustomMetaData ));
489532 });
490533 }
491534
492- private static void updateCustomMetaData (InternalTestCluster cluster , final MergableCustomMetaData1 customMetaData ) {
535+ private void removeCustomMetaData (InternalTestCluster cluster , final String customMetaDataType ) {
536+ logger .info ("removing custom_md type [{}] from [{}]" , customMetaDataType , cluster .getClusterName ());
537+ updateMetaData (cluster , builder -> builder .removeCustom (customMetaDataType ));
538+ }
539+
540+ private void putCustomMetaData (InternalTestCluster cluster , final TestCustomMetaData customMetaData ) {
541+ logger .info ("putting custom_md type [{}] with data[{}] from [{}]" , customMetaData .type (),
542+ customMetaData .getData (), cluster .getClusterName ());
543+ updateMetaData (cluster , builder -> builder .putCustom (customMetaData .type (), customMetaData ));
544+ }
545+
546+ private static void updateMetaData (InternalTestCluster cluster , UnaryOperator <MetaData .Builder > addCustoms ) {
493547 ClusterService clusterService = cluster .getInstance (ClusterService .class , cluster .getMasterName ());
494548 final CountDownLatch latch = new CountDownLatch (1 );
495549 clusterService .submitStateUpdateTask ("update customMetaData" , new ClusterStateUpdateTask (Priority .IMMEDIATE ) {
@@ -501,11 +555,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
501555 @ Override
502556 public ClusterState execute (ClusterState currentState ) throws Exception {
503557 MetaData .Builder builder = MetaData .builder (currentState .metaData ());
504- if (customMetaData == null ) {
505- builder .removeCustom (MergableCustomMetaData1 .TYPE );
506- } else {
507- builder .putCustom (MergableCustomMetaData1 .TYPE , customMetaData );
508- }
558+ builder = addCustoms .apply (builder );
509559 return new ClusterState .Builder (currentState ).metaData (builder ).build ();
510560 }
511561
@@ -520,6 +570,7 @@ public void onFailure(String source, Exception e) {
520570 fail ("latch waiting on publishing custom md interrupted [" + e .getMessage () + "]" );
521571 }
522572 assertThat ("timed out trying to add custom metadata to " + cluster .getClusterName (), latch .getCount (), equalTo (0L ));
573+
523574 }
524575
525576 private void assertIndicesExist (Client client , String ... indices ) throws Exception {
0 commit comments