Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.elasticsearch.xpack.search.AsyncSearch.INDEX;
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -84,7 +84,7 @@ private void testCase(String user, String other) throws Exception {
// other and user cannot access the result from direct get calls
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
for (String runAs : new String[] {user, other}) {
exc = expectThrows(ResponseException.class, () -> get(INDEX, searchId.getDocId(), runAs));
exc = expectThrows(ResponseException.class, () -> get(ASYNC_RESULTS_INDEX, searchId.getDocId(), runAs));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(exc.getMessage(), containsString("unauthorized"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;

Expand Down Expand Up @@ -371,7 +372,7 @@ public void testRemoveAsyncIndex() throws Exception {
assertThat(response.getExpirationTime(), greaterThan(now));

// remove the async search index
client().admin().indices().prepareDelete(AsyncSearch.INDEX).get();
client().admin().indices().prepareDelete(XPackPlugin.ASYNC_RESULTS_INDEX).get();

Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId()));
Throwable cause = exc instanceof ExecutionException ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;

Expand All @@ -45,7 +45,6 @@
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;

public final class AsyncSearch extends Plugin implements ActionPlugin {
public static final String INDEX = ".async-search";
private final Settings settings;

public AsyncSearch(Settings settings) {
Expand All @@ -56,8 +55,7 @@ public AsyncSearch(Settings settings) {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class),
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class),
new ActionHandler<>(DeleteAsyncSearchAction.INSTANCE, TransportDeleteAsyncSearchAction.class)
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class)
);
}

Expand Down Expand Up @@ -88,8 +86,8 @@ public Collection<Object> createComponents(Client client,
if (DiscoveryNode.isDataNode(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncTaskIndexService<AsyncSearchResponse> indexService =
new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN,
AsyncSearchResponse::new, namedWriteableRegistry);
new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService);
return Collections.singletonList(maintenanceService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;

Expand All @@ -30,7 +31,7 @@ public class AsyncSearchMaintenanceService extends AsyncTaskMaintenanceService {
Settings nodeSettings,
ThreadPool threadPool,
AsyncTaskIndexService<?> indexService) {
super(clusterService, AsyncSearch.INDEX, localNodeId, threadPool, indexService,
super(clusterService, XPackPlugin.ASYNC_RESULTS_INDEX, localNodeId, threadPool, indexService,
ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
Expand Down Expand Up @@ -132,6 +133,11 @@ public void setExpirationTime(long expirationTimeMillis) {
this.expirationTimeMillis = expirationTimeMillis;
}

@Override
public void cancelTask(TaskManager taskManager, Runnable runnable) {
cancelTask(runnable);
}

/**
* Cancels the running task and its children.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestHandler.Route;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;


import java.io.IOException;
Expand All @@ -34,7 +34,7 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
DeleteAsyncSearchAction.Request delete = new DeleteAsyncSearchAction.Request(request.param("id"));
return channel -> client.execute(DeleteAsyncSearchAction.INSTANCE, delete, new RestToXContentListener<>(channel));
DeleteAsyncResultRequest delete = new DeleteAsyncResultRequest(request.param("id"));
return channel -> client.execute(DeleteAsyncResultAction.INSTANCE, delete, new RestToXContentListener<>(channel));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncResultsService;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
Expand All @@ -39,9 +40,17 @@ public TransportGetAsyncSearchAction(TransportService transportService,
ThreadPool threadPool) {
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new);
this.transportService = transportService;
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService,
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool);
}

static AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> createResultsService(TransportService transportService,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
resultsService = new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
transportService.getTaskManager(), clusterService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
Expand Down Expand Up @@ -69,7 +70,7 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService,
this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction();
this.searchAction = searchAction;
this.threadContext = transportService.getThreadPool().getThreadContext();
this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadContext, client,
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadContext, client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
Expand All @@ -48,7 +49,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.xpack.search.AsyncSearch.INDEX;
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -125,7 +126,7 @@ protected void restartTaskNode(String id, String indexName) throws Exception {
stopMaintenanceService();
internalCluster().restartNode(node.getName(), new InternalTestCluster.RestartCallback() {});
startMaintenanceService();
ensureYellow(INDEX, indexName);
ensureYellow(ASYNC_RESULTS_INDEX, indexName);
}

protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request) throws ExecutionException, InterruptedException {
Expand All @@ -141,7 +142,7 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr
}

protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
return client().execute(DeleteAsyncSearchAction.INSTANCE, new DeleteAsyncSearchAction.Request(id)).get();
return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
}

/**
Expand All @@ -151,7 +152,7 @@ protected void ensureTaskRemoval(String id) throws Exception {
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
assertBusy(() -> {
GetResponse resp = client().prepareGet()
.setIndex(INDEX)
.setIndex(ASYNC_RESULTS_INDEX)
.setId(searchId.getDocId())
.get();
assertFalse(resp.isExists());
Expand Down
Loading