Skip to content

Commit 3397242

Browse files
committed
Enrich processor configuration changes (#45466)
Enrich processor configuration changes: * Renamed `enrich_key` option to `field` option. * Replaced `set_from` and `targets` options with `target_field`. The `target_field` option behaves different to how `set_from` and `targets` worked. The `target_field` is the field that will contain the looked up document. Relates to #32789
1 parent 7f2ba91 commit 3397242

File tree

12 files changed

+144
-327
lines changed

12 files changed

+144
-327
lines changed

docs/reference/ingest/ingest-node.asciidoc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -906,8 +906,8 @@ PUT _ingest/pipeline/user_lookup
906906
{
907907
"enrich" : {
908908
"policy_name": "users-policy",
909-
"enrich_key" : "email",
910-
"targets": ["address", "city", "zip", "state"]
909+
"field" : "email",
910+
"target_field": "user"
911911
}
912912
}
913913
]
@@ -936,10 +936,15 @@ Which returns:
936936
"_seq_no": 55,
937937
"_primary_term": 1,
938938
"_source": {
939-
"zip": 70116,
940-
"address": "6649 N Blue Gum St",
941-
"city": "New Orleans",
942-
"state": "LA",
939+
"user": {
940+
"email": "[email protected]",
941+
"first_name": "Mardy",
942+
"last_name": "Brown",
943+
"zip": 70116,
944+
"address": "6649 N Blue Gum St",
945+
"city": "New Orleans",
946+
"state": "LA"
947+
},
943948
"email": "[email protected]"
944949
}
945950
}

docs/reference/ingest/processors/enrich.asciidoc

Lines changed: 4 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -6,96 +6,16 @@
66
The `enrich` processor can enrich documents with data from another index.
77
See <<ingest-enriching-data,enrich data>> section for more information how to set this up and
88
check out the <<enrich-processor-getting-started,getting started>> to get familiar with enrich policies and related APIs.
9-
a
9+
1010
[[enrich-options]]
1111
.Enrich Options
1212
[options="header"]
1313
|======
1414
| Name | Required | Default | Description
1515
| `policy_name` | yes | - | The name of the enrich policy to use.
16-
| `enrich_key` | no | Policy enrich_key | The field to get the value from for the enrich lookup.
17-
| `ignore_missing` | no | `false` | If `true` and `enrich_key` does not exist, the processor quietly exits without modifying the document
16+
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data.
17+
| `target_field` | yes | - | The field that will be used for the enrichment data.
18+
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
1819
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
19-
| `targets` | no 1) | - | Describes what fields should be added to the document being indexed from the lookup document
20-
| `set_from` | no 1) | - | Same as `targets`, but allows fields from the lookup document to added under a different name to the document being indexed
2120
include::common-options.asciidoc[]
2221
|======
23-
24-
1) Either `targets` or `set_from` must be specified.
25-
26-
[[enrich-processor-set-from]]
27-
==== Enrich `set_from` option
28-
29-
This option should be used in the case that the field in the looked up document should be placed under
30-
a different field in the document being ingested.
31-
32-
The `set_from` accepts an array with two fields:
33-
* `source` - The name of the field in the lookup document
34-
* `target` - The name of the field in the document being ingested that should hold the source field's value.
35-
36-
For example:
37-
38-
//////////////////////////
39-
40-
[source,js]
41-
--------------------------------------------------
42-
PUT /_enrich/policy/users-policy
43-
{
44-
"type": "exact_match",
45-
"indices": "users",
46-
"match_field": "email",
47-
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
48-
}
49-
--------------------------------------------------
50-
// CONSOLE
51-
// TEST
52-
53-
//////////////////////////
54-
55-
[source,js]
56-
--------------------------------------------------
57-
PUT _ingest/pipeline/user_lookup
58-
{
59-
"description" : "Enriching user details to messages",
60-
"processors" : [
61-
{
62-
"enrich" : {
63-
"policy_name": "users-policy",
64-
"enrich_key" : "email",
65-
"set_from": [
66-
{
67-
"source": "address",
68-
"target": "address-line-1"
69-
},
70-
{
71-
"source": "city",
72-
"target": "residence"
73-
},
74-
{
75-
"source": "zip",
76-
"target": "zipcode"
77-
},
78-
{
79-
"source": "state",
80-
"target": "us_state"
81-
}
82-
]
83-
}
84-
}
85-
]
86-
}
87-
--------------------------------------------------
88-
// CONSOLE
89-
// TEST[continued]
90-
91-
//////////////////////////
92-
93-
[source,js]
94-
--------------------------------------------------
95-
DELETE /_ingest/pipeline/user_lookup
96-
DELETE /_enrich/policy/users-policy
97-
--------------------------------------------------
98-
// CONSOLE
99-
// TEST[continued]
100-
101-
//////////////////////////

x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,7 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception
5454
// Create pipeline
5555
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline");
5656
putPipelineRequest.setJsonEntity("{\"processors\":[" +
57-
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
58-
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
59-
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
60-
"]}}" +
57+
"{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" +
6158
"]}");
6259
assertOK(client().performRequest(putPipelineRequest));
6360

@@ -70,11 +67,12 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception
7067
// Check if document has been enriched
7168
Request getRequest = new Request("GET", "/my-index/_doc/1");
7269
Map<String, Object> response = toMap(client().performRequest(getRequest));
73-
Map<?, ?> _source = (Map<?, ?>) response.get("_source");
74-
assertThat(_source.size(), equalTo(3));
70+
Map<?, ?> _source = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
71+
assertThat(_source.size(), equalTo(4));
7572
assertThat(_source.get("host"), equalTo("elastic.co"));
76-
assertThat(_source.get("global_rank"), equalTo(25));
77-
assertThat(_source.get("tld_rank"), equalTo(7));
73+
assertThat(_source.get("tld"), equalTo("co"));
74+
assertThat(_source.get("globalRank"), equalTo(25));
75+
assertThat(_source.get("tldRank"), equalTo(7));
7876

7977
if (deletePipeilne) {
8078
// delete the pipeline so the policies can be deleted
@@ -113,10 +111,7 @@ public void testDeleteExistingPipeline() throws Exception {
113111

114112
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
115113
putPipelineRequest.setJsonEntity("{\"processors\":[" +
116-
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
117-
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
118-
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
119-
"]}}" +
114+
"{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" +
120115
"]}");
121116
assertOK(client().performRequest(putPipelineRequest));
122117

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1313

1414
import java.util.Collections;
15-
import java.util.List;
1615
import java.util.Map;
1716
import java.util.function.Consumer;
18-
import java.util.stream.Collectors;
1917

2018
final class EnrichProcessorFactory implements Processor.Factory, Consumer<ClusterState> {
2119

@@ -35,42 +33,15 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
3533
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
3634
}
3735

38-
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getMatchField());
36+
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
3937
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
4038
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
41-
42-
final List<EnrichSpecification> specifications;
43-
final List<Map<?, ?>> setFromConfig = ConfigurationUtils.readOptionalList(TYPE, tag, config, "set_from");
44-
if (setFromConfig != null) {
45-
if (setFromConfig.isEmpty()) {
46-
throw new IllegalArgumentException("provided set_from is empty");
47-
}
48-
49-
// TODO: Add templating support in enrich_values source and target options
50-
specifications = setFromConfig.stream()
51-
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
52-
.collect(Collectors.toList());
53-
} else {
54-
final List<String> targetsConfig = ConfigurationUtils.readList(TYPE, tag, config, "targets");
55-
if (targetsConfig.isEmpty()) {
56-
throw new IllegalArgumentException("provided targets is empty");
57-
}
58-
59-
specifications = targetsConfig.stream()
60-
.map(value -> new EnrichSpecification(value, value))
61-
.collect(Collectors.toList());
62-
}
63-
64-
for (EnrichSpecification specification : specifications) {
65-
if (policy.getEnrichFields().contains(specification.sourceField) == false) {
66-
throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" +
67-
policyName + "]");
68-
}
69-
}
39+
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
7040

7141
switch (policy.getType()) {
7242
case EnrichPolicy.EXACT_MATCH_TYPE:
73-
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications);
43+
return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(),
44+
ignoreMissing, overrideEnabled);
7445
default:
7546
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
7647
}
@@ -89,15 +60,4 @@ public void accept(ClusterState state) {
8960
policies = enrichMetadata.getPolicies();
9061
}
9162

92-
static final class EnrichSpecification {
93-
94-
final String sourceField;
95-
final String targetField;
96-
97-
EnrichSpecification(String sourceField, String targetField) {
98-
this.sourceField = sourceField;
99-
this.targetField = targetField;
100-
}
101-
}
102-
10363
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
import org.elasticsearch.search.SearchHit;
1717
import org.elasticsearch.search.builder.SearchSourceBuilder;
1818
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
19-
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
2019
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
2120

22-
import java.util.List;
2321
import java.util.Map;
2422
import java.util.function.BiConsumer;
2523

@@ -28,61 +26,65 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
2826
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
2927

3028
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
31-
private final String enrichKey;
29+
private final String field;
30+
private final String targetField;
31+
private final String matchField;
3232
private final boolean ignoreMissing;
3333
private final boolean overrideEnabled;
34-
private final List<EnrichSpecification> specifications;
3534

3635
ExactMatchProcessor(String tag,
3736
Client client,
3837
String policyName,
39-
String enrichKey,
38+
String field,
39+
String targetField,
40+
String matchField,
4041
boolean ignoreMissing,
41-
boolean overrideEnabled,
42-
List<EnrichSpecification> specifications) {
42+
boolean overrideEnabled) {
4343
this(
4444
tag,
4545
createSearchRunner(client),
4646
policyName,
47-
enrichKey,
48-
ignoreMissing,
49-
overrideEnabled,
50-
specifications
47+
field,
48+
targetField,
49+
matchField, ignoreMissing,
50+
overrideEnabled
5151
);
5252
}
5353

5454
ExactMatchProcessor(String tag,
5555
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
5656
String policyName,
57-
String enrichKey,
57+
String field,
58+
String targetField,
59+
String matchField,
5860
boolean ignoreMissing,
59-
boolean overrideEnabled,
60-
List<EnrichSpecification> specifications) {
61+
boolean overrideEnabled) {
6162
super(tag, policyName);
6263
this.searchRunner = searchRunner;
63-
this.enrichKey = enrichKey;
64+
this.field = field;
65+
this.targetField = targetField;
66+
this.matchField = matchField;
6467
this.ignoreMissing = ignoreMissing;
6568
this.overrideEnabled = overrideEnabled;
66-
this.specifications = specifications;
6769
}
6870

6971
@Override
7072
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
7173
try {
7274
// If a document does not have the enrich key, return the unchanged document
73-
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
75+
final String value = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
7476
if (value == null) {
7577
handler.accept(ingestDocument, null);
7678
return;
7779
}
7880

79-
TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value);
81+
TermQueryBuilder termQuery = new TermQueryBuilder(matchField, value);
8082
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
8183
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
8284
searchBuilder.from(0);
8385
searchBuilder.size(1);
8486
searchBuilder.trackScores(false);
85-
searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null);
87+
searchBuilder.fetchSource(true);
8688
searchBuilder.query(constantScore);
8789

8890
SearchRequest req = new SearchRequest();
@@ -104,18 +106,15 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
104106
handler.accept(ingestDocument, null);
105107
return;
106108
} else if (searchHits.length > 1) {
107-
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + enrichKey + "]"));
109+
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + matchField + "]"));
108110
return;
109111
}
110112

111113
// If a document is returned, add its fields to the document
112114
Map<String, Object> enrichDocument = searchHits[0].getSourceAsMap();
113-
assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
114-
for (EnrichSpecification specification : specifications) {
115-
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
116-
if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) {
117-
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
118-
}
115+
assert enrichDocument != null : "enrich document for id [" + field + "] was empty despite non-zero search hits length";
116+
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
117+
ingestDocument.setFieldValue(targetField, enrichDocument);
119118
}
120119
handler.accept(ingestDocument, null);
121120
});
@@ -134,8 +133,16 @@ public String getType() {
134133
return EnrichProcessorFactory.TYPE;
135134
}
136135

137-
String getEnrichKey() {
138-
return enrichKey;
136+
String getField() {
137+
return field;
138+
}
139+
140+
public String getTargetField() {
141+
return targetField;
142+
}
143+
144+
public String getMatchField() {
145+
return matchField;
139146
}
140147

141148
boolean isIgnoreMissing() {
@@ -146,10 +153,6 @@ boolean isOverrideEnabled() {
146153
return overrideEnabled;
147154
}
148155

149-
List<EnrichSpecification> getSpecifications() {
150-
return specifications;
151-
}
152-
153156
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
154157
return (req, handler) -> {
155158
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(

0 commit comments

Comments
 (0)