Skip to content

Commit a82d24b

Browse files
committed
Remove enrich indices on delete policy (#45870)
When a policy is deleted, the enrich indices that are backing the policy alias should also be deleted. This commit does that work and cleans up the transport action a bit so that the lock release is easier to see, as well as to ensure that any action carried out, regardless of exception, unlocks the policy.
1 parent a38e685 commit a82d24b

File tree

5 files changed

+194
-48
lines changed

5 files changed

+194
-48
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
package org.elasticsearch.xpack.enrich.action;
77

88
import org.elasticsearch.ElasticsearchStatusException;
9+
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1012
import org.elasticsearch.action.support.ActionFilters;
13+
import org.elasticsearch.action.support.IndicesOptions;
1114
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1215
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
16+
import org.elasticsearch.client.Client;
1317
import org.elasticsearch.cluster.ClusterState;
1418
import org.elasticsearch.cluster.block.ClusterBlockException;
1519
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -36,17 +40,24 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
3640

3741
private final EnrichPolicyLocks enrichPolicyLocks;
3842
private final IngestService ingestService;
43+
private final Client client;
44+
// the most lenient we can get in order to not bomb out if no indices are found, which is a valid case
45+
// where a user creates and deletes a policy before running execute
46+
private static final IndicesOptions LENIENT_OPTIONS = IndicesOptions.fromOptions(true, true, true, true);
47+
3948

4049
@Inject
4150
public TransportDeleteEnrichPolicyAction(TransportService transportService,
4251
ClusterService clusterService,
4352
ThreadPool threadPool,
4453
ActionFilters actionFilters,
4554
IndexNameExpressionResolver indexNameExpressionResolver,
55+
Client client,
4656
EnrichPolicyLocks enrichPolicyLocks,
4757
IngestService ingestService) {
4858
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
4959
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
60+
this.client = client;
5061
this.enrichPolicyLocks = enrichPolicyLocks;
5162
this.ingestService = ingestService;
5263
}
@@ -68,36 +79,74 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
6879
@Override
6980
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
7081
ActionListener<AcknowledgedResponse> listener) throws Exception {
82+
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
83+
if (policy == null) {
84+
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
85+
}
86+
7187
enrichPolicyLocks.lockPolicy(request.getName());
72-
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
73-
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
74-
List<String> pipelinesWithProcessors = new ArrayList<>();
75-
76-
for (PipelineConfiguration pipelineConfiguration : pipelines) {
77-
List<AbstractEnrichProcessor> enrichProcessors =
78-
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
79-
for (AbstractEnrichProcessor processor: enrichProcessors) {
80-
if (processor.getPolicyName().equals(request.getName())) {
81-
pipelinesWithProcessors.add(pipelineConfiguration.getId());
88+
try {
89+
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
90+
List<String> pipelinesWithProcessors = new ArrayList<>();
91+
92+
for (PipelineConfiguration pipelineConfiguration : pipelines) {
93+
List<AbstractEnrichProcessor> enrichProcessors =
94+
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
95+
for (AbstractEnrichProcessor processor : enrichProcessors) {
96+
if (processor.getPolicyName().equals(request.getName())) {
97+
pipelinesWithProcessors.add(pipelineConfiguration.getId());
98+
}
8299
}
83100
}
84-
}
85101

86-
if (pipelinesWithProcessors.isEmpty() == false) {
102+
if (pipelinesWithProcessors.isEmpty() == false) {
103+
throw new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
104+
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors);
105+
}
106+
} catch (Exception e) {
87107
enrichPolicyLocks.releasePolicy(request.getName());
88-
listener.onFailure(
89-
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
90-
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
108+
listener.onFailure(e);
91109
return;
92110
}
93111

94-
EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
95-
enrichPolicyLocks.releasePolicy(request.getName());
96-
if (e == null) {
97-
listener.onResponse(new AcknowledgedResponse(true));
98-
} else {
99-
listener.onFailure(e);
100-
}
112+
deleteIndicesAndPolicy(request.getName(), ActionListener.wrap(
113+
(response) -> {
114+
enrichPolicyLocks.releasePolicy(request.getName());
115+
listener.onResponse(response);
116+
},
117+
(exc) -> {
118+
enrichPolicyLocks.releasePolicy(request.getName());
119+
listener.onFailure(exc);
120+
}
121+
));
122+
}
123+
124+
private void deleteIndicesAndPolicy(String name, ActionListener<AcknowledgedResponse> listener) {
125+
// delete all enrich indices for this policy
126+
DeleteIndexRequest deleteRequest = new DeleteIndexRequest()
127+
.indices(EnrichPolicy.getBaseName(name) + "-*")
128+
.indicesOptions(LENIENT_OPTIONS);
129+
130+
client.admin().indices().delete(deleteRequest, ActionListener.wrap(
131+
(response) -> {
132+
if (response.isAcknowledged() == false) {
133+
listener.onFailure(new ElasticsearchStatusException("Could not fetch indices to delete during policy delete of [{}]",
134+
RestStatus.INTERNAL_SERVER_ERROR, name));
135+
} else {
136+
deletePolicy(name, listener);
137+
}
138+
},
139+
(error) -> listener.onFailure(error)
140+
));
141+
}
142+
143+
private void deletePolicy(String name, ActionListener<AcknowledgedResponse> listener) {
144+
EnrichStore.deletePolicy(name, clusterService, e -> {
145+
if (e == null) {
146+
listener.onResponse(new AcknowledgedResponse(true));
147+
} else {
148+
listener.onFailure(e);
149+
}
101150
});
102151
}
103152

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ protected void masterOperation(GetEnrichPolicyAction.Request request,
6363
} else {
6464
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
6565
if (policy == null) {
66-
throw new ResourceNotFoundException("Policy [{}] was not found", request.getName());
66+
throw new ResourceNotFoundException("Policy [{}] not found", request.getName());
6767
}
6868
policies = Collections.singletonMap(request.getName(), policy);
6969

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/AbstractEnrichTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy
3434
return error;
3535
}
3636

37-
void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
37+
protected void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
3838
CountDownLatch latch = new CountDownLatch(1);
3939
AtomicReference<Exception> error = new AtomicReference<>();
4040
EnrichStore.deletePolicy(name, clusterService, e -> {
Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@
66

77
package org.elasticsearch.xpack.enrich.action;
88

9+
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1112
import org.elasticsearch.cluster.service.ClusterService;
1213
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1314
import org.elasticsearch.common.xcontent.XContentType;
15+
import org.elasticsearch.index.IndexNotFoundException;
1416
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1517
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
1618
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
1719
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
20+
import org.elasticsearch.xpack.enrich.EnrichStore;
21+
import org.junit.After;
1822

1923
import java.util.concurrent.CountDownLatch;
2024
import java.util.concurrent.atomic.AtomicReference;
@@ -24,9 +28,56 @@
2428
import static org.hamcrest.Matchers.nullValue;
2529
import static org.hamcrest.core.IsInstanceOf.instanceOf;
2630

27-
public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCase {
31+
public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCase {
2832

29-
public void testDeleteIsNotLocked() throws InterruptedException {
33+
@After
34+
private void cleanupPolicy() {
35+
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
36+
String name = "my-policy";
37+
38+
try {
39+
deleteEnrichPolicy(name, clusterService);
40+
} catch (Exception e) {
41+
// if the enrich policy does not exist, then just keep going
42+
}
43+
44+
// fail if the state of this is left locked
45+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
46+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
47+
}
48+
49+
public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
50+
String fakeId = "fake-id";
51+
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
52+
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");
53+
54+
final CountDownLatch latch = new CountDownLatch(1);
55+
final AtomicReference<Exception> reference = new AtomicReference<>();
56+
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
57+
transportAction.execute(null,
58+
new DeleteEnrichPolicyAction.Request(fakeId),
59+
new ActionListener<AcknowledgedResponse>() {
60+
@Override
61+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
62+
fail();
63+
}
64+
65+
public void onFailure(final Exception e) {
66+
reference.set(e);
67+
latch.countDown();
68+
}
69+
});
70+
latch.await();
71+
assertNotNull(reference.get());
72+
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
73+
assertThat(reference.get().getMessage(), equalTo("policy [fake-id] not found"));
74+
75+
// fail if the state of this is left locked
76+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
77+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
78+
}
79+
80+
public void testDeleteWithoutIndex() throws Exception {
3081
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
3182
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
3283
String name = "my-policy";
@@ -53,6 +104,56 @@ public void onFailure(final Exception e) {
53104
latch.await();
54105
assertNotNull(reference.get());
55106
assertTrue(reference.get().isAcknowledged());
107+
108+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
109+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
110+
111+
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
112+
}
113+
114+
public void testDeleteIsNotLocked() throws Exception {
115+
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
116+
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
117+
String name = "my-policy";
118+
119+
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
120+
assertThat(error.get(), nullValue());
121+
122+
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
123+
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
124+
125+
client().admin().indices().prepareGetIndex().setIndices(
126+
EnrichPolicy.getBaseName(name) + "-foo1",
127+
EnrichPolicy.getBaseName(name) + "-foo2").get();
128+
129+
final CountDownLatch latch = new CountDownLatch(1);
130+
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
131+
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
132+
transportAction.execute(null,
133+
new DeleteEnrichPolicyAction.Request(name),
134+
new ActionListener<AcknowledgedResponse>() {
135+
@Override
136+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
137+
reference.set(acknowledgedResponse);
138+
latch.countDown();
139+
}
140+
141+
public void onFailure(final Exception e) {
142+
fail();
143+
}
144+
});
145+
latch.await();
146+
assertNotNull(reference.get());
147+
assertTrue(reference.get().isAcknowledged());
148+
149+
expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices(
150+
EnrichPolicy.getBaseName(name) + "-foo1",
151+
EnrichPolicy.getBaseName(name) + "-foo2").get());
152+
153+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
154+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
155+
156+
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
56157
}
57158

58159
public void testDeleteLocked() throws InterruptedException {
@@ -63,6 +164,9 @@ public void testDeleteLocked() throws InterruptedException {
63164
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
64165
assertThat(error.get(), nullValue());
65166

167+
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
168+
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
169+
66170
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
67171
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
68172

@@ -116,6 +220,8 @@ public void onFailure(final Exception e) {
116220
assertTrue(reference.get().isAcknowledged());
117221

118222
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
223+
224+
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
119225
}
120226
}
121227
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,12 @@
88

99
import org.elasticsearch.ResourceNotFoundException;
1010
import org.elasticsearch.action.ActionListener;
11-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1211
import org.elasticsearch.cluster.service.ClusterService;
1312
import org.elasticsearch.common.xcontent.XContentType;
1413
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
15-
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
1614
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
1715
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
16+
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
1817
import org.junit.After;
1918

2019
import java.util.concurrent.CountDownLatch;
@@ -30,6 +29,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
3029

3130
@After
3231
private void cleanupPolicies() throws InterruptedException {
32+
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
33+
3334
final CountDownLatch latch = new CountDownLatch(1);
3435
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
3536
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
@@ -52,26 +53,16 @@ public void onFailure(final Exception e) {
5253
GetEnrichPolicyAction.Response response = reference.get();
5354

5455
for (EnrichPolicy.NamedPolicy policy: response.getPolicies()) {
55-
final CountDownLatch loopLatch = new CountDownLatch(1);
56-
final AtomicReference<AcknowledgedResponse> loopReference = new AtomicReference<>();
57-
final TransportDeleteEnrichPolicyAction deleteAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
58-
deleteAction.execute(null,
59-
new DeleteEnrichPolicyAction.Request(policy.getName()),
60-
new ActionListener<AcknowledgedResponse>() {
61-
@Override
62-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
63-
loopReference.set(acknowledgedResponse);
64-
loopLatch.countDown();
65-
}
66-
67-
public void onFailure(final Exception e) {
68-
fail();
69-
}
70-
});
71-
loopLatch.await();
72-
assertNotNull(loopReference.get());
73-
assertTrue(loopReference.get().isAcknowledged());
56+
try {
57+
deleteEnrichPolicy(policy.getName(), clusterService);
58+
} catch (Exception e) {
59+
// if the enrich policy does not exist, then just keep going
60+
}
7461
}
62+
63+
// fail if the state of this is left locked
64+
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
65+
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
7566
}
7667

7768
public void testListPolicies() throws InterruptedException {
@@ -204,6 +195,6 @@ public void onFailure(final Exception e) {
204195
assertNotNull(reference.get());
205196
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
206197
assertThat(reference.get().getMessage(),
207-
equalTo("Policy [non-exists] was not found"));
198+
equalTo("Policy [non-exists] not found"));
208199
}
209200
}

0 commit comments

Comments
 (0)