From 5211fcb55201f132a6caac1fb3fe0f178b814872 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Mon, 14 Nov 2016 14:17:11 -0500 Subject: [PATCH 1/4] Add support for merging custom meta data in tribe node Currently, when any underlying cluster has custom metadata (via plugin), tribe node does not store custom meta data in its cluster state. This is because the tribe node has no idea how to select the appropriate custom metadata from one or many custom metadata (corresponding to the number of underlying clusters). This change adds an interface that custom metadata implementations can extend to add support for merging mulitple custom metadata of the same type for storing in the tribe state. Relates to #20544 Supersedes #20791 --- .../cluster/ClusterChangedEvent.java | 31 ++++ .../org/elasticsearch/tribe/TribeService.java | 98 +++++++++++- .../cluster/ClusterChangedEventTests.java | 140 ++++++++++++++++++ .../java/org/elasticsearch/tribe/TribeIT.java | 76 ++++++++++ .../tribe/TribeServiceTests.java | 137 +++++++++++++++++ 5 files changed, 477 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index e3164eacdbb6b..701656db9ce8f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -20,17 +20,21 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; /** @@ -143,6 +147,33 @@ public boolean metaDataChanged() { return state.metaData() != previousState.metaData(); } + /** + * Returns a set of custom meta data types when any custom metadata for the cluster has changed + * between the previous cluster state and the new cluster state. custom meta data types are + * returned iff they have been added, updated or removed between the previous and the current state + */ + public Set changedCustomMetaDataSet() { + Set result = new HashSet<>(); + ImmutableOpenMap currentCustoms = state.metaData().customs(); + ImmutableOpenMap previousCustoms = previousState.metaData().customs(); + if (currentCustoms.equals(previousCustoms) == false) { + for (ObjectObjectCursor currentCustomMetaData : currentCustoms) { + // new custom md added or existing custom md changed + if (previousCustoms.containsKey(currentCustomMetaData.key) == false + || currentCustomMetaData.value.equals(previousCustoms.get(currentCustomMetaData.key)) == false) { + result.add(currentCustomMetaData.key); + } + } + // existing custom md deleted + for (ObjectObjectCursor previousCustomMetaData : previousCustoms) { + if (currentCustoms.containsKey(previousCustomMetaData.key) == false) { + result.add(previousCustomMetaData.key); + } + } + } + return result; + } + /** * Returns true iff the {@link IndexMetaData} for a given index * has changed between the previous cluster state and the new cluster state. diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 69ad77fc91e6b..754730ba69837 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -58,10 +58,10 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.TransportSettings; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -72,6 +72,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableMap; @@ -134,6 +135,28 @@ public static Settings processSettings(Settings settings) { return sb.build(); } + /** + * Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom} in tribe node + * When multiple Mergable Custom metadata of the same type is found (from underlying clusters), the + * Custom metadata will be merged using {@link #merge(MetaData.Custom)} and the result will be stored + * in the tribe cluster state + * + * @param type of custom meta data + */ + interface MergableCustomMetaData { + + /** + * Merges this custom metadata with other, returning either this or other custom metadata + * for tribe cluster state + * + * @param other custom meta data + * @return the same instance or other custom metadata based on implementation + * if both the instances are considered equal, implementations should return this + * instance to avoid redundant cluster state changes. + */ + T merge(T other); + } + // internal settings only public static final Setting TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", Property.NodeScope); private final ClusterService clusterService; @@ -352,8 +375,7 @@ public BatchResult execute(ClusterState currentState, List< BatchResult.Builder builder = BatchResult.builder(); try { - // we only need to apply the latest cluster state update - accumulator = applyUpdate(accumulator, tasks.get(tasks.size() - 1)); + accumulator = applyUpdate(accumulator, tasks); builder.successes(tasks); } catch (Exception e) { builder.failures(tasks, e); @@ -362,9 +384,11 @@ public BatchResult execute(ClusterState currentState, List< return builder.build(accumulator); } - private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent task) { + private ClusterState applyUpdate(ClusterState currentState, List tasks) { boolean clusterStateChanged = false; - ClusterState tribeState = task.state(); + // we only need to apply the latest cluster state update + ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); + ClusterState tribeState = latestTask.state(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes()); // -- merge nodes // go over existing nodes, and see if they need to be removed @@ -463,6 +487,44 @@ private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent } } + Set changedCustomMetaDataTypeSet = tasks.stream() + .map(ClusterChangedEvent::changedCustomMetaDataSet) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + final List tribeNodes = TribeService.this.nodes; + Map mergedCustomMetaDataMap = mergeChangedCustomMetaData(changedCustomMetaDataTypeSet, + customMetaDataType -> { + List tribeCustomMetaDataList = new ArrayList<>(tribeNodes.size()); + for (Node tribeNode : tribeNodes) { + String currentTribeName = TRIBE_NAME_SETTING.get(tribeNode.settings()); + final MetaData.Custom custom; + if (currentTribeName.equals(tribeName)) { + // current tribe node + custom = tribeState.metaData().custom(customMetaDataType); + } else { + ClusterState currentTribeState = tribeNode.injector().getInstance(ClusterService.class).state(); + custom = currentTribeState.metaData().custom(customMetaDataType); + } + if (custom != null && custom instanceof MergableCustomMetaData) { + tribeCustomMetaDataList.add(custom); + } + } + return tribeCustomMetaDataList; + }); + for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) { + MetaData.Custom tribeCustomMetaData = currentState.metaData().custom(changedCustomMetaDataType); + MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType); + if (mergedCustomMetaData == null) { + // custom md has been removed + clusterStateChanged = true; + metaData.removeCustom(changedCustomMetaDataType); + } else if (mergedCustomMetaData.equals(tribeCustomMetaData) == false) { + // custom md has been changed + clusterStateChanged = true; + metaData.putCustom(changedCustomMetaDataType, mergedCustomMetaData); + } // else merged custom md is the same as the existing custom md in tribe state + } + if (!clusterStateChanged) { return currentState; } else { @@ -494,4 +556,30 @@ private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, } } } + + // pkg-private for testing + static Map mergeChangedCustomMetaData(Set changedCustomMetaDataTypeSet, + Function> customMetaDataByTribeNode) { + Map changedCustomMetaDataMap = new HashMap<>(changedCustomMetaDataTypeSet.size()); + for (String customMetaDataType : changedCustomMetaDataTypeSet) { + List tribeCustomMetaDataList = customMetaDataByTribeNode.apply(customMetaDataType); + if (tribeCustomMetaDataList.isEmpty() == false) { + final MetaData.Custom mergedCustomMetaData; + if (tribeCustomMetaDataList.size() == 1) { + mergedCustomMetaData = tribeCustomMetaDataList.get(0); + } else { + MetaData.Custom tempMergedCustomMetaData = tribeCustomMetaDataList.get(0); + assert tempMergedCustomMetaData instanceof MergableCustomMetaData + : "expected to merge only MergableCustomMetaData"; + for (int i = 1; i < tribeCustomMetaDataList.size(); i++) { + MetaData.Custom currentCustom = tribeCustomMetaDataList.get(i); + tempMergedCustomMetaData = ((MergableCustomMetaData) tempMergedCustomMetaData).merge(currentCustom); + } + mergedCustomMetaData = tempMergedCustomMetaData; + } + changedCustomMetaDataMap.put(customMetaDataType, mergedCustomMetaData); + } + } + return changedCustomMetaDataMap; + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index 939954c456020..63d34f683de0c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -33,6 +34,7 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCustomMetaData; import java.util.ArrayList; import java.util.Arrays; @@ -222,6 +224,128 @@ public void testRoutingTableChanges() { assertTrue("index routing table should not be the same object", event.indexRoutingTableChanged(initialIndices.get(0).getName())); } + /** + * Test custom metadata change checks + */ + public void testChangedCustomMetaDataSet() { + final int numNodesInCluster = 3; + + final ClusterState originalState = createState(numNodesInCluster, randomBoolean(), initialIndices); + CustomMetaData1 customMetaData1 = new CustomMetaData1("data"); + final ClusterState stateWithCustomMetaData = nextState(originalState, Collections.singletonList(customMetaData1)); + + // no custom metadata present in any state + ClusterState nextState = ClusterState.builder(originalState).build(); + ClusterChangedEvent event = new ClusterChangedEvent("_na_", originalState, nextState); + assertTrue(event.changedCustomMetaDataSet().isEmpty()); + + // next state has new custom metadata + nextState = nextState(originalState, Collections.singletonList(customMetaData1)); + event = new ClusterChangedEvent("_na_", originalState, nextState); + Set changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state has same custom metadata + nextState = nextState(originalState, Collections.singletonList(customMetaData1)); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.isEmpty()); + + // next state has equivalent custom metadata + nextState = nextState(originalState, Collections.singletonList(new CustomMetaData1("data"))); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.isEmpty()); + + // next state removes custom metadata + nextState = originalState; + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state updates custom metadata + nextState = nextState(stateWithCustomMetaData, Collections.singletonList(new CustomMetaData1("data1"))); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state adds new custom metadata type + CustomMetaData2 customMetaData2 = new CustomMetaData2("data2"); + nextState = nextState(stateWithCustomMetaData, Arrays.asList(customMetaData1, customMetaData2)); + event = new ClusterChangedEvent("_na_", stateWithCustomMetaData, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 1); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type())); + + // next state adds two custom metadata type + nextState = nextState(originalState, Arrays.asList(customMetaData1, customMetaData2)); + event = new ClusterChangedEvent("_na_", originalState, nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 2); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type())); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + + // next state removes two custom metadata type + nextState = originalState; + event = new ClusterChangedEvent("_na_", + nextState(originalState, Arrays.asList(customMetaData1, customMetaData2)), nextState); + changedCustomMetaDataTypeSet = event.changedCustomMetaDataSet(); + assertTrue(changedCustomMetaDataTypeSet.size() == 2); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData2.type())); + assertTrue(changedCustomMetaDataTypeSet.contains(customMetaData1.type())); + } + + private static class CustomMetaData2 extends TestCustomMetaData { + static { + MetaData.registerPrototype("2", new CustomMetaData2("")); + } + protected CustomMetaData2(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData2(data); + } + + @Override + public String type() { + return "2"; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + + private static class CustomMetaData1 extends TestCustomMetaData { + static { + MetaData.registerPrototype("1", new CustomMetaData1("")); + } + protected CustomMetaData1(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData1(data); + } + + @Override + public String type() { + return "1"; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + private static ClusterState createSimpleClusterState() { return ClusterState.builder(TEST_CLUSTER_NAME).build(); } @@ -244,6 +368,22 @@ private static ClusterState createNonInitializedState(final int numNodes, final .build(); } + private static ClusterState nextState(final ClusterState previousState, List customMetaDataList) { + final ClusterState.Builder builder = ClusterState.builder(previousState); + builder.stateUUID(UUIDs.randomBase64UUID()); + MetaData.Builder metaDataBuilder = new MetaData.Builder(previousState.metaData()); + for (ObjectObjectCursor customMetaData : previousState.metaData().customs()) { + if (customMetaData.value instanceof TestCustomMetaData) { + metaDataBuilder.removeCustom(customMetaData.key); + } + } + for (TestCustomMetaData testCustomMetaData : customMetaDataList) { + metaDataBuilder.putCustom(testCustomMetaData.type(), testCustomMetaData); + } + builder.metaData(metaDataBuilder); + return builder.build(); + } + // Create a modified cluster state from another one, but with some number of indices added and deleted. private static ClusterState nextState(final ClusterState previousState, final boolean changeClusterUUID, final List addedIndices, final List deletedIndices, final int numNodesToRemove) { diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index cf4fe03893f7b..b2f7d7c1501ca 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -22,12 +22,15 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; @@ -42,6 +45,7 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.transport.MockTcpTransportPlugin; +import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData1; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -52,6 +56,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -446,6 +452,76 @@ public void testClusterStateNodes() throws Exception { } } + public void testMergingRemovedCustomMetaData() throws Exception { + MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a"); + MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b"); + try (Releasable tribeNode = startTribeNode()) { + updateCustomMetaData(cluster1, customMetaData1); + updateCustomMetaData(cluster2, customMetaData2); + assertCustomMetaDataUpdated(internalCluster(), customMetaData2); + updateCustomMetaData(cluster2, null); + assertCustomMetaDataUpdated(internalCluster(), customMetaData1); + } + } + + public void testMergingCustomMetaData() throws Exception { + MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + List customMetaDatas = Arrays.asList(customMetaData1, customMetaData2); + Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); + final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0); + try (Releasable tribeNode = startTribeNode()) { + updateCustomMetaData(cluster1, customMetaData1); + updateCustomMetaData(cluster2, customMetaData2); + assertCustomMetaDataUpdated(internalCluster(), tribeNodeCustomMetaData); + } + } + + private static void assertCustomMetaDataUpdated(InternalTestCluster cluster, + MergableCustomMetaData1 expectedCustomMetaData) throws Exception { + assertBusy(() -> { + ClusterState tribeState = cluster.getInstance(ClusterService.class, cluster.getNodeNames()[0]).state(); + MetaData.Custom custom = tribeState.metaData().custom(MergableCustomMetaData1.TYPE); + assertNotNull(custom); + assertThat(custom, equalTo(expectedCustomMetaData)); + }); + } + + private static void updateCustomMetaData(InternalTestCluster cluster, final MergableCustomMetaData1 customMetaData) { + ClusterService clusterService = cluster.getInstance(ClusterService.class, cluster.getMasterName()); + final CountDownLatch latch = new CountDownLatch(1); + clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) { + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + latch.countDown(); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MetaData.Builder builder = MetaData.builder(currentState.metaData()); + if (customMetaData == null) { + builder.removeCustom(MergableCustomMetaData1.TYPE); + } else { + builder.putCustom(MergableCustomMetaData1.TYPE, customMetaData); + } + return new ClusterState.Builder(currentState).metaData(builder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + fail("failed to apply cluster state from [" + source + "] with " + e.getMessage()); + } + }); + try { + latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + fail("latch waiting on publishing custom md interrupted [" + e.getMessage() + "]"); + } + assertThat("timed out trying to add custom metadata to " + cluster.getClusterName(), latch.getCount(), equalTo(0L)); + } + private void assertIndicesExist(Client client, String... indices) throws Exception { assertBusy(() -> { ClusterState state = client.admin().cluster().prepareState().setRoutingTable(true).setMetaData(true).get().getState(); diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java index 43ee8fee15192..c691cdcaaffac 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java @@ -19,9 +19,22 @@ package org.elasticsearch.tribe; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCustomMetaData; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.instanceOf; public class TribeServiceTests extends ESTestCase { public void testMinimalSettings() { @@ -96,4 +109,128 @@ public void testPassthroughSettings() { assertEquals("7.7.7.7", clientSettings.get("transport.bind_host")); assertEquals("8.8.8.8", clientSettings.get("transport.publish_host")); } + + public void testMergeCustomMetaDataSimple() { + Map mergedCustoms = + TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE), + s -> Collections.singletonList(new MergableCustomMetaData1("data1"))); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertNotNull(mergedCustom); + assertEquals(mergedCustom.getData(), "data1"); + } + + public void testMergeCustomMetaData() { + Map mergedCustoms = + TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE), + s -> Arrays.asList(new MergableCustomMetaData1("data1"), new MergableCustomMetaData1("data2"))); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertNotNull(mergedCustom); + assertEquals(mergedCustom.getData(), "data2"); + } + + public void testMergeMultipleCustomMetaData() { + Map> inputMap = new HashMap<>(); + inputMap.put(MergableCustomMetaData1.TYPE, + Arrays.asList(new MergableCustomMetaData1("data10"), new MergableCustomMetaData1("data11"))); + inputMap.put(MergableCustomMetaData2.TYPE, + Arrays.asList(new MergableCustomMetaData2("data21"), new MergableCustomMetaData2("data20"))); + Map mergedCustoms = TribeService.mergeChangedCustomMetaData( + Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertEquals(mergedCustom.getData(), "data11"); + mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class)); + assertEquals(mergedCustom.getData(), "data21"); + } + + public void testMergeCustomMetaDataFromMany() { + Map> inputMap = new HashMap<>(); + int n = randomIntBetween(3, 5); + List customList1 = new ArrayList<>(); + for (int i = 0; i <= n; i++) { + customList1.add(new MergableCustomMetaData1("data1"+String.valueOf(i))); + } + Collections.shuffle(customList1, random()); + inputMap.put(MergableCustomMetaData1.TYPE, customList1); + List customList2 = new ArrayList<>(); + for (int i = 0; i <= n; i++) { + customList2.add(new MergableCustomMetaData2("data2"+String.valueOf(i))); + } + Collections.shuffle(customList2, random()); + inputMap.put(MergableCustomMetaData2.TYPE, customList2); + + Map mergedCustoms = TribeService.mergeChangedCustomMetaData( + Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get); + TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class)); + assertEquals(mergedCustom.getData(), "data1"+String.valueOf(n)); + mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE); + assertNotNull(mergedCustom); + assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class)); + assertEquals(mergedCustom.getData(), "data2"+String.valueOf(n)); + } + + static class MergableCustomMetaData1 extends TestCustomMetaData + implements TribeService.MergableCustomMetaData { + public static final String TYPE = "custom_md_1"; + + protected MergableCustomMetaData1(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new MergableCustomMetaData1(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + + @Override + public MergableCustomMetaData1 merge(MergableCustomMetaData1 other) { + return (getData().compareTo(other.getData()) >= 0) ? this : other; + } + } + + static class MergableCustomMetaData2 extends TestCustomMetaData + implements TribeService.MergableCustomMetaData { + public static final String TYPE = "custom_md_2"; + + protected MergableCustomMetaData2(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new MergableCustomMetaData2(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + + @Override + public MergableCustomMetaData2 merge(MergableCustomMetaData2 other) { + return (getData().compareTo(other.getData()) >= 0) ? this : other; + } + } } From cec9960dd8b931c0f4074e5cdd0b747066b94b79 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 17 Nov 2016 14:58:03 -0500 Subject: [PATCH 2/4] Simplify updating tribe state --- .../org/elasticsearch/tribe/TribeService.java | 103 ++++++++++-------- 1 file changed, 59 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 754730ba69837..3ec35f91c52b3 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -147,7 +147,8 @@ interface MergableCustomMetaData { /** * Merges this custom metadata with other, returning either this or other custom metadata - * for tribe cluster state + * for tribe cluster state. This method should not mutate either this or the + * other custom metadata. * * @param other custom meta data * @return the same instance or other custom metadata based on implementation @@ -293,7 +294,7 @@ protected void doStart() { public void startNodes() { for (Node node : nodes) { try { - node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node)); + getClusterService(node).add(new TribeClusterStateListener(node)); node.start(); } catch (Exception e) { // calling close is safe for non started nodes, we can just iterate over all @@ -373,18 +374,17 @@ public String describeTasks(List tasks) { public BatchResult execute(ClusterState currentState, List tasks) throws Exception { ClusterState accumulator = ClusterState.builder(currentState).build(); BatchResult.Builder builder = BatchResult.builder(); - - try { - accumulator = applyUpdate(accumulator, tasks); - builder.successes(tasks); - } catch (Exception e) { - builder.failures(tasks, e); + ClusterState.Builder newState = ClusterState.builder(accumulator).incrementVersion(); + boolean clusterStateChanged = updateNodes(accumulator, tasks, newState); + clusterStateChanged |= updateIndicesAndMetaData(accumulator, tasks, newState); + if (clusterStateChanged) { + accumulator = newState.build(); } - + builder.successes(tasks); return builder.build(accumulator); } - private ClusterState applyUpdate(ClusterState currentState, List tasks) { + private boolean updateNodes(ClusterState currentState, List tasks, ClusterState.Builder newState) { boolean clusterStateChanged = false; // we only need to apply the latest cluster state update ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); @@ -409,16 +409,25 @@ private ClusterState applyUpdate(ClusterState currentState, List tribeAttr = new HashMap<>(tribe.getAttributes()); tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName); DiscoveryNode discoNode = new DiscoveryNode(tribe.getName(), tribe.getId(), tribe.getEphemeralId(), - tribe.getHostName(), tribe.getHostAddress(), tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(), - tribe.getVersion()); + tribe.getHostName(), tribe.getHostAddress(), tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(), + tribe.getVersion()); clusterStateChanged = true; logger.info("[{}] adding node [{}]", tribeName, discoNode); nodes.remove(tribe.getId()); // remove any existing node with the same id but different ephemeral id nodes.add(discoNode); } } + if (clusterStateChanged) { + newState.nodes(nodes); + } + return clusterStateChanged; + } - // -- merge metadata + private boolean updateIndicesAndMetaData(ClusterState currentState, List tasks, ClusterState.Builder newState) { + // we only need to apply the latest cluster state update + ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); + ClusterState tribeState = latestTask.state(); + boolean clusterStateChanged = false; ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); MetaData.Builder metaData = MetaData.builder(currentState.metaData()); RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); @@ -486,51 +495,59 @@ private ClusterState applyUpdate(ClusterState currentState, List tasks, MetaData.Builder metaData) { + boolean clusterStateChanged = false; + ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); + final ClusterState tribeClientState = latestTask.state(); Set changedCustomMetaDataTypeSet = tasks.stream() .map(ClusterChangedEvent::changedCustomMetaDataSet) .flatMap(Collection::stream) .collect(Collectors.toSet()); - final List tribeNodes = TribeService.this.nodes; + final List tribeClientNodes = TribeService.this.nodes; Map mergedCustomMetaDataMap = mergeChangedCustomMetaData(changedCustomMetaDataTypeSet, customMetaDataType -> { - List tribeCustomMetaDataList = new ArrayList<>(tribeNodes.size()); - for (Node tribeNode : tribeNodes) { - String currentTribeName = TRIBE_NAME_SETTING.get(tribeNode.settings()); + List tribeCustomMetaDataList = new ArrayList<>(tribeClientNodes.size()); + for (Node tribeClientNode : tribeClientNodes) { + String currentTribeName = TRIBE_NAME_SETTING.get(tribeClientNode.settings()); final MetaData.Custom custom; if (currentTribeName.equals(tribeName)) { - // current tribe node - custom = tribeState.metaData().custom(customMetaDataType); + // tribe client node that triggered the cluster change event + // use the latest changed state instead of getting the state + // from the tribe client's cluster service + custom = tribeClientState.metaData().custom(customMetaDataType); } else { - ClusterState currentTribeState = tribeNode.injector().getInstance(ClusterService.class).state(); - custom = currentTribeState.metaData().custom(customMetaDataType); + ClusterState currentTribeClientState = getClusterService(tribeClientNode).state(); + custom = currentTribeClientState.metaData().custom(customMetaDataType); } if (custom != null && custom instanceof MergableCustomMetaData) { tribeCustomMetaDataList.add(custom); } } return tribeCustomMetaDataList; - }); + } + ); for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) { - MetaData.Custom tribeCustomMetaData = currentState.metaData().custom(changedCustomMetaDataType); MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType); if (mergedCustomMetaData == null) { // custom md has been removed clusterStateChanged = true; metaData.removeCustom(changedCustomMetaDataType); - } else if (mergedCustomMetaData.equals(tribeCustomMetaData) == false) { + } else { // custom md has been changed clusterStateChanged = true; metaData.putCustom(changedCustomMetaDataType, mergedCustomMetaData); - } // else merged custom md is the same as the existing custom md in tribe state - } - - if (!clusterStateChanged) { - return currentState; - } else { - return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData) - .routingTable(routingTable.build()).build(); + } } + return clusterStateChanged; } private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, @@ -557,6 +574,10 @@ private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, } } + private static ClusterService getClusterService(Node node) { + return node.injector().getInstance(ClusterService.class); + } + // pkg-private for testing static Map mergeChangedCustomMetaData(Set changedCustomMetaDataTypeSet, Function> customMetaDataByTribeNode) { @@ -564,18 +585,12 @@ static Map mergeChangedCustomMetaData(Set chang for (String customMetaDataType : changedCustomMetaDataTypeSet) { List tribeCustomMetaDataList = customMetaDataByTribeNode.apply(customMetaDataType); if (tribeCustomMetaDataList.isEmpty() == false) { - final MetaData.Custom mergedCustomMetaData; - if (tribeCustomMetaDataList.size() == 1) { - mergedCustomMetaData = tribeCustomMetaDataList.get(0); - } else { - MetaData.Custom tempMergedCustomMetaData = tribeCustomMetaDataList.get(0); - assert tempMergedCustomMetaData instanceof MergableCustomMetaData - : "expected to merge only MergableCustomMetaData"; - for (int i = 1; i < tribeCustomMetaDataList.size(); i++) { - MetaData.Custom currentCustom = tribeCustomMetaDataList.get(i); - tempMergedCustomMetaData = ((MergableCustomMetaData) tempMergedCustomMetaData).merge(currentCustom); - } - mergedCustomMetaData = tempMergedCustomMetaData; + MetaData.Custom mergedCustomMetaData = tribeCustomMetaDataList.get(0); + assert mergedCustomMetaData instanceof MergableCustomMetaData + : "expected to merge only MergableCustomMetaData"; + for (int i = 1; i < tribeCustomMetaDataList.size(); i++) { + MetaData.Custom currentCustom = tribeCustomMetaDataList.get(i); + mergedCustomMetaData = ((MergableCustomMetaData) mergedCustomMetaData).merge(currentCustom); } changedCustomMetaDataMap.put(customMetaDataType, mergedCustomMetaData); } From 16d650043935a79bffdd53b9ca989de09f57f4fc Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 17 Nov 2016 16:29:34 -0500 Subject: [PATCH 3/4] Add tests for merging multiple custom metadata types in tribe node --- .../org/elasticsearch/tribe/TribeService.java | 15 ++-- .../java/org/elasticsearch/tribe/TribeIT.java | 77 +++++++++++++++---- .../test/TestCustomMetaData.java | 5 ++ 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 3ec35f91c52b3..0a59f6f1903b6 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -495,7 +495,7 @@ private boolean updateIndicesAndMetaData(ClusterState currentState, List tasks, MetaData.Builder metaData) { + private boolean updateCustoms(ClusterState currentState, List tasks, MetaData.Builder metaData) { boolean clusterStateChanged = false; ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); final ClusterState tribeClientState = latestTask.state(); @@ -538,12 +538,17 @@ private boolean updateCustoms(List tasks, MetaData.Builder for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) { MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType); if (mergedCustomMetaData == null) { - // custom md has been removed - clusterStateChanged = true; - metaData.removeCustom(changedCustomMetaDataType); + // we ignore merging custom md which doesn't implement MergableCustomMetaData interface + if (currentState.metaData().custom(changedCustomMetaDataType) instanceof MergableCustomMetaData) { + // custom md has been removed + clusterStateChanged = true; + logger.info("[{}] removing custom meta data type [{}]", tribeName, changedCustomMetaDataType); + metaData.removeCustom(changedCustomMetaDataType); + } } else { // custom md has been changed clusterStateChanged = true; + logger.info("[{}] updating custom meta data type [{}] data [{}]", tribeName, changedCustomMetaDataType, mergedCustomMetaData); metaData.putCustom(changedCustomMetaDataType, mergedCustomMetaData); } } diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index b2f7d7c1501ca..137d877f8dd4f 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -44,8 +44,10 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; +import org.elasticsearch.test.TestCustomMetaData; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData1; +import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData2; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -61,6 +63,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -454,42 +457,93 @@ public void testClusterStateNodes() throws Exception { public void testMergingRemovedCustomMetaData() throws Exception { MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE); + removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE); MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a"); MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b"); try (Releasable tribeNode = startTribeNode()) { - updateCustomMetaData(cluster1, customMetaData1); - updateCustomMetaData(cluster2, customMetaData2); + assertNodes(ALL); + putCustomMetaData(cluster1, customMetaData1); + putCustomMetaData(cluster2, customMetaData2); assertCustomMetaDataUpdated(internalCluster(), customMetaData2); - updateCustomMetaData(cluster2, null); + removeCustomMetaData(cluster2, customMetaData2.type()); assertCustomMetaDataUpdated(internalCluster(), customMetaData1); } } public void testMergingCustomMetaData() throws Exception { MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE); + removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE); MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1(randomAsciiOfLength(10)); MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1(randomAsciiOfLength(10)); List customMetaDatas = Arrays.asList(customMetaData1, customMetaData2); Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0); try (Releasable tribeNode = startTribeNode()) { - updateCustomMetaData(cluster1, customMetaData1); - updateCustomMetaData(cluster2, customMetaData2); + assertNodes(ALL); + putCustomMetaData(cluster1, customMetaData1); + assertCustomMetaDataUpdated(internalCluster(), customMetaData1); + putCustomMetaData(cluster2, customMetaData2); assertCustomMetaDataUpdated(internalCluster(), tribeNodeCustomMetaData); } } + public void testMergingMultipleCustomMetaData() throws Exception { + MetaData.registerPrototype(MergableCustomMetaData1.TYPE, new MergableCustomMetaData1("")); + MetaData.registerPrototype(MergableCustomMetaData2.TYPE, new MergableCustomMetaData2("")); + removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE); + removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE); + MergableCustomMetaData1 firstCustomMetaDataType1 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + MergableCustomMetaData1 secondCustomMetaDataType1 = new MergableCustomMetaData1(randomAsciiOfLength(10)); + MergableCustomMetaData2 firstCustomMetaDataType2 = new MergableCustomMetaData2(randomAsciiOfLength(10)); + MergableCustomMetaData2 secondCustomMetaDataType2 = new MergableCustomMetaData2(randomAsciiOfLength(10)); + List mergedCustomMetaDataType1 = Arrays.asList(firstCustomMetaDataType1, secondCustomMetaDataType1); + List mergedCustomMetaDataType2 = Arrays.asList(firstCustomMetaDataType2, secondCustomMetaDataType2); + Collections.sort(mergedCustomMetaDataType1, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); + Collections.sort(mergedCustomMetaDataType2, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); + try (Releasable tribeNode = startTribeNode()) { + assertNodes(ALL); + // test putting multiple custom md types propagates to tribe + putCustomMetaData(cluster1, firstCustomMetaDataType1); + putCustomMetaData(cluster1, firstCustomMetaDataType2); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2); + + // test multiple same type custom md is merged and propagates to tribe + putCustomMetaData(cluster2, secondCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0)); + + // test multiple same type custom md is merged and propagates to tribe + putCustomMetaData(cluster2, secondCustomMetaDataType2); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0)); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0)); + } + } + private static void assertCustomMetaDataUpdated(InternalTestCluster cluster, - MergableCustomMetaData1 expectedCustomMetaData) throws Exception { + TestCustomMetaData expectedCustomMetaData) throws Exception { assertBusy(() -> { ClusterState tribeState = cluster.getInstance(ClusterService.class, cluster.getNodeNames()[0]).state(); - MetaData.Custom custom = tribeState.metaData().custom(MergableCustomMetaData1.TYPE); + MetaData.Custom custom = tribeState.metaData().custom(expectedCustomMetaData.type()); assertNotNull(custom); assertThat(custom, equalTo(expectedCustomMetaData)); }); } - private static void updateCustomMetaData(InternalTestCluster cluster, final MergableCustomMetaData1 customMetaData) { + private void removeCustomMetaData(InternalTestCluster cluster, final String customMetaDataType) { + logger.info("removing custom_md type [{}] from [{}]", customMetaDataType, cluster.getClusterName()); + updateMetaData(cluster, builder -> builder.removeCustom(customMetaDataType)); + } + + private void putCustomMetaData(InternalTestCluster cluster, final TestCustomMetaData customMetaData) { + logger.info("putting custom_md type [{}] with data[{}] from [{}]", customMetaData.type(), + customMetaData.getData(), cluster.getClusterName()); + updateMetaData(cluster, builder -> builder.putCustom(customMetaData.type(), customMetaData)); + } + + private static void updateMetaData(InternalTestCluster cluster, UnaryOperator addCustoms) { ClusterService clusterService = cluster.getInstance(ClusterService.class, cluster.getMasterName()); final CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @@ -501,11 +555,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { @Override public ClusterState execute(ClusterState currentState) throws Exception { MetaData.Builder builder = MetaData.builder(currentState.metaData()); - if (customMetaData == null) { - builder.removeCustom(MergableCustomMetaData1.TYPE); - } else { - builder.putCustom(MergableCustomMetaData1.TYPE, customMetaData); - } + builder = addCustoms.apply(builder); return new ClusterState.Builder(currentState).metaData(builder).build(); } @@ -520,6 +570,7 @@ public void onFailure(String source, Exception e) { fail("latch waiting on publishing custom md interrupted [" + e.getMessage() + "]"); } assertThat("timed out trying to add custom metadata to " + cluster.getClusterName(), latch.getCount(), equalTo(0L)); + } private void assertIndicesExist(Client client, String... indices) throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java b/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java index 92d5b95cfac69..a655f17facad3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestCustomMetaData.java @@ -99,4 +99,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("data", getData()); return builder; } + + @Override + public String toString() { + return "[" + type() + "][" + data +"]"; + } } From ae4c137808216a7c5172e806697ac0d440e88790 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Fri, 18 Nov 2016 16:09:28 -0500 Subject: [PATCH 4/4] cleanup merging custom md logic in tribe service --- .../org/elasticsearch/tribe/TribeService.java | 61 ++++++------------- .../java/org/elasticsearch/tribe/TribeIT.java | 8 +++ .../tribe/TribeServiceTests.java | 8 +-- 3 files changed, 30 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 0a59f6f1903b6..06d0fffd75e16 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -61,7 +61,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.TransportSettings; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -372,16 +371,12 @@ public String describeTasks(List tasks) { @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterState accumulator = ClusterState.builder(currentState).build(); BatchResult.Builder builder = BatchResult.builder(); - ClusterState.Builder newState = ClusterState.builder(accumulator).incrementVersion(); - boolean clusterStateChanged = updateNodes(accumulator, tasks, newState); - clusterStateChanged |= updateIndicesAndMetaData(accumulator, tasks, newState); - if (clusterStateChanged) { - accumulator = newState.build(); - } + ClusterState.Builder newState = ClusterState.builder(currentState).incrementVersion(); + boolean clusterStateChanged = updateNodes(currentState, tasks, newState); + clusterStateChanged |= updateIndicesAndMetaData(currentState, tasks, newState); builder.successes(tasks); - return builder.build(accumulator); + return builder.build(clusterStateChanged ? newState.build() : currentState); } private boolean updateNodes(ClusterState currentState, List tasks, ClusterState.Builder newState) { @@ -506,34 +501,19 @@ private boolean updateIndicesAndMetaData(ClusterState currentState, List tasks, MetaData.Builder metaData) { boolean clusterStateChanged = false; - ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1); - final ClusterState tribeClientState = latestTask.state(); Set changedCustomMetaDataTypeSet = tasks.stream() .map(ClusterChangedEvent::changedCustomMetaDataSet) .flatMap(Collection::stream) .collect(Collectors.toSet()); final List tribeClientNodes = TribeService.this.nodes; Map mergedCustomMetaDataMap = mergeChangedCustomMetaData(changedCustomMetaDataTypeSet, - customMetaDataType -> { - List tribeCustomMetaDataList = new ArrayList<>(tribeClientNodes.size()); - for (Node tribeClientNode : tribeClientNodes) { - String currentTribeName = TRIBE_NAME_SETTING.get(tribeClientNode.settings()); - final MetaData.Custom custom; - if (currentTribeName.equals(tribeName)) { - // tribe client node that triggered the cluster change event - // use the latest changed state instead of getting the state - // from the tribe client's cluster service - custom = tribeClientState.metaData().custom(customMetaDataType); - } else { - ClusterState currentTribeClientState = getClusterService(tribeClientNode).state(); - custom = currentTribeClientState.metaData().custom(customMetaDataType); - } - if (custom != null && custom instanceof MergableCustomMetaData) { - tribeCustomMetaDataList.add(custom); - } - } - return tribeCustomMetaDataList; - } + customMetaDataType -> tribeClientNodes.stream() + .map(TribeService::getClusterService).map(ClusterService::state) + .map(ClusterState::metaData) + .map(clusterMetaData -> ((MetaData.Custom) clusterMetaData.custom(customMetaDataType))) + .filter(custom1 -> custom1 != null && custom1 instanceof MergableCustomMetaData) + .map(custom2 -> (MergableCustomMetaData) custom2) + .collect(Collectors.toList()) ); for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) { MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType); @@ -585,20 +565,15 @@ private static ClusterService getClusterService(Node node) { // pkg-private for testing static Map mergeChangedCustomMetaData(Set changedCustomMetaDataTypeSet, - Function> customMetaDataByTribeNode) { + Function> customMetaDataByTribeNode) { + Map changedCustomMetaDataMap = new HashMap<>(changedCustomMetaDataTypeSet.size()); for (String customMetaDataType : changedCustomMetaDataTypeSet) { - List tribeCustomMetaDataList = customMetaDataByTribeNode.apply(customMetaDataType); - if (tribeCustomMetaDataList.isEmpty() == false) { - MetaData.Custom mergedCustomMetaData = tribeCustomMetaDataList.get(0); - assert mergedCustomMetaData instanceof MergableCustomMetaData - : "expected to merge only MergableCustomMetaData"; - for (int i = 1; i < tribeCustomMetaDataList.size(); i++) { - MetaData.Custom currentCustom = tribeCustomMetaDataList.get(i); - mergedCustomMetaData = ((MergableCustomMetaData) mergedCustomMetaData).merge(currentCustom); - } - changedCustomMetaDataMap.put(customMetaDataType, mergedCustomMetaData); - } + customMetaDataByTribeNode.apply(customMetaDataType).stream() + .reduce((mergableCustomMD, mergableCustomMD2) -> + ((MergableCustomMetaData) mergableCustomMD.merge((MetaData.Custom) mergableCustomMD2))) + .ifPresent(mergedCustomMetaData -> + changedCustomMetaDataMap.put(customMetaDataType, ((MetaData.Custom) mergedCustomMetaData))); } return changedCustomMetaDataMap; } diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 137d877f8dd4f..179d977ea5dc5 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -519,6 +519,14 @@ public void testMergingMultipleCustomMetaData() throws Exception { putCustomMetaData(cluster2, secondCustomMetaDataType2); assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0)); assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0)); + + // test removing custom md is propagates to tribe + removeCustomMetaData(cluster2, secondCustomMetaDataType1.type()); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0)); + removeCustomMetaData(cluster2, secondCustomMetaDataType2.type()); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1); + assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2); } } diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java index c691cdcaaffac..7aea02c552bc3 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java @@ -131,7 +131,7 @@ public void testMergeCustomMetaData() { } public void testMergeMultipleCustomMetaData() { - Map> inputMap = new HashMap<>(); + Map> inputMap = new HashMap<>(); inputMap.put(MergableCustomMetaData1.TYPE, Arrays.asList(new MergableCustomMetaData1("data10"), new MergableCustomMetaData1("data11"))); inputMap.put(MergableCustomMetaData2.TYPE, @@ -149,15 +149,15 @@ public void testMergeMultipleCustomMetaData() { } public void testMergeCustomMetaDataFromMany() { - Map> inputMap = new HashMap<>(); + Map> inputMap = new HashMap<>(); int n = randomIntBetween(3, 5); - List customList1 = new ArrayList<>(); + List customList1 = new ArrayList<>(); for (int i = 0; i <= n; i++) { customList1.add(new MergableCustomMetaData1("data1"+String.valueOf(i))); } Collections.shuffle(customList1, random()); inputMap.put(MergableCustomMetaData1.TYPE, customList1); - List customList2 = new ArrayList<>(); + List customList2 = new ArrayList<>(); for (int i = 0; i <= n; i++) { customList2.add(new MergableCustomMetaData2("data2"+String.valueOf(i))); }