Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -314,7 +316,12 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.add(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(), classpathPlugins);
Collection<BinaryOperator<Map<String, MetaData.Custom>>> customMetaDataReducers =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getCustomMetaDataReducer)
.collect(Collectors.toList());
final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(), classpathPlugins,
getCustomMetaDataReducer(customMetaDataReducers));
resourcesToClose.add(tribeService);
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
Expand Down Expand Up @@ -860,4 +867,14 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
}
return customNameResolvers;
}

private BinaryOperator<Map<String, MetaData.Custom>> getCustomMetaDataReducer(
final Collection<BinaryOperator<Map<String, MetaData.Custom>>> customMetaDataReducers) {
return (originalCustoms, newCustoms) -> {
for (BinaryOperator<Map<String, MetaData.Custom>> customMetaDataReducer : customMetaDataReducers) {
originalCustoms = customMetaDataReducer.apply(originalCustoms, newCustoms);
}
return originalCustoms;
};
}
}
13 changes: 13 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Map;
import java.util.function.BinaryOperator;
import java.util.function.UnaryOperator;

/**
Expand Down Expand Up @@ -148,6 +149,18 @@ public UnaryOperator<Map<String, MetaData.Custom>> getCustomMetaDataUpgrader() {
return UnaryOperator.identity();
}

/**
* Provides a function to reduce a global custom meta data from multiple custom meta data in tribe node.
* The selected global custom meta data is stored in the tribe node in case of multiple underlying clusters
* have same global custom meta data type.
* <p>
* Plugins should return the original custom map if no reduction is required.
* @return Never {@code null}. The original custom map or upgraded {@code MetaData.Custom} map.
*/
public BinaryOperator<Map<String, MetaData.Custom>> getCustomMetaDataReducer() {
return (originalCustoms, newCustoms) -> originalCustoms;
}

/**
* Provides the list of this plugin's custom thread pools, empty if
* none.
Expand Down
48 changes: 47 additions & 1 deletion core/src/main/java/org/elasticsearch/tribe/TribeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.tribe;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.BytesRef;
Expand All @@ -42,6 +44,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.hash.MurmurHash3;
import org.elasticsearch.common.network.NetworkModule;
Expand Down Expand Up @@ -71,6 +74,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BinaryOperator;
import java.util.function.Function;

import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -183,9 +187,11 @@ public static Settings processSettings(Settings settings) {
private final Set<String> droppedIndices = ConcurrentCollections.newConcurrentSet();

private final List<Node> nodes = new CopyOnWriteArrayList<>();
private final BinaryOperator<Map<String, MetaData.Custom>> metaDataReducer;

public TribeService(Settings settings, ClusterService clusterService, final String tribeNodeId,
Collection<Class<? extends Plugin>> classpathPlugins) {
Collection<Class<? extends Plugin>> classpathPlugins,
BinaryOperator<Map<String, MetaData.Custom>> metaDataReducer) {
super(settings);
this.clusterService = clusterService;
Map<String, Settings> nodesSettings = new HashMap<>(settings.getGroups("tribe", true));
Expand All @@ -210,6 +216,7 @@ public TribeService(Settings settings, ClusterService clusterService, final Stri
}

this.onConflict = ON_CONFLICT_SETTING.get(settings);
this.metaDataReducer = metaDataReducer;
}

// pkg private for testing
Expand Down Expand Up @@ -463,6 +470,19 @@ private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent
}
}

ImmutableOpenMap<String, MetaData.Custom> reducedCustoms = reduceCustomMetaData(
currentState.metaData().customs(), tribeState.metaData().customs(), metaDataReducer);

if (currentState.metaData().customs() != reducedCustoms) {
clusterStateChanged = true;
for (ObjectCursor<String> cursor : currentState.metaData().customs().keys()) {
metaData.removeCustom(cursor.value);
}
for (ObjectObjectCursor<String, MetaData.Custom> reducedCustom : reducedCustoms) {
metaData.putCustom(reducedCustom.key, reducedCustom.value);
}
}

if (!clusterStateChanged) {
return currentState;
} else {
Expand Down Expand Up @@ -494,4 +514,30 @@ private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks,
}
}
}

// pkg-private for testing
static ImmutableOpenMap<String, MetaData.Custom> reduceCustomMetaData(
ImmutableOpenMap<String, MetaData.Custom> currentCustomMetaData,
ImmutableOpenMap<String, MetaData.Custom> newCustomMetaData,
BinaryOperator<Map<String, MetaData.Custom>> metaDataReducer) {

Map<String, MetaData.Custom> existingCustoms = new HashMap<>();
for (ObjectObjectCursor<String, MetaData.Custom> customCursor : currentCustomMetaData) {
existingCustoms.put(customCursor.key, customCursor.value);
}
Map<String, MetaData.Custom> newCustoms = new HashMap<>();
for (ObjectObjectCursor<String, MetaData.Custom> customCursor : newCustomMetaData) {
newCustoms.put(customCursor.key, customCursor.value);
}
Map<String, MetaData.Custom> reducedCustoms = metaDataReducer.apply(new HashMap<>(existingCustoms), newCustoms);
if (reducedCustoms.equals(existingCustoms) == false) {
ImmutableOpenMap.Builder<String, MetaData.Custom> builder = ImmutableOpenMap.builder();
for (Map.Entry<String, MetaData.Custom> customEntry : reducedCustoms.entrySet()) {
builder.put(customEntry.getKey(), customEntry.getValue());
}
return builder.build();
} else {
return currentCustomMetaData;
}
}
}
115 changes: 114 additions & 1 deletion core/src/test/java/org/elasticsearch/tribe/TribeIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -44,18 +48,25 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.TestCustomMetaData;
import org.elasticsearch.transport.Transport;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -102,6 +113,11 @@ public class TribeIT extends ESIntegTestCase {
**/
private static final Predicate<InternalTestCluster> ALL = c -> true;

/**
* Custom meta data comparator for testing reducing custom metadata on tribe node
*/
private static final Comparator<TestCustomMetaData> CUSTOM_META_DATA_COMPARATOR = (o1, o2) -> o1.getData().compareTo(o2.getData());

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand All @@ -113,7 +129,57 @@ protected Settings nodeSettings(int nodeOrdinal) {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return getMockPlugins();
List<Class<? extends Plugin>> nodePlugins = new ArrayList<>();
nodePlugins.add(TestPluginWithCustomMetaData.class);
nodePlugins.addAll(getMockPlugins());
return nodePlugins;
}


public static class TestPluginWithCustomMetaData extends Plugin {
static {
MetaData.registerPrototype(CustomMetaData1.TYPE, new CustomMetaData1(""));
}
@Override
public BinaryOperator<Map<String, MetaData.Custom>> getCustomMetaDataReducer() {
return (existingCustoms, newCustoms) -> {
List<TestCustomMetaData> customs = new ArrayList<>();
if (existingCustoms.containsKey(CustomMetaData1.TYPE)) {
customs.add(((TestCustomMetaData) existingCustoms.get(CustomMetaData1.TYPE)));
}
if (newCustoms.containsKey(CustomMetaData1.TYPE)) {
customs.add(((TestCustomMetaData) newCustoms.get(CustomMetaData1.TYPE)));
}
if (customs.isEmpty() == false) {
Collections.sort(customs, CUSTOM_META_DATA_COMPARATOR);
existingCustoms.put(CustomMetaData1.TYPE, customs.get(0));
}
return existingCustoms;
};
}
}

private static class CustomMetaData1 extends TestCustomMetaData {
public static final String TYPE = "custom_md_1";

protected CustomMetaData1(String data) {
super(data);
}

@Override
protected TestCustomMetaData newTestCustomMetaData(String data) {
return new CustomMetaData1(data);
}

@Override
public String type() {
return TYPE;
}

@Override
public EnumSet<MetaData.XContentContext> context() {
return EnumSet.of(MetaData.XContentContext.GATEWAY);
}
}

@Before
Expand Down Expand Up @@ -459,6 +525,53 @@ public void testClusterStateNodes() throws Exception {
}
}

public void testCustomMetaDataReduce() throws Exception {
CustomMetaData1 customMetaData1 = new CustomMetaData1(randomAsciiOfLength(10));
CustomMetaData1 customMetaData2 = new CustomMetaData1(randomAsciiOfLength(10));
List<TestCustomMetaData> customMetaDatas = Arrays.asList(customMetaData1, customMetaData2);
Collections.sort(customMetaDatas, CUSTOM_META_DATA_COMPARATOR);
final CustomMetaData1 tribeNodeCustomMetaData = ((CustomMetaData1) customMetaDatas.get(0));
try (Releasable tribeNode = startTribeNode()) {
CountDownLatch expectedCustomMetaDataFound = new CountDownLatch(1);
assertThat(internalCluster().getNodeNames().length, equalTo(1));
internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]).add(event -> {
ImmutableOpenMap<String, MetaData.Custom> customs = event.state().metaData().customs();
MetaData.Custom custom = customs.get(CustomMetaData1.TYPE);
if (custom.equals(tribeNodeCustomMetaData)) {
expectedCustomMetaDataFound.countDown();
}
});
updateCustomMetaData(cluster1, customMetaData1);
updateCustomMetaData(cluster2, customMetaData2);
expectedCustomMetaDataFound.await();
}
}

private static void updateCustomMetaData(InternalTestCluster cluster, final CustomMetaData1 customMetaData)
throws InterruptedException {
ClusterService clusterService = cluster.getInstance(ClusterService.class, cluster.getMasterName());
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData.Builder builder = MetaData.builder(currentState.metaData());
builder.putCustom(CustomMetaData1.TYPE, customMetaData);
return new ClusterState.Builder(currentState).metaData(builder).build();
}

@Override
public void onFailure(String source, Exception e) {
fail("failed to apply cluster state from [" + source + "] with " + e.getMessage());
}
});
latch.await();
}

private void assertIndicesExist(Client client, String... indices) throws Exception {
assertBusy(() -> {
ClusterState state = client.admin().cluster().prepareState().setRoutingTable(true).setMetaData(true).get().getState();
Expand Down
Loading