Skip to content

Commit 398ebef

Browse files
authored
EQL: Add the ability for EQL to perform CCSes (#74399)
This introduces the ability for EQL to perform searches on a remote cluster, leveraging ES's CCS capabilities. The remote cluster needs to be on the same version as the local cluster.
1 parent c943110 commit 398ebef

File tree

36 files changed

+605
-91
lines changed

36 files changed

+605
-91
lines changed

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/BaseEqlSpecTestCase.java

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.client.RequestOptions;
1515
import org.elasticsearch.client.ResponseException;
1616
import org.elasticsearch.client.RestClient;
17-
import org.elasticsearch.client.RestClientBuilder;
1817
import org.elasticsearch.client.RestHighLevelClient;
1918
import org.elasticsearch.client.eql.EqlSearchRequest;
2019
import org.elasticsearch.client.eql.EqlSearchResponse;
@@ -24,21 +23,18 @@
2423
import org.elasticsearch.common.Strings;
2524
import org.elasticsearch.common.logging.LoggerMessageFormat;
2625
import org.elasticsearch.common.settings.Settings;
27-
import org.elasticsearch.core.TimeValue;
28-
import org.elasticsearch.test.rest.ESRestTestCase;
2926
import org.junit.AfterClass;
3027
import org.junit.Before;
3128

3229
import java.io.IOException;
3330
import java.util.ArrayList;
3431
import java.util.Arrays;
35-
import java.util.Collections;
3632
import java.util.List;
3733
import java.util.StringJoiner;
3834

3935
import static java.util.stream.Collectors.toList;
4036

41-
public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
37+
public abstract class BaseEqlSpecTestCase extends RemoteClusterAwareEqlRestTestCase {
4238

4339
protected static final String PARAM_FORMATTING = "%2$s";
4440

@@ -51,15 +47,16 @@ public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
5147

5248
@Before
5349
public void setup() throws Exception {
54-
if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) {
55-
DataLoader.loadDatasetIntoEs(highLevelClient(), this::createParser);
50+
RestClient provisioningClient = provisioningClient();
51+
if (provisioningClient.performRequest(new Request("HEAD", "/" + unqualifiedIndexName())).getStatusLine().getStatusCode() == 404) {
52+
DataLoader.loadDatasetIntoEs(highLevelClient(provisioningClient), this::createParser);
5653
}
5754
}
5855

5956
@AfterClass
6057
public static void wipeTestData() throws IOException {
6158
try {
62-
adminClient().performRequest(new Request("DELETE", "/*"));
59+
provisioningAdminClient().performRequest(new Request("DELETE", "/*"));
6360
} catch (ResponseException e) {
6461
// 404 here just means we had no indexes
6562
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
@@ -144,12 +141,7 @@ protected EqlClient eqlClient() {
144141

145142
private RestHighLevelClient highLevelClient() {
146143
if (highLevelClient == null) {
147-
highLevelClient = new RestHighLevelClient(
148-
client(),
149-
ignore -> {
150-
},
151-
Collections.emptyList()) {
152-
};
144+
highLevelClient = highLevelClient(client());
153145
}
154146
return highLevelClient;
155147
}
@@ -204,17 +196,7 @@ protected boolean preserveClusterUponCompletion() {
204196

205197
@Override
206198
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
207-
RestClientBuilder builder = RestClient.builder(hosts);
208-
configureClient(builder, settings);
209-
210-
int timeout = Math.toIntExact(timeout().millis());
211-
builder.setRequestConfigCallback(
212-
requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(timeout)
213-
.setConnectionRequestTimeout(timeout)
214-
.setSocketTimeout(timeout)
215-
);
216-
builder.setStrictDeprecationMode(true);
217-
return builder.build();
199+
return clientBuilder(settings, hosts);
218200
}
219201

220202
protected String timestamp() {
@@ -240,7 +222,9 @@ protected String requestResultPosition() {
240222
return randomBoolean() ? "head" : "tail";
241223
}
242224

243-
protected TimeValue timeout() {
244-
return TimeValue.timeValueSeconds(10);
225+
// strip any qualification from the received index string
226+
private String unqualifiedIndexName() {
227+
int offset = index.indexOf(':');
228+
return offset >= 0 ? index.substring(offset + 1) : index;
245229
}
246230
}

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlDateNanosSpecTestCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@ public static List<Object[]> readTestSpecs() throws Exception {
2121
return asArray(EqlSpecLoader.load("/test_queries_date_nanos.toml", new HashSet<>()));
2222
}
2323

24+
// constructor for "local" rest tests
2425
public EqlDateNanosSpecTestCase(String query, String name, long[] eventIds) {
25-
super(DATE_NANOS_INDEX, query, name, eventIds);
26+
this(DATE_NANOS_INDEX, query, name, eventIds);
27+
}
28+
29+
// constructor for multi-cluster tests
30+
public EqlDateNanosSpecTestCase(String index, String query, String name, long[] eventIds) {
31+
super(index, query, name, eventIds);
2632
}
2733

2834
@Override

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlExtraSpecTestCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@ public static List<Object[]> readTestSpecs() throws Exception {
2121
return asArray(EqlSpecLoader.load("/test_extra.toml", new HashSet<>()));
2222
}
2323

24+
// constructor for "local" rest tests
2425
public EqlExtraSpecTestCase(String query, String name, long[] eventIds) {
25-
super(TEST_EXTRA_INDEX, query, name, eventIds);
26+
this(TEST_EXTRA_INDEX, query, name, eventIds);
27+
}
28+
29+
// constructor for multi-cluster tests
30+
public EqlExtraSpecTestCase(String index, String query, String name, long[] eventIds) {
31+
super(index, query, name, eventIds);
2632
}
2733

2834
@Override

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlRestTestCase.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@
1010
import org.elasticsearch.client.Request;
1111
import org.elasticsearch.client.Response;
1212
import org.elasticsearch.client.ResponseException;
13-
import org.elasticsearch.common.settings.Settings;
1413
import org.elasticsearch.common.xcontent.XContentHelper;
1514
import org.elasticsearch.common.xcontent.json.JsonXContent;
16-
import org.elasticsearch.test.rest.ESRestTestCase;
1715
import org.junit.After;
1816

1917
import java.io.IOException;
@@ -27,7 +25,7 @@
2725
import static org.hamcrest.Matchers.is;
2826
import static org.hamcrest.Matchers.not;
2927

30-
public abstract class EqlRestTestCase extends ESRestTestCase {
28+
public abstract class EqlRestTestCase extends RemoteClusterAwareEqlRestTestCase {
3129

3230
private static final String defaultValidationIndexName = "eql_search_validation_test";
3331
private static final String validQuery = "process where user = \\\"SYSTEM\\\"";
@@ -49,11 +47,11 @@ public void checkSearchContent() throws Exception {
4947
};
5048

5149
public void testBadRequests() throws Exception {
52-
createIndex(defaultValidationIndexName, Settings.EMPTY);
50+
createIndex(defaultValidationIndexName, (String) null);
5351

5452
final String contentType = "application/json";
5553
for (String[] test : testBadRequests) {
56-
final String endpoint = "/" + defaultValidationIndexName + "/_eql/search";
54+
final String endpoint = "/" + indexPattern(defaultValidationIndexName) + "/_eql/search";
5755
Request request = new Request("GET", endpoint);
5856
request.setJsonEntity(test[0]);
5957

@@ -70,8 +68,8 @@ public void testBadRequests() throws Exception {
7068

7169
@SuppressWarnings("unchecked")
7270
public void testIndexWildcardPatterns() throws Exception {
73-
createIndex("test1", Settings.EMPTY, null, "\"my_alias\" : {}, \"test_alias\" : {}");
74-
createIndex("test2", Settings.EMPTY, null, "\"my_alias\" : {}");
71+
createIndex("test1", "\"my_alias\" : {}, \"test_alias\" : {}");
72+
createIndex("test2", "\"my_alias\" : {}");
7573

7674
StringBuilder bulk = new StringBuilder();
7775
bulk.append("{\"index\": {\"_index\": \"test1\", \"_id\": 1}}\n");
@@ -86,7 +84,7 @@ public void testIndexWildcardPatterns() throws Exception {
8684
};
8785

8886
for (String indexPattern : wildcardRequests) {
89-
String endpoint = "/" + indexPattern + "/_eql/search";
87+
String endpoint = "/" + indexPattern(indexPattern) + "/_eql/search";
9088
Request request = new Request("GET", endpoint);
9189
request.setJsonEntity("{\"query\":\"process where true\"}");
9290
Response response = client().performRequest(request);
@@ -108,7 +106,7 @@ public void testIndexWildcardPatterns() throws Exception {
108106

109107
@SuppressWarnings("unchecked")
110108
public void testUnicodeChars() throws Exception {
111-
createIndex("test", Settings.EMPTY, null, null);
109+
createIndex("test", (String) null);
112110

113111
StringBuilder bulk = new StringBuilder();
114112
bulk.append("{\"index\": {\"_index\": \"test\", \"_id\": 1}}\n");
@@ -117,7 +115,7 @@ public void testUnicodeChars() throws Exception {
117115
bulk.append("{\"event\":{\"category\":\"process\"},\"@timestamp\":\"2020-09-05T12:34:57Z\",\"log\" : \"prefix_𖠋_suffix\"}\n");
118116
bulkIndex(bulk.toString());
119117

120-
String endpoint = "/test/_eql/search";
118+
String endpoint = "/" + indexPattern("test") + "/_eql/search";
121119
Request request = new Request("GET", endpoint);
122120
request.setJsonEntity("{\"query\":\"process where log==\\\"prefix_\\\\u{0eb}_suffix\\\"\"}");
123121
Response response = client().performRequest(request);
@@ -150,9 +148,13 @@ private void bulkIndex(String bulk) throws IOException {
150148
bulkRequest.setJsonEntity(bulk);
151149
bulkRequest.addParameter("refresh", "true");
152150

153-
Response response = client().performRequest(bulkRequest);
151+
Response response = provisioningClient().performRequest(bulkRequest);
154152
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
155153
String bulkResponse = EntityUtils.toString(response.getEntity());
156154
assertThat(bulkResponse, not(containsString("\"errors\": true")));
157155
}
156+
157+
protected String indexPattern(String pattern) {
158+
return pattern;
159+
}
158160
}

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlRestValidationTestCase.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
import org.elasticsearch.client.Response;
1212
import org.elasticsearch.client.ResponseException;
1313
import org.elasticsearch.common.Strings;
14-
import org.elasticsearch.common.settings.Settings;
1514
import org.elasticsearch.common.xcontent.XContentBuilder;
1615
import org.elasticsearch.common.xcontent.json.JsonXContent;
17-
import org.elasticsearch.test.rest.ESRestTestCase;
1816
import org.junit.Before;
1917

2018
import java.io.IOException;
@@ -24,7 +22,7 @@
2422
import static org.hamcrest.Matchers.containsString;
2523
import static org.hamcrest.Matchers.equalTo;
2624

27-
public abstract class EqlRestValidationTestCase extends ESRestTestCase {
25+
public abstract class EqlRestValidationTestCase extends RemoteClusterAwareEqlRestTestCase {
2826

2927
private static final String indexName = "test_eql";
3028
protected static final String[] existentIndexWithWildcard = new String[] {indexName + ",inexistent*", indexName + "*,inexistent*",
@@ -35,8 +33,8 @@ public abstract class EqlRestValidationTestCase extends ESRestTestCase {
3533

3634
@Before
3735
public void prepareIndices() throws IOException {
38-
if (client().performRequest(new Request("HEAD", "/" + indexName)).getStatusLine().getStatusCode() == 404) {
39-
createIndex(indexName, Settings.EMPTY);
36+
if (provisioningClient().performRequest(new Request("HEAD", "/" + indexName)).getStatusLine().getStatusCode() == 404) {
37+
createIndex(indexName, (String) null);
4038
}
4139

4240
Object[] fieldsAndValues = new Object[] {"event_type", "my_event", "@timestamp", "2020-10-08T12:35:48Z", "val", 0};
@@ -47,9 +45,9 @@ public void prepareIndices() throws IOException {
4745
document.endObject();
4846
final Request request = new Request("POST", "/" + indexName + "/_doc/" + 0);
4947
request.setJsonEntity(Strings.toString(document));
50-
assertOK(client().performRequest(request));
48+
assertOK(provisioningClient().performRequest(request));
5149

52-
assertOK(adminClient().performRequest(new Request("POST", "/" + indexName + "/_refresh")));
50+
assertOK(provisioningAdminClient().performRequest(new Request("POST", "/" + indexName + "/_refresh")));
5351
}
5452

5553
protected abstract String getInexistentIndexErrorMessage();
@@ -82,7 +80,7 @@ public void testAllowNoIndicesOption() throws IOException {
8280

8381
protected void assertErrorMessages(String[] indices, String reqParameter, String errorMessage) throws IOException {
8482
for (String indexName : indices) {
85-
assertErrorMessage(indexName, reqParameter, errorMessage + "[" + indexName + "]");
83+
assertErrorMessage(indexName, reqParameter, errorMessage + "[" + indexPattern(indexName) + "]");
8684
}
8785
}
8886

@@ -95,7 +93,7 @@ protected void assertErrorMessage(String indexName, String reqParameter, String
9593
}
9694

9795
private Request createRequest(String indexName, String reqParameter) throws IOException {
98-
final Request request = new Request("POST", "/" + indexName + "/_eql/search" + reqParameter);
96+
final Request request = new Request("POST", "/" + indexPattern(indexName) + "/_eql/search" + reqParameter);
9997
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder()
10098
.startObject()
10199
.field("event_category_field", "event_type")
@@ -112,4 +110,8 @@ private void assertValidRequestOnIndices(String[] indices, String reqParameter)
112110
assertOK(response);
113111
}
114112
}
113+
114+
protected String indexPattern(String index) {
115+
return index;
116+
}
115117
}

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlSpecTestCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,13 @@ protected String tiebreaker() {
3434
return "serial_event_id";
3535
}
3636

37+
// constructor for "local" rest tests
3738
public EqlSpecTestCase(String query, String name, long[] eventIds) {
38-
super(TEST_INDEX, query, name, eventIds);
39+
this(TEST_INDEX, query, name, eventIds);
40+
}
41+
42+
// constructor for multi-cluster tests
43+
public EqlSpecTestCase(String index, String query, String name, long[] eventIds) {
44+
super(index, query, name, eventIds);
3945
}
4046
}

0 commit comments

Comments
 (0)