diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 2d03a057b583e..ca1ea11cf2151 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -145,10 +145,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -314,7 +316,12 @@ protected Node(final Environment environment, Collection final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); clusterService.add(scriptModule.getScriptService()); resourcesToClose.add(clusterService); - final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(), classpathPlugins); + Collection>> customMetaDataReducers = + pluginsService.filterPlugins(Plugin.class).stream() + .map(Plugin::getCustomMetaDataReducer) + .collect(Collectors.toList()); + final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(), classpathPlugins, + getCustomMetaDataReducer(customMetaDataReducers)); resourcesToClose.add(tribeService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); @@ -860,4 +867,14 @@ private List getCustomNameResolvers(List> getCustomMetaDataReducer( + final Collection>> customMetaDataReducers) { + return (originalCustoms, newCustoms) -> { + for (BinaryOperator> customMetaDataReducer : customMetaDataReducers) { + originalCustoms = customMetaDataReducer.apply(originalCustoms, newCustoms); + } + return originalCustoms; + }; + } } diff --git a/core/src/main/java/org/elasticsearch/plugins/Plugin.java b/core/src/main/java/org/elasticsearch/plugins/Plugin.java index 1e39edc634187..6eb881ea8ba53 100644 --- a/core/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -46,6 +46,7 @@ import org.elasticsearch.watcher.ResourceWatcherService; import java.util.Map; +import java.util.function.BinaryOperator; import java.util.function.UnaryOperator; /** @@ -148,6 +149,18 @@ public UnaryOperator> getCustomMetaDataUpgrader() { return UnaryOperator.identity(); } + /** + * Provides a function to reduce a global custom meta data from multiple custom meta data in tribe node. + * The selected global custom meta data is stored in the tribe node in case of multiple underlying clusters + * have same global custom meta data type. + *

+ * Plugins should return the original custom map if no reduction is required. + * @return Never {@code null}. The original custom map or upgraded {@code MetaData.Custom} map. + */ + public BinaryOperator> getCustomMetaDataReducer() { + return (originalCustoms, newCustoms) -> originalCustoms; + } + /** * Provides the list of this plugin's custom thread pools, empty if * none. diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index fd697340cd7be..bf29f25a484d8 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -19,6 +19,8 @@ package org.elasticsearch.tribe; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.BytesRef; @@ -42,6 +44,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.hash.MurmurHash3; import org.elasticsearch.common.network.NetworkModule; @@ -71,6 +74,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BinaryOperator; import java.util.function.Function; import static java.util.Collections.unmodifiableMap; @@ -183,9 +187,11 @@ public static Settings processSettings(Settings settings) { private final Set droppedIndices = ConcurrentCollections.newConcurrentSet(); private final List nodes = new CopyOnWriteArrayList<>(); + private final BinaryOperator> metaDataReducer; public TribeService(Settings settings, ClusterService clusterService, final String tribeNodeId, - Collection> classpathPlugins) { + Collection> classpathPlugins, + BinaryOperator> metaDataReducer) { super(settings); this.clusterService = clusterService; Map nodesSettings = new HashMap<>(settings.getGroups("tribe", true)); @@ -210,6 +216,7 @@ public TribeService(Settings settings, ClusterService clusterService, final Stri } this.onConflict = ON_CONFLICT_SETTING.get(settings); + this.metaDataReducer = metaDataReducer; } // pkg private for testing @@ -463,6 +470,19 @@ private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent } } + ImmutableOpenMap reducedCustoms = reduceCustomMetaData( + currentState.metaData().customs(), tribeState.metaData().customs(), metaDataReducer); + + if (currentState.metaData().customs() != reducedCustoms) { + clusterStateChanged = true; + for (ObjectCursor cursor : currentState.metaData().customs().keys()) { + metaData.removeCustom(cursor.value); + } + for (ObjectObjectCursor reducedCustom : reducedCustoms) { + metaData.putCustom(reducedCustom.key, reducedCustom.value); + } + } + if (!clusterStateChanged) { return currentState; } else { @@ -494,4 +514,30 @@ private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, } } } + + // pkg-private for testing + static ImmutableOpenMap reduceCustomMetaData( + ImmutableOpenMap currentCustomMetaData, + ImmutableOpenMap newCustomMetaData, + BinaryOperator> metaDataReducer) { + + Map existingCustoms = new HashMap<>(); + for (ObjectObjectCursor customCursor : currentCustomMetaData) { + existingCustoms.put(customCursor.key, customCursor.value); + } + Map newCustoms = new HashMap<>(); + for (ObjectObjectCursor customCursor : newCustomMetaData) { + newCustoms.put(customCursor.key, customCursor.value); + } + Map reducedCustoms = metaDataReducer.apply(new HashMap<>(existingCustoms), newCustoms); + if (reducedCustoms.equals(existingCustoms) == false) { + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + for (Map.Entry customEntry : reducedCustoms.entrySet()) { + builder.put(customEntry.getKey(), customEntry.getValue()); + } + return builder.build(); + } else { + return currentCustomMetaData; + } + } } diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 440859dce445d..716e370346af0 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -22,14 +22,18 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -44,18 +48,25 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; +import org.elasticsearch.test.TestCustomMetaData; import org.elasticsearch.transport.Transport; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -102,6 +113,11 @@ public class TribeIT extends ESIntegTestCase { **/ private static final Predicate ALL = c -> true; + /** + * Custom meta data comparator for testing reducing custom metadata on tribe node + */ + private static final Comparator CUSTOM_META_DATA_COMPARATOR = (o1, o2) -> o1.getData().compareTo(o2.getData()); + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -113,7 +129,57 @@ protected Settings nodeSettings(int nodeOrdinal) { @Override protected Collection> nodePlugins() { - return getMockPlugins(); + List> nodePlugins = new ArrayList<>(); + nodePlugins.add(TestPluginWithCustomMetaData.class); + nodePlugins.addAll(getMockPlugins()); + return nodePlugins; + } + + + public static class TestPluginWithCustomMetaData extends Plugin { + static { + MetaData.registerPrototype(CustomMetaData1.TYPE, new CustomMetaData1("")); + } + @Override + public BinaryOperator> getCustomMetaDataReducer() { + return (existingCustoms, newCustoms) -> { + List customs = new ArrayList<>(); + if (existingCustoms.containsKey(CustomMetaData1.TYPE)) { + customs.add(((TestCustomMetaData) existingCustoms.get(CustomMetaData1.TYPE))); + } + if (newCustoms.containsKey(CustomMetaData1.TYPE)) { + customs.add(((TestCustomMetaData) newCustoms.get(CustomMetaData1.TYPE))); + } + if (customs.isEmpty() == false) { + Collections.sort(customs, CUSTOM_META_DATA_COMPARATOR); + existingCustoms.put(CustomMetaData1.TYPE, customs.get(0)); + } + return existingCustoms; + }; + } + } + + private static class CustomMetaData1 extends TestCustomMetaData { + public static final String TYPE = "custom_md_1"; + + protected CustomMetaData1(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData1(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } } @Before @@ -459,6 +525,53 @@ public void testClusterStateNodes() throws Exception { } } + public void testCustomMetaDataReduce() throws Exception { + CustomMetaData1 customMetaData1 = new CustomMetaData1(randomAsciiOfLength(10)); + CustomMetaData1 customMetaData2 = new CustomMetaData1(randomAsciiOfLength(10)); + List customMetaDatas = Arrays.asList(customMetaData1, customMetaData2); + Collections.sort(customMetaDatas, CUSTOM_META_DATA_COMPARATOR); + final CustomMetaData1 tribeNodeCustomMetaData = ((CustomMetaData1) customMetaDatas.get(0)); + try (Releasable tribeNode = startTribeNode()) { + CountDownLatch expectedCustomMetaDataFound = new CountDownLatch(1); + assertThat(internalCluster().getNodeNames().length, equalTo(1)); + internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]).add(event -> { + ImmutableOpenMap customs = event.state().metaData().customs(); + MetaData.Custom custom = customs.get(CustomMetaData1.TYPE); + if (custom.equals(tribeNodeCustomMetaData)) { + expectedCustomMetaDataFound.countDown(); + } + }); + updateCustomMetaData(cluster1, customMetaData1); + updateCustomMetaData(cluster2, customMetaData2); + expectedCustomMetaDataFound.await(); + } + } + + private static void updateCustomMetaData(InternalTestCluster cluster, final CustomMetaData1 customMetaData) + throws InterruptedException { + ClusterService clusterService = cluster.getInstance(ClusterService.class, cluster.getMasterName()); + final CountDownLatch latch = new CountDownLatch(1); + clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) { + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + latch.countDown(); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MetaData.Builder builder = MetaData.builder(currentState.metaData()); + builder.putCustom(CustomMetaData1.TYPE, customMetaData); + return new ClusterState.Builder(currentState).metaData(builder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + fail("failed to apply cluster state from [" + source + "] with " + e.getMessage()); + } + }); + latch.await(); + } + private void assertIndicesExist(Client client, String... indices) throws Exception { assertBusy(() -> { ClusterState state = client.admin().cluster().prepareState().setRoutingTable(true).setMetaData(true).get().getState(); diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java index 43ee8fee15192..0a9fb2980bf6b 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java @@ -19,9 +19,16 @@ package org.elasticsearch.tribe; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCustomMetaData; + +import java.util.EnumSet; public class TribeServiceTests extends ESTestCase { public void testMinimalSettings() { @@ -96,4 +103,106 @@ public void testPassthroughSettings() { assertEquals("7.7.7.7", clientSettings.get("transport.bind_host")); assertEquals("8.8.8.8", clientSettings.get("transport.publish_host")); } + + public void testReduceCustomMetaData() { + MetaData existingMetaData = randomMetaData(new CustomMetaData1("data1")); + MetaData newMetaData = randomMetaData(new CustomMetaData1("data2")); + ImmutableOpenMap reducedCustoms = + TribeService.reduceCustomMetaData(existingMetaData.customs(), newMetaData.customs(), + (existingCustoms, newCustoms) -> { + existingCustoms.put(CustomMetaData1.TYPE, newCustoms.get(CustomMetaData1.TYPE)); + return existingCustoms; + } + ); + assertTrue(reducedCustoms != existingMetaData.customs()); + assertEquals(((TestCustomMetaData) reducedCustoms.get(CustomMetaData1.TYPE)).getData(), "data2"); + } + + public void testNoopReduceCustomMetaData() { + MetaData existingMetaData = randomMetaData(new CustomMetaData1("data1")); + MetaData newMetaData = randomMetaData(new CustomMetaData1("data2")); + ImmutableOpenMap reducedCustoms = + TribeService.reduceCustomMetaData(existingMetaData.customs(), newMetaData.customs(), + (existingCustoms, newCustoms) -> existingCustoms + ); + assertTrue(reducedCustoms == existingMetaData.customs()); + assertEquals(((TestCustomMetaData) reducedCustoms.get(CustomMetaData1.TYPE)).getData(), "data1"); + } + + public void testReduceMultipleCustomMetaData() { + MetaData existingMetaData = randomMetaData(new CustomMetaData1("existing_data1"), new CustomMetaData2("existing_data2")); + MetaData newMetaData = randomMetaData(new CustomMetaData1("new_data1"), new CustomMetaData2("new_data2")); + ImmutableOpenMap reducedCustoms = + TribeService.reduceCustomMetaData(existingMetaData.customs(), newMetaData.customs(), + (existingCustoms, newCustoms) -> { + existingCustoms.put(CustomMetaData1.TYPE, newCustoms.get(CustomMetaData1.TYPE)); + existingCustoms.put(CustomMetaData2.TYPE, newCustoms.get(CustomMetaData2.TYPE)); + return existingCustoms; + } + ); + assertTrue(reducedCustoms != existingMetaData.customs()); + assertEquals(((TestCustomMetaData) reducedCustoms.get(CustomMetaData1.TYPE)).getData(), "new_data1"); + assertEquals(((TestCustomMetaData) reducedCustoms.get(CustomMetaData2.TYPE)).getData(), "new_data2"); + } + + private static class CustomMetaData1 extends TestCustomMetaData { + public static final String TYPE = "custom_md_1"; + + protected CustomMetaData1(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData1(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + + private static class CustomMetaData2 extends TestCustomMetaData { + public static final String TYPE = "custom_md_2"; + + protected CustomMetaData2(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData2(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + + private static MetaData randomMetaData(TestCustomMetaData... customMetaDatas) { + MetaData.Builder builder = MetaData.builder(); + for (TestCustomMetaData customMetaData : customMetaDatas) { + builder.putCustom(customMetaData.type(), customMetaData); + } + for (int i = 0; i < randomIntBetween(1, 5); i++) { + builder.put(IndexMetaData.builder(randomAsciiOfLength(10)) + .settings(settings(Version.CURRENT)) + .numberOfReplicas(randomIntBetween(0, 3)) + .numberOfShards(randomIntBetween(1, 5)) + ); + } + return builder.build(); + } }