Skip to content

Commit 6067065

Browse files
committed
Decouple enrich processor factory from enrich policy (#45826)
This commit changes the enrich processor factory to read the required configuration from the current enrich index (from meta mapping field) in order to create the processor. Before this change the required config was read from the enrich policy in the cluster state. Enrich policies are going to be stored in an index (instead of the cluster state). In a processor factory there isn't a way to load something from an index, so with this change we read the required config / info from the enrich index (which is derived from the enrich policy), which then allows us to move enrich policies to an index. With this change it is required to execute a policy before creating a pipeline. Otherwise there is no enrich index and then there is no way to validate that a policy exist or retrieve its type and match field. Relates to #32789
1 parent cb42e19 commit 6067065

File tree

6 files changed

+75
-49
lines changed

6 files changed

+75
-49
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8-
import java.util.Arrays;
9-
import java.util.List;
10-
import java.util.Map;
11-
import java.util.concurrent.Semaphore;
12-
138
import org.apache.logging.log4j.LogManager;
149
import org.apache.logging.log4j.Logger;
1510
import org.elasticsearch.action.ActionListener;
@@ -34,11 +29,16 @@
3429
import org.elasticsearch.threadpool.ThreadPool;
3530
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
3631

32+
import java.util.Arrays;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.concurrent.Semaphore;
36+
3737
public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
3838

3939
private static final Logger logger = LogManager.getLogger(EnrichPolicyMaintenanceService.class);
4040

41-
private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME;
41+
private static final String MAPPING_POLICY_FIELD_PATH = "_meta." + EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME;
4242
private static final IndicesOptions IGNORE_UNAVAILABLE = IndicesOptions.fromOptions(true, false, false, false);
4343

4444
private final Settings settings;

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8-
import java.io.IOException;
9-
import java.io.UncheckedIOException;
10-
import java.util.HashSet;
11-
import java.util.List;
12-
import java.util.Map;
13-
import java.util.Set;
14-
import java.util.function.LongSupplier;
15-
168
import org.apache.logging.log4j.LogManager;
179
import org.apache.logging.log4j.Logger;
1810
import org.elasticsearch.ElasticsearchException;
@@ -51,13 +43,21 @@
5143
import org.elasticsearch.search.builder.SearchSourceBuilder;
5244
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
5345

54-
import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;
46+
import java.io.IOException;
47+
import java.io.UncheckedIOException;
48+
import java.util.HashSet;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.Set;
52+
import java.util.function.LongSupplier;
5553

5654
public class EnrichPolicyRunner implements Runnable {
5755

5856
private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
5957

60-
static final String ENRICH_POLICY_FIELD_NAME = "enrich_policy";
58+
static final String ENRICH_POLICY_NAME_FIELD_NAME = "enrich_policy_name";
59+
static final String ENRICH_POLICY_TYPE_FIELD_NAME = "enrich_policy_type";
60+
static final String ENRICH_MATCH_FIELD_NAME = "enrich_match_field";
6161

6262
private final String policyName;
6363
private final EnrichPolicy policy;
@@ -216,8 +216,9 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
216216
.endObject()
217217
.endObject()
218218
.startObject("_meta")
219-
.field(ENRICH_POLICY_FIELD_NAME, policyName)
220-
.field(ENRICH_KEY_FIELD_NAME, policy.getMatchField())
219+
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
220+
.field(ENRICH_MATCH_FIELD_NAME, policy.getMatchField())
221+
.field(ENRICH_POLICY_TYPE_FIELD_NAME, policy.getType())
221222
.endObject()
222223
.endObject()
223224
.endObject();

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
import org.elasticsearch.client.Client;
99
import org.elasticsearch.cluster.ClusterState;
10+
import org.elasticsearch.cluster.metadata.AliasOrIndex;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
12+
import org.elasticsearch.cluster.metadata.MetaData;
13+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1014
import org.elasticsearch.ingest.ConfigurationUtils;
1115
import org.elasticsearch.ingest.Processor;
1216
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -19,7 +23,8 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
1923

2024
static final String TYPE = "enrich";
2125
private final Client client;
22-
volatile Map<String, EnrichPolicy> policies = Collections.emptyMap();
26+
27+
volatile MetaData metaData;
2328

2429
EnrichProcessorFactory(Client client) {
2530
this.client = client;
@@ -28,36 +33,37 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
2833
@Override
2934
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception {
3035
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name");
31-
EnrichPolicy policy = policies.get(policyName);
32-
if (policy == null) {
33-
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
36+
String policyAlias = EnrichPolicy.getBaseName(policyName);
37+
AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(policyAlias);
38+
if (aliasOrIndex == null) {
39+
throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]");
3440
}
41+
assert aliasOrIndex.isAlias();
42+
assert aliasOrIndex.getIndices().size() == 1;
43+
IndexMetaData imd = aliasOrIndex.getIndices().get(0);
3544

3645
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
46+
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
47+
String policyType =
48+
(String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME, mappingAsMap);
49+
String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap);
50+
3751
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
3852
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
3953
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
4054

41-
switch (policy.getType()) {
55+
switch (policyType) {
4256
case EnrichPolicy.EXACT_MATCH_TYPE:
43-
return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(),
57+
return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField,
4458
ignoreMissing, overrideEnabled);
4559
default:
46-
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
60+
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
4761
}
4862
}
4963

5064
@Override
5165
public void accept(ClusterState state) {
52-
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
53-
if (enrichMetadata == null) {
54-
return;
55-
}
56-
if (policies.equals(enrichMetadata.getPolicies())) {
57-
return;
58-
}
59-
60-
policies = enrichMetadata.getPolicies();
66+
metaData = state.getMetaData();
6167
}
6268

6369
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ private String fakeRunPolicy(String forPolicy) throws IOException {
152152
.startObject()
153153
.startObject(MapperService.SINGLE_MAPPING_NAME)
154154
.startObject("_meta")
155-
.field(EnrichPolicyRunner.ENRICH_POLICY_FIELD_NAME, forPolicy)
155+
.field(EnrichPolicyRunner.ENRICH_POLICY_NAME_FIELD_NAME, forPolicy)
156156
.endObject()
157157
.endObject()
158158
.endObject()

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,39 @@
88
import org.elasticsearch.ResourceAlreadyExistsException;
99
import org.elasticsearch.action.ingest.PutPipelineRequest;
1010
import org.elasticsearch.common.bytes.BytesArray;
11+
import org.elasticsearch.common.settings.Settings;
1112
import org.elasticsearch.common.xcontent.XContentType;
13+
import org.elasticsearch.index.reindex.ReindexPlugin;
1214
import org.elasticsearch.ingest.IngestService;
1315
import org.elasticsearch.ingest.Pipeline;
1416
import org.elasticsearch.plugins.Plugin;
1517
import org.elasticsearch.test.ESSingleNodeTestCase;
1618
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
19+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
1720
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
1821

1922
import java.util.Collection;
2023
import java.util.Collections;
2124

2225
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
23-
import static org.hamcrest.Matchers.equalTo;
2426
import static org.hamcrest.Matchers.instanceOf;
2527

2628
public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
2729

2830
@Override
2931
protected Collection<Class<? extends Plugin>> getPlugins() {
30-
return Collections.singleton(LocalStateEnrich.class);
32+
return Collections.singleton(LocalStateEnrich.class, ReindexPlugin.class);
3133
}
3234

3335
public void testUpdatePolicyOnly() {
3436
IngestService ingestService = getInstanceFromNode(IngestService.class);
35-
EnrichProcessorFactory enrichProcessorFactory =
36-
(EnrichProcessorFactory) ingestService.getProcessorFactories().get(EnrichProcessorFactory.TYPE);
37+
createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword");
3738

3839
EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"),
3940
"key1", Collections.singletonList("field1"));
4041
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
4142
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
42-
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1));
43+
assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet());
4344

4445
String pipelineConfig =
4546
"{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,16 @@
66
package org.elasticsearch.xpack.enrich;
77

88
import org.elasticsearch.ElasticsearchParseException;
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.metadata.AliasMetaData;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
12+
import org.elasticsearch.cluster.metadata.MetaData;
913
import org.elasticsearch.common.collect.Tuple;
14+
import org.elasticsearch.common.settings.Settings;
1015
import org.elasticsearch.test.ESTestCase;
1116
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1217

18+
import java.io.IOException;
1319
import java.util.ArrayList;
1420
import java.util.Arrays;
1521
import java.util.Collections;
@@ -28,7 +34,7 @@ public void testCreateProcessorInstance() throws Exception {
2834
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
2935
enrichValues);
3036
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
31-
factory.policies = Collections.singletonMap("majestic", policy);
37+
factory.metaData = createMetaData("majestic", policy);
3238

3339
Map<String, Object> config = new HashMap<>();
3440
config.put("policy_name", "majestic");
@@ -67,6 +73,7 @@ public void testCreateProcessorInstance() throws Exception {
6773
public void testPolicyDoesNotExist() {
6874
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
6975
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
76+
factory.metaData = MetaData.builder().build();
7077

7178
Map<String, Object> config = new HashMap<>();
7279
config.put("policy_name", "majestic");
@@ -92,15 +99,12 @@ public void testPolicyDoesNotExist() {
9299
config.put("set_from", valuesConfig);
93100

94101
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
95-
assertThat(e.getMessage(), equalTo("policy [majestic] does not exists"));
102+
assertThat(e.getMessage(), equalTo("no enrich index exists for policy with name [majestic]"));
96103
}
97104

98105
public void testPolicyNameMissing() {
99106
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
100-
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
101-
enrichValues);
102107
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
103-
factory.policies = Collections.singletonMap("_name", policy);
104108

105109
Map<String, Object> config = new HashMap<>();
106110
config.put("enrich_key", "host");
@@ -128,12 +132,12 @@ public void testPolicyNameMissing() {
128132
assertThat(e.getMessage(), equalTo("[policy_name] required property is missing"));
129133
}
130134

131-
public void testUnsupportedPolicy() {
135+
public void testUnsupportedPolicy() throws Exception {
132136
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
133137
EnrichPolicy policy =
134138
new EnrichPolicy("unsupported", null, Collections.singletonList("source_index"), "my_key", enrichValues);
135139
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
136-
factory.policies = Collections.singletonMap("majestic", policy);
140+
factory.metaData = createMetaData("majestic", policy);
137141

138142
Map<String, Object> config = new HashMap<>();
139143
config.put("policy_name", "majestic");
@@ -153,7 +157,7 @@ public void testCompactEnrichValuesFormat() throws Exception {
153157
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
154158
Collections.singletonList("source_index"), "host", enrichValues);
155159
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
156-
factory.policies = Collections.singletonMap("majestic", policy);
160+
factory.metaData = createMetaData("majestic", policy);
157161

158162
Map<String, Object> config = new HashMap<>();
159163
config.put("policy_name", "majestic");
@@ -172,7 +176,7 @@ public void testNoTargetField() throws Exception {
172176
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
173177
Collections.singletonList("source_index"), "host", enrichValues);
174178
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
175-
factory.policies = Collections.singletonMap("majestic", policy);
179+
factory.metaData = createMetaData("majestic", policy);
176180

177181
Map<String, Object> config1 = new HashMap<>();
178182
config1.put("policy_name", "majestic");
@@ -182,4 +186,18 @@ public void testNoTargetField() throws Exception {
182186
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
183187
}
184188

189+
static MetaData createMetaData(String name, EnrichPolicy policy) throws IOException {
190+
Settings settings = Settings.builder()
191+
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
192+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
193+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
194+
.build();
195+
IndexMetaData.Builder builder = IndexMetaData.builder(EnrichPolicy.getBaseName(name) + "-1");
196+
builder.settings(settings);
197+
builder.putMapping("_doc", "{\"_meta\": {\"enrich_match_field\": \"" + policy.getMatchField() +
198+
"\", \"enrich_policy_type\": \"" + policy.getType() + "\"}}");
199+
builder.putAlias(AliasMetaData.builder(EnrichPolicy.getBaseName(name)).build());
200+
return MetaData.builder().put(builder).build();
201+
}
202+
185203
}

0 commit comments

Comments
 (0)