Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions x-pack/qa/rolling-upgrade-multi-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task leaderClusterTestRunner = tasks.getByName("${taskPrefix}#leader#clusterTestRunner")
leaderClusterTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'none'
systemProperty 'tests.rest.cluster_name', 'leader'

Expand Down Expand Up @@ -71,6 +72,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task followerClusterTestRunner = tasks.getByName("${taskPrefix}#follower#clusterTestRunner")
followerClusterTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'none'
systemProperty 'tests.rest.cluster_name', 'follower'

Expand Down Expand Up @@ -115,6 +117,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task followerOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#oneThirdUpgradedTestRunner")
followerOneThirdUpgradedTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'one_third'
systemProperty 'tests.rest.cluster_name', 'follower'

Expand All @@ -135,6 +138,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task followerTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#twoThirdsUpgradedTestRunner")
followerTwoThirdsUpgradedTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'two_third'
systemProperty 'tests.rest.cluster_name', 'follower'

Expand All @@ -155,6 +159,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task followerUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#follower#upgradedClusterTestRunner")
followerUpgradedClusterTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'all'
systemProperty 'tests.rest.cluster_name', 'follower'

Expand All @@ -181,6 +186,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task leaderOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#oneThirdUpgradedTestRunner")
leaderOneThirdUpgradedTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'one_third'
systemProperty 'tests.rest.cluster_name', 'leader'

Expand All @@ -201,6 +207,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task leaderTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#twoThirdsUpgradedTestRunner")
leaderTwoThirdsUpgradedTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'two_third'
systemProperty 'tests.rest.cluster_name', 'leader'

Expand All @@ -221,6 +228,7 @@ for (Version version : bwcVersions.wireCompatible) {

Task leaderUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#leader#upgradedClusterTestRunner")
leaderUpgradedClusterTestRunner.configure {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
systemProperty 'tests.rest.upgrade_state', 'all'
systemProperty 'tests.rest.cluster_name', 'leader'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
Expand Down Expand Up @@ -71,6 +72,9 @@ public static ClusterName parse(String value) {

protected final ClusterName clusterName = ClusterName.parse(System.getProperty("tests.rest.cluster_name"));

protected static final Version UPGRADE_FROM_VERSION =
Version.fromString(System.getProperty("tests.upgrade_from_version"));

private static RestClient leaderClient;
private static RestClient followerClient;
private static boolean initialized = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.support.XContentMapValues;

import java.io.IOException;
Expand Down Expand Up @@ -88,6 +89,128 @@ public void testIndexFollowing() throws Exception {
}
}

public void testAutoFollowing() throws Exception {
final Settings indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.put("index.number_of_shards", 1)
.build();

String leaderIndex1 = "logs-20200101";
String leaderIndex2 = "logs-20200102";
String leaderIndex3 = "logs-20200103";

if (clusterName == ClusterName.LEADER) {
switch (upgradeState) {
case NONE:
case ONE_THIRD:
case TWO_THIRD:
break;
case ALL:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 320, followerClient());
});
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertTotalHitCount(followerIndex, 256, followerClient());
});
index(leaderClient(), leaderIndex3, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertTotalHitCount(followerIndex, 192, followerClient());
});

deleteAutoFollowPattern(followerClient(), "test_pattern");
stopIndexFollowing(followerClient(), "copy-" + leaderIndex1);
stopIndexFollowing(followerClient(), "copy-" + leaderIndex2);
stopIndexFollowing(followerClient(), "copy-" + leaderIndex3);
break;
default:
throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]");
}
} else if (clusterName == ClusterName.FOLLOWER) {
switch (upgradeState) {
case NONE:
putAutoFollowPattern(followerClient(), "test_pattern", "leader", "logs-*");
createIndex(leaderIndex1, indexSettings);
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1));
assertTotalHitCount(followerIndex, 64, followerClient());
});
break;
case ONE_THIRD:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 128, followerClient());
});
// Auto follow stats are kept in-memory on master elected node
// and if this node get updated then auto follow stats are reset
{
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
createIndex(leaderIndex2, indexSettings);
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
assertTotalHitCount(followerIndex, 64, followerClient());
});
}
break;
case TWO_THIRD:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 192, followerClient());
});
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertTotalHitCount(followerIndex, 128, followerClient());
});

// Auto follow stats are kept in-memory on master elected node
// and if this node get updated then auto follow stats are reset
{
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
createIndex(leaderIndex3, indexSettings);
index(leaderClient(), leaderIndex3, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
assertTotalHitCount(followerIndex, 64, followerClient());
});
}
break;
case ALL:
index(leaderClient(), leaderIndex1, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertTotalHitCount(followerIndex, 256, followerClient());
});
index(leaderClient(), leaderIndex2, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertTotalHitCount(followerIndex, 192, followerClient());
});
index(leaderClient(), leaderIndex3, 64);
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertTotalHitCount(followerIndex, 128, followerClient());
});
break;
default:
throw new UnsupportedOperationException("unexpected upgrade state [" + upgradeState + "]");
}
} else {
throw new AssertionError("unexpected cluster_name [" + clusterName + "]");
}
}

public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
assumeTrue("Tests only runs with upgrade_state [all]", upgradeState == UpgradeState.ALL);

Expand Down Expand Up @@ -134,6 +257,29 @@ private static void followIndex(RestClient client, String leaderCluster, String
assertOK(client.performRequest(request));
}

private static void putAutoFollowPattern(RestClient client, String name, String remoteCluster, String pattern) throws IOException {
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"" + remoteCluster + "\"," +
"\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}");
assertOK(client.performRequest(request));
}

private static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException {
Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName);
assertOK(client.performRequest(request));
}

private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response);
if (actualSuccessfulFollowedIndices != null) {
return actualSuccessfulFollowedIndices;
} else {
return -1;
}
}

private static void index(RestClient client, String index, int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/");
Expand Down Expand Up @@ -162,4 +308,10 @@ private static void verifyTotalHitCount(final String index,
assertThat(totalHits, equalTo(expectedTotalHits));
}

private static void stopIndexFollowing(RestClient client, String followerIndex) throws IOException {
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/pause_follow")));
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_close")));
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/unfollow")));
}

}
Loading