Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.enrich;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
Expand All @@ -13,6 +14,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -40,6 +42,7 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
private static final ParseField INDICES = new ParseField("indices");
private static final ParseField MATCH_FIELD = new ParseField("match_field");
private static final ParseField ENRICH_FIELDS = new ParseField("enrich_fields");
private static final ParseField ELASTICSEARCH_VERSION = new ParseField("elasticsearch_version");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<EnrichPolicy, String> PARSER = new ConstructingObjectParser<>(
Expand All @@ -50,15 +53,16 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
(QuerySource) args[0],
(List<String>) args[1],
(String) args[2],
(List<String>) args[3]
(List<String>) args[3],
(Version) args[4]
)
);

static {
declareParserOptions(PARSER);
declareCommonConstructorParsingOptions(PARSER);
}

private static void declareParserOptions(ConstructingObjectParser<?, ?> parser) {
private static <T> void declareCommonConstructorParsingOptions(ConstructingObjectParser<T, ?> parser) {
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent());
contentBuilder.generator().copyCurrentStructure(p);
Expand All @@ -67,6 +71,8 @@ private static void declareParserOptions(ConstructingObjectParser<?, ?> parser)
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES);
parser.declareString(ConstructingObjectParser.constructorArg(), MATCH_FIELD);
parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_FIELDS);
parser.declareField(ConstructingObjectParser.optionalConstructorArg(), ((p, c) -> Version.fromString(p.text())),
ELASTICSEARCH_VERSION, ValueType.STRING);
}

public static EnrichPolicy fromXContent(XContentParser parser) throws IOException {
Expand Down Expand Up @@ -95,14 +101,16 @@ public static EnrichPolicy fromXContent(XContentParser parser) throws IOExceptio
private final List<String> indices;
private final String matchField;
private final List<String> enrichFields;
private final Version elasticsearchVersion;

public EnrichPolicy(StreamInput in) throws IOException {
this(
in.readString(),
in.readOptionalWriteable(QuerySource::new),
in.readStringList(),
in.readString(),
in.readStringList()
in.readStringList(),
Version.readVersion(in)
);
}

Expand All @@ -111,11 +119,21 @@ public EnrichPolicy(String type,
List<String> indices,
String matchField,
List<String> enrichFields) {
this(type, query, indices, matchField, enrichFields, Version.CURRENT);
}

public EnrichPolicy(String type,
QuerySource query,
List<String> indices,
String matchField,
List<String> enrichFields,
Version elasticsearchVersion) {
this.type = type;
this.query= query;
this.query = query;
this.indices = indices;
this.matchField = matchField;
this.enrichFields = enrichFields;
this.elasticsearchVersion = elasticsearchVersion != null ? elasticsearchVersion : Version.CURRENT;
}

public String getType() {
Expand All @@ -138,6 +156,10 @@ public List<String> getEnrichFields() {
return enrichFields;
}

public Version getElasticsearchVersion() {
return elasticsearchVersion;
}

public static String getBaseName(String policyName) {
return ENRICH_INDEX_NAME_BASE + policyName;
}
Expand All @@ -149,25 +171,29 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(indices);
out.writeString(matchField);
out.writeStringCollection(enrichFields);
Version.writeVersion(elasticsearchVersion, out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(type);
{
toInnerXContent(builder);
toInnerXContent(builder, params);
}
builder.endObject();
return builder;
}

private void toInnerXContent(XContentBuilder builder) throws IOException {
private void toInnerXContent(XContentBuilder builder, Params params) throws IOException {
if (query != null) {
builder.field(QUERY.getPreferredName(), query.getQueryAsMap());
}
builder.array(INDICES.getPreferredName(), indices.toArray(new String[0]));
builder.field(MATCH_FIELD.getPreferredName(), matchField);
builder.array(ENRICH_FIELDS.getPreferredName(), enrichFields.toArray(new String[0]));
if (params.paramAsBoolean("include_version", false) && elasticsearchVersion != null) {
builder.field(ELASTICSEARCH_VERSION.getPreferredName(), elasticsearchVersion.toString());
}
}

@Override
Expand All @@ -179,7 +205,8 @@ public boolean equals(Object o) {
Objects.equals(query, policy.query) &&
indices.equals(policy.indices) &&
matchField.equals(policy.matchField) &&
enrichFields.equals(policy.enrichFields);
enrichFields.equals(policy.enrichFields) &&
elasticsearchVersion.equals(policy.elasticsearchVersion);
}

@Override
Expand All @@ -189,7 +216,8 @@ public int hashCode() {
query,
indices,
matchField,
enrichFields
enrichFields,
elasticsearchVersion
);
}

Expand Down Expand Up @@ -257,13 +285,14 @@ public static class NamedPolicy implements Writeable, ToXContent {
(QuerySource) args[1],
(List<String>) args[2],
(String) args[3],
(List<String>) args[4])
(List<String>) args[4],
(Version) args[5])
)
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
declareParserOptions(PARSER);
declareCommonConstructorParsingOptions(PARSER);
}

private final String name;
Expand Down Expand Up @@ -299,7 +328,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(policy.type);
{
builder.field(NAME.getPreferredName(), name);
policy.toInnerXContent(builder);
policy.toInnerXContent(builder, params);
}
builder.endObject();
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.enrich.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -37,6 +38,10 @@ public static class Request extends MasterNodeRequest<PutEnrichPolicyAction.Requ

public Request(String name, EnrichPolicy policy) {
this.name = Objects.requireNonNull(name, "name cannot be null");
if (!Version.CURRENT.equals(policy.getElasticsearchVersion())) {
throw new IllegalArgumentException("Cannot set [version_created] field on enrich policy [" + name +
"]. Found [" + policy.getElasticsearchVersion() + "]");
}
this.policy = policy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -54,12 +55,25 @@ public static void putPolicy(String name, EnrichPolicy policy, ClusterService cl
}
// TODO: add policy validation

final EnrichPolicy finalPolicy;
if (policy.getElasticsearchVersion() == null) {
finalPolicy = new EnrichPolicy(
policy.getType(),
policy.getQuery(),
policy.getIndices(),
policy.getMatchField(),
policy.getEnrichFields(),
Version.CURRENT
);
} else {
finalPolicy = policy;
}
updateClusterState(clusterService, handler, current -> {
final Map<String, EnrichPolicy> policies = getPolicies(current);
if (policies.get(name) != null) {
throw new ResourceAlreadyExistsException("policy [{}] already exists", name);
}
policies.put(name, policy);
policies.put(name, finalPolicy);
return policies;
});
}
Expand Down