From fce6a02f8b2505737ecfdde77144c89b1b2b0645 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 27 May 2020 16:06:50 -0400 Subject: [PATCH 1/3] EQL: Adds delete async EQL search result action (#56852) Adds support for deleting async EQL search results to eql search API. Relates to #49638 --- .../search/RestDeleteAsyncSearchAction.java | 4 +- .../TransportDeleteAsyncSearchAction.java | 87 +++---------------- .../search/TransportGetAsyncSearchAction.java | 12 ++- .../search/AsyncSearchIntegTestCase.java | 3 +- .../search/DeleteAsyncSearchRequestTests.java | 12 +-- .../xpack/core/async/AsyncResultsService.java | 55 ++++++++++++ .../core/async/DeleteAsyncResultRequest.java | 56 ++++++++++++ .../action/DeleteAsyncSearchAction.java | 47 ---------- .../core/async/AsyncResultsServiceTests.java | 15 +++- .../rest-api-spec/test/eql/10_basic.yml | 9 ++ .../xpack/eql/AsyncEqlSecurityIT.java | 9 +- .../eql/action/AsyncEqlSearchActionIT.java | 53 ++++++++++- .../plugin/EqlAsyncDeleteResultAction.java | 24 +++++ .../xpack/eql/plugin/EqlPlugin.java | 4 +- .../RestEqlDeleteAsyncResultAction.java | 34 ++++++++ .../TransportEqlAsyncDeleteResultAction.java | 76 ++++++++++++++++ .../TransportEqlAsyncGetResultAction.java | 12 +-- .../AsyncTaskManagementServiceTests.java | 1 + .../rest-api-spec/api/eql.delete.json | 25 ++++++ 19 files changed, 387 insertions(+), 151 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultRequest.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/eql.delete.json diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java index faab51dc3af73..5250cb60e380d 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java @@ -7,9 +7,9 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.RestHandler.Route; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; @@ -34,7 +34,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - DeleteAsyncSearchAction.Request delete = new DeleteAsyncSearchAction.Request(request.param("id")); + DeleteAsyncResultRequest delete = new DeleteAsyncResultRequest(request.param("id")); return channel -> client.execute(DeleteAsyncSearchAction.INSTANCE, delete, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java index ae628edc28e6a..53ecf18273952 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java @@ -5,14 +5,8 @@ */ package org.elasticsearch.xpack.search; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -21,26 +15,20 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.AsyncResultsService; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; -import java.io.IOException; +import static org.elasticsearch.xpack.search.TransportGetAsyncSearchAction.createResultsService; -import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; - -public class TransportDeleteAsyncSearchAction extends HandledTransportAction { - private static final Logger logger = LogManager.getLogger(TransportDeleteAsyncSearchAction.class); - - private final ClusterService clusterService; +public class TransportDeleteAsyncSearchAction extends HandledTransportAction { + private final AsyncResultsService resultsService; private final TransportService transportService; - private final AsyncTaskIndexService store; @Inject public TransportDeleteAsyncSearchAction(TransportService transportService, @@ -49,67 +37,20 @@ public TransportDeleteAsyncSearchAction(TransportService transportService, ThreadPool threadPool, NamedWriteableRegistry registry, Client client) { - super(DeleteAsyncSearchAction.NAME, transportService, actionFilters, DeleteAsyncSearchAction.Request::new); - this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, - ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); - this.clusterService = clusterService; + super(DeleteAsyncSearchAction.NAME, transportService, actionFilters, DeleteAsyncResultRequest::new); this.transportService = transportService; + this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); } @Override - protected void doExecute(Task task, DeleteAsyncSearchAction.Request request, ActionListener listener) { - try { - AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); - DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); - if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) { - cancelTaskAndDeleteResult(searchId, listener); - } else { - TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); - transportService.sendRequest(node, DeleteAsyncSearchAction.NAME, request, builder.build(), - new ActionListenerResponseHandler<>(listener, AcknowledgedResponse::new, ThreadPool.Names.SAME)); - } - } catch (Exception exc) { - listener.onFailure(exc); - } - } - - void cancelTaskAndDeleteResult(AsyncExecutionId searchId, ActionListener listener) throws IOException { - AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class); - if (task != null) { - //the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways. - task.cancelTask(() -> store.deleteResponse(searchId, - ActionListener.wrap( - r -> listener.onResponse(new AcknowledgedResponse(true)), - exc -> { - RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); - //the index may not be there (no initial async search response stored yet?): we still want to return 200 - //note that index missing comes back as 200 hence it's handled in the onResponse callback - if (status == RestStatus.NOT_FOUND) { - listener.onResponse(new AcknowledgedResponse(true)); - } else { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); - listener.onFailure(exc); - } - }))); + protected void doExecute(Task task, DeleteAsyncResultRequest request, ActionListener listener) { + DiscoveryNode node = resultsService.getNode(request.getId()); + if (node == null || resultsService.isLocalNode(node)) { + resultsService.deleteResult(request, listener); } else { - // the task was not found (already cancelled, already completed, or invalid id?) - // we fail if the response is not found in the index - ActionListener deleteListener = ActionListener.wrap( - resp -> { - if (resp.status() == RestStatus.NOT_FOUND) { - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); - } else { - listener.onResponse(new AcknowledgedResponse(true)); - } - }, - exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); - listener.onFailure(exc); - } - ); - //we get before deleting to verify that the user is authorized - store.getResponse(searchId, false, - ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure)); + TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); + transportService.sendRequest(node, DeleteAsyncSearchAction.NAME, request, builder.build(), + new ActionListenerResponseHandler<>(listener, AcknowledgedResponse::new, ThreadPool.Names.SAME)); } } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index 2f8d3a5cd386e..d07f18c532f39 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -39,10 +39,18 @@ public TransportGetAsyncSearchAction(TransportService transportService, ThreadPool threadPool) { super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new); this.transportService = transportService; + this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); + } + + static AsyncResultsService createResultsService(TransportService transportService, + ClusterService clusterService, + NamedWriteableRegistry registry, + Client client, + ThreadPool threadPool) { AsyncTaskIndexService store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); - resultsService = new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener, - transportService.getTaskManager(), clusterService); + return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener, + (task, listener) -> task.cancelTask(() -> listener.onResponse(null)), transportService.getTaskManager(), clusterService); } @Override diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 1aa22778e2d2d..36470612f6baa 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; @@ -141,7 +142,7 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr } protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException { - return client().execute(DeleteAsyncSearchAction.INSTANCE, new DeleteAsyncSearchAction.Request(id)).get(); + return client().execute(DeleteAsyncSearchAction.INSTANCE, new DeleteAsyncResultRequest(id)).get(); } /** diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java index 7e1b220b51ca1..b92d300da45ac 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/DeleteAsyncSearchRequestTests.java @@ -7,18 +7,18 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId; -public class DeleteAsyncSearchRequestTests extends AbstractWireSerializingTestCase { +public class DeleteAsyncSearchRequestTests extends AbstractWireSerializingTestCase { @Override - protected Writeable.Reader instanceReader() { - return DeleteAsyncSearchAction.Request::new; + protected Writeable.Reader instanceReader() { + return DeleteAsyncResultRequest::new; } @Override - protected DeleteAsyncSearchAction.Request createTestInstance() { - return new DeleteAsyncSearchAction.Request(randomSearchId()); + protected DeleteAsyncResultRequest createTestInstance() { + return new DeleteAsyncResultRequest(randomSearchId()); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index 81ad3fad44983..7e05d4dd0ba71 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -12,6 +12,8 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriConsumer; @@ -20,6 +22,7 @@ import org.elasticsearch.tasks.TaskManager; import java.util.Objects; +import java.util.function.BiConsumer; /** * Service that is capable of retrieving and cleaning up AsyncTasks regardless of their state. It works with the TaskManager, if a task @@ -33,6 +36,7 @@ public class AsyncResultsService store; private final boolean updateInitialResultsInStore; private final TriConsumer, TimeValue> addCompletionListener; + private final BiConsumer> cancelTask; /** * Creates async results service @@ -48,11 +52,13 @@ public AsyncResultsService(AsyncTaskIndexService store, boolean updateInitialResultsInStore, Class asyncTaskClass, TriConsumer, TimeValue> addCompletionListener, + BiConsumer> cancelTask, TaskManager taskManager, ClusterService clusterService) { this.updateInitialResultsInStore = updateInitialResultsInStore; this.asyncTaskClass = asyncTaskClass; this.addCompletionListener = addCompletionListener; + this.cancelTask = cancelTask; this.taskManager = taskManager; this.clusterService = clusterService; this.store = store; @@ -103,6 +109,55 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener listener) { + try { + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); + Task task = store.getTask(taskManager, searchId, asyncTaskClass); + if (task != null) { + //the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways. + cancelTask.accept(task, ActionListener.wrap( + (ignore) -> store.deleteResponse(searchId, + ActionListener.wrap( + r -> listener.onResponse(new AcknowledgedResponse(true)), + exc -> { + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); + //the index may not be there (no initial async search response stored yet?): we still want to return 200 + //note that index missing comes back as 200 hence it's handled in the onResponse callback + if (status == RestStatus.NOT_FOUND) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + logger.error(() -> new ParameterizedMessage("failed to clean async result [{}]", + searchId.getEncoded()), exc); + listener.onFailure(exc); + } + })), + listener::onFailure) + ); + } else { + // the task was not found (already cancelled, already completed, or invalid id?) + // we fail if the response is not found in the index + ActionListener deleteListener = ActionListener.wrap( + resp -> { + if (resp.status() == RestStatus.NOT_FOUND) { + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + } else { + listener.onResponse(new AcknowledgedResponse(true)); + } + }, + exc -> { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } + ); + //we get before deleting to verify that the user is authorized + store.getResponse(searchId, false, + ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure)); + } + } catch (Exception exc) { + listener.onFailure(exc); + } + } + private void getSearchResponseFromTask(AsyncExecutionId searchId, GetAsyncResultRequest request, long nowInMillis, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultRequest.java new file mode 100644 index 0000000000000..63158e07c4f16 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultRequest.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteAsyncResultRequest extends ActionRequest { + private final String id; + + public DeleteAsyncResultRequest(String id) { + this.id = id; + } + + public DeleteAsyncResultRequest(StreamInput in) throws IOException { + super(in); + this.id = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getId() { + return id; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteAsyncResultRequest request = (DeleteAsyncResultRequest) o; + return id.equals(request.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/DeleteAsyncSearchAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/DeleteAsyncSearchAction.java index d69de80d2293e..4ac268c6f794d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/DeleteAsyncSearchAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/DeleteAsyncSearchAction.java @@ -5,17 +5,10 @@ */ package org.elasticsearch.xpack.core.search.action; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import java.io.IOException; -import java.util.Objects; - public class DeleteAsyncSearchAction extends ActionType { public static final DeleteAsyncSearchAction INSTANCE = new DeleteAsyncSearchAction(); public static final String NAME = "indices:data/read/async_search/delete"; @@ -29,44 +22,4 @@ public Writeable.Reader getResponseReader() { return AcknowledgedResponse::new; } - public static class Request extends ActionRequest { - private final String id; - - public Request(String id) { - this.id = id; - } - - public Request(StreamInput in) throws IOException { - super(in); - this.id = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(id); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - public String getId() { - return id; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Request request = (Request) o; - return id.equals(request.id); - } - - @Override - public int hashCode() { - return Objects.hash(id); - } - } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index 5599d81f2cac2..3d512954e5c35 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; @@ -123,8 +124,8 @@ public void setup() { } private AsyncResultsService createResultsService(boolean updateInitialResultsInStore) { - return new AsyncResultsService<>(indexService, updateInitialResultsInStore, TestTask.class, - TestTask::addListener, taskManager, clusterService); + return new AsyncResultsService<>(indexService, updateInitialResultsInStore, TestTask.class, TestTask::addListener, + (task, listener) ->taskManager.cancelTaskAndDescendants(task, "deletion", true, listener), taskManager, clusterService); } public void testRecordNotFound() { @@ -132,6 +133,9 @@ public void testRecordNotFound() { PlainActionFuture listener = new PlainActionFuture<>(); service.retrieveResult(new GetAsyncResultRequest(randomAsyncId().getEncoded()), listener); assertFutureThrows(listener, ResourceNotFoundException.class); + PlainActionFuture deleteListener = new PlainActionFuture<>(); + service.deleteResult(new DeleteAsyncResultRequest(randomAsyncId().getEncoded()), deleteListener); + assertFutureThrows(listener, ResourceNotFoundException.class); } public void testRetrieveFromMemoryWithExpiration() throws Exception { @@ -250,5 +254,12 @@ public void testRetrieveFromDisk() throws Exception { TestAsyncResponse response = listener.actionGet(TimeValue.timeValueSeconds(10)); assertThat(response.test, equalTo("final_response")); + PlainActionFuture deleteListener = new PlainActionFuture<>(); + service.deleteResult(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); + assertThat(deleteListener.actionGet().isAcknowledged(), equalTo(true)); + + deleteListener = new PlainActionFuture<>(); + service.deleteResult(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); + assertFutureThrows(deleteListener, ResourceNotFoundException.class); } } diff --git a/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml b/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml index fa94799009255..ef233b286a881 100644 --- a/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml +++ b/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml @@ -52,3 +52,12 @@ setup: - match: {hits.total.relation: "eq"} - match: {hits.events.0._source.user: "SYSTEM"} + - do: + eql.delete: + id: $id + - match: {acknowledged: true} + + - do: + catch: missing + eql.delete: + id: $id diff --git a/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java b/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java index 80fa5dca30dcc..0ac4f68148a36 100644 --- a/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java +++ b/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java @@ -79,8 +79,7 @@ private void testCase(String user, String other) throws Exception { // other cannot delete the result exc = expectThrows(ResponseException.class, () -> deleteAsyncEqlSearch(id, other)); - // TODO: This is not implemented yet, should return 404 when it is done - assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(405)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(404)); // other and user cannot access the result from direct get calls AsyncExecutionId searchId = AsyncExecutionId.decode(id); @@ -89,9 +88,9 @@ private void testCase(String user, String other) throws Exception { assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); assertThat(exc.getMessage(), containsString("unauthorized")); } - // TODO: Deletion is not implemented yet - // Response delResp = deleteAsyncEqlSearch(id, user); - // assertOK(delResp); + + Response delResp = deleteAsyncEqlSearch(id, user); + assertOK(delResp); } ResponseException exc = expectThrows(ResponseException.class, () -> submitAsyncEqlSearch("index-" + other, "*", TimeValue.timeValueSeconds(10), user)); diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java index 110f39260419e..226f9e86a89f3 100644 --- a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java @@ -6,11 +6,13 @@ package org.elasticsearch.xpack.eql.action; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; @@ -25,8 +27,10 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; +import org.elasticsearch.xpack.eql.plugin.EqlAsyncDeleteResultAction; import org.elasticsearch.xpack.eql.plugin.EqlAsyncGetResultAction; import org.elasticsearch.xpack.eql.plugin.EqlPlugin; import org.hamcrest.BaseMatcher; @@ -48,6 +52,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -136,6 +141,10 @@ public void testBasicAsyncExecution() throws Exception { Exception ex = expectThrows(Exception.class, future::actionGet); assertThat(ex.getCause().getMessage(), containsString("by zero")); } + + AcknowledgedResponse deleteResponse = + client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())).actionGet(); + assertThat(deleteResponse.isAcknowledged(), equalTo(true)); } public void testGoingAsync() throws Exception { @@ -191,6 +200,44 @@ public void testGoingAsync() throws Exception { } } + public void testAsyncCancellation() throws Exception { + prepareIndex(); + + boolean success = randomBoolean(); + String query = success ? "my_event where i=1" : "my_event where 10/i=1"; + EqlSearchRequest request = new EqlSearchRequest().indices("test").query(query).eventCategoryField("event_type") + .waitForCompletionTimeout(TimeValue.timeValueMillis(1)); + + boolean customKeepAlive = randomBoolean(); + final TimeValue keepAliveValue; + if (customKeepAlive) { + keepAliveValue = TimeValue.parseTimeValue(randomTimeValue(1, 5, "d"), "test"); + request.keepAlive(keepAliveValue); + } + + List plugins = initBlockFactory(true, false); + + String opaqueId = randomAlphaOfLength(10); + logger.trace("Starting async search"); + EqlSearchResponse response = client().filterWithHeader(Collections.singletonMap(Task.X_OPAQUE_ID, opaqueId)) + .execute(EqlSearchAction.INSTANCE, request).get(); + assertThat(response.isRunning(), is(true)); + assertThat(response.isPartial(), is(true)); + assertThat(response.id(), notNullValue()); + + logger.trace("Waiting for block to be established"); + awaitForBlockedSearches(plugins, "test"); + logger.trace("Block is established"); + + ActionFuture deleteResponse = + client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())); + disableBlocks(plugins); + assertThat(deleteResponse.actionGet().isAcknowledged(), equalTo(true)); + + deleteResponse = client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())); + assertFutureThrows(deleteResponse, ResourceNotFoundException.class); + } + public void testFinishingBeforeTimeout() throws Exception { prepareIndex(); @@ -215,11 +262,13 @@ public void testFinishingBeforeTimeout() throws Exception { assertThat(doc.getException(), nullValue()); assertThat(doc.getResponse(), notNullValue()); assertThat(doc.getResponse().hits().events().size(), equalTo(1)); - } - if (keepOnCompletion) { EqlSearchResponse storedResponse = client().execute(EqlAsyncGetResultAction.INSTANCE, new GetAsyncResultRequest(response.id())).actionGet(); assertThat(storedResponse, equalTo(response)); + + AcknowledgedResponse deleteResponse = + client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())).actionGet(); + assertThat(deleteResponse.isAcknowledged(), equalTo(true)); } } else { Exception ex = expectThrows(Exception.class, diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java new file mode 100644 index 0000000000000..8a136e7e19cdf --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; + +public class EqlAsyncDeleteResultAction extends ActionType { + public static final EqlAsyncDeleteResultAction INSTANCE = new EqlAsyncDeleteResultAction(); + + private EqlAsyncDeleteResultAction() { + super(EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME, AcknowledgedResponse::new); + } + + @Override + public Writeable.Reader getResponseReader() { + return AcknowledgedResponse::new; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index c4dea0d03d84e..30164c0d3bf8d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -114,6 +114,7 @@ public List> getSettings() { new ActionHandler<>(EqlSearchAction.INSTANCE, TransportEqlSearchAction.class), new ActionHandler<>(EqlStatsAction.INSTANCE, TransportEqlStatsAction.class), new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultAction.class), + new ActionHandler<>(EqlAsyncDeleteResultAction.INSTANCE, TransportEqlAsyncDeleteResultAction.class), new ActionHandler<>(XPackUsageFeatureAction.EQL, EqlUsageTransportAction.class), new ActionHandler<>(XPackInfoFeatureAction.EQL, EqlInfoTransportAction.class) ); @@ -146,7 +147,8 @@ public List getRestHandlers(Settings settings, return List.of( new RestEqlSearchAction(), new RestEqlStatsAction(), - new RestEqlGetAsyncResultAction() + new RestEqlGetAsyncResultAction(), + new RestEqlDeleteAsyncResultAction() ); } return List.of(); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java new file mode 100644 index 0000000000000..f6f68912c1838 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; + +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.DELETE; + +public class RestEqlDeleteAsyncResultAction extends BaseRestHandler { + @Override + public List routes() { + return List.of(new Route(DELETE, "/_eql/search/{id}")); + } + + @Override + public String getName() { + return "eql_delete_async_result"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + DeleteAsyncResultRequest delete = new DeleteAsyncResultRequest(request.param("id")); + return channel -> client.execute(EqlAsyncDeleteResultAction.INSTANCE, delete, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java new file mode 100644 index 0000000000000..bb1871403c9f5 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.async.AsyncResultsService; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; +import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; +import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; + +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; +import static org.elasticsearch.xpack.eql.async.AsyncTaskManagementService.addCompletionListener; + +public class TransportEqlAsyncDeleteResultAction extends HandledTransportAction { + private final AsyncResultsService> resultsService; + private final TransportService transportService; + + @Inject + public TransportEqlAsyncDeleteResultAction(TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + NamedWriteableRegistry registry, + Client client, + ThreadPool threadPool) { + super(EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME, transportService, actionFilters, DeleteAsyncResultRequest::new); + this.transportService = transportService; + this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); + } + + static AsyncResultsService> createResultsService( + TransportService transportService, + ClusterService clusterService, + NamedWriteableRegistry registry, + Client client, + ThreadPool threadPool) { + Writeable.Reader> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); + AsyncTaskIndexService> store = new AsyncTaskIndexService<>(EqlPlugin.INDEX, clusterService, + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); + return new AsyncResultsService<>(store, true, EqlSearchTask.class, + (task, listener, timeout) -> addCompletionListener(threadPool, task, listener, timeout), + (task, listener) -> transportService.getTaskManager().cancelTaskAndDescendants(task, "async result is deleted", true, listener), + transportService.getTaskManager(), clusterService); + } + + @Override + protected void doExecute(Task task, DeleteAsyncResultRequest request, ActionListener listener) { + DiscoveryNode node = resultsService.getNode(request.getId()); + if (node == null || resultsService.isLocalNode(node)) { + resultsService.deleteResult(request, listener); + } else { + TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); + transportService.sendRequest(node, EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME, request, builder.build(), + new ActionListenerResponseHandler<>(listener, AcknowledgedResponse::new, ThreadPool.Names.SAME)); + } + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java index 8d1c637270ad8..933937118c7f8 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java @@ -14,21 +14,18 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncResultsService; -import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; import org.elasticsearch.xpack.eql.action.EqlSearchResponse; import org.elasticsearch.xpack.eql.action.EqlSearchTask; import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; -import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; -import static org.elasticsearch.xpack.eql.async.AsyncTaskManagementService.addCompletionListener; +import static org.elasticsearch.xpack.eql.plugin.TransportEqlAsyncDeleteResultAction.createResultsService; public class TransportEqlAsyncGetResultAction extends HandledTransportAction { private final AsyncResultsService> resultsService; @@ -43,12 +40,7 @@ public TransportEqlAsyncGetResultAction(TransportService transportService, ThreadPool threadPool) { super(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME, transportService, actionFilters, GetAsyncResultRequest::new); this.transportService = transportService; - Writeable.Reader> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); - AsyncTaskIndexService> store = new AsyncTaskIndexService<>(EqlPlugin.INDEX, clusterService, - threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); - resultsService = new AsyncResultsService<>(store, true, EqlSearchTask.class, - (task, listener, timeout) -> addCompletionListener(threadPool, task, listener, timeout), - transportService.getTaskManager(), clusterService); + this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java index 83dfe78f91c9c..9eb7cb0e71026 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java @@ -133,6 +133,7 @@ public void setup() { in -> new StoredAsyncResponse<>(TestResponse::new, in), writableRegistry()); results = new AsyncResultsService<>(store, true, TestTask.class, (task, listener, timeout) -> addCompletionListener(transportService.getThreadPool(), task, listener, timeout), + (task, listener) -> transportService.getTaskManager().cancelTaskAndDescendants(task, "deletion", true, listener), transportService.getTaskManager(), clusterService); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.delete.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.delete.json new file mode 100644 index 0000000000000..47b3990adcb0a --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.delete.json @@ -0,0 +1,25 @@ +{ + "eql.delete":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/eql-search-api.html", + "description": "Deletes an async EQL search by ID. If the search is still running, the search request will be cancelled. Otherwise, the saved search results are deleted." + }, + "stability":"beta", + "url":{ + "paths":[ + { + "path":"/_eql/search/{id}", + "methods":[ + "DELETE" + ], + "parts":{ + "id":{ + "type":"string", + "description":"The async search ID" + } + } + } + ] + } + } +} From fd236217d23ef2f27fdc43135ca62db86a0999ab Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 29 May 2020 14:11:57 -0400 Subject: [PATCH 2/3] Move delete async result action to x-pack core --- .../xpack/search/AsyncSearchSecurityIT.java | 4 +- .../xpack/search/AsyncSearchActionIT.java | 3 +- .../xpack/search/AsyncSearch.java | 10 +-- .../search/AsyncSearchMaintenanceService.java | 3 +- .../xpack/search/AsyncSearchTask.java | 6 ++ .../search/RestDeleteAsyncSearchAction.java | 4 +- .../search/TransportGetAsyncSearchAction.java | 5 +- .../TransportSubmitAsyncSearchAction.java | 3 +- .../search/AsyncSearchIntegTestCase.java | 10 +-- .../xpack/core/XPackClientPlugin.java | 4 +- .../elasticsearch/xpack/core/XPackPlugin.java | 4 + .../xpack/core/async/AsyncResultsService.java | 55 ------------ .../xpack/core/async/AsyncTask.java | 7 ++ .../core/async/AsyncTaskIndexService.java | 43 ++++++--- .../DeleteAsyncResultAction.java} | 10 +-- .../core/async/DeleteAsyncResultsService.java | 88 +++++++++++++++++++ .../TransportDeleteAsyncResultAction.java} | 38 ++++---- .../xpack/core/eql/EqlAsyncActionNames.java | 1 - .../core/async/AsyncResultsServiceTests.java | 19 +++- .../xpack/eql/AsyncEqlSecurityIT.java | 4 +- .../eql/action/AsyncEqlSearchActionIT.java | 14 +-- .../xpack/eql/async/StoredAsyncTask.java | 5 ++ .../plugin/EqlAsyncDeleteResultAction.java | 24 ----- .../xpack/eql/plugin/EqlPlugin.java | 3 - .../RestEqlDeleteAsyncResultAction.java | 3 +- .../TransportEqlAsyncDeleteResultAction.java | 76 ---------------- .../TransportEqlAsyncGetResultAction.java | 20 ++++- .../eql/plugin/TransportEqlSearchAction.java | 5 +- .../AsyncTaskManagementServiceTests.java | 1 - .../xpack/security/authz/RBACEngine.java | 7 +- 30 files changed, 244 insertions(+), 235 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{search/action/DeleteAsyncSearchAction.java => async/DeleteAsyncResultAction.java} (65%) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultsService.java rename x-pack/plugin/{async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java => core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java} (57%) delete mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java delete mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java diff --git a/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java b/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java index 60102798f6fee..9db013e7efda1 100644 --- a/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java +++ b/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java @@ -29,7 +29,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; -import static org.elasticsearch.xpack.search.AsyncSearch.INDEX; +import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -84,7 +84,7 @@ private void testCase(String user, String other) throws Exception { // other and user cannot access the result from direct get calls AsyncExecutionId searchId = AsyncExecutionId.decode(id); for (String runAs : new String[] {user, other}) { - exc = expectThrows(ResponseException.class, () -> get(INDEX, searchId.getDocId(), runAs)); + exc = expectThrows(ResponseException.class, () -> get(ASYNC_RESULTS_INDEX, searchId.getDocId(), runAs)); assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); assertThat(exc.getMessage(), containsString("unauthorized")); } diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index d3a768daed238..05170876e599a 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; import org.elasticsearch.test.junit.annotations.TestIssueLogging; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; @@ -371,7 +372,7 @@ public void testRemoveAsyncIndex() throws Exception { assertThat(response.getExpirationTime(), greaterThan(now)); // remove the async search index - client().admin().indices().prepareDelete(AsyncSearch.INDEX).get(); + client().admin().indices().prepareDelete(XPackPlugin.ASYNC_RESULTS_INDEX).get(); Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId())); Throwable cause = exc instanceof ExecutionException ? diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java index eda9c84b9a71f..1c7dc44eb68bc 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java @@ -29,9 +29,9 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; -import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; @@ -45,7 +45,6 @@ import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; public final class AsyncSearch extends Plugin implements ActionPlugin { - public static final String INDEX = ".async-search"; private final Settings settings; public AsyncSearch(Settings settings) { @@ -56,8 +55,7 @@ public AsyncSearch(Settings settings) { public List> getActions() { return Arrays.asList( new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class), - new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class), - new ActionHandler<>(DeleteAsyncSearchAction.INSTANCE, TransportDeleteAsyncSearchAction.class) + new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class) ); } @@ -88,8 +86,8 @@ public Collection createComponents(Client client, if (DiscoveryNode.isDataNode(environment.settings())) { // only data nodes should be eligible to run the maintenance service. AsyncTaskIndexService indexService = - new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, - AsyncSearchResponse::new, namedWriteableRegistry); + new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client, + ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry); AsyncSearchMaintenanceService maintenanceService = new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService); return Collections.singletonList(maintenanceService); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java index 65be6b6ba1471..e23cacd9bbddd 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService; @@ -30,7 +31,7 @@ public class AsyncSearchMaintenanceService extends AsyncTaskMaintenanceService { Settings nodeSettings, ThreadPool threadPool, AsyncTaskIndexService indexService) { - super(clusterService, AsyncSearch.INDEX, localNodeId, threadPool, indexService, + super(clusterService, XPackPlugin.ASYNC_RESULTS_INDEX, localNodeId, threadPool, indexService, ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings)); } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 4c05273c720e0..fa21f212f3979 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.async.AsyncExecutionId; @@ -132,6 +133,11 @@ public void setExpirationTime(long expirationTimeMillis) { this.expirationTimeMillis = expirationTimeMillis; } + @Override + public void cancelTask(TaskManager taskManager, Runnable runnable) { + cancelTask(runnable); + } + /** * Cancels the running task and its children. */ diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java index 5250cb60e380d..96ac890345bc1 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestDeleteAsyncSearchAction.java @@ -10,7 +10,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; -import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import java.io.IOException; @@ -35,6 +35,6 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { DeleteAsyncResultRequest delete = new DeleteAsyncResultRequest(request.param("id")); - return channel -> client.execute(DeleteAsyncSearchAction.INSTANCE, delete, new RestToXContentListener<>(channel)); + return channel -> client.execute(DeleteAsyncResultAction.INSTANCE, delete, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index d07f18c532f39..14b01b5259f0f 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncResultsService; import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; @@ -47,10 +48,10 @@ static AsyncResultsService createResultsSe NamedWriteableRegistry registry, Client client, ThreadPool threadPool) { - AsyncTaskIndexService store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, + AsyncTaskIndexService store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener, - (task, listener) -> task.cancelTask(() -> listener.onResponse(null)), transportService.getTaskManager(), clusterService); + transportService.getTaskManager(), clusterService); } @Override diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 1b3238ea9d528..b1ece50be62ca 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; @@ -69,7 +70,7 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService, this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction(); this.searchAction = searchAction; this.threadContext = transportService.getThreadPool().getThreadContext(); - this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadContext, client, + this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadContext, client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 36470612f6baa..0a99c57c689ae 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -33,7 +33,7 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; -import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; @@ -49,7 +49,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.xpack.search.AsyncSearch.INDEX; +import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX; import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -126,7 +126,7 @@ protected void restartTaskNode(String id, String indexName) throws Exception { stopMaintenanceService(); internalCluster().restartNode(node.getName(), new InternalTestCluster.RestartCallback() {}); startMaintenanceService(); - ensureYellow(INDEX, indexName); + ensureYellow(ASYNC_RESULTS_INDEX, indexName); } protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request) throws ExecutionException, InterruptedException { @@ -142,7 +142,7 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr } protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException { - return client().execute(DeleteAsyncSearchAction.INSTANCE, new DeleteAsyncResultRequest(id)).get(); + return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get(); } /** @@ -152,7 +152,7 @@ protected void ensureTaskRemoval(String id) throws Exception { AsyncExecutionId searchId = AsyncExecutionId.decode(id); assertBusy(() -> { GetResponse resp = client().prepareGet() - .setIndex(INDEX) + .setIndex(ASYNC_RESULTS_INDEX) .setId(searchId.getDocId()) .get(); assertFalse(resp.isExists()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 1180018bcfdc2..13a96ec8e2f19 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -147,7 +147,7 @@ import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; -import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.security.SecurityFeatureSetUsage; @@ -402,7 +402,7 @@ public List> getClientActions() { // Async Search SubmitAsyncSearchAction.INSTANCE, GetAsyncSearchAction.INSTANCE, - DeleteAsyncSearchAction.INSTANCE + DeleteAsyncResultAction.INSTANCE ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index efeedaa2a0e6b..021ee6a35d9d8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -61,6 +61,8 @@ import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackUsageAction; import org.elasticsearch.xpack.core.action.XPackUsageResponse; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; +import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.rest.action.RestReloadAnalyzersAction; import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction; @@ -91,6 +93,7 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, private static final Logger logger = LogManager.getLogger(XPackPlugin.class); private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); + public static final String ASYNC_RESULTS_INDEX = ".async-search"; public static final String XPACK_INSTALLED_NODE_ATTR = "xpack.installed"; // TODO: clean up this library to not ask for write access to all system properties! @@ -255,6 +258,7 @@ public Collection createComponents(Client client, ClusterService cluster actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, getUsageAction())); actions.addAll(licensing.getActions()); actions.add(new ActionHandler<>(ReloadAnalyzerAction.INSTANCE, TransportReloadAnalyzersAction.class)); + actions.add(new ActionHandler<>(DeleteAsyncResultAction.INSTANCE, TransportDeleteAsyncResultAction.class)); return actions; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index 7e05d4dd0ba71..81ad3fad44983 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -12,8 +12,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriConsumer; @@ -22,7 +20,6 @@ import org.elasticsearch.tasks.TaskManager; import java.util.Objects; -import java.util.function.BiConsumer; /** * Service that is capable of retrieving and cleaning up AsyncTasks regardless of their state. It works with the TaskManager, if a task @@ -36,7 +33,6 @@ public class AsyncResultsService store; private final boolean updateInitialResultsInStore; private final TriConsumer, TimeValue> addCompletionListener; - private final BiConsumer> cancelTask; /** * Creates async results service @@ -52,13 +48,11 @@ public AsyncResultsService(AsyncTaskIndexService store, boolean updateInitialResultsInStore, Class asyncTaskClass, TriConsumer, TimeValue> addCompletionListener, - BiConsumer> cancelTask, TaskManager taskManager, ClusterService clusterService) { this.updateInitialResultsInStore = updateInitialResultsInStore; this.asyncTaskClass = asyncTaskClass; this.addCompletionListener = addCompletionListener; - this.cancelTask = cancelTask; this.taskManager = taskManager; this.clusterService = clusterService; this.store = store; @@ -109,55 +103,6 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener listener) { - try { - AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); - Task task = store.getTask(taskManager, searchId, asyncTaskClass); - if (task != null) { - //the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways. - cancelTask.accept(task, ActionListener.wrap( - (ignore) -> store.deleteResponse(searchId, - ActionListener.wrap( - r -> listener.onResponse(new AcknowledgedResponse(true)), - exc -> { - RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); - //the index may not be there (no initial async search response stored yet?): we still want to return 200 - //note that index missing comes back as 200 hence it's handled in the onResponse callback - if (status == RestStatus.NOT_FOUND) { - listener.onResponse(new AcknowledgedResponse(true)); - } else { - logger.error(() -> new ParameterizedMessage("failed to clean async result [{}]", - searchId.getEncoded()), exc); - listener.onFailure(exc); - } - })), - listener::onFailure) - ); - } else { - // the task was not found (already cancelled, already completed, or invalid id?) - // we fail if the response is not found in the index - ActionListener deleteListener = ActionListener.wrap( - resp -> { - if (resp.status() == RestStatus.NOT_FOUND) { - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); - } else { - listener.onResponse(new AcknowledgedResponse(true)); - } - }, - exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); - listener.onFailure(exc); - } - ); - //we get before deleting to verify that the user is authorized - store.getResponse(searchId, false, - ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure)); - } - } catch (Exception exc) { - listener.onFailure(exc); - } - } - private void getSearchResponseFromTask(AsyncExecutionId searchId, GetAsyncResultRequest request, long nowInMillis, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java index e2a8ce6b134e4..865277d9ceb76 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.core.async; +import org.elasticsearch.tasks.TaskManager; + import java.util.Map; /** @@ -31,4 +33,9 @@ public interface AsyncTask { * Update the expiration time of the (partial) response. */ void setExpirationTime(long expirationTimeMillis); + + /** + * Performs necessary checks, cancels the task and calls the runnable upon completion + */ + void cancelTask(TaskManager taskManager, Runnable runnable); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 607d967548c91..eddc5bd7d329c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; @@ -243,15 +244,9 @@ public T getTask(TaskManager taskManager, AsyncExecutionId return asyncTask; } - /** - * Gets the response from the index if present, or delegate a {@link ResourceNotFoundException} - * failure to the provided listener if not. - * When the provided restoreResponseHeaders is true, this method also restores the - * response headers of the original request in the current thread context. - */ - public void getResponse(AsyncExecutionId asyncExecutionId, - boolean restoreResponseHeaders, - ActionListener listener) { + private void getEncodedResponse(AsyncExecutionId asyncExecutionId, + boolean restoreResponseHeaders, + ActionListener> listener) { final Authentication current = securityContext.getAuthentication(); GetRequest internalGet = new GetRequest(index) .preference(asyncExecutionId.getEncoded()) @@ -280,7 +275,7 @@ public void getResponse(AsyncExecutionId asyncExecutionId, long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD); String encoded = (String) get.getSource().get(RESULT_FIELD); if (encoded != null) { - listener.onResponse(decodeResponse(encoded).withExpirationTime(expirationTime)); + listener.onResponse(new Tuple<>(encoded, expirationTime)); } else { listener.onResponse(null); } @@ -289,6 +284,34 @@ public void getResponse(AsyncExecutionId asyncExecutionId, )); } + /** + * Gets the response from the index if present, or delegate a {@link ResourceNotFoundException} + * failure to the provided listener if not. + * When the provided restoreResponseHeaders is true, this method also restores the + * response headers of the original request in the current thread context. + */ + public void getResponse(AsyncExecutionId asyncExecutionId, + boolean restoreResponseHeaders, + ActionListener listener) { + getEncodedResponse(asyncExecutionId, restoreResponseHeaders, ActionListener.wrap( + (t) -> listener.onResponse(decodeResponse(t.v1()).withExpirationTime(t.v2())), + listener::onFailure + )); + } + + /** + * Ensures that the current user can read the specified response without actually reading it + */ + public void authorizeResponse(AsyncExecutionId asyncExecutionId, + boolean restoreResponseHeaders, + ActionListener listener) { + getEncodedResponse(asyncExecutionId, restoreResponseHeaders, ActionListener.wrap( + (t) -> listener.onResponse(null), + listener::onFailure + )); + } + + /** * Extracts the authentication from the original headers and checks that it matches * the current user. This function returns always true if the provided diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/DeleteAsyncSearchAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultAction.java similarity index 65% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/DeleteAsyncSearchAction.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultAction.java index 4ac268c6f794d..f5ff159c42dfb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/DeleteAsyncSearchAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultAction.java @@ -3,17 +3,17 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.search.action; +package org.elasticsearch.xpack.core.async; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.Writeable; -public class DeleteAsyncSearchAction extends ActionType { - public static final DeleteAsyncSearchAction INSTANCE = new DeleteAsyncSearchAction(); - public static final String NAME = "indices:data/read/async_search/delete"; +public class DeleteAsyncResultAction extends ActionType { + public static final DeleteAsyncResultAction INSTANCE = new DeleteAsyncResultAction(); + public static final String NAME = "indices:data/read/async_result/delete"; - private DeleteAsyncSearchAction() { + private DeleteAsyncResultAction() { super(NAME, AcknowledgedResponse::new); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultsService.java new file mode 100644 index 0000000000000..f6ee0e59e408f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultsService.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskManager; + +/** + * Service that is capable of retrieving and cleaning up AsyncTasks regardless of their state. It works with the TaskManager, if a task + * is still running and AsyncTaskIndexService if task results already stored there. + */ +public class DeleteAsyncResultsService { + private final Logger logger = LogManager.getLogger(DeleteAsyncResultsService.class); + private final TaskManager taskManager; + private final AsyncTaskIndexService> store; + + /** + * Creates async results service + * + * @param store AsyncTaskIndexService for the response we are working with + * @param taskManager task manager + */ + public DeleteAsyncResultsService(AsyncTaskIndexService> store, + TaskManager taskManager) { + this.taskManager = taskManager; + this.store = store; + + } + + public void deleteResult(DeleteAsyncResultRequest request, ActionListener listener) { + try { + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); + AsyncTask task = store.getTask(taskManager, searchId, AsyncTask.class); + if (task != null) { + //the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways. + task.cancelTask(taskManager, () -> store.deleteResponse(searchId, + ActionListener.wrap( + r -> listener.onResponse(new AcknowledgedResponse(true)), + exc -> { + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); + //the index may not be there (no initial async search response stored yet?): we still want to return 200 + //note that index missing comes back as 200 hence it's handled in the onResponse callback + if (status == RestStatus.NOT_FOUND) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + logger.error(() -> new ParameterizedMessage("failed to clean async result [{}]", + searchId.getEncoded()), exc); + listener.onFailure(exc); + } + })) + ); + } else { + // the task was not found (already cancelled, already completed, or invalid id?) + // we fail if the response is not found in the index + ActionListener deleteListener = ActionListener.wrap( + resp -> { + if (resp.status() == RestStatus.NOT_FOUND) { + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + } else { + listener.onResponse(new AcknowledgedResponse(true)); + } + }, + exc -> { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } + ); + //we get before deleting to verify that the user is authorized + store.authorizeResponse(searchId, false, + ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure)); + } + } catch (Exception exc) { + listener.onFailure(exc); + } + } +} diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java similarity index 57% rename from x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java index 53ecf18273952..0448d210fdd94 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.search; +package org.elasticsearch.xpack.core.async; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -19,37 +19,41 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.async.AsyncResultsService; -import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; -import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; -import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; +import org.elasticsearch.xpack.core.XPackPlugin; -import static org.elasticsearch.xpack.search.TransportGetAsyncSearchAction.createResultsService; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; -public class TransportDeleteAsyncSearchAction extends HandledTransportAction { - private final AsyncResultsService resultsService; +public class TransportDeleteAsyncResultAction extends HandledTransportAction { + private final DeleteAsyncResultsService deleteResultsService; + private final ClusterService clusterService; private final TransportService transportService; @Inject - public TransportDeleteAsyncSearchAction(TransportService transportService, + public TransportDeleteAsyncResultAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService, - ThreadPool threadPool, NamedWriteableRegistry registry, - Client client) { - super(DeleteAsyncSearchAction.NAME, transportService, actionFilters, DeleteAsyncResultRequest::new); + Client client, + ThreadPool threadPool) { + super(DeleteAsyncResultAction.NAME, transportService, actionFilters, DeleteAsyncResultRequest::new); this.transportService = transportService; - this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); + this.clusterService = clusterService; + AsyncTaskIndexService store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, + (in) -> {throw new UnsupportedOperationException("Reading is not supported during deletion");}, registry); + this.deleteResultsService = new DeleteAsyncResultsService(store, transportService.getTaskManager()); } + @Override protected void doExecute(Task task, DeleteAsyncResultRequest request, ActionListener listener) { - DiscoveryNode node = resultsService.getNode(request.getId()); - if (node == null || resultsService.isLocalNode(node)) { - resultsService.deleteResult(request, listener); + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); + DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); + if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) { + deleteResultsService.deleteResult(request, listener); } else { TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); - transportService.sendRequest(node, DeleteAsyncSearchAction.NAME, request, builder.build(), + transportService.sendRequest(node, DeleteAsyncResultAction.NAME, request, builder.build(), new ActionListenerResponseHandler<>(listener, AcknowledgedResponse::new, ThreadPool.Names.SAME)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/eql/EqlAsyncActionNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/eql/EqlAsyncActionNames.java index c6ad58472052e..e84655052451c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/eql/EqlAsyncActionNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/eql/EqlAsyncActionNames.java @@ -11,5 +11,4 @@ */ public final class EqlAsyncActionNames { public static final String EQL_ASYNC_GET_RESULT_ACTION_NAME = "indices:data/read/eql/async/get"; - public static final String EQL_ASYNC_DELETE_RESULT_ACTION_NAME = "indices:data/read/eql/async/delete"; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index 3d512954e5c35..ec1a49c117d9a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -72,6 +72,11 @@ public void setExpirationTime(long expirationTimeMillis) { this.expirationTimeMillis = expirationTimeMillis; } + @Override + public void cancelTask(TaskManager taskManager, Runnable runnable) { + taskManager.cancelTaskAndDescendants(this, "test", true, ActionListener.wrap(runnable)); + } + public long getExpirationTime() { return this.expirationTimeMillis; } @@ -125,16 +130,21 @@ public void setup() { private AsyncResultsService createResultsService(boolean updateInitialResultsInStore) { return new AsyncResultsService<>(indexService, updateInitialResultsInStore, TestTask.class, TestTask::addListener, - (task, listener) ->taskManager.cancelTaskAndDescendants(task, "deletion", true, listener), taskManager, clusterService); + taskManager, clusterService); + } + + private DeleteAsyncResultsService createDeleteResultsService() { + return new DeleteAsyncResultsService(indexService, taskManager); } public void testRecordNotFound() { AsyncResultsService service = createResultsService(randomBoolean()); + DeleteAsyncResultsService deleteService = createDeleteResultsService(); PlainActionFuture listener = new PlainActionFuture<>(); service.retrieveResult(new GetAsyncResultRequest(randomAsyncId().getEncoded()), listener); assertFutureThrows(listener, ResourceNotFoundException.class); PlainActionFuture deleteListener = new PlainActionFuture<>(); - service.deleteResult(new DeleteAsyncResultRequest(randomAsyncId().getEncoded()), deleteListener); + deleteService.deleteResult(new DeleteAsyncResultRequest(randomAsyncId().getEncoded()), deleteListener); assertFutureThrows(listener, ResourceNotFoundException.class); } @@ -220,6 +230,7 @@ public void testAssertExpirationPropagation() throws Exception { public void testRetrieveFromDisk() throws Exception { boolean updateInitialResultsInStore = randomBoolean(); AsyncResultsService service = createResultsService(updateInitialResultsInStore); + DeleteAsyncResultsService deleteService = createDeleteResultsService(); TestRequest request = new TestRequest("test request"); TestTask task = (TestTask) taskManager.register("test", "test", request); try { @@ -255,11 +266,11 @@ public void testRetrieveFromDisk() throws Exception { assertThat(response.test, equalTo("final_response")); PlainActionFuture deleteListener = new PlainActionFuture<>(); - service.deleteResult(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); + deleteService.deleteResult(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); assertThat(deleteListener.actionGet().isAcknowledged(), equalTo(true)); deleteListener = new PlainActionFuture<>(); - service.deleteResult(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); + deleteService.deleteResult(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); assertFutureThrows(deleteListener, ResourceNotFoundException.class); } } diff --git a/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java b/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java index 0ac4f68148a36..40db5f99354b1 100644 --- a/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java +++ b/x-pack/plugin/eql/qa/security/src/test/java/org/elasticsearch/xpack/eql/AsyncEqlSecurityIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.junit.Before; @@ -29,7 +30,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; -import static org.elasticsearch.xpack.eql.plugin.EqlPlugin.INDEX; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -84,7 +84,7 @@ private void testCase(String user, String other) throws Exception { // other and user cannot access the result from direct get calls AsyncExecutionId searchId = AsyncExecutionId.decode(id); for (String runAs : new String[] {user, other}) { - exc = expectThrows(ResponseException.class, () -> get(INDEX, searchId.getDocId(), runAs)); + exc = expectThrows(ResponseException.class, () -> get(XPackPlugin.ASYNC_RESULTS_INDEX, searchId.getDocId(), runAs)); assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); assertThat(exc.getMessage(), containsString("unauthorized")); } diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java index 226f9e86a89f3..76448592d99f2 100644 --- a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java @@ -26,13 +26,13 @@ import org.elasticsearch.search.SearchModule; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; -import org.elasticsearch.xpack.eql.plugin.EqlAsyncDeleteResultAction; import org.elasticsearch.xpack.eql.plugin.EqlAsyncGetResultAction; -import org.elasticsearch.xpack.eql.plugin.EqlPlugin; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.junit.After; @@ -143,7 +143,7 @@ public void testBasicAsyncExecution() throws Exception { } AcknowledgedResponse deleteResponse = - client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())).actionGet(); + client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())).actionGet(); assertThat(deleteResponse.isAcknowledged(), equalTo(true)); } @@ -230,11 +230,11 @@ public void testAsyncCancellation() throws Exception { logger.trace("Block is established"); ActionFuture deleteResponse = - client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())); + client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())); disableBlocks(plugins); assertThat(deleteResponse.actionGet().isAcknowledged(), equalTo(true)); - deleteResponse = client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())); + deleteResponse = client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())); assertFutureThrows(deleteResponse, ResourceNotFoundException.class); } @@ -267,7 +267,7 @@ public void testFinishingBeforeTimeout() throws Exception { assertThat(storedResponse, equalTo(response)); AcknowledgedResponse deleteResponse = - client().execute(EqlAsyncDeleteResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())).actionGet(); + client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(response.id())).actionGet(); assertThat(deleteResponse.isAcknowledged(), equalTo(true)); } } else { @@ -279,7 +279,7 @@ public void testFinishingBeforeTimeout() throws Exception { public StoredAsyncResponse getStoredRecord(String id) throws Exception { try { - GetResponse doc = client().prepareGet(EqlPlugin.INDEX, AsyncExecutionId.decode(id).getDocId()).get(); + GetResponse doc = client().prepareGet(XPackPlugin.ASYNC_RESULTS_INDEX, AsyncExecutionId.decode(id).getDocId()).get(); if (doc.isExists()) { String value = doc.getSource().get("result").toString(); try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java index df01e3e9064b0..833fd141173b4 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/StoredAsyncTask.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncTask; @@ -94,4 +95,8 @@ protected synchronized void onFailure(Exception e) { */ protected abstract Response getCurrentResult(); + @Override + public void cancelTask(TaskManager taskManager, Runnable runnable) { + taskManager.cancelTaskAndDescendants(this, "task deleted", true, ActionListener.wrap(runnable)); + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java deleted file mode 100644 index 8a136e7e19cdf..0000000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncDeleteResultAction.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.eql.plugin; - -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; - -public class EqlAsyncDeleteResultAction extends ActionType { - public static final EqlAsyncDeleteResultAction INSTANCE = new EqlAsyncDeleteResultAction(); - - private EqlAsyncDeleteResultAction() { - super(EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME, AcknowledgedResponse::new); - } - - @Override - public Writeable.Reader getResponseReader() { - return AcknowledgedResponse::new; - } -} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index 30164c0d3bf8d..e7d89ed4fcd50 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -46,8 +46,6 @@ import java.util.function.Supplier; public class EqlPlugin extends Plugin implements ActionPlugin { - // We are going to reuse the same index as normal async search until system indices are implemented - public static final String INDEX = ".async-search"; private final boolean enabled; @@ -114,7 +112,6 @@ public List> getSettings() { new ActionHandler<>(EqlSearchAction.INSTANCE, TransportEqlSearchAction.class), new ActionHandler<>(EqlStatsAction.INSTANCE, TransportEqlStatsAction.class), new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultAction.class), - new ActionHandler<>(EqlAsyncDeleteResultAction.INSTANCE, TransportEqlAsyncDeleteResultAction.class), new ActionHandler<>(XPackUsageFeatureAction.EQL, EqlUsageTransportAction.class), new ActionHandler<>(XPackInfoFeatureAction.EQL, EqlInfoTransportAction.class) ); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java index f6f68912c1838..3fd654c8d1f8a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlDeleteAsyncResultAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import java.util.List; @@ -29,6 +30,6 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { DeleteAsyncResultRequest delete = new DeleteAsyncResultRequest(request.param("id")); - return channel -> client.execute(EqlAsyncDeleteResultAction.INSTANCE, delete, new RestToXContentListener<>(channel)); + return channel -> client.execute(DeleteAsyncResultAction.INSTANCE, delete, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java deleted file mode 100644 index bb1871403c9f5..0000000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncDeleteResultAction.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.eql.plugin; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.async.AsyncResultsService; -import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; -import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; -import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; -import org.elasticsearch.xpack.eql.action.EqlSearchResponse; -import org.elasticsearch.xpack.eql.action.EqlSearchTask; -import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; - -import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; -import static org.elasticsearch.xpack.eql.async.AsyncTaskManagementService.addCompletionListener; - -public class TransportEqlAsyncDeleteResultAction extends HandledTransportAction { - private final AsyncResultsService> resultsService; - private final TransportService transportService; - - @Inject - public TransportEqlAsyncDeleteResultAction(TransportService transportService, - ActionFilters actionFilters, - ClusterService clusterService, - NamedWriteableRegistry registry, - Client client, - ThreadPool threadPool) { - super(EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME, transportService, actionFilters, DeleteAsyncResultRequest::new); - this.transportService = transportService; - this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); - } - - static AsyncResultsService> createResultsService( - TransportService transportService, - ClusterService clusterService, - NamedWriteableRegistry registry, - Client client, - ThreadPool threadPool) { - Writeable.Reader> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); - AsyncTaskIndexService> store = new AsyncTaskIndexService<>(EqlPlugin.INDEX, clusterService, - threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); - return new AsyncResultsService<>(store, true, EqlSearchTask.class, - (task, listener, timeout) -> addCompletionListener(threadPool, task, listener, timeout), - (task, listener) -> transportService.getTaskManager().cancelTaskAndDescendants(task, "async result is deleted", true, listener), - transportService.getTaskManager(), clusterService); - } - - @Override - protected void doExecute(Task task, DeleteAsyncResultRequest request, ActionListener listener) { - DiscoveryNode node = resultsService.getNode(request.getId()); - if (node == null || resultsService.isLocalNode(node)) { - resultsService.deleteResult(request, listener); - } else { - TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); - transportService.sendRequest(node, EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME, request, builder.build(), - new ActionListenerResponseHandler<>(listener, AcknowledgedResponse::new, ThreadPool.Names.SAME)); - } - } -} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java index 933937118c7f8..f484d8e036fd0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java @@ -14,18 +14,22 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncResultsService; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; import org.elasticsearch.xpack.eql.action.EqlSearchResponse; import org.elasticsearch.xpack.eql.action.EqlSearchTask; +import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService; import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; -import static org.elasticsearch.xpack.eql.plugin.TransportEqlAsyncDeleteResultAction.createResultsService; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; public class TransportEqlAsyncGetResultAction extends HandledTransportAction { private final AsyncResultsService> resultsService; @@ -43,6 +47,20 @@ public TransportEqlAsyncGetResultAction(TransportService transportService, this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); } + static AsyncResultsService> createResultsService( + TransportService transportService, + ClusterService clusterService, + NamedWriteableRegistry registry, + Client client, + ThreadPool threadPool) { + Writeable.Reader> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); + AsyncTaskIndexService> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, + clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); + return new AsyncResultsService<>(store, true, EqlSearchTask.class, + (task, listener, timeout) -> AsyncTaskManagementService.addCompletionListener(threadPool, task, listener, timeout), + transportService.getTaskManager(), clusterService); + } + @Override protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener listener) { DiscoveryNode node = resultsService.getNode(request.getId()); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index ba3da64973446..7d36a92fc228b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService; @@ -62,8 +63,8 @@ public TransportEqlSearchAction(Settings settings, ClusterService clusterService this.planExecutor = planExecutor; this.threadPool = threadPool; - this.asyncTaskManagementService = new AsyncTaskManagementService<>(EqlPlugin.INDEX, client, ASYNC_SEARCH_ORIGIN, registry, - taskManager, EqlSearchAction.INSTANCE.name(), this, EqlSearchTask.class, clusterService, threadPool); + this.asyncTaskManagementService = new AsyncTaskManagementService<>(XPackPlugin.ASYNC_RESULTS_INDEX, client, ASYNC_SEARCH_ORIGIN, + registry, taskManager, EqlSearchAction.INSTANCE.name(), this, EqlSearchTask.class, clusterService, threadPool); } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java index 9eb7cb0e71026..83dfe78f91c9c 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java @@ -133,7 +133,6 @@ public void setup() { in -> new StoredAsyncResponse<>(TestResponse::new, in), writableRegistry()); results = new AsyncResultsService<>(store, true, TestTask.class, (task, listener, timeout) -> addCompletionListener(transportService.getThreadPool(), task, listener, timeout), - (task, listener) -> transportService.getTaskManager().cancelTaskAndDescendants(task, "deletion", true, listener), transportService.getTaskManager(), clusterService); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 745345aeee3c8..8098235c0ff33 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -33,7 +33,7 @@ import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.eql.EqlAsyncActionNames; -import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.security.action.GetApiKeyAction; @@ -591,8 +591,7 @@ private static boolean isScrollRelatedAction(String action) { private static boolean isAsyncRelatedAction(String action) { return action.equals(SubmitAsyncSearchAction.NAME) || action.equals(GetAsyncSearchAction.NAME) || - action.equals(DeleteAsyncSearchAction.NAME) || - action.equals(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME) || - action.equals(EqlAsyncActionNames.EQL_ASYNC_DELETE_RESULT_ACTION_NAME); + action.equals(DeleteAsyncResultAction.NAME) || + action.equals(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME); } } From 527a9a1f4dce57d83513c9a15758d6a28b70afb4 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 29 May 2020 15:57:09 -0400 Subject: [PATCH 3/3] Shouldn't change transport action to preserve BWC --- .../elasticsearch/xpack/core/async/DeleteAsyncResultAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultAction.java index f5ff159c42dfb..86986630f031f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/DeleteAsyncResultAction.java @@ -11,7 +11,7 @@ public class DeleteAsyncResultAction extends ActionType { public static final DeleteAsyncResultAction INSTANCE = new DeleteAsyncResultAction(); - public static final String NAME = "indices:data/read/async_result/delete"; + public static final String NAME = "indices:data/read/async_search/delete"; private DeleteAsyncResultAction() { super(NAME, AcknowledgedResponse::new);