Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncResultRequest, AsyncSearchResponse> {
private final AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> resultsService;
private final AsyncResultsService<AsyncSearchTask, AsyncSearchResponse, AsyncStatusResponse> resultsService;
private final TransportService transportService;

@Inject
Expand All @@ -43,12 +44,11 @@ public TransportGetAsyncSearchAction(TransportService transportService,
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool);
}

static AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> createResultsService(TransportService transportService,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
static AsyncResultsService<AsyncSearchTask, AsyncSearchResponse, AsyncStatusResponse> createResultsService(
TransportService transportService, ClusterService clusterService, NamedWriteableRegistry registry,
Client client, ThreadPool threadPool) {
AsyncTaskIndexService<AsyncSearchResponse, AsyncStatusResponse> store = new AsyncTaskIndexService<>(
XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
transportService.getTaskManager(), clusterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsyncStatusRequest, AsyncStatusResponse> {
private final TransportService transportService;
private final ClusterService clusterService;
private final AsyncTaskIndexService<AsyncSearchResponse> store;
private final AsyncTaskIndexService<AsyncSearchResponse, AsyncStatusResponse> store;

@Inject
public TransportGetAsyncStatusAction(TransportService transportService,
Expand All @@ -60,7 +60,8 @@ protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListene
taskManager,
AsyncSearchTask.class,
AsyncSearchTask::getStatusResponse,
AsyncStatusResponse::getStatusFromStoredSearch,
AsyncStatusResponse::getStatusFromSearchResponse,
AsyncStatusResponse::new,
listener
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;

Expand All @@ -54,7 +55,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
private final Function<SearchRequest, InternalAggregation.ReduceContext> requestToAggReduceContextBuilder;
private final TransportSearchAction searchAction;
private final ThreadContext threadContext;
private final AsyncTaskIndexService<AsyncSearchResponse> store;
private final AsyncTaskIndexService<AsyncSearchResponse, AsyncStatusResponse> store;

@Inject
public TransportSubmitAsyncSearchAction(ClusterService clusterService,
Expand Down Expand Up @@ -87,11 +88,11 @@ public void onResponse(AsyncSearchResponse searchResponse) {
// the task is still running and the user cannot wait more so we create
// a document for further retrieval
try {
final String docId = searchTask.getExecutionId().getDocId();
// creates the fallback response if the node crashes/restarts in the middle of the request
// TODO: store intermediate results ?
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
store.createResponse(docId, searchTask.getOriginHeaders(), initialResp,
store.createResponse(searchTask.getExecutionId(), searchTask.getOriginHeaders(), initialResp,
AsyncStatusResponse::getStatusFromSearchResponse,
new ActionListener<>() {
@Override
public void onResponse(IndexResponse r) {
Expand Down Expand Up @@ -142,7 +143,10 @@ private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, Task
SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) {
@Override
public AsyncSearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> taskHeaders) {
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
AsyncExecutionId searchId = new AsyncExecutionId(
docID,
new TaskId(nodeClient.getLocalNodeId(), id),
AsyncTaskIndexService.getIndexVersion());
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
return new AsyncSearchTask(id, type, action, parentTaskId, this::buildDescription, keepAlive,
Expand Down Expand Up @@ -175,7 +179,8 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
private void onFinalResponse(AsyncSearchTask searchTask,
AsyncSearchResponse response,
Runnable nextAction) {
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
store.updateResponse(searchTask.getExecutionId(), threadContext.getResponseHeaders(), response,
AsyncStatusResponse::getStatusFromSearchResponse,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
Expand Down Expand Up @@ -73,23 +74,24 @@ public void afterTest() {

private AsyncSearchTask createAsyncSearchTask() {
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1), Version.CURRENT),
new NoOpClient(threadPool), threadPool, null);
}

public void testTaskDescription() {
SearchRequest searchRequest = new SearchRequest("index1", "index2").source(
new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value")));
AsyncSearchTask asyncSearchTask = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), searchRequest::buildDescription,
TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(),
new AsyncExecutionId("0", new TaskId("node1", 1), Version.CURRENT),
new NoOpClient(threadPool), threadPool, null);
assertEquals("async_search{indices[index1,index2], search_type[QUERY_THEN_FETCH], " +
"source[{\"query\":{\"term\":{\"field\":{\"value\":\"value\",\"boost\":1.0}}}}]}", asyncSearchTask.getDescription());
}

public void testWaitForInit() throws InterruptedException {
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1), Version.CURRENT),
new NoOpClient(threadPool), threadPool, null);
int numShards = randomIntBetween(0, 10);
List<SearchShard> shards = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
import org.junit.Before;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -41,7 +40,7 @@ public void setUpAction() {
* no parameters are specified on the rest request itself.
*/
@SuppressWarnings("unchecked")
public void testRequestParameterDefaults() throws IOException {
public void testRequestParameterDefaults() {
SetOnce<Boolean> executeCalled = new SetOnce<>();
verifyingClient.setExecuteLocallyVerifier((actionType, request) -> {
assertThat(request, instanceOf(SubmitAsyncSearchRequest.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -75,7 +76,7 @@ public Collection<Object> createComponents(
List<Object> components = new ArrayList<>();
if (DiscoveryNode.canContainData(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncTaskIndexService<AsyncSearchResponse> indexService = new AsyncTaskIndexService<>(
AsyncTaskIndexService<AsyncSearchResponse, AsyncStatusResponse> indexService = new AsyncTaskIndexService<>(
XPackPlugin.ASYNC_RESULTS_INDEX,
clusterService,
threadPool.getThreadContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,40 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.Version;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Objects;

import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.VERSION_SEPARATE_STATUS_FIELD;


/**
* A class that contains all information related to a submitted async execution.
*/
public final class AsyncExecutionId {
private final String docId;
private final TaskId taskId;
private final Version version;
private final String encoded;

public AsyncExecutionId(String docId, TaskId taskId) {
this(docId, taskId, encode(docId, taskId));
public AsyncExecutionId(String docId, TaskId taskId, Version version) {
this(docId,
taskId,
// we store versions for async searches created starting from VERSION_SEPARATE_STATUS_FIELD,
// before that we assume "version" field to be empty
version.onOrAfter(VERSION_SEPARATE_STATUS_FIELD) ? version : Version.V_EMPTY,
encode(docId, taskId, version)
);
}

private AsyncExecutionId(String docId, TaskId taskId, String encoded) {
private AsyncExecutionId(String docId, TaskId taskId, Version version, String encoded) {
this.docId = docId;
this.taskId = taskId;
this.version = version;
this.encoded = encoded;
}

Expand All @@ -49,43 +62,60 @@ public TaskId getTaskId() {
return taskId;
}

/**
* The {@link Version} of the .async-search index at the moment
* when this async search was created
*/
public Version getVersion() {
return version;
}

/**
* Gets the encoded string that represents this execution.
*/
public String getEncoded() {
return encoded;
}

public boolean versionOnOrAfterSeparateStatusField() {
return version.onOrAfter(VERSION_SEPARATE_STATUS_FIELD);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AsyncExecutionId searchId = (AsyncExecutionId) o;
return docId.equals(searchId.docId) &&
taskId.equals(searchId.taskId);
taskId.equals(searchId.taskId) &&
version.equals(searchId.version);
}

@Override
public int hashCode() {
return Objects.hash(docId, taskId);
return Objects.hash(docId, taskId, version);
}

@Override
public String toString() {
return "AsyncExecutionId{" +
"docId='" + docId + '\'' +
", taskId=" + taskId +
", version=" + version.toString() +
'}';
}

/**
* Encodes the informations needed to retrieve a async response
* in a base64 encoded string.
*/
public static String encode(String docId, TaskId taskId) {
public static String encode(String docId, TaskId taskId, Version version) {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeString(docId);
out.writeString(taskId.toString());
if (version.onOrAfter(VERSION_SEPARATE_STATUS_FIELD)) {
Version.writeVersion(version, out);
}
return Base64.getUrlEncoder().encodeToString(BytesReference.toBytes(out.bytes()));
} catch (IOException e) {
throw new IllegalArgumentException(e);
Expand All @@ -105,15 +135,19 @@ public static AsyncExecutionId decode(String id) {
}
String docId;
String taskId;
Version version = Version.V_EMPTY;
try (StreamInput in = new ByteBufferStreamInput(byteBuffer)) {
docId = in.readString();
taskId = in.readString();
if (in.available() > 0) {
throw new IllegalArgumentException("invalid id: [" + id + "]");
version = Version.readVersion(in);
if (version.before(VERSION_SEPARATE_STATUS_FIELD) || in.available() > 0) {
throw new IllegalArgumentException("invalid id: [" + id + "]");
}
}
} catch (IOException e) {
throw new IllegalArgumentException("invalid id: [" + id + "]", e);
}
return new AsyncExecutionId(docId, new TaskId(taskId), id);
return new AsyncExecutionId(docId, new TaskId(taskId), version, id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.search.action.SearchStatusResponse;

import java.util.Objects;

/**
* Service that is capable of retrieving and cleaning up AsyncTasks regardless of their state. It works with the TaskManager, if a task
* is still running and AsyncTaskIndexService if task results already stored there.
*/
public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncResponse<Response>> {
public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncResponse<Response>,
StatusResponse extends SearchStatusResponse> {
private final Logger logger = LogManager.getLogger(AsyncResultsService.class);
private final Class<? extends Task> asyncTaskClass;
private final TaskManager taskManager;
private final ClusterService clusterService;
private final AsyncTaskIndexService<Response> store;
private final AsyncTaskIndexService<Response, StatusResponse> store;
private final boolean updateInitialResultsInStore;
private final TriConsumer<Task, ActionListener<Response>, TimeValue> addCompletionListener;

Expand All @@ -45,7 +47,7 @@ public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncR
* @param taskManager task manager
* @param clusterService cluster service
*/
public AsyncResultsService(AsyncTaskIndexService<Response> store,
public AsyncResultsService(AsyncTaskIndexService<Response, StatusResponse> store,
boolean updateInitialResultsInStore,
Class<? extends Task> asyncTaskClass,
TriConsumer<Task, ActionListener<Response>, TimeValue> addCompletionListener,
Expand Down
Loading