Skip to content

Commit 6521d2a

Browse files
Introduce eql search status API (#68065)
Introduce eql search status API, that reports the status of eql stored or async search. GET _eql/search/status/<id> The API is restricted to the monitoring_user role. For a running eql search, a response has the following format: { "id" : <id>, "is_running" : true, "is_partial" : true, "start_time_in_millis" : 1611690235000, "expiration_time_in_millis" : 1611690295000 } For a completed eql search, a response has the following format: { "id" : <id>, "is_running" : false, "is_partial" : false, "expiration_time_in_millis" : 1611690295000, "completion_status" : 200 } Closes #66955
1 parent 61257c6 commit 6521d2a

File tree

20 files changed

+897
-77
lines changed

20 files changed

+897
-77
lines changed

docs/reference/eql/eql-search-api.asciidoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,9 @@ This search ID is only provided if one of the following conditions is met:
338338
parameter is `true`.
339339

340340
You can use this ID with the <<get-async-eql-search-api,get async EQL search
341-
API>> to get the current status and available results for the search.
341+
API>> to get the current status and available results for the search or
342+
<<get-async-eql-status-api,get async EQL status API>> to get only
343+
the current status.
342344
--
343345

344346
`is_partial`::

docs/reference/eql/eql.asciidoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,28 @@ complete.
567567
// TESTRESPONSE[s/"took": 2000/"took": $body.took/]
568568
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
569569

570+
Another more lightweight way to check the progress of an async search is to use
571+
the <<get-async-eql-status-api,get async EQL status API>> with the search ID.
572+
573+
[source,console]
574+
----
575+
GET /_eql/search/status/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=
576+
----
577+
// TEST[skip: no access to search ID]
578+
579+
[source,console-result]
580+
----
581+
{
582+
"id": "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
583+
"is_running": false,
584+
"is_partial": false,
585+
"expiration_time_in_millis" : 1611690295000,
586+
"completion_status": 200
587+
}
588+
----
589+
// TESTRESPONSE[s/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=/$body.id/]
590+
// TESTRESPONSE[s/"expiration_time_in_millis": 1611690295000/"expiration_time_in_millis": $body.expiration_time_in_millis/]
591+
570592
[discrete]
571593
[[eql-search-store-async-eql-search]]
572594
=== Change the search retention period
@@ -660,6 +682,9 @@ GET /_eql/search/FjlmbndxNmJjU0RPdExBTGg0elNOOEEaQk9xSjJBQzBRMldZa1VVQ2pPa01YUTo
660682
Saved synchronous searches are still subject to the `keep_alive` parameter's
661683
retention period. When this period ends, the search and its results are deleted.
662684

685+
You can also check only the status of the saved synchronous search without
686+
results by using <<get-async-eql-status-api,get async EQL status API>>.
687+
663688
You can also manually delete saved synchronous searches using the
664689
<<delete-async-eql-search-api,delete async EQL search API>>.
665690

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
[role="xpack"]
2+
[testenv="basic"]
3+
4+
[[get-async-eql-status-api]]
5+
=== Get async EQL status API
6+
++++
7+
<titleabbrev>Get async EQL search status</titleabbrev>
8+
++++
9+
Returns the current status for an <<eql-search-async,async EQL search>> or
10+
a <<eql-search-store-sync-eql-search,stored synchronous EQL search>>
11+
without returning results. This is a more lightweight API than
12+
<<get-async-eql-search-api,get async EQL search API>> as it doesn't return
13+
search results, and reports only the status.
14+
15+
If the {es} {security-features} are enabled, the access to the get async
16+
eql status API is restricted to the <<built-in-roles, monitoring_user role>>.
17+
18+
[source,console]
19+
----
20+
GET /_eql/search/status/FkpMRkJGS1gzVDRlM3g4ZzMyRGlLbkEaTXlJZHdNT09TU2VTZVBoNDM3cFZMUToxMDM=
21+
----
22+
// TEST[skip: no access to search ID]
23+
24+
[[get-async-eql-status-api-request]]
25+
==== {api-request-title}
26+
27+
`GET /_eql/search/status/<search_id>`
28+
29+
30+
[[get-async-eql-status-api-path-params]]
31+
==== {api-path-parms-title}
32+
33+
`<search_id>`::
34+
(Required, string)
35+
Identifier for the search.
36+
+
37+
A search ID is provided in the <<eql-search-api,EQL search API>>'s response for
38+
an <<eql-search-async,async search>>. A search ID is also provided if the
39+
request's <<eql-search-api-keep-on-completion,`keep_on_completion`>> parameter
40+
is `true`.
41+
42+
[role="child_attributes"]
43+
[[get-async-eql-status-api-response-body]]
44+
==== {api-response-body-title}
45+
46+
`id`::
47+
(string)
48+
Identifier for the search.
49+
50+
`is_running`::
51+
(boolean)
52+
If `true`, the search request is still executing.
53+
If `false`, the search is completed.
54+
55+
`is_partial`::
56+
(boolean)
57+
If `true`, the response does not contain complete search results.
58+
This could be because either the search is still running
59+
(`is_running` status is `false`), or because it is already completed
60+
(`is_running` status is `true`) and results are partial due to
61+
failures or timeouts.
62+
63+
`start_time_in_millis`::
64+
(Long)
65+
For a running search shows a timestamp when the eql search
66+
started, in milliseconds since the Unix epoch.
67+
68+
`expiration_time_in_millis`::
69+
(long)
70+
Shows a timestamp when the eql search will be expired, in milliseconds
71+
since the Unix epoch. When this time is reached, the search and its results
72+
are deleted, even if the search is still ongoing.
73+
74+
`completion_status`::
75+
(Integer)
76+
For a completed search shows the http status code of the completed
77+
search.
78+
79+
80+
[[eql-status-api-example]]
81+
==== {api-examples-title}
82+
83+
[source,console]
84+
----
85+
GET /_eql/search/status/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=?keep_alive=5d
86+
----
87+
// TEST[skip: no access to search ID]
88+
89+
If the search is still running, the status response has the following form:
90+
91+
[source,console-result]
92+
--------------------------------------------------
93+
{
94+
"id" : "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
95+
"is_running" : true,
96+
"is_partial" : true,
97+
"start_time_in_millis" : 1611690235000,
98+
"expiration_time_in_millis" : 1611690295000
99+
100+
}
101+
--------------------------------------------------
102+
// TEST[skip: no access to search ID]
103+
104+
If the search is completed the status response doesn't have
105+
`start_time_in_millis`, but has an additional `completion_status`
106+
field that shows the status code of the completed eql search:
107+
108+
[source,console-result]
109+
--------------------------------------------------
110+
{
111+
"id" : "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
112+
"is_running" : false,
113+
"is_partial" : false,
114+
"expiration_time_in_millis" : 1611690295000,
115+
"completion_status" : 200 <1>
116+
}
117+
--------------------------------------------------
118+
// TEST[skip: no access to search ID]
119+
120+
<1> Indicates that the eql search was successfully completed

docs/reference/search.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ For an overview of EQL and related tutorials, see <<eql>>.
4545

4646
* <<eql-search-api>>
4747
* <<get-async-eql-search-api>>
48+
* <<get-async-eql-status-api>>
4849
* <<delete-async-eql-search-api>>
4950

5051

@@ -70,6 +71,8 @@ include::eql/eql-search-api.asciidoc[]
7071

7172
include::eql/get-async-eql-search-api.asciidoc[]
7273

74+
include::eql/get-async-eql-status-api.asciidoc[]
75+
7376
include::eql/delete-async-eql-search-api.asciidoc[]
7477

7578
include::search/count.asciidoc[]

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -350,12 +350,16 @@ private synchronized void checkCancellation() {
350350
}
351351

352352
/**
353-
* Returns the status of {@link AsyncSearchTask}
353+
* Returns the status from {@link AsyncSearchTask}
354354
*/
355-
public AsyncStatusResponse getStatusResponse() {
356-
MutableSearchResponse mutableSearchResponse = searchResponse.get();
355+
public static AsyncStatusResponse getStatusResponse(AsyncSearchTask asyncTask) {
356+
MutableSearchResponse mutableSearchResponse = asyncTask.searchResponse.get();
357357
assert mutableSearchResponse != null;
358-
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
358+
return mutableSearchResponse.toStatusResponse(
359+
asyncTask.searchId.getEncoded(),
360+
asyncTask.getStartTime(),
361+
asyncTask.expirationTimeMillis
362+
);
359363
}
360364

361365
class Listener extends SearchProgressActionListener {

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java

Lines changed: 10 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package org.elasticsearch.xpack.search;
88

9-
import org.elasticsearch.ResourceNotFoundException;
109
import org.elasticsearch.action.ActionListener;
1110
import org.elasticsearch.action.ActionListenerResponseHandler;
1211
import org.elasticsearch.action.support.ActionFilters;
@@ -21,7 +20,6 @@
2120
import org.elasticsearch.transport.TransportService;
2221
import org.elasticsearch.xpack.core.XPackPlugin;
2322
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
24-
import org.elasticsearch.xpack.core.async.AsyncTask;
2523
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
2624
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
2725
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
@@ -55,56 +53,19 @@ public TransportGetAsyncStatusAction(TransportService transportService,
5553
protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
5654
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
5755
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
58-
if (node == null || Objects.equals(node, clusterService.localNode())) {
59-
retrieveStatus(request, listener);
56+
DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
57+
if (node == null || Objects.equals(node, localNode)) {
58+
store.retrieveStatus(
59+
request,
60+
taskManager,
61+
AsyncSearchTask.class,
62+
AsyncSearchTask::getStatusResponse,
63+
AsyncStatusResponse::getStatusFromStoredSearch,
64+
listener
65+
);
6066
} else {
6167
transportService.sendRequest(node, GetAsyncStatusAction.NAME, request,
6268
new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, ThreadPool.Names.SAME));
6369
}
6470
}
65-
66-
private void retrieveStatus(GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
67-
long nowInMillis = System.currentTimeMillis();
68-
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
69-
try {
70-
AsyncTask task = (AsyncTask) taskManager.getTask(searchId.getTaskId().getId());
71-
if ((task instanceof AsyncSearchTask) && (task.getExecutionId().equals(searchId))) {
72-
AsyncStatusResponse response = ((AsyncSearchTask) task).getStatusResponse();
73-
sendFinalResponse(request, response, nowInMillis, listener);
74-
} else {
75-
getStatusResponseFromIndex(searchId, request, nowInMillis, listener);
76-
}
77-
} catch (Exception exc) {
78-
listener.onFailure(exc);
79-
}
80-
}
81-
82-
/**
83-
* Get a status response from index
84-
*/
85-
private void getStatusResponseFromIndex(AsyncExecutionId searchId,
86-
GetAsyncStatusRequest request, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
87-
store.getStatusResponse(searchId, AsyncStatusResponse::getStatusFromAsyncSearchResponseWithExpirationTime,
88-
new ActionListener<>() {
89-
@Override
90-
public void onResponse(AsyncStatusResponse asyncStatusResponse) {
91-
sendFinalResponse(request, asyncStatusResponse, nowInMillis, listener);
92-
}
93-
94-
@Override
95-
public void onFailure(Exception e) {
96-
listener.onFailure(e);
97-
}
98-
}
99-
);
100-
}
101-
102-
private static void sendFinalResponse(GetAsyncStatusRequest request,
103-
AsyncStatusResponse response, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
104-
if (response.getExpirationTime() < nowInMillis) { // check if the result has expired
105-
listener.onFailure(new ResourceNotFoundException(request.getId()));
106-
} else {
107-
listener.onResponse(response);
108-
}
109-
}
11071
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.search;
9+
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.common.xcontent.ToXContent;
13+
import org.elasticsearch.common.xcontent.XContentBuilder;
14+
import org.elasticsearch.common.xcontent.XContentType;
15+
import org.elasticsearch.rest.RestStatus;
16+
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
17+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
18+
19+
import java.io.IOException;
20+
import java.util.Date;
21+
import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId;
22+
23+
public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<AsyncStatusResponse> {
24+
25+
@Override
26+
protected AsyncStatusResponse createTestInstance() {
27+
String id = randomSearchId();
28+
boolean isRunning = randomBoolean();
29+
boolean isPartial = isRunning ? randomBoolean() : false;
30+
long startTimeMillis = (new Date(randomLongBetween(0, 3000000000000L))).getTime();
31+
long expirationTimeMillis = startTimeMillis + 3600000L;
32+
int totalShards = randomIntBetween(10, 150);
33+
int successfulShards = randomIntBetween(0, totalShards - 5);
34+
int skippedShards = randomIntBetween(0, 5);
35+
int failedShards = totalShards - successfulShards - skippedShards;
36+
RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE;
37+
return new AsyncStatusResponse(
38+
id,
39+
isRunning,
40+
isPartial,
41+
startTimeMillis,
42+
expirationTimeMillis,
43+
totalShards,
44+
successfulShards,
45+
skippedShards,
46+
failedShards,
47+
completionStatus
48+
);
49+
}
50+
51+
@Override
52+
protected Writeable.Reader<AsyncStatusResponse> instanceReader() {
53+
return AsyncStatusResponse::new;
54+
}
55+
56+
@Override
57+
protected AsyncStatusResponse mutateInstance(AsyncStatusResponse instance) {
58+
// return a response with the opposite running status
59+
boolean isRunning = instance.isRunning() == false;
60+
boolean isPartial = isRunning ? randomBoolean() : false;
61+
RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE;
62+
return new AsyncStatusResponse(
63+
instance.getId(),
64+
isRunning,
65+
isPartial,
66+
instance.getStartTime(),
67+
instance.getExpirationTime(),
68+
instance.getTotalShards(),
69+
instance.getSuccessfulShards(),
70+
instance.getSkippedShards(),
71+
instance.getFailedShards(),
72+
completionStatus
73+
);
74+
}
75+
76+
public void testToXContent() throws IOException {
77+
AsyncStatusResponse response = createTestInstance();
78+
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
79+
String expectedJson = "{\n" +
80+
" \"id\" : \"" + response.getId() + "\",\n" +
81+
" \"is_running\" : " + response.isRunning() + ",\n" +
82+
" \"is_partial\" : " + response.isPartial() + ",\n" +
83+
" \"start_time_in_millis\" : " + response.getStartTime() + ",\n" +
84+
" \"expiration_time_in_millis\" : " + response.getExpirationTime() + ",\n" +
85+
" \"_shards\" : {\n" +
86+
" \"total\" : " + response.getTotalShards() + ",\n" +
87+
" \"successful\" : " + response.getSuccessfulShards() + ",\n" +
88+
" \"skipped\" : " + response.getSkippedShards() + ",\n" +
89+
" \"failed\" : " + response.getFailedShards() + "\n";
90+
if (response.getCompletionStatus() == null) {
91+
expectedJson = expectedJson +
92+
" }\n" +
93+
"}";
94+
} else {
95+
expectedJson = expectedJson +
96+
" },\n" +
97+
" \"completion_status\" : " + response.getCompletionStatus().getStatus() + "\n" +
98+
"}";
99+
}
100+
builder.prettyPrint();
101+
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
102+
assertEquals(expectedJson, Strings.toString(builder));
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)