Skip to content

Commit 364c0fe

Browse files
authored
EQL: Make EQL search task cancellable (#54598)
First step towards async search execution. At the moment we don't try to cancel the underlying search requests, and just check if the task is canceled before performing network operation (such as field caps and search) Relates to #49638
1 parent f9ca4d9 commit 364c0fe

File tree

8 files changed

+213
-14
lines changed

8 files changed

+213
-14
lines changed

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.IndicesRequest;
1111
import org.elasticsearch.action.support.IndicesOptions;
1212
import org.elasticsearch.common.ParseField;
13+
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.io.stream.StreamInput;
1415
import org.elasticsearch.common.io.stream.StreamOutput;
1516
import org.elasticsearch.common.xcontent.ObjectParser;
@@ -19,9 +20,12 @@
1920
import org.elasticsearch.index.query.AbstractQueryBuilder;
2021
import org.elasticsearch.index.query.QueryBuilder;
2122
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
23+
import org.elasticsearch.tasks.Task;
24+
import org.elasticsearch.tasks.TaskId;
2225

2326
import java.io.IOException;
2427
import java.util.Arrays;
28+
import java.util.Map;
2529
import java.util.Objects;
2630
import java.util.function.Supplier;
2731

@@ -287,4 +291,16 @@ public EqlSearchRequest indicesOptions(IndicesOptions indicesOptions) {
287291
public IndicesOptions indicesOptions() {
288292
return indicesOptions;
289293
}
294+
295+
@Override
296+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
297+
return new EqlSearchTask(id, type, action, () -> {
298+
StringBuilder sb = new StringBuilder();
299+
sb.append("indices[");
300+
Strings.arrayToDelimitedString(indices, ",", sb);
301+
sb.append("], ");
302+
sb.append(query);
303+
return sb.toString();
304+
}, parentTaskId, headers);
305+
}
290306
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.eql.action;
8+
9+
import org.elasticsearch.tasks.CancellableTask;
10+
import org.elasticsearch.tasks.TaskId;
11+
12+
import java.util.Map;
13+
import java.util.function.Supplier;
14+
15+
public class EqlSearchTask extends CancellableTask {
16+
private final Supplier<String> descriptionSupplier;
17+
18+
public EqlSearchTask(long id, String type, String action, Supplier<String> descriptionSupplier, TaskId parentTaskId,
19+
Map<String, String> headers) {
20+
super(id, type, action, null, parentTaskId, headers);
21+
this.descriptionSupplier = descriptionSupplier;
22+
}
23+
24+
@Override
25+
public boolean shouldCancelChildrenOnCancellation() {
26+
return false;
27+
}
28+
29+
@Override
30+
public String getDescription() {
31+
return descriptionSupplier.get();
32+
}
33+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Querier.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.index.query.QueryBuilder;
1818
import org.elasticsearch.search.aggregations.Aggregation;
1919
import org.elasticsearch.search.builder.SearchSourceBuilder;
20+
import org.elasticsearch.tasks.TaskCancelledException;
2021
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
2122
import org.elasticsearch.xpack.eql.session.Configuration;
2223
import org.elasticsearch.xpack.eql.session.EqlSession;
@@ -56,7 +57,9 @@ public void query(List<Attribute> output, QueryContainer container, String index
5657
if (log.isTraceEnabled()) {
5758
log.trace("About to execute query {} on {}", StringUtils.toString(sourceBuilder), index);
5859
}
59-
60+
if (cfg.isCancelled()) {
61+
throw new TaskCancelledException("cancelled");
62+
}
6063
SearchRequest search = prepareRequest(client, sourceBuilder, cfg.requestTimeout(), false,
6164
Strings.commaDelimitedListToStringArray(index));
6265

@@ -93,4 +96,4 @@ protected static void logSearchResponse(SearchResponse response, Logger logger)
9396
response.getHits().getTotalHits().value, aggs.size(), aggsNames, response.getFailedShards(), response.getSkippedShards(),
9497
response.getSuccessfulShards(), response.getTotalShards(), response.getTook(), response.isTimedOut());
9598
}
96-
}
99+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.eql.action.EqlSearchAction;
2323
import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
2424
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
25+
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
2526
import org.elasticsearch.xpack.eql.execution.PlanExecutor;
2627
import org.elasticsearch.xpack.eql.parser.ParserParams;
2728
import org.elasticsearch.xpack.eql.session.Configuration;
@@ -49,10 +50,10 @@ public TransportEqlSearchAction(Settings settings, ClusterService clusterService
4950

5051
@Override
5152
protected void doExecute(Task task, EqlSearchRequest request, ActionListener<EqlSearchResponse> listener) {
52-
operation(planExecutor, request, username(securityContext), clusterName(clusterService), listener);
53+
operation(planExecutor, (EqlSearchTask) task, request, username(securityContext), clusterName(clusterService), listener);
5354
}
5455

55-
public static void operation(PlanExecutor planExecutor, EqlSearchRequest request, String username,
56+
public static void operation(PlanExecutor planExecutor, EqlSearchTask task, EqlSearchRequest request, String username,
5657
String clusterName, ActionListener<EqlSearchResponse> listener) {
5758
// TODO: these should be sent by the client
5859
ZoneId zoneId = DateUtils.of("Z");
@@ -67,7 +68,7 @@ public static void operation(PlanExecutor planExecutor, EqlSearchRequest request
6768
.implicitJoinKey(request.implicitJoinKeyField());
6869

6970
Configuration cfg = new Configuration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(),
70-
includeFrozen, clientId);
71+
includeFrozen, clientId, task);
7172
planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r)), listener::onFailure));
7273
}
7374

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,24 @@
99
import org.elasticsearch.common.Nullable;
1010
import org.elasticsearch.common.unit.TimeValue;
1111
import org.elasticsearch.index.query.QueryBuilder;
12+
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
1213

1314
import java.time.ZoneId;
1415

1516
public class Configuration extends org.elasticsearch.xpack.ql.session.Configuration {
16-
17+
1718
private final String[] indices;
1819
private final TimeValue requestTimeout;
1920
private final int size;
2021
private final String clientId;
2122
private final boolean includeFrozenIndices;
23+
private final EqlSearchTask task;
2224

2325
@Nullable
2426
private QueryBuilder filter;
2527

2628
public Configuration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, TimeValue requestTimeout,
27-
int size, boolean includeFrozen, String clientId) {
29+
int size, boolean includeFrozen, String clientId, EqlSearchTask task) {
2830

2931
super(zi, username, clusterName);
3032

@@ -34,6 +36,7 @@ public Configuration(String[] indices, ZoneId zi, String username, String cluste
3436
this.size = size;
3537
this.clientId = clientId;
3638
this.includeFrozenIndices = includeFrozen;
39+
this.task = task;
3740
}
3841

3942
public String[] indices() {
@@ -59,4 +62,8 @@ public String clientId() {
5962
public boolean includeFrozen() {
6063
return includeFrozenIndices;
6164
}
62-
}
65+
66+
public boolean isCancelled() {
67+
return task.isCancelled();
68+
}
69+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.Client;
1111
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.tasks.TaskCancelledException;
1213
import org.elasticsearch.xpack.eql.analysis.Analyzer;
1314
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
1415
import org.elasticsearch.xpack.eql.execution.PlanExecutor;
@@ -35,7 +36,7 @@ public class EqlSession {
3536

3637
public EqlSession(Client client, Configuration cfg, IndexResolver indexResolver, PreAnalyzer preAnalyzer, Analyzer analyzer,
3738
Optimizer optimizer, Planner planner, PlanExecutor planExecutor) {
38-
39+
3940
this.client = client;
4041
this.configuration = cfg;
4142
this.indexResolver = indexResolver;
@@ -60,7 +61,7 @@ public Configuration configuration() {
6061
public void eql(String eql, ParserParams params, ActionListener<Results> listener) {
6162
eqlExecutable(eql, params, wrap(e -> e.execute(this, listener), listener::onFailure));
6263
}
63-
64+
6465
public void eqlExecutable(String eql, ParserParams params, ActionListener<PhysicalPlan> listener) {
6566
try {
6667
physicalPlan(doParse(eql, params), listener);
@@ -88,7 +89,9 @@ public void analyzedPlan(LogicalPlan parsed, ActionListener<LogicalPlan> listene
8889

8990
private <T> void preAnalyze(LogicalPlan parsed, ActionListener<LogicalPlan> listener) {
9091
String indexWildcard = Strings.arrayToCommaDelimitedString(configuration.indices());
91-
92+
if(configuration.isCancelled()){
93+
throw new TaskCancelledException("cancelled");
94+
}
9295
indexResolver.resolveAsMergedMapping(indexWildcard, null, configuration.includeFrozen(), wrap(r -> {
9396
listener.onResponse(preAnalyzer.preAnalyze(parsed, r));
9497
}, listener::onFailure));
@@ -97,4 +100,4 @@ private <T> void preAnalyze(LogicalPlan parsed, ActionListener<LogicalPlan> list
97100
private LogicalPlan doParse(String eql, ParserParams params) {
98101
return new EqlParser().createStatement(eql, params);
99102
}
100-
}
103+
}

x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@
77
package org.elasticsearch.xpack.eql;
88

99
import org.elasticsearch.common.unit.TimeValue;
10+
import org.elasticsearch.xpack.eql.action.EqlSearchAction;
11+
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
1012
import org.elasticsearch.xpack.eql.session.Configuration;
1113

14+
import java.util.Collections;
15+
1216
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
1317
import static org.elasticsearch.test.ESTestCase.randomBoolean;
1418
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
19+
import static org.elasticsearch.test.ESTestCase.randomLong;
1520
import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong;
1621
import static org.elasticsearch.test.ESTestCase.randomZone;
1722

@@ -21,7 +26,8 @@ private EqlTestUtils() {
2126
}
2227

2328
public static final Configuration TEST_CFG = new Configuration(new String[]{"none"}, org.elasticsearch.xpack.ql.util.DateUtils.UTC,
24-
"nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, "");
29+
"nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, "",
30+
new EqlSearchTask(-1, "", EqlSearchAction.NAME, () -> "", null, Collections.emptyMap()));
2531

2632
public static Configuration randomConfiguration() {
2733
return new Configuration(new String[]{randomAlphaOfLength(16)},
@@ -32,6 +38,11 @@ public static Configuration randomConfiguration() {
3238
new TimeValue(randomNonNegativeLong()),
3339
randomIntBetween(5, 100),
3440
randomBoolean(),
35-
randomAlphaOfLength(16));
41+
randomAlphaOfLength(16),
42+
randomTask());
43+
}
44+
45+
public static EqlSearchTask randomTask() {
46+
return new EqlSearchTask(randomLong(), "transport", EqlSearchAction.NAME, () -> "", null, Collections.emptyMap());
3647
}
3748
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.eql.analysis;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
10+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
11+
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
13+
import org.elasticsearch.tasks.TaskCancelledException;
14+
import org.elasticsearch.test.ESTestCase;
15+
import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
16+
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
17+
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
18+
import org.elasticsearch.xpack.eql.execution.PlanExecutor;
19+
import org.elasticsearch.xpack.eql.plugin.TransportEqlSearchAction;
20+
import org.elasticsearch.xpack.ql.index.IndexResolver;
21+
import org.elasticsearch.xpack.ql.type.DefaultDataTypeRegistry;
22+
import org.mockito.stubbing.Answer;
23+
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
import static java.util.Collections.emptyMap;
31+
import static java.util.Collections.singletonMap;
32+
import static org.hamcrest.Matchers.instanceOf;
33+
import static org.mockito.Matchers.any;
34+
import static org.mockito.Mockito.doAnswer;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.times;
37+
import static org.mockito.Mockito.verify;
38+
import static org.mockito.Mockito.verifyNoMoreInteractions;
39+
import static org.mockito.Mockito.when;
40+
41+
public class CancellationTests extends ESTestCase {
42+
43+
public void testCancellationBeforeFieldCaps() throws InterruptedException {
44+
Client client = mock(Client.class);
45+
EqlSearchTask task = mock(EqlSearchTask.class);
46+
when(task.isCancelled()).thenReturn(true);
47+
48+
IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
49+
PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
50+
CountDownLatch countDownLatch = new CountDownLatch(1);
51+
TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().query("foo where blah"), "", "",
52+
new ActionListener<>() {
53+
@Override
54+
public void onResponse(EqlSearchResponse eqlSearchResponse) {
55+
fail("Shouldn't be here");
56+
countDownLatch.countDown();
57+
}
58+
59+
@Override
60+
public void onFailure(Exception e) {
61+
assertThat(e, instanceOf(TaskCancelledException.class));
62+
countDownLatch.countDown();
63+
}
64+
});
65+
countDownLatch.await();
66+
verify(task, times(1)).isCancelled();
67+
verifyNoMoreInteractions(client, task);
68+
}
69+
70+
public void testCancellationBeforeSearch() throws InterruptedException {
71+
Client client = mock(Client.class);
72+
73+
AtomicBoolean cancelled = new AtomicBoolean(false);
74+
EqlSearchTask task = mock(EqlSearchTask.class);
75+
when(task.isCancelled()).then(invocationOnMock -> cancelled.get());
76+
77+
String[] indices = new String[]{"endgame"};
78+
79+
FieldCapabilities fooField =
80+
new FieldCapabilities("foo", "integer", true, true, indices, null, null, emptyMap());
81+
FieldCapabilities categoryField =
82+
new FieldCapabilities("event.category", "keyword", true, true, indices, null, null, emptyMap());
83+
FieldCapabilities timestampField =
84+
new FieldCapabilities("@timestamp", "date", true, true, indices, null, null, emptyMap());
85+
Map<String, Map<String, FieldCapabilities>> fields = new HashMap<>();
86+
fields.put(fooField.getName(), singletonMap(fooField.getName(), fooField));
87+
fields.put(categoryField.getName(), singletonMap(categoryField.getName(), categoryField));
88+
fields.put(timestampField.getName(), singletonMap(timestampField.getName(), timestampField));
89+
90+
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
91+
when(fieldCapabilitiesResponse.getIndices()).thenReturn(indices);
92+
when(fieldCapabilitiesResponse.get()).thenReturn(fields);
93+
doAnswer((Answer<Void>) invocation -> {
94+
@SuppressWarnings("unchecked")
95+
ActionListener<FieldCapabilitiesResponse> listener = (ActionListener<FieldCapabilitiesResponse>) invocation.getArguments()[1];
96+
assertFalse(cancelled.getAndSet(true));
97+
listener.onResponse(fieldCapabilitiesResponse);
98+
return null;
99+
}).when(client).fieldCaps(any(), any());
100+
101+
102+
IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
103+
PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
104+
CountDownLatch countDownLatch = new CountDownLatch(1);
105+
TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().indices("endgame")
106+
.query("process where foo==3"), "", "", new ActionListener<>() {
107+
@Override
108+
public void onResponse(EqlSearchResponse eqlSearchResponse) {
109+
fail("Shouldn't be here");
110+
countDownLatch.countDown();
111+
}
112+
113+
@Override
114+
public void onFailure(Exception e) {
115+
assertThat(e, instanceOf(TaskCancelledException.class));
116+
countDownLatch.countDown();
117+
}
118+
});
119+
countDownLatch.await();
120+
verify(client).fieldCaps(any(), any());
121+
verify(task, times(2)).isCancelled();
122+
verifyNoMoreInteractions(client, task);
123+
}
124+
125+
}

0 commit comments

Comments
 (0)