Skip to content

Commit ae4c137

Browse files
committed
cleanup merging custom md logic in tribe service
1 parent 16d6500 commit ae4c137

File tree

3 files changed

+30
-47
lines changed

3 files changed

+30
-47
lines changed

core/src/main/java/org/elasticsearch/tribe/TribeService.java

Lines changed: 18 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import org.elasticsearch.rest.RestStatus;
6262
import org.elasticsearch.transport.TransportSettings;
6363

64-
import java.util.ArrayList;
6564
import java.util.Arrays;
6665
import java.util.Collection;
6766
import java.util.Collections;
@@ -372,16 +371,12 @@ public String describeTasks(List<ClusterChangedEvent> tasks) {
372371

373372
@Override
374373
public BatchResult<ClusterChangedEvent> execute(ClusterState currentState, List<ClusterChangedEvent> tasks) throws Exception {
375-
ClusterState accumulator = ClusterState.builder(currentState).build();
376374
BatchResult.Builder<ClusterChangedEvent> builder = BatchResult.builder();
377-
ClusterState.Builder newState = ClusterState.builder(accumulator).incrementVersion();
378-
boolean clusterStateChanged = updateNodes(accumulator, tasks, newState);
379-
clusterStateChanged |= updateIndicesAndMetaData(accumulator, tasks, newState);
380-
if (clusterStateChanged) {
381-
accumulator = newState.build();
382-
}
375+
ClusterState.Builder newState = ClusterState.builder(currentState).incrementVersion();
376+
boolean clusterStateChanged = updateNodes(currentState, tasks, newState);
377+
clusterStateChanged |= updateIndicesAndMetaData(currentState, tasks, newState);
383378
builder.successes(tasks);
384-
return builder.build(accumulator);
379+
return builder.build(clusterStateChanged ? newState.build() : currentState);
385380
}
386381

387382
private boolean updateNodes(ClusterState currentState, List<ClusterChangedEvent> tasks, ClusterState.Builder newState) {
@@ -506,34 +501,19 @@ private boolean updateIndicesAndMetaData(ClusterState currentState, List<Cluster
506501

507502
private boolean updateCustoms(ClusterState currentState, List<ClusterChangedEvent> tasks, MetaData.Builder metaData) {
508503
boolean clusterStateChanged = false;
509-
ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1);
510-
final ClusterState tribeClientState = latestTask.state();
511504
Set<String> changedCustomMetaDataTypeSet = tasks.stream()
512505
.map(ClusterChangedEvent::changedCustomMetaDataSet)
513506
.flatMap(Collection::stream)
514507
.collect(Collectors.toSet());
515508
final List<Node> tribeClientNodes = TribeService.this.nodes;
516509
Map<String, MetaData.Custom> mergedCustomMetaDataMap = mergeChangedCustomMetaData(changedCustomMetaDataTypeSet,
517-
customMetaDataType -> {
518-
List<MetaData.Custom> tribeCustomMetaDataList = new ArrayList<>(tribeClientNodes.size());
519-
for (Node tribeClientNode : tribeClientNodes) {
520-
String currentTribeName = TRIBE_NAME_SETTING.get(tribeClientNode.settings());
521-
final MetaData.Custom custom;
522-
if (currentTribeName.equals(tribeName)) {
523-
// tribe client node that triggered the cluster change event
524-
// use the latest changed state instead of getting the state
525-
// from the tribe client's cluster service
526-
custom = tribeClientState.metaData().custom(customMetaDataType);
527-
} else {
528-
ClusterState currentTribeClientState = getClusterService(tribeClientNode).state();
529-
custom = currentTribeClientState.metaData().custom(customMetaDataType);
530-
}
531-
if (custom != null && custom instanceof MergableCustomMetaData) {
532-
tribeCustomMetaDataList.add(custom);
533-
}
534-
}
535-
return tribeCustomMetaDataList;
536-
}
510+
customMetaDataType -> tribeClientNodes.stream()
511+
.map(TribeService::getClusterService).map(ClusterService::state)
512+
.map(ClusterState::metaData)
513+
.map(clusterMetaData -> ((MetaData.Custom) clusterMetaData.custom(customMetaDataType)))
514+
.filter(custom1 -> custom1 != null && custom1 instanceof MergableCustomMetaData)
515+
.map(custom2 -> (MergableCustomMetaData) custom2)
516+
.collect(Collectors.toList())
537517
);
538518
for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) {
539519
MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType);
@@ -585,20 +565,15 @@ private static ClusterService getClusterService(Node node) {
585565

586566
// pkg-private for testing
587567
static Map<String, MetaData.Custom> mergeChangedCustomMetaData(Set<String> changedCustomMetaDataTypeSet,
588-
Function<String, List<MetaData.Custom>> customMetaDataByTribeNode) {
568+
Function<String, List<MergableCustomMetaData>> customMetaDataByTribeNode) {
569+
589570
Map<String, MetaData.Custom> changedCustomMetaDataMap = new HashMap<>(changedCustomMetaDataTypeSet.size());
590571
for (String customMetaDataType : changedCustomMetaDataTypeSet) {
591-
List<MetaData.Custom> tribeCustomMetaDataList = customMetaDataByTribeNode.apply(customMetaDataType);
592-
if (tribeCustomMetaDataList.isEmpty() == false) {
593-
MetaData.Custom mergedCustomMetaData = tribeCustomMetaDataList.get(0);
594-
assert mergedCustomMetaData instanceof MergableCustomMetaData
595-
: "expected to merge only MergableCustomMetaData";
596-
for (int i = 1; i < tribeCustomMetaDataList.size(); i++) {
597-
MetaData.Custom currentCustom = tribeCustomMetaDataList.get(i);
598-
mergedCustomMetaData = ((MergableCustomMetaData) mergedCustomMetaData).merge(currentCustom);
599-
}
600-
changedCustomMetaDataMap.put(customMetaDataType, mergedCustomMetaData);
601-
}
572+
customMetaDataByTribeNode.apply(customMetaDataType).stream()
573+
.reduce((mergableCustomMD, mergableCustomMD2) ->
574+
((MergableCustomMetaData) mergableCustomMD.merge((MetaData.Custom) mergableCustomMD2)))
575+
.ifPresent(mergedCustomMetaData ->
576+
changedCustomMetaDataMap.put(customMetaDataType, ((MetaData.Custom) mergedCustomMetaData)));
602577
}
603578
return changedCustomMetaDataMap;
604579
}

core/src/test/java/org/elasticsearch/tribe/TribeIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,14 @@ public void testMergingMultipleCustomMetaData() throws Exception {
519519
putCustomMetaData(cluster2, secondCustomMetaDataType2);
520520
assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0));
521521
assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0));
522+
523+
// test removing custom md is propagates to tribe
524+
removeCustomMetaData(cluster2, secondCustomMetaDataType1.type());
525+
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
526+
assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0));
527+
removeCustomMetaData(cluster2, secondCustomMetaDataType2.type());
528+
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
529+
assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2);
522530
}
523531
}
524532

core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void testMergeCustomMetaData() {
131131
}
132132

133133
public void testMergeMultipleCustomMetaData() {
134-
Map<String, List<MetaData.Custom>> inputMap = new HashMap<>();
134+
Map<String, List<TribeService.MergableCustomMetaData>> inputMap = new HashMap<>();
135135
inputMap.put(MergableCustomMetaData1.TYPE,
136136
Arrays.asList(new MergableCustomMetaData1("data10"), new MergableCustomMetaData1("data11")));
137137
inputMap.put(MergableCustomMetaData2.TYPE,
@@ -149,15 +149,15 @@ public void testMergeMultipleCustomMetaData() {
149149
}
150150

151151
public void testMergeCustomMetaDataFromMany() {
152-
Map<String, List<MetaData.Custom>> inputMap = new HashMap<>();
152+
Map<String, List<TribeService.MergableCustomMetaData>> inputMap = new HashMap<>();
153153
int n = randomIntBetween(3, 5);
154-
List<MetaData.Custom> customList1 = new ArrayList<>();
154+
List<TribeService.MergableCustomMetaData> customList1 = new ArrayList<>();
155155
for (int i = 0; i <= n; i++) {
156156
customList1.add(new MergableCustomMetaData1("data1"+String.valueOf(i)));
157157
}
158158
Collections.shuffle(customList1, random());
159159
inputMap.put(MergableCustomMetaData1.TYPE, customList1);
160-
List<MetaData.Custom> customList2 = new ArrayList<>();
160+
List<TribeService.MergableCustomMetaData> customList2 = new ArrayList<>();
161161
for (int i = 0; i <= n; i++) {
162162
customList2.add(new MergableCustomMetaData2("data2"+String.valueOf(i)));
163163
}

0 commit comments

Comments
 (0)