Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.CustomPrototypeRegistry;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -359,10 +360,17 @@ public CreateIndexRequest source(byte[] source, int offset, int length) {
* Sets the settings and mappings as a single source.
*/
public CreateIndexRequest source(BytesReference source) {
return source(source, CustomPrototypeRegistry.EMPTY);
}

/**
* Sets the settings and mappings as a single source.
*/
public CreateIndexRequest source(BytesReference source, CustomPrototypeRegistry registry) {
XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) {
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(source)) {
source(parser.map());
source(parser.map(), registry);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse source for create index", e);
}
Expand All @@ -375,8 +383,15 @@ public CreateIndexRequest source(BytesReference source) {
/**
* Sets the settings and mappings as a single source.
*/
@SuppressWarnings("unchecked")
public CreateIndexRequest source(Map<String, ?> source) {
return source(source, CustomPrototypeRegistry.EMPTY);
}

/**
* Sets the settings and mappings as a single source.
*/
@SuppressWarnings("unchecked")
public CreateIndexRequest source(Map<String, ?> source, CustomPrototypeRegistry registry) {
boolean found = false;
for (Map.Entry<String, ?> entry : source.entrySet()) {
String name = entry.getKey();
Expand All @@ -393,8 +408,7 @@ public CreateIndexRequest source(Map<String, ?> source) {
found = true;
aliases((Map<String, Object>) entry.getValue());
} else {
// maybe custom?
IndexMetaData.Custom proto = IndexMetaData.lookupPrototype(name);
IndexMetaData.Custom proto = registry.getIndexMetadataPrototype(name);
if (proto != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw an exception if the proto doesn't resolve to anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You made a Safe method for this already I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, we should do that here. I'll use the Safe variants for that.

found = true;
try {
Expand Down Expand Up @@ -475,7 +489,6 @@ public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -489,9 +502,8 @@ public void readFrom(StreamInput in) throws IOException {
}
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readString();
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupPrototypeSafe(type).readFrom(in);
customs.put(type, customIndexMetaData);
IndexMetaData.Custom customIndexMetaData = in.readNamedWriteable(IndexMetaData.Custom.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks properly BWC to me. ++

customs.put(customIndexMetaData.type(), customIndexMetaData);
}
int aliasesSize = in.readVInt();
for (int i = 0; i < aliasesSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.CustomPrototypeRegistry;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -281,7 +282,7 @@ public PutIndexTemplateRequest source(XContentBuilder templateBuilder) {
* The template source definition.
*/
@SuppressWarnings("unchecked")
public PutIndexTemplateRequest source(Map templateSource) {
public PutIndexTemplateRequest source(Map templateSource, CustomPrototypeRegistry registry) {
Map<String, Object> source = templateSource;
for (Map.Entry<String, Object> entry : source.entrySet()) {
String name = entry.getKey();
Expand Down Expand Up @@ -312,8 +313,7 @@ public PutIndexTemplateRequest source(Map templateSource) {
} else if (name.equals("aliases")) {
aliases((Map<String, Object>) entry.getValue());
} else {
// maybe custom?
IndexMetaData.Custom proto = IndexMetaData.lookupPrototype(name);
IndexMetaData.Custom proto = registry.getIndexMetadataPrototypeSafe(name);
if (proto != null) {
try {
customs.put(name, proto.fromMap((Map<String, Object>) entry.getValue()));
Expand All @@ -331,7 +331,7 @@ public PutIndexTemplateRequest source(Map templateSource) {
*/
public PutIndexTemplateRequest source(String templateSource) {
try (XContentParser parser = XContentFactory.xContent(templateSource).createParser(templateSource)) {
return source(parser.mapOrdered());
return source(parser.mapOrdered(), CustomPrototypeRegistry.EMPTY);
} catch (Exception e) {
throw new IllegalArgumentException("failed to parse template source [" + templateSource + "]", e);
}
Expand All @@ -349,7 +349,7 @@ public PutIndexTemplateRequest source(byte[] source) {
*/
public PutIndexTemplateRequest source(byte[] source, int offset, int length) {
try (XContentParser parser = XContentFactory.xContent(source, offset, length).createParser(source, offset, length)) {
return source(parser.mapOrdered());
return source(parser.mapOrdered(), CustomPrototypeRegistry.EMPTY);
} catch (IOException e) {
throw new IllegalArgumentException("failed to parse template source", e);
}
Expand All @@ -359,8 +359,15 @@ public PutIndexTemplateRequest source(byte[] source, int offset, int length) {
* The template source definition.
*/
public PutIndexTemplateRequest source(BytesReference source) {
return source(source, CustomPrototypeRegistry.EMPTY);
}

/**
* The template source definition.
*/
public PutIndexTemplateRequest source(BytesReference source, CustomPrototypeRegistry registry) {
try (XContentParser parser = XContentFactory.xContent(source).createParser(source)) {
return source(parser.mapOrdered());
return source(parser.mapOrdered(), registry);
} catch (IOException e) {
throw new IllegalArgumentException("failed to parse template source", e);
}
Expand Down Expand Up @@ -459,9 +466,8 @@ public void readFrom(StreamInput in) throws IOException {
}
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readString();
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupPrototypeSafe(type).readFrom(in);
customs.put(type, customIndexMetaData);
IndexMetaData.Custom customIndexMetaData = in.readNamedWriteable(IndexMetaData.Custom.class);
customs.put(customIndexMetaData.type(), customIndexMetaData);
}
int aliasesSize = in.readVInt();
for (int i = 0; i < aliasesSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.CustomPrototypeRegistry;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -208,7 +209,7 @@ public PutIndexTemplateRequestBuilder setSource(XContentBuilder templateBuilder)
* The template source definition.
*/
public PutIndexTemplateRequestBuilder setSource(Map templateSource) {
request.source(templateSource);
request.source(templateSource, CustomPrototypeRegistry.EMPTY);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.CustomPrototypeRegistry;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
Expand All @@ -45,6 +47,7 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
Expand Down Expand Up @@ -120,10 +123,13 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
CustomPrototypeRegistry registry =
ClusterModule.createCustomPrototypeRegistry(pluginsService.filterPlugins(ClusterPlugin.class));

SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(NetworkModule.getNamedWriteables());
entries.addAll(registry.getNamedWriteables());
entries.addAll(searchModule.getNamedWriteables());
entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Diff<T> diff(T previousState) {
}

@Override
public Diff<T> readDiffFrom(StreamInput in) throws IOException {
public Diff<T> readDiffFrom(StreamInput in, CustomPrototypeRegistry registry) throws IOException {
return new CompleteDiff<>(this, in);
}

Expand Down
54 changes: 54 additions & 0 deletions core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -58,7 +62,9 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.script.ScriptMetaData;
import org.elasticsearch.tasks.TaskResultsService;

import java.util.Collection;
Expand Down Expand Up @@ -86,6 +92,8 @@ public class ClusterModule extends AbstractModule {
final Collection<AllocationDecider> allocationDeciders;
final ShardsAllocator shardsAllocator;

private final CustomPrototypeRegistry registry;

// pkg private so tests can mock
Class<? extends ClusterInfoService> clusterInfoServiceImpl = InternalClusterInfoService.class;

Expand All @@ -95,6 +103,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
registry = createCustomPrototypeRegistry(clusterPlugins);
}

public IndexNameExpressionResolver getIndexNameExpressionResolver() {
Expand Down Expand Up @@ -157,6 +166,46 @@ private static ShardsAllocator createShardsAllocator(Settings settings, ClusterS
"ShardsAllocator factory for [" + allocatorName + "] returned null");
}

/**
* Creates a {@link CustomPrototypeRegistry} instance that registers built-in cluster state parts and custom
* cluster state parts provided by plugins
*/
public static CustomPrototypeRegistry createCustomPrototypeRegistry(Collection<ClusterPlugin> clusterPlugins) {
Map<String, ClusterState.Custom> customClusterStatePrototypes = new HashMap<>();
customClusterStatePrototypes.put(SnapshotsInProgress.TYPE, SnapshotsInProgress.PROTO);
customClusterStatePrototypes.put(RestoreInProgress.TYPE, RestoreInProgress.PROTO);

Map<String, MetaData.Custom> customMetadataPrototypes = new HashMap<>();
customMetadataPrototypes.put(RepositoriesMetaData.TYPE, RepositoriesMetaData.PROTO);
customMetadataPrototypes.put(IngestMetadata.TYPE, IngestMetadata.PROTO);
customMetadataPrototypes.put(ScriptMetaData.TYPE, ScriptMetaData.PROTO);
customMetadataPrototypes.put(IndexGraveyard.TYPE, IndexGraveyard.PROTO);

Map<String, IndexMetaData.Custom> customIndexMetadataPrototypes = new HashMap<>();

for (ClusterPlugin clusterPlugin : clusterPlugins) {
for (ClusterState.Custom custom : clusterPlugin.getCustomClusterState()) {
ClusterState.Custom previous = customClusterStatePrototypes.putIfAbsent(custom.type(), custom);
if (previous != null) {
throw new IllegalStateException("Custom cluster state [" + custom.type() + "] already declared");
}
}
for (MetaData.Custom custom : clusterPlugin.getCustomMetadata()) {
MetaData.Custom previous = customMetadataPrototypes.putIfAbsent(custom.type(), custom);
if (previous != null) {
throw new IllegalStateException("Custom metadata [" + custom.type() + "] already declared");
}
}
for (IndexMetaData.Custom custom : clusterPlugin.getCustomIndexMetadata()) {
IndexMetaData.Custom previous = customIndexMetadataPrototypes.putIfAbsent(custom.type(), custom);
if (previous != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

throw new IllegalStateException("Custom index metadata [" + custom.type() + "] already declared");
}
}
}
return new CustomPrototypeRegistry(customClusterStatePrototypes, customMetadataPrototypes, customIndexMetadataPrototypes);
}

@Override
protected void configure() {
bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
Expand All @@ -180,5 +229,10 @@ protected void configure() {
bind(TaskResultsService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(new AllocationDeciders(settings, allocationDeciders));
bind(ShardsAllocator.class).toInstance(shardsAllocator);
bind(CustomPrototypeRegistry.class).toInstance(registry);
}

public CustomPrototypeRegistry getRegistry() {
return registry;
}
}
Loading