Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
*/
package org.elasticsearch.xpack.enrich;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
Expand All @@ -34,11 +29,16 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;

public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {

private static final Logger logger = LogManager.getLogger(EnrichPolicyMaintenanceService.class);

private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME;
private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME;
private static final IndicesOptions IGNORE_UNAVAILABLE = IndicesOptions.fromOptions(true, false, false, false);

private final Settings settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@
*/
package org.elasticsearch.xpack.enrich;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -51,13 +43,21 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

public class EnrichPolicyRunner implements Runnable {

private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);

static final String ENRICH_POLICY_FIELD_NAME = "enrich_policy";
static final String ENRICH_POLICY_NAME_FIELD_NAME = "enrich_policy_name";
static final String ENRICH_POLICY_TYPE_FIELD_NAME = "enrich_policy_type";
static final String ENRICH_MATCH_FIELD_NAME = "enrich_match_field";

private final String policyName;
private final EnrichPolicy policy;
Expand Down Expand Up @@ -216,8 +216,9 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
.endObject()
.endObject()
.startObject("_meta")
.field(ENRICH_POLICY_FIELD_NAME, policyName)
.field(ENRICH_KEY_FIELD_NAME, policy.getMatchField())
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
.field(ENRICH_MATCH_FIELD_NAME, policy.getMatchField())
.field(ENRICH_POLICY_TYPE_FIELD_NAME, policy.getType())
.endObject()
.endObject()
.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
Expand All @@ -18,7 +22,8 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste

static final String TYPE = "enrich";
private final Client client;
volatile Map<String, EnrichPolicy> policies = Map.of();

volatile MetaData metaData;

EnrichProcessorFactory(Client client) {
this.client = client;
Expand All @@ -27,36 +32,37 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
@Override
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception {
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name");
EnrichPolicy policy = policies.get(policyName);
if (policy == null) {
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
String policyAlias = EnrichPolicy.getBaseName(policyName);
AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(policyAlias);
if (aliasOrIndex == null) {
throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]");
}
assert aliasOrIndex.isAlias();
assert aliasOrIndex.getIndices().size() == 1;
IndexMetaData imd = aliasOrIndex.getIndices().get(0);

String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
String policyType =
(String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME, mappingAsMap);
String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap);

boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;

switch (policy.getType()) {
switch (policyType) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(),
return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField,
ignoreMissing, overrideEnabled);
default:
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
}
}

@Override
public void accept(ClusterState state) {
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
if (enrichMetadata == null) {
return;
}
if (policies.equals(enrichMetadata.getPolicies())) {
return;
}

policies = enrichMetadata.getPolicies();
metaData = state.getMetaData();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private String fakeRunPolicy(String forPolicy) throws IOException {
.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.startObject("_meta")
.field(EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME, forPolicy)
.field(EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME, forPolicy)
.endObject()
.endObject()
.endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,39 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(LocalStateEnrich.class);
return List.of(LocalStateEnrich.class, ReindexPlugin.class);
}

public void testUpdatePolicyOnly() {
IngestService ingestService = getInstanceFromNode(IngestService.class);
EnrichProcessorFactory enrichProcessorFactory =
(EnrichProcessorFactory) ingestService.getProcessorFactories().get(EnrichProcessorFactory.TYPE);
createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword");

EnrichPolicy instance1 =
new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("index"), "key1", List.of("field1"));
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1));
assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet());

String pipelineConfig =
"{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -27,7 +33,7 @@ public void testCreateProcessorInstance() throws Exception {
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Map.of("majestic", policy);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
Expand Down Expand Up @@ -66,6 +72,7 @@ public void testCreateProcessorInstance() throws Exception {
public void testPolicyDoesNotExist() {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = MetaData.builder().build();

Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
Expand All @@ -88,15 +95,12 @@ public void testPolicyDoesNotExist() {
config.put("set_from", valuesConfig);

Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("policy [majestic] does not exists"));
assertThat(e.getMessage(), equalTo("no enrich index exists for policy with name [majestic]"));
}

public void testPolicyNameMissing() {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Map.of("_name", policy);

Map<String, Object> config = new HashMap<>();
config.put("enrich_key", "host");
Expand All @@ -121,11 +125,11 @@ public void testPolicyNameMissing() {
assertThat(e.getMessage(), equalTo("[policy_name] required property is missing"));
}

public void testUnsupportedPolicy() {
public void testUnsupportedPolicy() throws Exception {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy("unsupported", null, List.of("source_index"), "my_key", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Map.of("majestic", policy);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
Expand All @@ -145,7 +149,7 @@ public void testCompactEnrichValuesFormat() throws Exception {
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Map.of("majestic", policy);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
Expand All @@ -164,7 +168,7 @@ public void testNoTargetField() throws Exception {
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Map.of("majestic", policy);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config1 = new HashMap<>();
config1.put("policy_name", "majestic");
Expand All @@ -174,4 +178,18 @@ public void testNoTargetField() throws Exception {
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
}

static MetaData createMetaData(String name, EnrichPolicy policy) throws IOException {
Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
IndexMetaData.Builder builder = IndexMetaData.builder(EnrichPolicy.getBaseName(name) + "-1");
builder.settings(settings);
builder.putMapping("_doc", "{\"_meta\": {\"enrich_match_field\": \"" + policy.getMatchField() +
"\", \"enrich_policy_type\": \"" + policy.getType() + "\"}}");
builder.putAlias(AliasMetaData.builder(EnrichPolicy.getBaseName(name)).build());
return MetaData.builder().put(builder).build();
}

}