diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 2cf1227151bf5..de77ede600ad6 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -907,8 +907,8 @@ PUT _ingest/pipeline/user_lookup { "enrich" : { "policy_name": "users-policy", - "enrich_key" : "email", - "targets": ["address", "city", "zip", "state"] + "field" : "email", + "target_field": "user" } } ] @@ -937,10 +937,15 @@ Which returns: "_seq_no": 55, "_primary_term": 1, "_source": { - "zip": 70116, - "address": "6649 N Blue Gum St", - "city": "New Orleans", - "state": "LA", + "user": { + "email": "mardy.brown@email.me", + "first_name": "Mardy", + "last_name": "Brown", + "zip": 70116, + "address": "6649 N Blue Gum St", + "city": "New Orleans", + "state": "LA" + }, "email": "mardy.brown@email.me" } } diff --git a/docs/reference/ingest/processors/enrich.asciidoc b/docs/reference/ingest/processors/enrich.asciidoc index b124149ef522b..94863c682498e 100644 --- a/docs/reference/ingest/processors/enrich.asciidoc +++ b/docs/reference/ingest/processors/enrich.asciidoc @@ -6,96 +6,16 @@ The `enrich` processor can enrich documents with data from another index. See <> section for more information how to set this up and check out the <> to get familiar with enrich policies and related APIs. -a + [[enrich-options]] .Enrich Options [options="header"] |====== | Name | Required | Default | Description | `policy_name` | yes | - | The name of the enrich policy to use. -| `enrich_key` | no | Policy enrich_key | The field to get the value from for the enrich lookup. -| `ignore_missing` | no | `false` | If `true` and `enrich_key` does not exist, the processor quietly exits without modifying the document +| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data. +| `target_field` | yes | - | The field that will be used for the enrichment data. +| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document | `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched. -| `targets` | no 1) | - | Describes what fields should be added to the document being indexed from the lookup document -| `set_from` | no 1) | - | Same as `targets`, but allows fields from the lookup document to added under a different name to the document being indexed include::common-options.asciidoc[] |====== - -1) Either `targets` or `set_from` must be specified. - -[[enrich-processor-set-from]] -==== Enrich `set_from` option - -This option should be used in the case that the field in the looked up document should be placed under -a different field in the document being ingested. - -The `set_from` accepts an array with two fields: -* `source` - The name of the field in the lookup document -* `target` - The name of the field in the document being ingested that should hold the source field's value. - -For example: - -////////////////////////// - -[source,js] --------------------------------------------------- -PUT /_enrich/policy/users-policy -{ - "type": "exact_match", - "indices": "users", - "match_field": "email", - "enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"] -} --------------------------------------------------- -// CONSOLE -// TEST - -////////////////////////// - -[source,js] --------------------------------------------------- -PUT _ingest/pipeline/user_lookup -{ - "description" : "Enriching user details to messages", - "processors" : [ - { - "enrich" : { - "policy_name": "users-policy", - "enrich_key" : "email", - "set_from": [ - { - "source": "address", - "target": "address-line-1" - }, - { - "source": "city", - "target": "residence" - }, - { - "source": "zip", - "target": "zipcode" - }, - { - "source": "state", - "target": "us_state" - } - ] - } - } - ] -} --------------------------------------------------- -// CONSOLE -// TEST[continued] - -////////////////////////// - -[source,js] --------------------------------------------------- -DELETE /_ingest/pipeline/user_lookup -DELETE /_enrich/policy/users-policy --------------------------------------------------- -// CONSOLE -// TEST[continued] - -////////////////////////// diff --git a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java index de139d11add75..d84b02a3cc6c2 100644 --- a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java +++ b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java @@ -54,10 +54,7 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception // Create pipeline Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline"); putPipelineRequest.setJsonEntity("{\"processors\":[" + - "{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" + - "{\"source\":\"globalRank\",\"target\":\"global_rank\"}," + - "{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" + - "]}}" + + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" + "]}"); assertOK(client().performRequest(putPipelineRequest)); @@ -70,11 +67,12 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception // Check if document has been enriched Request getRequest = new Request("GET", "/my-index/_doc/1"); Map response = toMap(client().performRequest(getRequest)); - Map _source = (Map) response.get("_source"); - assertThat(_source.size(), equalTo(3)); + Map _source = (Map) ((Map) response.get("_source")).get("entry"); + assertThat(_source.size(), equalTo(4)); assertThat(_source.get("host"), equalTo("elastic.co")); - assertThat(_source.get("global_rank"), equalTo(25)); - assertThat(_source.get("tld_rank"), equalTo(7)); + assertThat(_source.get("tld"), equalTo("co")); + assertThat(_source.get("globalRank"), equalTo(25)); + assertThat(_source.get("tldRank"), equalTo(7)); if (deletePipeilne) { // delete the pipeline so the policies can be deleted @@ -113,10 +111,7 @@ public void testDeleteExistingPipeline() throws Exception { Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline"); putPipelineRequest.setJsonEntity("{\"processors\":[" + - "{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" + - "{\"source\":\"globalRank\",\"target\":\"global_rank\"}," + - "{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" + - "]}}" + + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" + "]}"); assertOK(client().performRequest(putPipelineRequest)); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index bea3a4fae8611..e31e8a7afeebb 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -11,10 +11,8 @@ import org.elasticsearch.ingest.Processor; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.stream.Collectors; final class EnrichProcessorFactory implements Processor.Factory, Consumer { @@ -34,42 +32,15 @@ public Processor create(Map processorFactories, Strin throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); } - String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getMatchField()); + String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field"); boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false); boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true); - - final List specifications; - final List> setFromConfig = ConfigurationUtils.readOptionalList(TYPE, tag, config, "set_from"); - if (setFromConfig != null) { - if (setFromConfig.isEmpty()) { - throw new IllegalArgumentException("provided set_from is empty"); - } - - // TODO: Add templating support in enrich_values source and target options - specifications = setFromConfig.stream() - .map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target"))) - .collect(Collectors.toList()); - } else { - final List targetsConfig = ConfigurationUtils.readList(TYPE, tag, config, "targets"); - if (targetsConfig.isEmpty()) { - throw new IllegalArgumentException("provided targets is empty"); - } - - specifications = targetsConfig.stream() - .map(value -> new EnrichSpecification(value, value)) - .collect(Collectors.toList()); - } - - for (EnrichSpecification specification : specifications) { - if (policy.getEnrichFields().contains(specification.sourceField) == false) { - throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" + - policyName + "]"); - } - } + String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");; switch (policy.getType()) { case EnrichPolicy.EXACT_MATCH_TYPE: - return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications); + return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(), + ignoreMissing, overrideEnabled); default: throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]"); } @@ -88,15 +59,4 @@ public void accept(ClusterState state) { policies = enrichMetadata.getPolicies(); } - static final class EnrichSpecification { - - final String sourceField; - final String targetField; - - EnrichSpecification(String sourceField, String targetField) { - this.sourceField = sourceField; - this.targetField = targetField; - } - } - } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java index 03a9db6e078df..e84e36aca9572 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java @@ -16,10 +16,8 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; -import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -28,61 +26,65 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor { static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field"; private final BiConsumer> searchRunner; - private final String enrichKey; + private final String field; + private final String targetField; + private final String matchField; private final boolean ignoreMissing; private final boolean overrideEnabled; - private final List specifications; ExactMatchProcessor(String tag, Client client, String policyName, - String enrichKey, + String field, + String targetField, + String matchField, boolean ignoreMissing, - boolean overrideEnabled, - List specifications) { + boolean overrideEnabled) { this( tag, createSearchRunner(client), policyName, - enrichKey, - ignoreMissing, - overrideEnabled, - specifications + field, + targetField, + matchField, ignoreMissing, + overrideEnabled ); } ExactMatchProcessor(String tag, BiConsumer> searchRunner, String policyName, - String enrichKey, + String field, + String targetField, + String matchField, boolean ignoreMissing, - boolean overrideEnabled, - List specifications) { + boolean overrideEnabled) { super(tag, policyName); this.searchRunner = searchRunner; - this.enrichKey = enrichKey; + this.field = field; + this.targetField = targetField; + this.matchField = matchField; this.ignoreMissing = ignoreMissing; this.overrideEnabled = overrideEnabled; - this.specifications = specifications; } @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { try { // If a document does not have the enrich key, return the unchanged document - final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); + final String value = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (value == null) { handler.accept(ingestDocument, null); return; } - TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value); + TermQueryBuilder termQuery = new TermQueryBuilder(matchField, value); ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); searchBuilder.from(0); searchBuilder.size(1); searchBuilder.trackScores(false); - searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null); + searchBuilder.fetchSource(true); searchBuilder.query(constantScore); SearchRequest req = new SearchRequest(); @@ -104,18 +106,15 @@ public void execute(IngestDocument ingestDocument, BiConsumer 1) { - handler.accept(null, new IllegalStateException("more than one doc id matching for [" + enrichKey + "]")); + handler.accept(null, new IllegalStateException("more than one doc id matching for [" + matchField + "]")); return; } // If a document is returned, add its fields to the document Map enrichDocument = searchHits[0].getSourceAsMap(); - assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length"; - for (EnrichSpecification specification : specifications) { - Object enrichFieldValue = enrichDocument.get(specification.sourceField); - if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) { - ingestDocument.setFieldValue(specification.targetField, enrichFieldValue); - } + assert enrichDocument != null : "enrich document for id [" + field + "] was empty despite non-zero search hits length"; + if (overrideEnabled || ingestDocument.hasField(targetField) == false) { + ingestDocument.setFieldValue(targetField, enrichDocument); } handler.accept(ingestDocument, null); }); @@ -134,8 +133,16 @@ public String getType() { return EnrichProcessorFactory.TYPE; } - String getEnrichKey() { - return enrichKey; + String getField() { + return field; + } + + public String getTargetField() { + return targetField; + } + + public String getMatchField() { + return matchField; } boolean isIgnoreMissing() { @@ -146,10 +153,6 @@ boolean isOverrideEnabled() { return overrideEnabled; } - List getSpecifications() { - return specifications; - } - private static BiConsumer> createSearchRunner(Client client) { return (req, handler) -> { client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap( diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java index 2fd0cc59f4e90..10a6515767ec7 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java @@ -245,6 +245,10 @@ protected MultiSearchResponse shardOperation(Request request, ShardId shardId) t } private static BytesReference filterSource(FetchSourceContext fetchSourceContext, BytesReference source) throws IOException { + if (fetchSourceContext.includes().length == 0 && fetchSourceContext.excludes().length == 0) { + return source; + } + Set includes = Set.of(fetchSourceContext.includes()); Set excludes = Set.of(fetchSourceContext.excludes()); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index de932450e5225..80b0d2a235b87 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -28,10 +28,11 @@ import java.util.Set; import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.DECORATE_FIELDS; -import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.KEY_FIELD; +import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD; import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class BasicEnrichTests extends ESSingleNodeTestCase { @@ -46,17 +47,14 @@ public void testIngestDataWithEnrichProcessor() { String policyName = "my-policy"; EnrichPolicy enrichPolicy = - new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), KEY_FIELD, List.of(DECORATE_FIELDS)); + new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS)); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); String pipelineName = "my-pipeline"; String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + - "\", \"set_from\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," + - "{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," + - "{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" + - "]}}]}"; + "\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"user\"}}]}"; PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); @@ -65,7 +63,7 @@ public void testIngestDataWithEnrichProcessor() { IndexRequest indexRequest = new IndexRequest(); indexRequest.id(Integer.toString(i)); indexRequest.setPipeline(pipelineName); - indexRequest.source(Map.of(KEY_FIELD, keys.get(i))); + indexRequest.source(Map.of(MATCH_FIELD, keys.get(i))); bulkRequest.add(indexRequest); } BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); @@ -74,11 +72,14 @@ public void testIngestDataWithEnrichProcessor() { for (int i = 0; i < numDocs; i++) { GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet(); Map source = getResponse.getSourceAsMap(); - assertThat(source.size(), equalTo(1 + DECORATE_FIELDS.length)); + Map userEntry = (Map) source.get("user"); + assertThat(userEntry, notNullValue()); + assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1)); for (int j = 0; j < 3; j++) { String field = DECORATE_FIELDS[j]; - assertThat(source.get(field), equalTo(keys.get(i) + j)); + assertThat(userEntry.get(field), equalTo(keys.get(i) + j)); } + assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true)); } } @@ -100,8 +101,7 @@ public void testMultiplePolicies() { String pipelineName = "pipeline" + i; String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + - "\", \"set_from\": [{\"source\": \"value\", \"target\": \"value\"}" + - "]}}]}"; + "\", \"field\": \"key\", \"target_field\": \"target\"}}]}"; PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); } @@ -121,7 +121,7 @@ public void testMultiplePolicies() { GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet(); Map source = getResponse.getSourceAsMap(); assertThat(source.size(), equalTo(2)); - assertThat(source.get("value"), equalTo("val" + i)); + assertThat(source.get("target"), equalTo(Map.of("key", "key", "value", "val" + i))); } } @@ -136,7 +136,7 @@ private List createSourceIndex(int numDocs) { IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME); indexRequest.create(true); indexRequest.id(key); - indexRequest.source(Map.of(KEY_FIELD, key, DECORATE_FIELDS[0], key + "0", + indexRequest.source(Map.of(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0", DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2")); client().index(indexRequest).actionGet(); } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index a2c17a2e6026b..00f8d1b25c9a9 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -44,7 +44,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase { static final String POLICY_NAME = "my-policy"; private static final String PIPELINE_NAME = "my-pipeline"; static final String SOURCE_INDEX_NAME = "users"; - static final String KEY_FIELD = "email"; + static final String MATCH_FIELD = "email"; static final String[] DECORATE_FIELDS = new String[]{"address", "city", "country"}; @Override @@ -61,7 +61,7 @@ public void testEnrichAPIs() { for (int i = 0; i < numPolicies; i++) { String policyName = POLICY_NAME + i; EnrichPolicy enrichPolicy = - new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), KEY_FIELD, List.of(DECORATE_FIELDS)); + new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS)); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); @@ -120,7 +120,7 @@ private static void enrich(List keys, String coordinatingNode) { IndexRequest indexRequest = new IndexRequest(); indexRequest.id(Integer.toString(i)); indexRequest.setPipeline(PIPELINE_NAME); - indexRequest.source(Map.of(KEY_FIELD, randomFrom(keys))); + indexRequest.source(Map.of(MATCH_FIELD, randomFrom(keys))); bulkRequest.add(indexRequest); } BulkResponse bulkResponse = client(coordinatingNode).bulk(bulkRequest).actionGet(); @@ -129,9 +129,11 @@ private static void enrich(List keys, String coordinatingNode) { for (int i = 0; i < numDocs; i++) { GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet(); Map source = getResponse.getSourceAsMap(); - assertThat(source.size(), equalTo(1 + DECORATE_FIELDS.length)); + Map userEntry = (Map) source.get("user"); + assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1)); + assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true)); for (String field : DECORATE_FIELDS) { - assertThat(source.get(field), notNullValue()); + assertThat(userEntry.get(field), notNullValue()); } } } @@ -147,7 +149,7 @@ private static List createSourceIndex(int numDocs) { IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME); indexRequest.create(true); indexRequest.id(key); - indexRequest.source(Map.of(KEY_FIELD, key, DECORATE_FIELDS[0], randomAlphaOfLength(4), + indexRequest.source(Map.of(MATCH_FIELD, key, DECORATE_FIELDS[0], randomAlphaOfLength(4), DECORATE_FIELDS[1], randomAlphaOfLength(4), DECORATE_FIELDS[2], randomAlphaOfLength(4))); client().index(indexRequest).actionGet(); } @@ -157,7 +159,7 @@ private static List createSourceIndex(int numDocs) { private static void createAndExecutePolicy() { EnrichPolicy enrichPolicy = - new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), KEY_FIELD, List.of(DECORATE_FIELDS)); + new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS)); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet(); @@ -165,10 +167,7 @@ private static void createAndExecutePolicy() { private static void createPipeline() { String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + POLICY_NAME + - "\", \"set_from\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," + - "{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," + - "{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" + - "]}}]}"; + "\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"user\"}}]}"; PutPipelineRequest request = new PutPipelineRequest(PIPELINE_NAME, new BytesArray(pipelineBody), XContentType.JSON); client().admin().cluster().putPipeline(request).actionGet(); } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 122f8d503f911..9688ddc0d1501 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -41,7 +41,8 @@ public void testUpdatePolicyOnly() { assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1)); - String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"targets\": [\"field1\"]}}]}"; + String pipelineConfig = + "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}"; PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON); assertAcked(client().admin().cluster().putPipeline(putPipelineRequest).actionGet()); Pipeline pipelineInstance1 = ingestService.getPipeline("1"); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java index 5d0f86a1dba2c..5df148ed485ff 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; import java.util.ArrayList; import java.util.Collections; @@ -32,7 +31,8 @@ public void testCreateProcessorInstance() throws Exception { Map config = new HashMap<>(); config.put("policy_name", "majestic"); - config.put("enrich_key", "host"); + config.put("field", "host"); + config.put("target_field", "entry"); boolean keyIgnoreMissing = randomBoolean(); if (keyIgnoreMissing || randomBoolean()) { config.put("ignore_missing", keyIgnoreMissing); @@ -49,29 +49,18 @@ public void testCreateProcessorInstance() throws Exception { randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4))); } - List> valuesConfig = new ArrayList<>(numRandomValues); - for (Tuple tuple : randomValues) { - valuesConfig.add(Map.of("source", tuple.v1(), "target", tuple.v2())); - } - config.put("set_from", valuesConfig); - ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config); assertThat(result, notNullValue()); assertThat(result.getPolicyName(), equalTo("majestic")); - assertThat(result.getEnrichKey(), equalTo("host")); + assertThat(result.getField(), equalTo("host")); + assertThat(result.getTargetField(), equalTo("entry")); + assertThat(result.getMatchField(), equalTo("my_key")); assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing)); if (overrideEnabled != null) { assertThat(result.isOverrideEnabled(), is(overrideEnabled)); } else { assertThat(result.isOverrideEnabled(), is(true)); } - assertThat(result.getSpecifications().size(), equalTo(numRandomValues)); - for (int i = 0; i < numRandomValues; i++) { - EnrichSpecification actual = result.getSpecifications().get(i); - Tuple expected = randomValues.get(i); - assertThat(actual.sourceField, equalTo(expected.v1())); - assertThat(actual.targetField, equalTo(expected.v2())); - } } public void testPolicyDoesNotExist() { @@ -140,45 +129,17 @@ public void testUnsupportedPolicy() { Map config = new HashMap<>(); config.put("policy_name", "majestic"); - config.put("enrich_key", "host"); + config.put("field", "host"); + config.put("target_field", "entry"); boolean keyIgnoreMissing = randomBoolean(); if (keyIgnoreMissing || randomBoolean()) { config.put("ignore_missing", keyIgnoreMissing); } - int numRandomValues = randomIntBetween(1, 8); - List> randomValues = new ArrayList<>(numRandomValues); - for (int i = 0; i < numRandomValues; i++) { - randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4))); - } - - List> valuesConfig = new ArrayList<>(numRandomValues); - for (Tuple tuple : randomValues) { - valuesConfig.add(Map.of("source", tuple.v1(), "target", tuple.v2())); - } - config.put("set_from", valuesConfig); - Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config)); assertThat(e.getMessage(), equalTo("unsupported policy type [unsupported]")); } - public void testNonExistingDecorateField() { - List enrichValues = List.of("globalRank", "tldRank", "tld"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key", - enrichValues); - EnrichProcessorFactory factory = new EnrichProcessorFactory(null); - factory.policies = Map.of("majestic", policy); - - Map config = new HashMap<>(); - config.put("policy_name", "majestic"); - config.put("enrich_key", "host"); - List> valuesConfig = List.of(Map.of("source", "rank", "target", "rank")); - config.put("set_from", valuesConfig); - - Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config)); - assertThat(e.getMessage(), equalTo("source field [rank] does not exist in policy [majestic]")); - } - public void testCompactEnrichValuesFormat() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host", @@ -188,22 +149,17 @@ public void testCompactEnrichValuesFormat() throws Exception { Map config = new HashMap<>(); config.put("policy_name", "majestic"); - config.put("targets", enrichValues); + config.put("field", "host"); + config.put("target_field", "entry"); ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config); assertThat(result, notNullValue()); assertThat(result.getPolicyName(), equalTo("majestic")); - assertThat(result.getEnrichKey(), equalTo("host")); - assertThat(result.getSpecifications().size(), equalTo(enrichValues.size())); - for (int i = 0; i < enrichValues.size(); i++) { - EnrichSpecification actual = result.getSpecifications().get(i); - String expected = enrichValues.get(i); - assertThat(actual.sourceField, equalTo(expected)); - assertThat(actual.targetField, equalTo(expected)); - } + assertThat(result.getField(), equalTo("host")); + assertThat(result.getTargetField(), equalTo("entry")); } - public void testNoEnrichValues() throws Exception { + public void testNoTargetField() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host", enrichValues); @@ -212,16 +168,10 @@ public void testNoEnrichValues() throws Exception { Map config1 = new HashMap<>(); config1.put("policy_name", "majestic"); - config1.put("set_from", List.of()); - - Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config1)); - assertThat(e.getMessage(), equalTo("provided set_from is empty")); + config1.put("field", "host"); - Map config2 = new HashMap<>(); - config2.put("policy_name", "majestic"); - config2.put("targets", List.of()); - e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config2)); - assertThat(e.getMessage(), equalTo("provided targets is empty")); + Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(Collections.emptyMap(), "_tag", config1)); + assertThat(e.getMessage(), equalTo("[target_field] required property is missing")); } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java index 3790f843ceaae..e2869c4d93b78 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java @@ -17,7 +17,7 @@ import java.util.Optional; import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.DECORATE_FIELDS; -import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.KEY_FIELD; +import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD; import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.POLICY_NAME; import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME; import static org.hamcrest.Matchers.equalTo; @@ -36,7 +36,7 @@ public void testRestart() throws Exception { internalCluster().startNode(); EnrichPolicy enrichPolicy = - new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), KEY_FIELD, List.of(DECORATE_FIELDS)); + new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS)); for (int i = 0; i < numPolicies; i++) { String policyName = POLICY_NAME + i; PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java index 839aead0c142a..4fb5b033fecbe 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java @@ -27,18 +27,16 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -import static org.hamcrest.Matchers.arrayContainingInAnyOrder; +import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; @@ -48,8 +46,7 @@ public class ExactMatchProcessorTests extends ESTestCase { public void testBasics() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true, - List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.co")); // Run @@ -64,21 +61,24 @@ public void testBasics() throws Exception { assertThat(request.source().size(), equalTo(1)); assertThat(request.source().trackScores(), equalTo(false)); assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); - assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld")); + assertThat(request.source().fetchSource().excludes(), emptyArray()); + assertThat(request.source().fetchSource().includes(), emptyArray()); assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class)); TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); assertThat(termQueryBuilder.fieldName(), equalTo("domain")); assertThat(termQueryBuilder.value(), equalTo("elastic.co")); // Check result - assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23)); - assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co")); + Map entry = ingestDocument.getFieldValue("entry", Map.class); + assertThat(entry.size(), equalTo(3)); + assertThat(entry.get("globalRank"), equalTo(451)); + assertThat(entry.get("tldRank"), equalTo(23)); + assertThat(entry.get("tld"), equalTo("co")); } public void testNoMatch() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true, - List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.com")); int numProperties = ingestDocument.getSourceAndMetadata().size(); @@ -94,7 +94,8 @@ public void testNoMatch() throws Exception { assertThat(request.source().size(), equalTo(1)); assertThat(request.source().trackScores(), equalTo(false)); assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); - assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld")); + assertThat(request.source().fetchSource().includes(), emptyArray()); + assertThat(request.source().fetchSource().excludes(), emptyArray()); assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class)); TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); @@ -107,8 +108,7 @@ public void testNoMatch() throws Exception { public void testSearchFailure() throws Exception { String indexName = ".enrich-_name"; MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName)); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true, - List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.com")); // Run @@ -129,7 +129,8 @@ public void testSearchFailure() throws Exception { assertThat(request.source().size(), equalTo(1)); assertThat(request.source().trackScores(), equalTo(false)); assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); - assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld")); + assertThat(request.source().fetchSource().includes(), emptyArray()); + assertThat(request.source().fetchSource().excludes(), emptyArray()); assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class)); TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); @@ -141,8 +142,8 @@ public void testSearchFailure() throws Exception { public void testIgnoreKeyMissing() throws Exception { { - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", - true, true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); @@ -152,8 +153,8 @@ public void testIgnoreKeyMissing() throws Exception { assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); } { - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", - false, true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + ExactMatchProcessor processor = + new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); IngestDocument[] resultHolder = new IngestDocument[1]; Exception[] exceptionHolder = new Exception[1]; @@ -169,8 +170,7 @@ public void testIgnoreKeyMissing() throws Exception { public void testExistingFieldWithOverrideDisabled() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false, - List.of(new EnrichSpecification("tld", "tld"))); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(Map.of("domain", "elastic.co", "tld", "tld")), Map.of()); IngestDocument[] resultHolder = new IngestDocument[1]; @@ -186,8 +186,7 @@ public void testExistingFieldWithOverrideDisabled() throws Exception { public void testExistingNullFieldWithOverrideDisabled() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false, - List.of(new EnrichSpecification("tld", "tld"))); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false); Map source = new HashMap<>(); source.put("domain", "elastic.co");