Skip to content

Commit 6d55248

Browse files
committed
Add the ability to the follow index to follow an index in a remote cluster.
The follow index api completely reuses CCS infrastructure that was exposed via: #29495 This means that the leader index parameter support the same ccs index to indicate that an index resides in a different cluster. I also added a qa module that smoke tests the cross cluster nature of ccr. The idea is that this test just verifies that ccr can read data from a remote leader index and that is it, no crazy randomization or indirectly testing other features.
1 parent e1e453c commit 6d55248

File tree

9 files changed

+377
-120
lines changed

9 files changed

+377
-120
lines changed

x-pack/plugin/ccr/build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ task internalClusterTest(type: RandomizedTestingTask,
3232
include '**/*IT.class'
3333
systemProperty 'es.set.netty.runtime.available.processors', 'false'
3434
}
35-
check.dependsOn internalClusterTest
35+
36+
check {
37+
dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest']
38+
}
39+
3640
internalClusterTest.mustRunAfter test
3741

3842
dependencies {

x-pack/plugin/ccr/qa/build.gradle

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
/* Remove assemble on all qa projects because we don't need to publish
3+
* artifacts for them. */
4+
gradle.projectsEvaluated {
5+
subprojects {
6+
Task assemble = project.tasks.findByName('assemble')
7+
if (assemble) {
8+
project.tasks.remove(assemble)
9+
project.build.dependsOn.remove('assemble')
10+
}
11+
}
12+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import org.elasticsearch.gradle.test.RestIntegTestTask
2+
3+
apply plugin: 'elasticsearch.standalone-test'
4+
5+
dependencies {
6+
testCompile project(path: xpackModule('core'), configuration: 'runtime')
7+
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
8+
}
9+
10+
task leaderClusterTest(type: RestIntegTestTask) {
11+
mustRunAfter(precommit)
12+
}
13+
14+
leaderClusterTestCluster {
15+
numNodes = 1
16+
clusterName = 'leader-cluster'
17+
plugin xpackProject('plugin').path
18+
}
19+
20+
leaderClusterTestRunner {
21+
systemProperty 'tests.is_leader_cluster', 'true'
22+
}
23+
24+
task followClusterTest(type: RestIntegTestTask) {}
25+
26+
followClusterTestCluster {
27+
dependsOn leaderClusterTestRunner
28+
numNodes = 1
29+
clusterName = 'follow-cluster'
30+
plugin xpackProject('plugin').path
31+
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
32+
}
33+
34+
followClusterTestRunner {
35+
systemProperty 'tests.is_leader_cluster', 'false'
36+
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
37+
finalizedBy 'leaderClusterTestCluster#stop'
38+
}
39+
40+
test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr;
7+
8+
import org.apache.http.HttpHost;
9+
import org.apache.http.entity.ContentType;
10+
import org.apache.http.entity.StringEntity;
11+
import org.apache.http.util.EntityUtils;
12+
import org.elasticsearch.client.Response;
13+
import org.elasticsearch.client.RestClient;
14+
import org.elasticsearch.common.Booleans;
15+
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.xcontent.XContentBuilder;
18+
import org.elasticsearch.common.xcontent.XContentHelper;
19+
import org.elasticsearch.common.xcontent.json.JsonXContent;
20+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
23+
import java.io.IOException;
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
import static java.util.Collections.emptyMap;
30+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
31+
import static org.hamcrest.Matchers.equalTo;
32+
33+
public class FollowIndexIT extends ESRestTestCase {
34+
35+
private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster"));
36+
37+
@Override
38+
protected boolean preserveClusterUponCompletion() {
39+
return true;
40+
}
41+
42+
public void testFollowIndex() throws Exception {
43+
final int numDocs = 128;
44+
final String leaderIndexName = "test_index1";
45+
if (runningAgainstLeaderCluster) {
46+
logger.info("Running against leader cluster");
47+
for (int i = 0; i < numDocs; i++) {
48+
logger.info("Indexing doc [{}]", i);
49+
index(client(), leaderIndexName, Integer.toString(i), "field", i);
50+
}
51+
refresh(leaderIndexName);
52+
verifyDocuments(leaderIndexName, numDocs);
53+
} else {
54+
logger.info("Running against follow cluster");
55+
final String followIndexName = "test_index2";
56+
Settings indexSettings = Settings.builder()
57+
.put("index.xpack.ccr.following_index", true)
58+
.build();
59+
// TODO: remove mapping here when ccr syncs mappings too
60+
createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}");
61+
ensureYellow(followIndexName);
62+
63+
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
64+
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
65+
66+
try (RestClient leaderClient = buildLeaderClient()) {
67+
int id = numDocs;
68+
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id);
69+
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1);
70+
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2);
71+
}
72+
73+
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
74+
}
75+
}
76+
77+
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
78+
XContentBuilder document = jsonBuilder().startObject();
79+
for (int i = 0; i < fields.length; i += 2) {
80+
document.field((String) fields[i], fields[i + 1]);
81+
}
82+
document.endObject();
83+
assertOK(client.performRequest("POST", "/" + index + "/doc/" + id, emptyMap(),
84+
new StringEntity(Strings.toString(document), ContentType.APPLICATION_JSON)));
85+
}
86+
87+
private static void refresh(String index) throws IOException {
88+
assertOK(client().performRequest("POST", "/" + index + "/_refresh"));
89+
}
90+
91+
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
92+
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
93+
assertOK(client().performRequest("POST", "/_xpack/ccr/" + followIndex + "/_follow", params));
94+
}
95+
96+
private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
97+
Map<String, String> params = new HashMap<>();
98+
params.put("size", Integer.toString(expectedNumDocs));
99+
params.put("sort", "field:asc");
100+
Map<String, ?> response = toMap(client().performRequest("GET", "/" + index + "/_search", params));
101+
102+
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
103+
assertThat(numDocs, equalTo(expectedNumDocs));
104+
105+
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
106+
assertThat(hits.size(), equalTo(expectedNumDocs));
107+
for (int i = 0; i < expectedNumDocs; i++) {
108+
int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
109+
assertThat(i, equalTo(value));
110+
}
111+
}
112+
113+
private static Map<String, Object> toMap(Response response) throws IOException {
114+
return toMap(EntityUtils.toString(response.getEntity()));
115+
}
116+
117+
private static Map<String, Object> toMap(String response) {
118+
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
119+
}
120+
121+
private static void ensureYellow(String index) throws IOException {
122+
Map<String, String> params = new HashMap<>();
123+
params.put("wait_for_status", "yellow");
124+
params.put("wait_for_no_relocating_shards", "true");
125+
params.put("timeout", "30s");
126+
params.put("level", "shards");
127+
assertOK(client().performRequest("GET", "_cluster/health/" + index, params));
128+
}
129+
130+
private RestClient buildLeaderClient() throws IOException {
131+
assert runningAgainstLeaderCluster == false;
132+
String leaderUrl = System.getProperty("tests.leader_host");
133+
int portSeparator = leaderUrl.lastIndexOf(':');
134+
HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator),
135+
Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol());
136+
return buildClient(Settings.EMPTY, new HttpHost[]{httpHost});
137+
}
138+
139+
}

0 commit comments

Comments
 (0)