Skip to content

Commit 1c4ffd3

Browse files
authored
Remove enrich indices on delete policy (#45870)
When a policy is deleted, the enrich indices that are backing the policy alias should also be deleted. This commit does that work and cleans up the transport action a bit so that the lock release is easier to see, as well as to ensure that any action carried out, regardless of exception, unlocks the policy.
1 parent 3e3cd72 commit 1c4ffd3

File tree

5 files changed

+195
-49
lines changed

5 files changed

+195
-49
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
package org.elasticsearch.xpack.enrich.action;
77

88
import org.elasticsearch.ElasticsearchStatusException;
9+
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1012
import org.elasticsearch.action.support.ActionFilters;
13+
import org.elasticsearch.action.support.IndicesOptions;
1114
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1215
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
16+
import org.elasticsearch.client.Client;
1317
import org.elasticsearch.cluster.ClusterState;
1418
import org.elasticsearch.cluster.block.ClusterBlockException;
1519
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -37,17 +41,24 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
3741

3842
private final EnrichPolicyLocks enrichPolicyLocks;
3943
private final IngestService ingestService;
44+
private final Client client;
45+
// the most lenient we can get in order to not bomb out if no indices are found, which is a valid case
46+
// where a user creates and deletes a policy before running execute
47+
private static final IndicesOptions LENIENT_OPTIONS = IndicesOptions.fromOptions(true, true, true, true);
48+
4049

4150
@Inject
4251
public TransportDeleteEnrichPolicyAction(TransportService transportService,
4352
ClusterService clusterService,
4453
ThreadPool threadPool,
4554
ActionFilters actionFilters,
4655
IndexNameExpressionResolver indexNameExpressionResolver,
56+
Client client,
4757
EnrichPolicyLocks enrichPolicyLocks,
4858
IngestService ingestService) {
4959
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
5060
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
61+
this.client = client;
5162
this.enrichPolicyLocks = enrichPolicyLocks;
5263
this.ingestService = ingestService;
5364
}
@@ -69,36 +80,74 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
6980
@Override
7081
protected void masterOperation(Task task, DeleteEnrichPolicyAction.Request request, ClusterState state,
7182
ActionListener<AcknowledgedResponse> listener) throws Exception {
83+
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
84+
if (policy == null) {
85+
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
86+
}
87+
7288
enrichPolicyLocks.lockPolicy(request.getName());
73-
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
74-
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
75-
List<String> pipelinesWithProcessors = new ArrayList<>();
76-
77-
for (PipelineConfiguration pipelineConfiguration : pipelines) {
78-
List<AbstractEnrichProcessor> enrichProcessors =
79-
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
80-
for (AbstractEnrichProcessor processor: enrichProcessors) {
81-
if (processor.getPolicyName().equals(request.getName())) {
82-
pipelinesWithProcessors.add(pipelineConfiguration.getId());
89+
try {
90+
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
91+
List<String> pipelinesWithProcessors = new ArrayList<>();
92+
93+
for (PipelineConfiguration pipelineConfiguration : pipelines) {
94+
List<AbstractEnrichProcessor> enrichProcessors =
95+
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
96+
for (AbstractEnrichProcessor processor : enrichProcessors) {
97+
if (processor.getPolicyName().equals(request.getName())) {
98+
pipelinesWithProcessors.add(pipelineConfiguration.getId());
99+
}
83100
}
84101
}
85-
}
86102

87-
if (pipelinesWithProcessors.isEmpty() == false) {
103+
if (pipelinesWithProcessors.isEmpty() == false) {
104+
throw new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
105+
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors);
106+
}
107+
} catch (Exception e) {
88108
enrichPolicyLocks.releasePolicy(request.getName());
89-
listener.onFailure(
90-
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
91-
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
109+
listener.onFailure(e);
92110
return;
93111
}
94112

95-
EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
96-
enrichPolicyLocks.releasePolicy(request.getName());
97-
if (e == null) {
98-
listener.onResponse(new AcknowledgedResponse(true));
99-
} else {
100-
listener.onFailure(e);
101-
}
113+
deleteIndicesAndPolicy(request.getName(), ActionListener.wrap(
114+
(response) -> {
115+
enrichPolicyLocks.releasePolicy(request.getName());
116+
listener.onResponse(response);
117+
},
118+
(exc) -> {
119+
enrichPolicyLocks.releasePolicy(request.getName());
120+
listener.onFailure(exc);
121+
}
122+
));
123+
}
124+
125+
private void deleteIndicesAndPolicy(String name, ActionListener<AcknowledgedResponse> listener) {
126+
// delete all enrich indices for this policy
127+
DeleteIndexRequest deleteRequest = new DeleteIndexRequest()
128+
.indices(EnrichPolicy.getBaseName(name) + "-*")
129+
.indicesOptions(LENIENT_OPTIONS);
130+
131+
client.admin().indices().delete(deleteRequest, ActionListener.wrap(
132+
(response) -> {
133+
if (response.isAcknowledged() == false) {
134+
listener.onFailure(new ElasticsearchStatusException("Could not fetch indices to delete during policy delete of [{}]",
135+
RestStatus.INTERNAL_SERVER_ERROR, name));
136+
} else {
137+
deletePolicy(name, listener);
138+
}
139+
},
140+
(error) -> listener.onFailure(error)
141+
));
142+
}
143+
144+
private void deletePolicy(String name, ActionListener<AcknowledgedResponse> listener) {
145+
EnrichStore.deletePolicy(name, clusterService, e -> {
146+
if (e == null) {
147+
listener.onResponse(new AcknowledgedResponse(true));
148+
} else {
149+
listener.onFailure(e);
150+
}
102151
});
103152
}
104153

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ protected void masterOperation(Task task, GetEnrichPolicyAction.Request request,
6363
} else {
6464
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
6565
if (policy == null) {
66-
throw new ResourceNotFoundException("Policy [{}] was not found", request.getName());
66+
throw new ResourceNotFoundException("Policy [{}] not found", request.getName());
6767
}
6868
policies = Map.of(request.getName(), policy);
6969

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/AbstractEnrichTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy
3434
return error;
3535
}
3636

37-
void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
37+
protected void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
3838
CountDownLatch latch = new CountDownLatch(1);
3939
AtomicReference<Exception> error = new AtomicReference<>();
4040
EnrichStore.deletePolicy(name, clusterService, e -> {
Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,20 @@
66

77
package org.elasticsearch.xpack.enrich.action;
88

9+
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.support.ActionTestUtils;
1112
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1213
import org.elasticsearch.cluster.service.ClusterService;
1314
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1415
import org.elasticsearch.common.xcontent.XContentType;
16+
import org.elasticsearch.index.IndexNotFoundException;
1517
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1618
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
17-
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
1819
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
20+
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
21+
import org.elasticsearch.xpack.enrich.EnrichStore;
22+
import org.junit.After;
1923

2024
import java.util.concurrent.CountDownLatch;
2125
import java.util.concurrent.atomic.AtomicReference;
@@ -25,9 +29,56 @@
2529
import static org.hamcrest.Matchers.nullValue;
2630
import static org.hamcrest.core.IsInstanceOf.instanceOf;
2731

28-
public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCase {
32+
public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCase {
33+
34+
@After
35+
private void cleanupPolicy() {
36+
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
37+
String name = "my-policy";
38+
39+
try {
40+
deleteEnrichPolicy(name, clusterService);
41+
} catch (Exception e) {
42+
// if the enrich policy does not exist, then just keep going
43+
}
2944

30-
public void testDeleteIsNotLocked() throws InterruptedException {
45+
// fail if the state of this is left locked
46+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
47+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
48+
}
49+
50+
public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
51+
String fakeId = "fake-id";
52+
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
53+
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");
54+
55+
final CountDownLatch latch = new CountDownLatch(1);
56+
final AtomicReference<Exception> reference = new AtomicReference<>();
57+
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
58+
ActionTestUtils.execute(transportAction, null,
59+
new DeleteEnrichPolicyAction.Request(fakeId),
60+
new ActionListener<AcknowledgedResponse>() {
61+
@Override
62+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
63+
fail();
64+
}
65+
66+
public void onFailure(final Exception e) {
67+
reference.set(e);
68+
latch.countDown();
69+
}
70+
});
71+
latch.await();
72+
assertNotNull(reference.get());
73+
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
74+
assertThat(reference.get().getMessage(), equalTo("policy [fake-id] not found"));
75+
76+
// fail if the state of this is left locked
77+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
78+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
79+
}
80+
81+
public void testDeleteWithoutIndex() throws Exception {
3182
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
3283
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
3384
String name = "my-policy";
@@ -54,6 +105,56 @@ public void onFailure(final Exception e) {
54105
latch.await();
55106
assertNotNull(reference.get());
56107
assertTrue(reference.get().isAcknowledged());
108+
109+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
110+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
111+
112+
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
113+
}
114+
115+
public void testDeleteIsNotLocked() throws Exception {
116+
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
117+
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
118+
String name = "my-policy";
119+
120+
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
121+
assertThat(error.get(), nullValue());
122+
123+
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
124+
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
125+
126+
client().admin().indices().prepareGetIndex().setIndices(
127+
EnrichPolicy.getBaseName(name) + "-foo1",
128+
EnrichPolicy.getBaseName(name) + "-foo2").get();
129+
130+
final CountDownLatch latch = new CountDownLatch(1);
131+
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
132+
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
133+
ActionTestUtils.execute(transportAction, null,
134+
new DeleteEnrichPolicyAction.Request(name),
135+
new ActionListener<AcknowledgedResponse>() {
136+
@Override
137+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
138+
reference.set(acknowledgedResponse);
139+
latch.countDown();
140+
}
141+
142+
public void onFailure(final Exception e) {
143+
fail();
144+
}
145+
});
146+
latch.await();
147+
assertNotNull(reference.get());
148+
assertTrue(reference.get().isAcknowledged());
149+
150+
expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices(
151+
EnrichPolicy.getBaseName(name) + "-foo1",
152+
EnrichPolicy.getBaseName(name) + "-foo2").get());
153+
154+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
155+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
156+
157+
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
57158
}
58159

59160
public void testDeleteLocked() throws InterruptedException {
@@ -64,6 +165,9 @@ public void testDeleteLocked() throws InterruptedException {
64165
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
65166
assertThat(error.get(), nullValue());
66167

168+
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
169+
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
170+
67171
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
68172
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
69173

@@ -117,6 +221,8 @@ public void onFailure(final Exception e) {
117221
assertTrue(reference.get().isAcknowledged());
118222

119223
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
224+
225+
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
120226
}
121227
}
122228
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
import org.elasticsearch.ResourceNotFoundException;
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.ActionTestUtils;
12-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1312
import org.elasticsearch.cluster.service.ClusterService;
1413
import org.elasticsearch.common.xcontent.XContentType;
1514
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
16-
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
1715
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
1816
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
17+
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
1918
import org.junit.After;
2019

2120
import java.util.concurrent.CountDownLatch;
@@ -31,6 +30,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
3130

3231
@After
3332
private void cleanupPolicies() throws InterruptedException {
33+
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
34+
3435
final CountDownLatch latch = new CountDownLatch(1);
3536
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
3637
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
@@ -53,26 +54,16 @@ public void onFailure(final Exception e) {
5354
GetEnrichPolicyAction.Response response = reference.get();
5455

5556
for (EnrichPolicy.NamedPolicy policy: response.getPolicies()) {
56-
final CountDownLatch loopLatch = new CountDownLatch(1);
57-
final AtomicReference<AcknowledgedResponse> loopReference = new AtomicReference<>();
58-
final TransportDeleteEnrichPolicyAction deleteAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
59-
ActionTestUtils.execute(deleteAction, null,
60-
new DeleteEnrichPolicyAction.Request(policy.getName()),
61-
new ActionListener<>() {
62-
@Override
63-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
64-
loopReference.set(acknowledgedResponse);
65-
loopLatch.countDown();
66-
}
67-
68-
public void onFailure(final Exception e) {
69-
fail();
70-
}
71-
});
72-
loopLatch.await();
73-
assertNotNull(loopReference.get());
74-
assertTrue(loopReference.get().isAcknowledged());
57+
try {
58+
deleteEnrichPolicy(policy.getName(), clusterService);
59+
} catch (Exception e) {
60+
// if the enrich policy does not exist, then just keep going
61+
}
7562
}
63+
64+
// fail if the state of this is left locked
65+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
66+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
7667
}
7768

7869
public void testListPolicies() throws InterruptedException {
@@ -205,6 +196,6 @@ public void onFailure(final Exception e) {
205196
assertNotNull(reference.get());
206197
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
207198
assertThat(reference.get().getMessage(),
208-
equalTo("Policy [non-exists] was not found"));
199+
equalTo("Policy [non-exists] not found"));
209200
}
210201
}

0 commit comments

Comments
 (0)