Skip to content

Commit 4c860c0

Browse files
authored
Add support for overwrite parameter in the enrich processor. (#45029)
Similar to how it is supported in the set processor: https://www.elastic.co/guide/en/elasticsearch/reference/current/set-processor.html Relates to #32789
1 parent da3c34c commit 4c860c0

File tree

4 files changed

+67
-7
lines changed

4 files changed

+67
-7
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
3636

3737
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey());
3838
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
39+
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
3940

4041
final List<EnrichSpecification> specifications;
4142
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
@@ -53,7 +54,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
5354

5455
switch (policy.getType()) {
5556
case EnrichPolicy.EXACT_MATCH_TYPE:
56-
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, specifications);
57+
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications);
5758
default:
5859
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
5960
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,23 @@ final class ExactMatchProcessor extends AbstractProcessor {
3232
private final String policyName;
3333
private final String enrichKey;
3434
private final boolean ignoreMissing;
35+
private final boolean overrideEnabled;
3536
private final List<EnrichSpecification> specifications;
3637

3738
ExactMatchProcessor(String tag,
3839
Client client,
3940
String policyName,
4041
String enrichKey,
4142
boolean ignoreMissing,
43+
boolean overrideEnabled,
4244
List<EnrichSpecification> specifications) {
4345
this(
4446
tag,
4547
createSearchRunner(client),
4648
policyName,
4749
enrichKey,
4850
ignoreMissing,
51+
overrideEnabled,
4952
specifications
5053
);
5154
}
@@ -55,12 +58,14 @@ final class ExactMatchProcessor extends AbstractProcessor {
5558
String policyName,
5659
String enrichKey,
5760
boolean ignoreMissing,
61+
boolean overrideEnabled,
5862
List<EnrichSpecification> specifications) {
5963
super(tag);
6064
this.searchRunner = searchRunner;
6165
this.policyName = policyName;
6266
this.enrichKey = enrichKey;
6367
this.ignoreMissing = ignoreMissing;
68+
this.overrideEnabled = overrideEnabled;
6469
this.specifications = specifications;
6570
}
6671

@@ -111,7 +116,9 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
111116
assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
112117
for (EnrichSpecification specification : specifications) {
113118
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
114-
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
119+
if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) {
120+
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
121+
}
115122
}
116123
handler.accept(ingestDocument, null);
117124
});
@@ -142,6 +149,10 @@ boolean isIgnoreMissing() {
142149
return ignoreMissing;
143150
}
144151

152+
boolean isOverrideEnabled() {
153+
return overrideEnabled;
154+
}
155+
145156
List<EnrichSpecification> getSpecifications() {
146157
return specifications;
147158
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public void testCreateProcessorInstance() throws Exception {
3838
config.put("ignore_missing", keyIgnoreMissing);
3939
}
4040

41+
Boolean overrideEnabled = randomBoolean() ? null : randomBoolean();
42+
if (overrideEnabled != null) {
43+
config.put("override", overrideEnabled);
44+
}
45+
4146
int numRandomValues = randomIntBetween(1, 8);
4247
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
4348
for (int i = 0; i < numRandomValues; i++) {
@@ -55,6 +60,11 @@ public void testCreateProcessorInstance() throws Exception {
5560
assertThat(result.getPolicyName(), equalTo("majestic"));
5661
assertThat(result.getEnrichKey(), equalTo("host"));
5762
assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing));
63+
if (overrideEnabled != null) {
64+
assertThat(result.isOverrideEnabled(), is(overrideEnabled));
65+
} else {
66+
assertThat(result.isOverrideEnabled(), is(true));
67+
}
5868
assertThat(result.getSpecifications().size(), equalTo(numRandomValues));
5969
for (int i = 0; i < numRandomValues; i++) {
6070
EnrichSpecification actual = result.getSpecifications().get(i);

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.IOException;
3434
import java.io.UncheckedIOException;
3535
import java.util.Collections;
36+
import java.util.HashMap;
3637
import java.util.List;
3738
import java.util.Map;
3839
import java.util.function.BiConsumer;
@@ -47,7 +48,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
4748

4849
public void testBasics() throws Exception {
4950
MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
50-
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
51+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
5152
List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
5253
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
5354
Map.of("domain", "elastic.co"));
@@ -76,7 +77,7 @@ public void testBasics() throws Exception {
7677

7778
public void testNoMatch() throws Exception {
7879
MockSearchFunction mockSearch = mockedSearchFunction();
79-
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
80+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
8081
List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
8182
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
8283
Map.of("domain", "elastic.com"));
@@ -106,7 +107,7 @@ public void testNoMatch() throws Exception {
106107
public void testSearchFailure() throws Exception {
107108
String indexName = ".enrich-_name";
108109
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
109-
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
110+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
110111
List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
111112
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
112113
Map.of("domain", "elastic.com"));
@@ -141,7 +142,7 @@ public void testSearchFailure() throws Exception {
141142
public void testIgnoreKeyMissing() throws Exception {
142143
{
143144
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
144-
true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
145+
true, true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
145146
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of());
146147

147148
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
@@ -152,7 +153,7 @@ public void testIgnoreKeyMissing() throws Exception {
152153
}
153154
{
154155
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
155-
false, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
156+
false, true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
156157
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of());
157158
IngestDocument[] resultHolder = new IngestDocument[1];
158159
Exception[] exceptionHolder = new Exception[1];
@@ -166,6 +167,43 @@ public void testIgnoreKeyMissing() throws Exception {
166167
}
167168
}
168169

170+
public void testExistingFieldWithOverrideDisabled() throws Exception {
171+
MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
172+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
173+
List.of(new EnrichSpecification("tld", "tld")));
174+
175+
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(Map.of("domain", "elastic.co", "tld", "tld")), Map.of());
176+
IngestDocument[] resultHolder = new IngestDocument[1];
177+
Exception[] exceptionHolder = new Exception[1];
178+
processor.execute(ingestDocument, (result, e) -> {
179+
resultHolder[0] = result;
180+
exceptionHolder[0] = e;
181+
});
182+
assertThat(exceptionHolder[0], nullValue());
183+
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
184+
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo("tld"));
185+
}
186+
187+
public void testExistingNullFieldWithOverrideDisabled() throws Exception {
188+
MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
189+
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
190+
List.of(new EnrichSpecification("tld", "tld")));
191+
192+
Map<String, Object> source = new HashMap<>();
193+
source.put("domain", "elastic.co");
194+
source.put("tld", null);
195+
IngestDocument ingestDocument = new IngestDocument(source, Map.of());
196+
IngestDocument[] resultHolder = new IngestDocument[1];
197+
Exception[] exceptionHolder = new Exception[1];
198+
processor.execute(ingestDocument, (result, e) -> {
199+
resultHolder[0] = result;
200+
exceptionHolder[0] = e;
201+
});
202+
assertThat(exceptionHolder[0], nullValue());
203+
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
204+
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo(null));
205+
}
206+
169207
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
170208
private final SearchResponse mockResponse;
171209
private final SetOnce<SearchRequest> capturedRequest;

0 commit comments

Comments
 (0)