Skip to content

Commit e8ffcd7

Browse files
authored
Add the cluster version to enrich policies (#45021)
Adds the Elasticsearch version as a field on the EnrichPolicy object
1 parent a18b587 commit e8ffcd7

File tree

3 files changed

+61
-13
lines changed

3 files changed

+61
-13
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.enrich;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.common.ParseField;
910
import org.elasticsearch.common.ParsingException;
1011
import org.elasticsearch.common.Strings;
@@ -13,6 +14,7 @@
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.common.io.stream.Writeable;
1516
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
17+
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
1618
import org.elasticsearch.common.xcontent.ToXContent;
1719
import org.elasticsearch.common.xcontent.ToXContentFragment;
1820
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -40,6 +42,7 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
4042
private static final ParseField INDICES = new ParseField("indices");
4143
private static final ParseField MATCH_FIELD = new ParseField("match_field");
4244
private static final ParseField ENRICH_FIELDS = new ParseField("enrich_fields");
45+
private static final ParseField ELASTICSEARCH_VERSION = new ParseField("elasticsearch_version");
4346

4447
@SuppressWarnings("unchecked")
4548
private static final ConstructingObjectParser<EnrichPolicy, String> PARSER = new ConstructingObjectParser<>(
@@ -50,15 +53,16 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
5053
(QuerySource) args[0],
5154
(List<String>) args[1],
5255
(String) args[2],
53-
(List<String>) args[3]
56+
(List<String>) args[3],
57+
(Version) args[4]
5458
)
5559
);
5660

5761
static {
58-
declareParserOptions(PARSER);
62+
declareCommonConstructorParsingOptions(PARSER);
5963
}
6064

61-
private static void declareParserOptions(ConstructingObjectParser<?, ?> parser) {
65+
private static <T> void declareCommonConstructorParsingOptions(ConstructingObjectParser<T, ?> parser) {
6266
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
6367
XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent());
6468
contentBuilder.generator().copyCurrentStructure(p);
@@ -67,6 +71,8 @@ private static void declareParserOptions(ConstructingObjectParser<?, ?> parser)
6771
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES);
6872
parser.declareString(ConstructingObjectParser.constructorArg(), MATCH_FIELD);
6973
parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_FIELDS);
74+
parser.declareField(ConstructingObjectParser.optionalConstructorArg(), ((p, c) -> Version.fromString(p.text())),
75+
ELASTICSEARCH_VERSION, ValueType.STRING);
7076
}
7177

7278
public static EnrichPolicy fromXContent(XContentParser parser) throws IOException {
@@ -95,14 +101,16 @@ public static EnrichPolicy fromXContent(XContentParser parser) throws IOExceptio
95101
private final List<String> indices;
96102
private final String matchField;
97103
private final List<String> enrichFields;
104+
private final Version elasticsearchVersion;
98105

99106
public EnrichPolicy(StreamInput in) throws IOException {
100107
this(
101108
in.readString(),
102109
in.readOptionalWriteable(QuerySource::new),
103110
in.readStringList(),
104111
in.readString(),
105-
in.readStringList()
112+
in.readStringList(),
113+
Version.readVersion(in)
106114
);
107115
}
108116

@@ -111,11 +119,21 @@ public EnrichPolicy(String type,
111119
List<String> indices,
112120
String matchField,
113121
List<String> enrichFields) {
122+
this(type, query, indices, matchField, enrichFields, Version.CURRENT);
123+
}
124+
125+
public EnrichPolicy(String type,
126+
QuerySource query,
127+
List<String> indices,
128+
String matchField,
129+
List<String> enrichFields,
130+
Version elasticsearchVersion) {
114131
this.type = type;
115-
this.query= query;
132+
this.query = query;
116133
this.indices = indices;
117134
this.matchField = matchField;
118135
this.enrichFields = enrichFields;
136+
this.elasticsearchVersion = elasticsearchVersion != null ? elasticsearchVersion : Version.CURRENT;
119137
}
120138

121139
public String getType() {
@@ -138,6 +156,10 @@ public List<String> getEnrichFields() {
138156
return enrichFields;
139157
}
140158

159+
public Version getElasticsearchVersion() {
160+
return elasticsearchVersion;
161+
}
162+
141163
public static String getBaseName(String policyName) {
142164
return ENRICH_INDEX_NAME_BASE + policyName;
143165
}
@@ -149,25 +171,29 @@ public void writeTo(StreamOutput out) throws IOException {
149171
out.writeStringCollection(indices);
150172
out.writeString(matchField);
151173
out.writeStringCollection(enrichFields);
174+
Version.writeVersion(elasticsearchVersion, out);
152175
}
153176

154177
@Override
155178
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
156179
builder.startObject(type);
157180
{
158-
toInnerXContent(builder);
181+
toInnerXContent(builder, params);
159182
}
160183
builder.endObject();
161184
return builder;
162185
}
163186

164-
private void toInnerXContent(XContentBuilder builder) throws IOException {
187+
private void toInnerXContent(XContentBuilder builder, Params params) throws IOException {
165188
if (query != null) {
166189
builder.field(QUERY.getPreferredName(), query.getQueryAsMap());
167190
}
168191
builder.array(INDICES.getPreferredName(), indices.toArray(new String[0]));
169192
builder.field(MATCH_FIELD.getPreferredName(), matchField);
170193
builder.array(ENRICH_FIELDS.getPreferredName(), enrichFields.toArray(new String[0]));
194+
if (params.paramAsBoolean("include_version", false) && elasticsearchVersion != null) {
195+
builder.field(ELASTICSEARCH_VERSION.getPreferredName(), elasticsearchVersion.toString());
196+
}
171197
}
172198

173199
@Override
@@ -179,7 +205,8 @@ public boolean equals(Object o) {
179205
Objects.equals(query, policy.query) &&
180206
indices.equals(policy.indices) &&
181207
matchField.equals(policy.matchField) &&
182-
enrichFields.equals(policy.enrichFields);
208+
enrichFields.equals(policy.enrichFields) &&
209+
elasticsearchVersion.equals(policy.elasticsearchVersion);
183210
}
184211

185212
@Override
@@ -189,7 +216,8 @@ public int hashCode() {
189216
query,
190217
indices,
191218
matchField,
192-
enrichFields
219+
enrichFields,
220+
elasticsearchVersion
193221
);
194222
}
195223

@@ -257,13 +285,14 @@ public static class NamedPolicy implements Writeable, ToXContent {
257285
(QuerySource) args[1],
258286
(List<String>) args[2],
259287
(String) args[3],
260-
(List<String>) args[4])
288+
(List<String>) args[4],
289+
(Version) args[5])
261290
)
262291
);
263292

264293
static {
265294
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
266-
declareParserOptions(PARSER);
295+
declareCommonConstructorParsingOptions(PARSER);
267296
}
268297

269298
private final String name;
@@ -299,7 +328,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
299328
builder.startObject(policy.type);
300329
{
301330
builder.field(NAME.getPreferredName(), name);
302-
policy.toInnerXContent(builder);
331+
policy.toInnerXContent(builder, params);
303332
}
304333
builder.endObject();
305334
builder.endObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.enrich.action;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.action.ActionRequestValidationException;
910
import org.elasticsearch.action.ActionType;
1011
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -37,6 +38,10 @@ public static class Request extends MasterNodeRequest<PutEnrichPolicyAction.Requ
3738

3839
public Request(String name, EnrichPolicy policy) {
3940
this.name = Objects.requireNonNull(name, "name cannot be null");
41+
if (!Version.CURRENT.equals(policy.getElasticsearchVersion())) {
42+
throw new IllegalArgumentException("Cannot set [version_created] field on enrich policy [" + name +
43+
"]. Found [" + policy.getElasticsearchVersion() + "]");
44+
}
4045
this.policy = policy;
4146
}
4247

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.ResourceAlreadyExistsException;
99
import org.elasticsearch.ResourceNotFoundException;
10+
import org.elasticsearch.Version;
1011
import org.elasticsearch.cluster.ClusterState;
1112
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1213
import org.elasticsearch.cluster.metadata.MetaData;
@@ -54,12 +55,25 @@ public static void putPolicy(String name, EnrichPolicy policy, ClusterService cl
5455
}
5556
// TODO: add policy validation
5657

58+
final EnrichPolicy finalPolicy;
59+
if (policy.getElasticsearchVersion() == null) {
60+
finalPolicy = new EnrichPolicy(
61+
policy.getType(),
62+
policy.getQuery(),
63+
policy.getIndices(),
64+
policy.getMatchField(),
65+
policy.getEnrichFields(),
66+
Version.CURRENT
67+
);
68+
} else {
69+
finalPolicy = policy;
70+
}
5771
updateClusterState(clusterService, handler, current -> {
5872
final Map<String, EnrichPolicy> policies = getPolicies(current);
5973
if (policies.get(name) != null) {
6074
throw new ResourceAlreadyExistsException("policy [{}] already exists", name);
6175
}
62-
policies.put(name, policy);
76+
policies.put(name, finalPolicy);
6377
return policies;
6478
});
6579
}

0 commit comments

Comments
 (0)