Skip to content

Commit 52a094b

Browse files
committed
Fail delete policy if pipeline exists (#44438)
If a pipeline that refrences the policy exists, we should not allow the policy to be deleted. The user will need to remove the processor from the pipeline before deleting the policy. This commit adds a check to ensure that the policy cannot be deleted if it is referenced by any pipeline in the system.
1 parent 43b8ab6 commit 52a094b

File tree

6 files changed

+104
-12
lines changed

6 files changed

+104
-12
lines changed

docs/reference/ingest/ingest-node.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,7 @@ Which returns:
950950
951951
[source,js]
952952
--------------------------------------------------
953+
DELETE /_ingest/pipeline/user_lookup
953954
DELETE /_enrich/policy/users-policy
954955
--------------------------------------------------
955956
// CONSOLE

docs/reference/ingest/processors/enrich.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ PUT _ingest/pipeline/user_lookup
9292
9393
[source,js]
9494
--------------------------------------------------
95+
DELETE /_ingest/pipeline/user_lookup
9596
DELETE /_enrich/policy/users-policy
9697
--------------------------------------------------
9798
// CONSOLE

x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private void deletePolicies() throws Exception {
3333
}
3434
}
3535

36-
public void testBasicFlow() throws Exception {
36+
private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
3737
// Create the policy:
3838
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
3939
putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"indices\": [\"my-source-index\"], \"match_field\": \"host\", " +
@@ -75,6 +75,15 @@ public void testBasicFlow() throws Exception {
7575
assertThat(_source.get("host"), equalTo("elastic.co"));
7676
assertThat(_source.get("global_rank"), equalTo(25));
7777
assertThat(_source.get("tld_rank"), equalTo(7));
78+
79+
if (deletePipeilne) {
80+
// delete the pipeline so the policies can be deleted
81+
client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
82+
}
83+
}
84+
85+
public void testBasicFlow() throws Exception {
86+
setupGenericLifecycleTest(true);
7887
}
7988

8089
public void testImmutablePolicy() throws IOException {
@@ -87,6 +96,40 @@ public void testImmutablePolicy() throws IOException {
8796
assertTrue(exc.getMessage().contains("policy [my_policy] already exists"));
8897
}
8998

99+
public void testDeleteIsCaseSensitive() throws Exception {
100+
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
101+
putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"indices\": [\"my-source-index\"], \"enrich_key\": \"host\", " +
102+
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"]}");
103+
assertOK(client().performRequest(putPolicyRequest));
104+
105+
ResponseException exc = expectThrows(ResponseException.class,
106+
() -> client().performRequest(new Request("DELETE", "/_enrich/policy/MY_POLICY")));
107+
assertTrue(exc.getMessage().contains("policy [MY_POLICY] not found"));
108+
}
109+
110+
public void testDeleteExistingPipeline() throws Exception {
111+
// lets not delete the pipeline at first, to test the failure
112+
setupGenericLifecycleTest(false);
113+
114+
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
115+
putPipelineRequest.setJsonEntity("{\"processors\":[" +
116+
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
117+
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
118+
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
119+
"]}}" +
120+
"]}");
121+
assertOK(client().performRequest(putPipelineRequest));
122+
123+
ResponseException exc = expectThrows(ResponseException.class,
124+
() -> client().performRequest(new Request("DELETE", "/_enrich/policy/my_policy")));
125+
assertTrue(exc.getMessage().contains("Could not delete policy [my_policy] because" +
126+
" a pipeline is referencing it [my_pipeline, another_pipeline]"));
127+
128+
// delete the pipelines so the policies can be deleted
129+
client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
130+
client().performRequest(new Request("DELETE", "/_ingest/pipeline/another_pipeline"));
131+
}
132+
90133
private static Map<String, Object> toMap(Response response) throws IOException {
91134
return toMap(EntityUtils.toString(response.getEntity()));
92135
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.elasticsearch.ingest.AbstractProcessor;
9+
10+
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
11+
12+
private final String policyName;
13+
14+
protected AbstractEnrichProcessor(String tag, String policyName) {
15+
super(tag);
16+
this.policyName = policyName;
17+
}
18+
19+
public String getPolicyName() {
20+
return policyName;
21+
}
22+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.cluster.routing.Preference;
1313
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
1414
import org.elasticsearch.index.query.TermQueryBuilder;
15-
import org.elasticsearch.ingest.AbstractProcessor;
1615
import org.elasticsearch.ingest.IngestDocument;
1716
import org.elasticsearch.search.SearchHit;
1817
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -24,12 +23,11 @@
2423
import java.util.Map;
2524
import java.util.function.BiConsumer;
2625

27-
final class ExactMatchProcessor extends AbstractProcessor {
26+
public final class ExactMatchProcessor extends AbstractEnrichProcessor {
2827

2928
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
3029

3130
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
32-
private final String policyName;
3331
private final String enrichKey;
3432
private final boolean ignoreMissing;
3533
private final boolean overrideEnabled;
@@ -60,9 +58,8 @@ final class ExactMatchProcessor extends AbstractProcessor {
6058
boolean ignoreMissing,
6159
boolean overrideEnabled,
6260
List<EnrichSpecification> specifications) {
63-
super(tag);
61+
super(tag, policyName);
6462
this.searchRunner = searchRunner;
65-
this.policyName = policyName;
6663
this.enrichKey = enrichKey;
6764
this.ignoreMissing = ignoreMissing;
6865
this.overrideEnabled = overrideEnabled;
@@ -89,7 +86,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
8986
searchBuilder.query(constantScore);
9087

9188
SearchRequest req = new SearchRequest();
92-
req.indices(EnrichPolicy.getBaseName(policyName));
89+
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
9390
req.preference(Preference.LOCAL.type());
9491
req.source(searchBuilder);
9592

@@ -137,10 +134,6 @@ public String getType() {
137134
return EnrichProcessorFactory.TYPE;
138135
}
139136

140-
String getPolicyName() {
141-
return policyName;
142-
}
143-
144137
String getEnrichKey() {
145138
return enrichKey;
146139
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.enrich.action;
77

8+
import org.elasticsearch.ElasticsearchStatusException;
89
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.action.support.ActionFilters;
1011
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -16,23 +17,34 @@
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.inject.Inject;
1819
import org.elasticsearch.common.io.stream.StreamInput;
20+
import org.elasticsearch.ingest.IngestService;
21+
import org.elasticsearch.ingest.PipelineConfiguration;
22+
import org.elasticsearch.rest.RestStatus;
1923
import org.elasticsearch.threadpool.ThreadPool;
2024
import org.elasticsearch.transport.TransportService;
25+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2126
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
27+
import org.elasticsearch.xpack.enrich.AbstractEnrichProcessor;
2228
import org.elasticsearch.xpack.enrich.EnrichStore;
2329

2430
import java.io.IOException;
31+
import java.util.ArrayList;
32+
import java.util.List;
2533

2634
public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction<DeleteEnrichPolicyAction.Request, AcknowledgedResponse> {
2735

36+
private final IngestService ingestService;
37+
2838
@Inject
2939
public TransportDeleteEnrichPolicyAction(TransportService transportService,
3040
ClusterService clusterService,
3141
ThreadPool threadPool,
3242
ActionFilters actionFilters,
33-
IndexNameExpressionResolver indexNameExpressionResolver) {
43+
IndexNameExpressionResolver indexNameExpressionResolver,
44+
IngestService ingestService) {
3445
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
3546
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
47+
this.ingestService = ingestService;
3648
}
3749

3850
@Override
@@ -52,6 +64,26 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
5264
@Override
5365
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
5466
ActionListener<AcknowledgedResponse> listener) throws Exception {
67+
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
68+
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
69+
List<String> pipelinesWithProcessors = new ArrayList<>();
70+
71+
for (PipelineConfiguration pipelineConfiguration : pipelines) {
72+
List<AbstractEnrichProcessor> enrichProcessors =
73+
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
74+
for (AbstractEnrichProcessor processor: enrichProcessors) {
75+
if (processor.getPolicyName().equals(request.getName())) {
76+
pipelinesWithProcessors.add(pipelineConfiguration.getId());
77+
}
78+
}
79+
}
80+
81+
if (pipelinesWithProcessors.isEmpty() == false) {
82+
listener.onFailure(
83+
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
84+
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
85+
}
86+
5587
EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
5688
if (e == null) {
5789
listener.onResponse(new AcknowledgedResponse(true));

0 commit comments

Comments
 (0)