Skip to content

Commit e3fd1e6

Browse files
committed
Add support for overwrite parameter in the enrich processor. (#45029)
Similar to how it is supported in the set processor: https://www.elastic.co/guide/en/elasticsearch/reference/current/set-processor.html Relates to #32789
1 parent 39f2803 commit e3fd1e6

File tree

4 files changed

+91
-7
lines changed

4 files changed

+91
-7
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
3737

3838
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey());
3939
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
40+
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
4041

4142
final List<EnrichSpecification> specifications;
4243
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
@@ -54,7 +55,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
5455

5556
switch (policy.getType()) {
5657
case EnrichPolicy.EXACT_MATCH_TYPE:
57-
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, specifications);
58+
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications);
5859
default:
5960
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
6061
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,23 @@ final class ExactMatchProcessor extends AbstractProcessor {
3232
private final String policyName;
3333
private final String enrichKey;
3434
private final boolean ignoreMissing;
35+
private final boolean overrideEnabled;
3536
private final List<EnrichSpecification> specifications;
3637

3738
ExactMatchProcessor(String tag,
3839
Client client,
3940
String policyName,
4041
String enrichKey,
4142
boolean ignoreMissing,
43+
boolean overrideEnabled,
4244
List<EnrichSpecification> specifications) {
4345
this(
4446
tag,
4547
createSearchRunner(client),
4648
policyName,
4749
enrichKey,
4850
ignoreMissing,
51+
overrideEnabled,
4952
specifications
5053
);
5154
}
@@ -55,12 +58,14 @@ final class ExactMatchProcessor extends AbstractProcessor {
5558
String policyName,
5659
String enrichKey,
5760
boolean ignoreMissing,
61+
boolean overrideEnabled,
5862
List<EnrichSpecification> specifications) {
5963
super(tag);
6064
this.searchRunner = searchRunner;
6165
this.policyName = policyName;
6266
this.enrichKey = enrichKey;
6367
this.ignoreMissing = ignoreMissing;
68+
this.overrideEnabled = overrideEnabled;
6469
this.specifications = specifications;
6570
}
6671

@@ -111,7 +116,9 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
111116
assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
112117
for (EnrichSpecification specification : specifications) {
113118
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
114-
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
119+
if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) {
120+
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
121+
}
115122
}
116123
handler.accept(ingestDocument, null);
117124
});
@@ -142,6 +149,10 @@ boolean isIgnoreMissing() {
142149
return ignoreMissing;
143150
}
144151

152+
boolean isOverrideEnabled() {
153+
return overrideEnabled;
154+
}
155+
145156
List<EnrichSpecification> getSpecifications() {
146157
return specifications;
147158
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public void testCreateProcessorInstance() throws Exception {
3939
config.put("ignore_missing", keyIgnoreMissing);
4040
}
4141

42+
Boolean overrideEnabled = randomBoolean() ? null : randomBoolean();
43+
if (overrideEnabled != null) {
44+
config.put("override", overrideEnabled);
45+
}
46+
4247
int numRandomValues = randomIntBetween(1, 8);
4348
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
4449
for (int i = 0; i < numRandomValues; i++) {
@@ -59,6 +64,11 @@ public void testCreateProcessorInstance() throws Exception {
5964
assertThat(result.getPolicyName(), equalTo("majestic"));
6065
assertThat(result.getEnrichKey(), equalTo("host"));
6166
assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing));
67+
if (overrideEnabled != null) {
68+
assertThat(result.isOverrideEnabled(), is(overrideEnabled));
69+
} else {
70+
assertThat(result.isOverrideEnabled(), is(true));
71+
}
6272
assertThat(result.getSpecifications().size(), equalTo(numRandomValues));
6373
for (int i = 0; i < numRandomValues; i++) {
6474
EnrichSpecification actual = result.getSpecifications().get(i);

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void testBasics() throws Exception {
5656
documents.put("elastic.co", document1);
5757
}
5858
MockSearchFunction mockSearch = mockedSearchFunction(documents);
59-
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
59+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
6060
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
6161
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
6262
Collections.singletonMap("domain", "elastic.co"));
@@ -85,7 +85,7 @@ public void testBasics() throws Exception {
8585

8686
public void testNoMatch() throws Exception {
8787
MockSearchFunction mockSearch = mockedSearchFunction();
88-
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
88+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
8989
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
9090
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
9191
Collections.singletonMap("domain", "elastic.com"));
@@ -115,7 +115,7 @@ public void testNoMatch() throws Exception {
115115
public void testSearchFailure() throws Exception {
116116
String indexName = ".enrich-_name";
117117
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
118-
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
118+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
119119
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
120120
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
121121
Collections.singletonMap("domain", "elastic.com"));
@@ -150,7 +150,7 @@ public void testSearchFailure() throws Exception {
150150
public void testIgnoreKeyMissing() throws Exception {
151151
{
152152
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
153-
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
153+
true, true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
154154
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
155155
Collections.emptyMap());
156156

@@ -162,7 +162,7 @@ public void testIgnoreKeyMissing() throws Exception {
162162
}
163163
{
164164
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
165-
false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
165+
false, true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
166166
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
167167
Collections.emptyMap());
168168
IngestDocument[] resultHolder = new IngestDocument[1];
@@ -177,6 +177,43 @@ public void testIgnoreKeyMissing() throws Exception {
177177
}
178178
}
179179

180+
public void testExistingFieldWithOverrideDisabled() throws Exception {
181+
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
182+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
183+
Collections.singletonList(new EnrichSpecification("tld", "tld")));
184+
185+
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(mapOf("domain", "elastic.co", "tld", "tld")), mapOf());
186+
IngestDocument[] resultHolder = new IngestDocument[1];
187+
Exception[] exceptionHolder = new Exception[1];
188+
processor.execute(ingestDocument, (result, e) -> {
189+
resultHolder[0] = result;
190+
exceptionHolder[0] = e;
191+
});
192+
assertThat(exceptionHolder[0], nullValue());
193+
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
194+
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo("tld"));
195+
}
196+
197+
public void testExistingNullFieldWithOverrideDisabled() throws Exception {
198+
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
199+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
200+
Collections.singletonList(new EnrichSpecification("tld", "tld")));
201+
202+
Map<String, Object> source = new HashMap<>();
203+
source.put("domain", "elastic.co");
204+
source.put("tld", null);
205+
IngestDocument ingestDocument = new IngestDocument(source, mapOf());
206+
IngestDocument[] resultHolder = new IngestDocument[1];
207+
Exception[] exceptionHolder = new Exception[1];
208+
processor.execute(ingestDocument, (result, e) -> {
209+
resultHolder[0] = result;
210+
exceptionHolder[0] = e;
211+
});
212+
assertThat(exceptionHolder[0], nullValue());
213+
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
214+
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo(null));
215+
}
216+
180217
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
181218
private final SearchResponse mockResponse;
182219
private final SetOnce<SearchRequest> capturedRequest;
@@ -240,4 +277,29 @@ public SearchResponse mockResponse(Map<String, Map<String, ?>> documents) {
240277
new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()),
241278
false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0));
242279
}
280+
281+
static <K, V> Map<K, V> mapOf() {
282+
return Collections.emptyMap();
283+
}
284+
285+
static <K, V> Map<K, V> mapOf(K key1, V value1) {
286+
Map<K, V> map = new HashMap<>();
287+
map.put(key1, value1);
288+
return map;
289+
}
290+
291+
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2) {
292+
Map<K, V> map = new HashMap<>();
293+
map.put(key1, value1);
294+
map.put(key2, value2);
295+
return map;
296+
}
297+
298+
static Map<String, ?> mapOf(String key1, Object value1, String key2, Object value2, String key3, Object value3) {
299+
Map<String, Object> map = new HashMap<>();
300+
map.put(key1, value1);
301+
map.put(key2, value2);
302+
map.put(key3, value3);
303+
return map;
304+
}
243305
}

0 commit comments

Comments
 (0)