Skip to content

Commit 3e3cd72

Browse files
authored
Decouple enrich processor factory from enrich policy (#45826)
This commit changes the enrich processor factory to read the required configuration from the current enrich index (from meta mapping field) in order to create the processor. Before this change the required config was read from the enrich policy in the cluster state. Enrich policies are going to be stored in an index (instead of the cluster state). In a processor factory there isn't a way to load something from an index, so with this change we read the required config / info from the enrich index (which is derived from the enrich policy), which then allows us to move enrich policies to an index. With this change it is required to execute a policy before creating a pipeline. Otherwise there is no enrich index and then there is no way to validate that a policy exist or retrieve its type and match field. Relates to #32789
1 parent f14874c commit 3e3cd72

File tree

6 files changed

+75
-49
lines changed

6 files changed

+75
-49
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8-
import java.util.Arrays;
9-
import java.util.List;
10-
import java.util.Map;
11-
import java.util.concurrent.Semaphore;
12-
138
import org.apache.logging.log4j.LogManager;
149
import org.apache.logging.log4j.Logger;
1510
import org.elasticsearch.action.ActionListener;
@@ -34,11 +29,16 @@
3429
import org.elasticsearch.threadpool.ThreadPool;
3530
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
3631

32+
import java.util.Arrays;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.concurrent.Semaphore;
36+
3737
public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
3838

3939
private static final Logger logger = LogManager.getLogger(EnrichPolicyMaintenanceService.class);
4040

41-
private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME;
41+
private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME;
4242
private static final IndicesOptions IGNORE_UNAVAILABLE = IndicesOptions.fromOptions(true, false, false, false);
4343

4444
private final Settings settings;

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8-
import java.io.IOException;
9-
import java.io.UncheckedIOException;
10-
import java.util.HashSet;
11-
import java.util.List;
12-
import java.util.Map;
13-
import java.util.Set;
14-
import java.util.function.LongSupplier;
15-
168
import org.apache.logging.log4j.LogManager;
179
import org.apache.logging.log4j.Logger;
1810
import org.elasticsearch.ElasticsearchException;
@@ -51,13 +43,21 @@
5143
import org.elasticsearch.search.builder.SearchSourceBuilder;
5244
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
5345

54-
import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;
46+
import java.io.IOException;
47+
import java.io.UncheckedIOException;
48+
import java.util.HashSet;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.Set;
52+
import java.util.function.LongSupplier;
5553

5654
public class EnrichPolicyRunner implements Runnable {
5755

5856
private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
5957

60-
static final String ENRICH_POLICY_FIELD_NAME = "enrich_policy";
58+
static final String ENRICH_POLICY_NAME_FIELD_NAME = "enrich_policy_name";
59+
static final String ENRICH_POLICY_TYPE_FIELD_NAME = "enrich_policy_type";
60+
static final String ENRICH_MATCH_FIELD_NAME = "enrich_match_field";
6161

6262
private final String policyName;
6363
private final EnrichPolicy policy;
@@ -216,8 +216,9 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
216216
.endObject()
217217
.endObject()
218218
.startObject("_meta")
219-
.field(ENRICH_POLICY_FIELD_NAME, policyName)
220-
.field(ENRICH_KEY_FIELD_NAME, policy.getMatchField())
219+
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
220+
.field(ENRICH_MATCH_FIELD_NAME, policy.getMatchField())
221+
.field(ENRICH_POLICY_TYPE_FIELD_NAME, policy.getType())
221222
.endObject()
222223
.endObject()
223224
.endObject();

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
import org.elasticsearch.client.Client;
99
import org.elasticsearch.cluster.ClusterState;
10+
import org.elasticsearch.cluster.metadata.AliasOrIndex;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
12+
import org.elasticsearch.cluster.metadata.MetaData;
13+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1014
import org.elasticsearch.ingest.ConfigurationUtils;
1115
import org.elasticsearch.ingest.Processor;
1216
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -18,7 +22,8 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
1822

1923
static final String TYPE = "enrich";
2024
private final Client client;
21-
volatile Map<String, EnrichPolicy> policies = Map.of();
25+
26+
volatile MetaData metaData;
2227

2328
EnrichProcessorFactory(Client client) {
2429
this.client = client;
@@ -27,36 +32,37 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
2732
@Override
2833
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception {
2934
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name");
30-
EnrichPolicy policy = policies.get(policyName);
31-
if (policy == null) {
32-
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
35+
String policyAlias = EnrichPolicy.getBaseName(policyName);
36+
AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(policyAlias);
37+
if (aliasOrIndex == null) {
38+
throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]");
3339
}
40+
assert aliasOrIndex.isAlias();
41+
assert aliasOrIndex.getIndices().size() == 1;
42+
IndexMetaData imd = aliasOrIndex.getIndices().get(0);
3443

3544
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
45+
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
46+
String policyType =
47+
(String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME, mappingAsMap);
48+
String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap);
49+
3650
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
3751
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
3852
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
3953

40-
switch (policy.getType()) {
54+
switch (policyType) {
4155
case EnrichPolicy.EXACT_MATCH_TYPE:
42-
return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(),
56+
return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField,
4357
ignoreMissing, overrideEnabled);
4458
default:
45-
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
59+
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
4660
}
4761
}
4862

4963
@Override
5064
public void accept(ClusterState state) {
51-
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
52-
if (enrichMetadata == null) {
53-
return;
54-
}
55-
if (policies.equals(enrichMetadata.getPolicies())) {
56-
return;
57-
}
58-
59-
policies = enrichMetadata.getPolicies();
65+
metaData = state.getMetaData();
6066
}
6167

6268
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private String fakeRunPolicy(String forPolicy) throws IOException {
150150
.startObject()
151151
.startObject(MapperService.SINGLE_MAPPING_NAME)
152152
.startObject("_meta")
153-
.field(EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME, forPolicy)
153+
.field(EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME, forPolicy)
154154
.endObject()
155155
.endObject()
156156
.endObject()

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,39 @@
88
import org.elasticsearch.ResourceAlreadyExistsException;
99
import org.elasticsearch.action.ingest.PutPipelineRequest;
1010
import org.elasticsearch.common.bytes.BytesArray;
11+
import org.elasticsearch.common.settings.Settings;
1112
import org.elasticsearch.common.xcontent.XContentType;
13+
import org.elasticsearch.index.reindex.ReindexPlugin;
1214
import org.elasticsearch.ingest.IngestService;
1315
import org.elasticsearch.ingest.Pipeline;
1416
import org.elasticsearch.plugins.Plugin;
1517
import org.elasticsearch.test.ESSingleNodeTestCase;
1618
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
19+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
1720
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
1821

1922
import java.util.Collection;
2023
import java.util.List;
2124

2225
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
23-
import static org.hamcrest.Matchers.equalTo;
2426
import static org.hamcrest.Matchers.instanceOf;
2527

2628
public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
2729

2830
@Override
2931
protected Collection<Class<? extends Plugin>> getPlugins() {
30-
return List.of(LocalStateEnrich.class);
32+
return List.of(LocalStateEnrich.class, ReindexPlugin.class);
3133
}
3234

3335
public void testUpdatePolicyOnly() {
3436
IngestService ingestService = getInstanceFromNode(IngestService.class);
35-
EnrichProcessorFactory enrichProcessorFactory =
36-
(EnrichProcessorFactory) ingestService.getProcessorFactories().get(EnrichProcessorFactory.TYPE);
37+
createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword");
3738

3839
EnrichPolicy instance1 =
3940
new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("index"), "key1", List.of("field1"));
4041
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
4142
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
42-
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1));
43+
assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet());
4344

4445
String pipelineConfig =
4546
"{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,16 @@
66
package org.elasticsearch.xpack.enrich;
77

88
import org.elasticsearch.ElasticsearchParseException;
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.metadata.AliasMetaData;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
12+
import org.elasticsearch.cluster.metadata.MetaData;
913
import org.elasticsearch.common.collect.Tuple;
14+
import org.elasticsearch.common.settings.Settings;
1015
import org.elasticsearch.test.ESTestCase;
1116
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1217

18+
import java.io.IOException;
1319
import java.util.ArrayList;
1420
import java.util.Collections;
1521
import java.util.HashMap;
@@ -27,7 +33,7 @@ public void testCreateProcessorInstance() throws Exception {
2733
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key",
2834
enrichValues);
2935
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
30-
factory.policies = Map.of("majestic", policy);
36+
factory.metaData = createMetaData("majestic", policy);
3137

3238
Map<String, Object> config = new HashMap<>();
3339
config.put("policy_name", "majestic");
@@ -66,6 +72,7 @@ public void testCreateProcessorInstance() throws Exception {
6672
public void testPolicyDoesNotExist() {
6773
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
6874
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
75+
factory.metaData = MetaData.builder().build();
6976

7077
Map<String, Object> config = new HashMap<>();
7178
config.put("policy_name", "majestic");
@@ -88,15 +95,12 @@ public void testPolicyDoesNotExist() {
8895
config.put("set_from", valuesConfig);
8996

9097
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
91-
assertThat(e.getMessage(), equalTo("policy [majestic] does not exists"));
98+
assertThat(e.getMessage(), equalTo("no enrich index exists for policy with name [majestic]"));
9299
}
93100

94101
public void testPolicyNameMissing() {
95102
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
96-
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key",
97-
enrichValues);
98103
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
99-
factory.policies = Map.of("_name", policy);
100104

101105
Map<String, Object> config = new HashMap<>();
102106
config.put("enrich_key", "host");
@@ -121,11 +125,11 @@ public void testPolicyNameMissing() {
121125
assertThat(e.getMessage(), equalTo("[policy_name] required property is missing"));
122126
}
123127

124-
public void testUnsupportedPolicy() {
128+
public void testUnsupportedPolicy() throws Exception {
125129
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
126130
EnrichPolicy policy = new EnrichPolicy("unsupported", null, List.of("source_index"), "my_key", enrichValues);
127131
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
128-
factory.policies = Map.of("majestic", policy);
132+
factory.metaData = createMetaData("majestic", policy);
129133

130134
Map<String, Object> config = new HashMap<>();
131135
config.put("policy_name", "majestic");
@@ -145,7 +149,7 @@ public void testCompactEnrichValuesFormat() throws Exception {
145149
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host",
146150
enrichValues);
147151
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
148-
factory.policies = Map.of("majestic", policy);
152+
factory.metaData = createMetaData("majestic", policy);
149153

150154
Map<String, Object> config = new HashMap<>();
151155
config.put("policy_name", "majestic");
@@ -164,7 +168,7 @@ public void testNoTargetField() throws Exception {
164168
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host",
165169
enrichValues);
166170
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
167-
factory.policies = Map.of("majestic", policy);
171+
factory.metaData = createMetaData("majestic", policy);
168172

169173
Map<String, Object> config1 = new HashMap<>();
170174
config1.put("policy_name", "majestic");
@@ -174,4 +178,18 @@ public void testNoTargetField() throws Exception {
174178
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
175179
}
176180

181+
static MetaData createMetaData(String name, EnrichPolicy policy) throws IOException {
182+
Settings settings = Settings.builder()
183+
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
184+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
185+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
186+
.build();
187+
IndexMetaData.Builder builder = IndexMetaData.builder(EnrichPolicy.getBaseName(name) + "-1");
188+
builder.settings(settings);
189+
builder.putMapping("_doc", "{\"_meta\": {\"enrich_match_field\": \"" + policy.getMatchField() +
190+
"\", \"enrich_policy_type\": \"" + policy.getType() + "\"}}");
191+
builder.putAlias(AliasMetaData.builder(EnrichPolicy.getBaseName(name)).build());
192+
return MetaData.builder().put(builder).build();
193+
}
194+
177195
}

0 commit comments

Comments
 (0)