Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,8 @@ PUT _ingest/pipeline/user_lookup
{
"enrich" : {
"policy_name": "users-policy",
"enrich_key" : "email",
"targets": ["address", "city", "zip", "state"]
"field" : "email",
"target_field": "user"
}
}
]
Expand Down Expand Up @@ -937,10 +937,15 @@ Which returns:
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"zip": 70116,
"address": "6649 N Blue Gum St",
"city": "New Orleans",
"state": "LA",
"user": {
"email": "[email protected]",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"address": "6649 N Blue Gum St",
"city": "New Orleans",
"state": "LA"
},
"email": "[email protected]"
}
}
Expand Down
88 changes: 4 additions & 84 deletions docs/reference/ingest/processors/enrich.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,96 +6,16 @@
The `enrich` processor can enrich documents with data from another index.
See <<ingest-enriching-data,enrich data>> section for more information how to set this up and
check out the <<enrich-processor-getting-started,getting started>> to get familiar with enrich policies and related APIs.
a

[[enrich-options]]
.Enrich Options
[options="header"]
|======
| Name | Required | Default | Description
| `policy_name` | yes | - | The name of the enrich policy to use.
| `enrich_key` | no | Policy enrich_key | The field to get the value from for the enrich lookup.
| `ignore_missing` | no | `false` | If `true` and `enrich_key` does not exist, the processor quietly exits without modifying the document
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data.
| `target_field` | yes | - | The field that will be used for the enrichment data.
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `targets` | no 1) | - | Describes what fields should be added to the document being indexed from the lookup document
| `set_from` | no 1) | - | Same as `targets`, but allows fields from the lookup document to added under a different name to the document being indexed
include::common-options.asciidoc[]
|======

1) Either `targets` or `set_from` must be specified.

[[enrich-processor-set-from]]
==== Enrich `set_from` option

This option should be used in the case that the field in the looked up document should be placed under
a different field in the document being ingested.

The `set_from` accepts an array with two fields:
* `source` - The name of the field in the lookup document
* `target` - The name of the field in the document being ingested that should hold the source field's value.

For example:

//////////////////////////

[source,js]
--------------------------------------------------
PUT /_enrich/policy/users-policy
{
"type": "exact_match",
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
}
--------------------------------------------------
// CONSOLE
// TEST

//////////////////////////

[source,js]
--------------------------------------------------
PUT _ingest/pipeline/user_lookup
{
"description" : "Enriching user details to messages",
"processors" : [
{
"enrich" : {
"policy_name": "users-policy",
"enrich_key" : "email",
"set_from": [
{
"source": "address",
"target": "address-line-1"
},
{
"source": "city",
"target": "residence"
},
{
"source": "zip",
"target": "zipcode"
},
{
"source": "state",
"target": "us_state"
}
]
}
}
]
}
--------------------------------------------------
// CONSOLE
// TEST[continued]

//////////////////////////

[source,js]
--------------------------------------------------
DELETE /_ingest/pipeline/user_lookup
DELETE /_enrich/policy/users-policy
--------------------------------------------------
// CONSOLE
// TEST[continued]

//////////////////////////
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception
// Create pipeline
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline");
putPipelineRequest.setJsonEntity("{\"processors\":[" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
"]}}" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" +
"]}");
assertOK(client().performRequest(putPipelineRequest));

Expand All @@ -70,11 +67,12 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception
// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
Map<?, ?> _source = (Map<?, ?>) response.get("_source");
assertThat(_source.size(), equalTo(3));
Map<?, ?> _source = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
assertThat(_source.size(), equalTo(4));
assertThat(_source.get("host"), equalTo("elastic.co"));
assertThat(_source.get("global_rank"), equalTo(25));
assertThat(_source.get("tld_rank"), equalTo(7));
assertThat(_source.get("tld"), equalTo("co"));
assertThat(_source.get("globalRank"), equalTo(25));
assertThat(_source.get("tldRank"), equalTo(7));

if (deletePipeilne) {
// delete the pipeline so the policies can be deleted
Expand Down Expand Up @@ -113,10 +111,7 @@ public void testDeleteExistingPipeline() throws Exception {

Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
putPipelineRequest.setJsonEntity("{\"processors\":[" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
"]}}" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" +
"]}");
assertOK(client().performRequest(putPipelineRequest));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

final class EnrichProcessorFactory implements Processor.Factory, Consumer<ClusterState> {

Expand All @@ -34,42 +32,15 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
}

String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getMatchField());
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);

final List<EnrichSpecification> specifications;
final List<Map<?, ?>> setFromConfig = ConfigurationUtils.readOptionalList(TYPE, tag, config, "set_from");
if (setFromConfig != null) {
if (setFromConfig.isEmpty()) {
throw new IllegalArgumentException("provided set_from is empty");
}

// TODO: Add templating support in enrich_values source and target options
specifications = setFromConfig.stream()
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
.collect(Collectors.toList());
} else {
final List<String> targetsConfig = ConfigurationUtils.readList(TYPE, tag, config, "targets");
if (targetsConfig.isEmpty()) {
throw new IllegalArgumentException("provided targets is empty");
}

specifications = targetsConfig.stream()
.map(value -> new EnrichSpecification(value, value))
.collect(Collectors.toList());
}

for (EnrichSpecification specification : specifications) {
if (policy.getEnrichFields().contains(specification.sourceField) == false) {
throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" +
policyName + "]");
}
}
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;

switch (policy.getType()) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications);
return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(),
ignoreMissing, overrideEnabled);
default:
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
}
Expand All @@ -88,15 +59,4 @@ public void accept(ClusterState state) {
policies = enrichMetadata.getPolicies();
}

static final class EnrichSpecification {

final String sourceField;
final String targetField;

EnrichSpecification(String sourceField, String targetField) {
this.sourceField = sourceField;
this.targetField = targetField;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;

import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

Expand All @@ -28,61 +26,65 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";

private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String enrichKey;
private final String field;
private final String targetField;
private final String matchField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
private final List<EnrichSpecification> specifications;

ExactMatchProcessor(String tag,
Client client,
String policyName,
String enrichKey,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
List<EnrichSpecification> specifications) {
boolean overrideEnabled) {
this(
tag,
createSearchRunner(client),
policyName,
enrichKey,
ignoreMissing,
overrideEnabled,
specifications
field,
targetField,
matchField, ignoreMissing,
overrideEnabled
);
}

ExactMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String enrichKey,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
List<EnrichSpecification> specifications) {
boolean overrideEnabled) {
super(tag, policyName);
this.searchRunner = searchRunner;
this.enrichKey = enrichKey;
this.field = field;
this.targetField = targetField;
this.matchField = matchField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.specifications = specifications;
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
// If a document does not have the enrich key, return the unchanged document
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
final String value = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (value == null) {
handler.accept(ingestDocument, null);
return;
}

TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value);
TermQueryBuilder termQuery = new TermQueryBuilder(matchField, value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(1);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);

SearchRequest req = new SearchRequest();
Expand All @@ -104,18 +106,15 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
handler.accept(ingestDocument, null);
return;
} else if (searchHits.length > 1) {
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + enrichKey + "]"));
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + matchField + "]"));
return;
}

// If a document is returned, add its fields to the document
Map<String, Object> enrichDocument = searchHits[0].getSourceAsMap();
assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
for (EnrichSpecification specification : specifications) {
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) {
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
}
assert enrichDocument != null : "enrich document for id [" + field + "] was empty despite non-zero search hits length";
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
ingestDocument.setFieldValue(targetField, enrichDocument);
}
handler.accept(ingestDocument, null);
});
Expand All @@ -134,8 +133,16 @@ public String getType() {
return EnrichProcessorFactory.TYPE;
}

String getEnrichKey() {
return enrichKey;
String getField() {
return field;
}

public String getTargetField() {
return targetField;
}

public String getMatchField() {
return matchField;
}

boolean isIgnoreMissing() {
Expand All @@ -146,10 +153,6 @@ boolean isOverrideEnabled() {
return overrideEnabled;
}

List<EnrichSpecification> getSpecifications() {
return specifications;
}

private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
Expand Down
Loading