diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java index 3dfe46510623e..c11bd6e9a68c9 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java @@ -5,11 +5,6 @@ */ package org.elasticsearch.xpack.enrich; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Semaphore; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -34,11 +29,16 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Semaphore; + public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener { private static final Logger logger = LogManager.getLogger(EnrichPolicyMaintenanceService.class); - private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME; + private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME; private static final IndicesOptions IGNORE_UNAVAILABLE = IndicesOptions.fromOptions(true, false, false, false); private final Settings settings; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 9783024e10a0f..2d066f68d4598 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -5,14 +5,6 @@ */ package org.elasticsearch.xpack.enrich; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.LongSupplier; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; @@ -51,13 +43,21 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.LongSupplier; public class EnrichPolicyRunner implements Runnable { private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class); - static final String ENRICH_POLICY_FIELD_NAME = "enrich_policy"; + static final String ENRICH_POLICY_NAME_FIELD_NAME = "enrich_policy_name"; + static final String ENRICH_POLICY_TYPE_FIELD_NAME = "enrich_policy_type"; + static final String ENRICH_MATCH_FIELD_NAME = "enrich_match_field"; private final String policyName; private final EnrichPolicy policy; @@ -216,8 +216,9 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { .endObject() .endObject() .startObject("_meta") - .field(ENRICH_POLICY_FIELD_NAME, policyName) - .field(ENRICH_KEY_FIELD_NAME, policy.getMatchField()) + .field(ENRICH_POLICY_NAME_FIELD_NAME, policyName) + .field(ENRICH_MATCH_FIELD_NAME, policy.getMatchField()) + .field(ENRICH_POLICY_TYPE_FIELD_NAME, policy.getType()) .endObject() .endObject() .endObject(); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index e31e8a7afeebb..5194d6a34c469 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -7,6 +7,10 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.Processor; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -18,7 +22,8 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer policies = Map.of(); + + volatile MetaData metaData; EnrichProcessorFactory(Client client) { this.client = client; @@ -27,36 +32,37 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer processorFactories, String tag, Map config) throws Exception { String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name"); - EnrichPolicy policy = policies.get(policyName); - if (policy == null) { - throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); + String policyAlias = EnrichPolicy.getBaseName(policyName); + AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(policyAlias); + if (aliasOrIndex == null) { + throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]"); } + assert aliasOrIndex.isAlias(); + assert aliasOrIndex.getIndices().size() == 1; + IndexMetaData imd = aliasOrIndex.getIndices().get(0); String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field"); + Map mappingAsMap = imd.mapping().sourceAsMap(); + String policyType = + (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME, mappingAsMap); + String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false); boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true); String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");; - switch (policy.getType()) { + switch (policyType) { case EnrichPolicy.EXACT_MATCH_TYPE: - return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(), + return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField, ignoreMissing, overrideEnabled); default: - throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]"); + throw new IllegalArgumentException("unsupported policy type [" + policyType + "]"); } } @Override public void accept(ClusterState state) { - final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE); - if (enrichMetadata == null) { - return; - } - if (policies.equals(enrichMetadata.getPolicies())) { - return; - } - - policies = enrichMetadata.getPolicies(); + metaData = state.getMetaData(); } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java index 62aa7ec8778b6..c16cb729022f0 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java @@ -150,7 +150,7 @@ private String fakeRunPolicy(String forPolicy) throws IOException { .startObject() .startObject(MapperService.SINGLE_MAPPING_NAME) .startObject("_meta") - .field(EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME, forPolicy) + .field(EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME, forPolicy) .endObject() .endObject() .endObject() diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 9688ddc0d1501..6f21b2f9025b2 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -8,38 +8,39 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import java.util.Collection; import java.util.List; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return List.of(LocalStateEnrich.class); + return List.of(LocalStateEnrich.class, ReindexPlugin.class); } public void testUpdatePolicyOnly() { IngestService ingestService = getInstanceFromNode(IngestService.class); - EnrichProcessorFactory enrichProcessorFactory = - (EnrichProcessorFactory) ingestService.getProcessorFactories().get(EnrichProcessorFactory.TYPE); + createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword"); EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("index"), "key1", List.of("field1")); PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1); assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); - assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1)); + assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet()); String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}"; diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java index 5df148ed485ff..0c03905445791 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java @@ -6,10 +6,16 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -27,7 +33,7 @@ public void testCreateProcessorInstance() throws Exception { EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues); EnrichProcessorFactory factory = new EnrichProcessorFactory(null); - factory.policies = Map.of("majestic", policy); + factory.metaData = createMetaData("majestic", policy); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -66,6 +72,7 @@ public void testCreateProcessorInstance() throws Exception { public void testPolicyDoesNotExist() { List enrichValues = List.of("globalRank", "tldRank", "tld"); EnrichProcessorFactory factory = new EnrichProcessorFactory(null); + factory.metaData = MetaData.builder().build(); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -88,15 +95,12 @@ public void testPolicyDoesNotExist() { config.put("set_from", valuesConfig); Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config)); - assertThat(e.getMessage(), equalTo("policy [majestic] does not exists")); + assertThat(e.getMessage(), equalTo("no enrich index exists for policy with name [majestic]")); } public void testPolicyNameMissing() { List enrichValues = List.of("globalRank", "tldRank", "tld"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key", - enrichValues); EnrichProcessorFactory factory = new EnrichProcessorFactory(null); - factory.policies = Map.of("_name", policy); Map config = new HashMap<>(); config.put("enrich_key", "host"); @@ -121,11 +125,11 @@ public void testPolicyNameMissing() { assertThat(e.getMessage(), equalTo("[policy_name] required property is missing")); } - public void testUnsupportedPolicy() { + public void testUnsupportedPolicy() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); EnrichPolicy policy = new EnrichPolicy("unsupported", null, List.of("source_index"), "my_key", enrichValues); EnrichProcessorFactory factory = new EnrichProcessorFactory(null); - factory.policies = Map.of("majestic", policy); + factory.metaData = createMetaData("majestic", policy); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -145,7 +149,7 @@ public void testCompactEnrichValuesFormat() throws Exception { EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host", enrichValues); EnrichProcessorFactory factory = new EnrichProcessorFactory(null); - factory.policies = Map.of("majestic", policy); + factory.metaData = createMetaData("majestic", policy); Map config = new HashMap<>(); config.put("policy_name", "majestic"); @@ -164,7 +168,7 @@ public void testNoTargetField() throws Exception { EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host", enrichValues); EnrichProcessorFactory factory = new EnrichProcessorFactory(null); - factory.policies = Map.of("majestic", policy); + factory.metaData = createMetaData("majestic", policy); Map config1 = new HashMap<>(); config1.put("policy_name", "majestic"); @@ -174,4 +178,18 @@ public void testNoTargetField() throws Exception { assertThat(e.getMessage(), equalTo("[target_field] required property is missing")); } + static MetaData createMetaData(String name, EnrichPolicy policy) throws IOException { + Settings settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + IndexMetaData.Builder builder = IndexMetaData.builder(EnrichPolicy.getBaseName(name) + "-1"); + builder.settings(settings); + builder.putMapping("_doc", "{\"_meta\": {\"enrich_match_field\": \"" + policy.getMatchField() + + "\", \"enrich_policy_type\": \"" + policy.getType() + "\"}}"); + builder.putAlias(AliasMetaData.builder(EnrichPolicy.getBaseName(name)).build()); + return MetaData.builder().put(builder).build(); + } + }