Skip to content

Commit 074f7d2

Browse files
Async search status (#62947)
Introduce async search status API GET /_async_search/status/<id> The API is restricted to the monitoring_user role. For a running async search, the response is: ```js { "id" : <id>, "is_running" : true, "is_partial" : true, "start_time_in_millis" : 1583945890986, "expiration_time_in_millis" : 1584377890986, "_shards" : { "total" : 562, "successful" : 188, "skipped" : 0, "failed" : 0 } } ``` For a completed async search, an additional `completion_status` fields is added. ```js { "id" : <id>, "is_running" : false, "is_partial" : false, "start_time_in_millis" : 1583945890986, "expiration_time_in_millis" : 1584377890986, "_shards" : { "total" : 562, "successful" : 562, "skipped" : 0, "failed" : 0 }, "completion_status" : 200 } ``` Closes #57537
1 parent 4dc833f commit 074f7d2

File tree

15 files changed

+741
-5
lines changed

15 files changed

+741
-5
lines changed

docs/reference/search/async-search.asciidoc

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ set to `false`.
138138
==== Get async search
139139

140140
The get async search API retrieves the results of a previously submitted
141-
async search request given its id. If the {es} {security-features} are enabled.
141+
async search request given its id. If the {es} {security-features} are enabled,
142142
the access to the results of a specific async search is restricted to the user
143143
that submitted it in the first place.
144144

@@ -161,8 +161,8 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
161161
"timed_out" : false,
162162
"num_reduce_phases" : 46, <4>
163163
"_shards" : {
164-
"total" : 562, <5>
165-
"successful" : 188,
164+
"total" : 562,
165+
"successful" : 188, <5>
166166
"skipped" : 0,
167167
"failed" : 0
168168
},
@@ -222,6 +222,87 @@ override such value and extend the validity of the request. When this period
222222
expires, the search, if still running, is cancelled. If the search is
223223
completed, its saved results are deleted.
224224

225+
226+
[[get-async-search-status]]
227+
==== Get async search status
228+
The get async search status API, without retrieving search results, shows
229+
only the status of a previously submitted async search request given its `id`.
230+
If the {es} {security-features} are enabled, the access to the get async
231+
search status API is restricted to the
232+
<<built-in-roles, monitoring_user role>>.
233+
234+
[source,console,id=get-async-search-status-example]
235+
--------------------------------------------------
236+
GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=
237+
--------------------------------------------------
238+
// TEST[continued s/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=/\${body.id}/]
239+
240+
[source,console-result]
241+
--------------------------------------------------
242+
{
243+
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
244+
"is_running" : true,
245+
"is_partial" : true,
246+
"start_time_in_millis" : 1583945890986,
247+
"expiration_time_in_millis" : 1584377890986,
248+
"_shards" : {
249+
"total" : 562,
250+
"successful" : 188, <1>
251+
"skipped" : 0,
252+
"failed" : 0
253+
}
254+
}
255+
--------------------------------------------------
256+
// TEST[skip: a sample output of a status of a running async search]
257+
258+
<1> Indicates how many shards have executed the query so far.
259+
260+
For an async search that has been completed, the status response has
261+
an additional `completion_status` field that shows the status
262+
code of the completed async search.
263+
[source,console-result]
264+
--------------------------------------------------
265+
{
266+
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
267+
"is_running" : false,
268+
"is_partial" : false,
269+
"start_time_in_millis" : 1583945890986,
270+
"expiration_time_in_millis" : 1584377890986,
271+
"_shards" : {
272+
"total" : 562,
273+
"successful" : 562,
274+
"skipped" : 0,
275+
"failed" : 0
276+
},
277+
"completion_status" : 200 <1>
278+
}
279+
--------------------------------------------------
280+
// TEST[skip: a sample output of a status of a completed async search]
281+
282+
<1> Indicates that the async search was successfully completed
283+
284+
285+
[source,console-result]
286+
--------------------------------------------------
287+
{
288+
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
289+
"is_running" : false,
290+
"is_partial" : true,
291+
"start_time_in_millis" : 1583945890986,
292+
"expiration_time_in_millis" : 1584377890986,
293+
"_shards" : {
294+
"total" : 562,
295+
"successful" : 450,
296+
"skipped" : 0,
297+
"failed" : 112
298+
},
299+
"completion_status" : 503 <1>
300+
}
301+
--------------------------------------------------
302+
// TEST[skip: a sample output of a status of a completed async search]
303+
304+
<1> Indicates that the async search was completed with an error
305+
225306
[[delete-async-search]]
226307
==== Delete async search
227308

server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,23 @@ public int length() {
4545
return array.length();
4646
}
4747

48+
/**
49+
* Returns the size of the expected results, excluding potential null values.
50+
* @return the number of non-null elements
51+
*/
52+
public int nonNullLength() {
53+
if (nonNullList != null) {
54+
return nonNullList.size();
55+
}
56+
int count = 0;
57+
for (int i = 0; i < array.length(); i++) {
58+
if (array.get(i) != null) {
59+
count++;
60+
}
61+
}
62+
return count;
63+
}
64+
4865
/**
4966
* Sets the element at position {@code i} to the given value.
5067
*

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
2222
import org.elasticsearch.xpack.core.XPackPlugin;
2323
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
24+
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
2425
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
2526

2627
import java.util.ArrayList;
@@ -188,10 +189,19 @@ public void testRestartAfterCompletion() throws Exception {
188189
}
189190
ensureTaskCompletion(initial.getId());
190191
restartTaskNode(initial.getId(), indexName);
192+
191193
AsyncSearchResponse response = getAsyncSearch(initial.getId());
192194
assertNotNull(response.getSearchResponse());
193195
assertFalse(response.isRunning());
194196
assertFalse(response.isPartial());
197+
198+
AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
199+
assertFalse(statusResponse.isRunning());
200+
assertFalse(statusResponse.isPartial());
201+
assertEquals(numShards, statusResponse.getTotalShards());
202+
assertEquals(numShards, statusResponse.getSuccessfulShards());
203+
assertEquals(RestStatus.OK, statusResponse.getCompletionStatus());
204+
195205
deleteAsyncSearch(response.getId());
196206
ensureTaskRemoval(response.getId());
197207
}
@@ -232,6 +242,15 @@ public void testCleanupOnFailure() throws Exception {
232242
assertTrue(response.isPartial());
233243
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
234244
assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards));
245+
246+
AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
247+
assertFalse(statusResponse.isRunning());
248+
assertTrue(statusResponse.isPartial());
249+
assertEquals(numShards, statusResponse.getTotalShards());
250+
assertEquals(0, statusResponse.getSuccessfulShards());
251+
assertEquals(numShards, statusResponse.getFailedShards());
252+
assertThat(statusResponse.getCompletionStatus().getStatus(), greaterThanOrEqualTo(400));
253+
235254
deleteAsyncSearch(initial.getId());
236255
ensureTaskRemoval(initial.getId());
237256
}
@@ -248,6 +267,10 @@ public void testInvalidId() throws Exception {
248267
}
249268
assertFalse(response.isRunning());
250269
}
270+
271+
ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncStatus("invalid"));
272+
assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
273+
assertThat(exc.getMessage(), containsString("invalid id"));
251274
}
252275

253276
public void testNoIndex() throws Exception {
@@ -289,6 +312,13 @@ public void testCancellation() throws Exception {
289312
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
290313
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
291314

315+
AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
316+
assertTrue(statusResponse.isRunning());
317+
assertEquals(numShards, statusResponse.getTotalShards());
318+
assertEquals(0, statusResponse.getSuccessfulShards());
319+
assertEquals(0, statusResponse.getSkippedShards());
320+
assertEquals(0, statusResponse.getFailedShards());
321+
292322
deleteAsyncSearch(response.getId());
293323
ensureTaskRemoval(response.getId());
294324
}
@@ -323,6 +353,17 @@ public void testUpdateRunningKeepAlive() throws Exception {
323353
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
324354
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
325355

356+
AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
357+
assertTrue(statusResponse.isRunning());
358+
assertTrue(statusResponse.isPartial());
359+
assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
360+
assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
361+
assertEquals(numShards, statusResponse.getTotalShards());
362+
assertEquals(0, statusResponse.getSuccessfulShards());
363+
assertEquals(0, statusResponse.getFailedShards());
364+
assertEquals(0, statusResponse.getSkippedShards());
365+
assertEquals(null, statusResponse.getCompletionStatus());
366+
326367
response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
327368
assertThat(response.getExpirationTime(), lessThan(expirationTime));
328369
ensureTaskNotRunning(response.getId());

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@
3636
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
3737
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
3838
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
39+
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
3940
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
41+
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
4042
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
4143
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
4244
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
45+
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
4346
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
4447
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
4548
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
@@ -154,6 +157,10 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr
154157
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get();
155158
}
156159

160+
protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException {
161+
return client().execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get();
162+
}
163+
157164
protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
158165
return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
159166
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.rest.RestController;
2020
import org.elasticsearch.rest.RestHandler;
2121
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
22+
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
2223
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
2324

2425
import java.util.Arrays;
@@ -34,7 +35,8 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
3435
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
3536
return Arrays.asList(
3637
new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class),
37-
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class)
38+
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class),
39+
new ActionHandler<>(GetAsyncStatusAction.INSTANCE, TransportGetAsyncStatusAction.class)
3840
);
3941
}
4042

@@ -46,6 +48,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
4648
return Arrays.asList(
4749
new RestSubmitAsyncSearchAction(),
4850
new RestGetAsyncSearchAction(),
51+
new RestGetAsyncStatusAction(),
4952
new RestDeleteAsyncSearchAction()
5053
);
5154
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
3232
import org.elasticsearch.xpack.core.async.AsyncTask;
3333
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
34+
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
3435

3536
import java.util.ArrayList;
3637
import java.util.HashMap;
@@ -347,6 +348,15 @@ private synchronized void checkCancellation() {
347348
}
348349
}
349350

351+
/**
352+
* Returns the status of {@link AsyncSearchTask}
353+
*/
354+
public AsyncStatusResponse getStatusResponse() {
355+
MutableSearchResponse mutableSearchResponse = searchResponse.get();
356+
assert mutableSearchResponse != null;
357+
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
358+
}
359+
350360
class Listener extends SearchProgressActionListener {
351361
@Override
352362
protected void onQueryResult(int shardIndex) {

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.lucene.search.TotalHits;
99
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.action.search.SearchResponse;
1112
import org.elasticsearch.action.search.SearchResponse.Clusters;
1213
import org.elasticsearch.action.search.ShardSearchFailure;
@@ -17,6 +18,7 @@
1718
import org.elasticsearch.search.aggregations.InternalAggregations;
1819
import org.elasticsearch.search.internal.InternalSearchResponse;
1920
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
21+
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
2022

2123
import java.util.ArrayList;
2224
import java.util.List;
@@ -32,6 +34,7 @@
3234
* run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
3335
*/
3436
class MutableSearchResponse {
37+
private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
3538
private final int totalShards;
3639
private final int skippedShards;
3740
private final Clusters clusters;
@@ -77,7 +80,7 @@ class MutableSearchResponse {
7780
this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
7881
this.isPartial = true;
7982
this.threadContext = threadContext;
80-
this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
83+
this.totalHits = EMPTY_TOTAL_HITS;
8184
}
8285

8386
/**
@@ -184,6 +187,58 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
184187
failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
185188
}
186189

190+
191+
/**
192+
* Creates an {@link AsyncStatusResponse} -- status of an async response.
193+
* Response is created based on the current state of the mutable response or based on {@code finalResponse} if it is available.
194+
* @param asyncExecutionId – id of async search request
195+
* @param startTime – start time of task
196+
* @param expirationTime – expiration time of async search request
197+
* @return response representing the status of async search
198+
*/
199+
synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
200+
if (finalResponse != null) {
201+
return new AsyncStatusResponse(
202+
asyncExecutionId,
203+
false,
204+
false,
205+
startTime,
206+
expirationTime,
207+
finalResponse.getTotalShards(),
208+
finalResponse.getSuccessfulShards(),
209+
finalResponse.getSkippedShards(),
210+
finalResponse.getShardFailures() != null ? finalResponse.getShardFailures().length : 0,
211+
finalResponse.status()
212+
);
213+
}
214+
if (failure != null) {
215+
return new AsyncStatusResponse(
216+
asyncExecutionId,
217+
false,
218+
true,
219+
startTime,
220+
expirationTime,
221+
totalShards,
222+
successfulShards,
223+
skippedShards,
224+
queryFailures == null ? 0 : queryFailures.nonNullLength(),
225+
ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure))
226+
);
227+
}
228+
return new AsyncStatusResponse(
229+
asyncExecutionId,
230+
true,
231+
true,
232+
startTime,
233+
expirationTime,
234+
totalShards,
235+
successfulShards,
236+
skippedShards,
237+
queryFailures == null ? 0 : queryFailures.nonNullLength(),
238+
null // for a still running search, completion status is null
239+
);
240+
}
241+
187242
synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
188243
long expirationTime,
189244
ElasticsearchException reduceException) {

0 commit comments

Comments
 (0)