Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
package org.elasticsearch.xpack.enrich.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -37,17 +41,24 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction

private final EnrichPolicyLocks enrichPolicyLocks;
private final IngestService ingestService;
private final Client client;
// the most lenient we can get in order to not bomb out if no indices are found, which is a valid case
// where a user creates and deletes a policy before running execute
private static final IndicesOptions LENIENT_OPTIONS = IndicesOptions.fromOptions(true, true, true, true);


@Inject
public TransportDeleteEnrichPolicyAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client,
EnrichPolicyLocks enrichPolicyLocks,
IngestService ingestService) {
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
this.client = client;
this.enrichPolicyLocks = enrichPolicyLocks;
this.ingestService = ingestService;
}
Expand All @@ -69,36 +80,74 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, DeleteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
if (policy == null) {
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
}

enrichPolicyLocks.lockPolicy(request.getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this line isn't from this PR but I'm wondering if it makes sense if we can get this to unlock from within a catch or finally block? I'm mostly paranoid about some benign exception being thrown anywhere that isn't expected and the policy becoming locked forever. Might require some restructuring here though and maybe not appropriate for this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be quite simple to do in this PR. I already had to add release logic to this PR, so i can just move it and remove a bit of code in the process.

List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
List<String> pipelinesWithProcessors = new ArrayList<>();

for (PipelineConfiguration pipelineConfiguration : pipelines) {
List<AbstractEnrichProcessor> enrichProcessors =
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
for (AbstractEnrichProcessor processor: enrichProcessors) {
if (processor.getPolicyName().equals(request.getName())) {
pipelinesWithProcessors.add(pipelineConfiguration.getId());
try {
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
List<String> pipelinesWithProcessors = new ArrayList<>();

for (PipelineConfiguration pipelineConfiguration : pipelines) {
List<AbstractEnrichProcessor> enrichProcessors =
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
for (AbstractEnrichProcessor processor : enrichProcessors) {
if (processor.getPolicyName().equals(request.getName())) {
pipelinesWithProcessors.add(pipelineConfiguration.getId());
}
}
}
}

if (pipelinesWithProcessors.isEmpty() == false) {
if (pipelinesWithProcessors.isEmpty() == false) {
throw new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors);
}
} catch (Exception e) {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onFailure(
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
listener.onFailure(e);
return;
}

EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
enrichPolicyLocks.releasePolicy(request.getName());
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
listener.onFailure(e);
}
deleteIndicesAndPolicy(request.getName(), ActionListener.wrap(
(response) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onResponse(response);
},
(exc) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onFailure(exc);
}
));
}

private void deleteIndicesAndPolicy(String name, ActionListener<AcknowledgedResponse> listener) {
// delete all enrich indices for this policy
DeleteIndexRequest deleteRequest = new DeleteIndexRequest()
.indices(EnrichPolicy.getBaseName(name) + "-*")
.indicesOptions(LENIENT_OPTIONS);

client.admin().indices().delete(deleteRequest, ActionListener.wrap(
(response) -> {
if (response.isAcknowledged() == false) {
listener.onFailure(new ElasticsearchStatusException("Could not fetch indices to delete during policy delete of [{}]",
RestStatus.INTERNAL_SERVER_ERROR, name));
} else {
deletePolicy(name, listener);
}
},
(error) -> listener.onFailure(error)
));
}

private void deletePolicy(String name, ActionListener<AcknowledgedResponse> listener) {
EnrichStore.deletePolicy(name, clusterService, e -> {
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
listener.onFailure(e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected void masterOperation(Task task, GetEnrichPolicyAction.Request request,
} else {
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
if (policy == null) {
throw new ResourceNotFoundException("Policy [{}] was not found", request.getName());
throw new ResourceNotFoundException("Policy [{}] not found", request.getName());
}
policies = Map.of(request.getName(), policy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy
return error;
}

void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
protected void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.deletePolicy(name, clusterService, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@

package org.elasticsearch.xpack.enrich.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import org.elasticsearch.xpack.enrich.EnrichStore;
import org.junit.After;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -25,9 +29,56 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;

public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCase {
public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCase {

@After
private void cleanupPolicy() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";

try {
deleteEnrichPolicy(name, clusterService);
} catch (Exception e) {
// if the enrich policy does not exist, then just keep going
}

public void testDeleteIsNotLocked() throws InterruptedException {
// fail if the state of this is left locked
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
}

public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
String fakeId = "fake-id";
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
ActionTestUtils.execute(transportAction, null,
new DeleteEnrichPolicyAction.Request(fakeId),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail();
}

public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
latch.await();
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
assertThat(reference.get().getMessage(), equalTo("policy [fake-id] not found"));

// fail if the state of this is left locked
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
}

public void testDeleteWithoutIndex() throws Exception {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";
Expand All @@ -54,6 +105,56 @@ public void onFailure(final Exception e) {
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());

EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());

assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}

public void testDeleteIsNotLocked() throws Exception {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";

AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());

createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");

client().admin().indices().prepareGetIndex().setIndices(
EnrichPolicy.getBaseName(name) + "-foo1",
EnrichPolicy.getBaseName(name) + "-foo2").get();

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
ActionTestUtils.execute(transportAction, null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}

public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());

expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices(
EnrichPolicy.getBaseName(name) + "-foo1",
EnrichPolicy.getBaseName(name) + "-foo2").get());

EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());

assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}

public void testDeleteLocked() throws InterruptedException {
Expand All @@ -64,6 +165,9 @@ public void testDeleteLocked() throws InterruptedException {
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());

createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");

EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());

Expand Down Expand Up @@ -117,6 +221,8 @@ public void onFailure(final Exception e) {
assertTrue(reference.get().isAcknowledged());

assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());

assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import org.junit.After;

import java.util.concurrent.CountDownLatch;
Expand All @@ -31,6 +30,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase

@After
private void cleanupPolicies() throws InterruptedException {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
Expand All @@ -53,26 +54,16 @@ public void onFailure(final Exception e) {
GetEnrichPolicyAction.Response response = reference.get();

for (EnrichPolicy.NamedPolicy policy: response.getPolicies()) {
final CountDownLatch loopLatch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> loopReference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction deleteAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
ActionTestUtils.execute(deleteAction, null,
new DeleteEnrichPolicyAction.Request(policy.getName()),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
loopReference.set(acknowledgedResponse);
loopLatch.countDown();
}

public void onFailure(final Exception e) {
fail();
}
});
loopLatch.await();
assertNotNull(loopReference.get());
assertTrue(loopReference.get().isAcknowledged());
try {
deleteEnrichPolicy(policy.getName(), clusterService);
} catch (Exception e) {
// if the enrich policy does not exist, then just keep going
}
}

// fail if the state of this is left locked
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
}

public void testListPolicies() throws InterruptedException {
Expand Down Expand Up @@ -205,6 +196,6 @@ public void onFailure(final Exception e) {
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
assertThat(reference.get().getMessage(),
equalTo("Policy [non-exists] was not found"));
equalTo("Policy [non-exists] not found"));
}
}