Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testDataStreams() throws Exception {
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
try {
// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", null);

// Create data stream and ensure that is is auto followed
try (RestClient leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -322,6 +322,112 @@ public void testDataStreams() throws Exception {
}
}

public void testDataStreamsRenameFollowDataStream() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final int numDocs = 64;
final String dataStreamName = "logs-mysql-error";
final String dataStreamNameFollower = "logs-mysql-error_copy";
final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT);

int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
try {
// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", "{{leader_index}}_copy");

// Create data stream and ensure that is is auto followed
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1));
verifyDocuments(leaderClient, dataStreamName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamNameFollower, backingIndexName(dataStreamName, 1));
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamNameFollower, numDocs);
});

// First rollover and ensure second backing index is replicated:
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyDataStream(
client(),
dataStreamNameFollower,
backingIndexName(dataStreamNameFollower, 1),
backingIndexName(dataStreamNameFollower, 2)
);
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamNameFollower, numDocs + 1);
});

// Second rollover and ensure third backing index is replicated:
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(
leaderClient,
dataStreamName,
backingIndexName(dataStreamName, 1),
backingIndexName(dataStreamName, 2),
backingIndexName(dataStreamName, 3)
);

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 2);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
verifyDataStream(
client(),
dataStreamNameFollower,
backingIndexName(dataStreamNameFollower, 1),
backingIndexName(dataStreamNameFollower, 2),
backingIndexName(dataStreamNameFollower, 3)
);
ensureYellow(dataStreamNameFollower);
verifyDocuments(client(), dataStreamNameFollower, numDocs + 2);
});

} finally {
cleanUpFollower(
List.of(
backingIndexName(dataStreamNameFollower, 1),
backingIndexName(dataStreamNameFollower, 2),
backingIndexName(dataStreamNameFollower, 3)
),
List.of(dataStreamNameFollower),
List.of(autoFollowPatternName)
);
cleanUpLeader(
List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)),
List.of(dataStreamName),
List.of()
);
}
}

public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
Expand Down Expand Up @@ -355,7 +461,7 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception
}

// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster", null);

// Rollover and ensure only second backing index is replicated:
try (RestClient leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -412,7 +518,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {
List<String> backingIndexNames = null;
try {
// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster", null);

// Create data stream and ensure that is is auto followed
try (var leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -533,7 +639,7 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception {
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
try {
// Create auto follow pattern
createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster");
createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster", null);

// Create leader index and write alias:
try (var leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -620,7 +726,7 @@ public void testDataStreamsBiDirectionalReplication() throws Exception {

try {
// Create auto follow pattern in follow cluster
createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster");
createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster", null);

// Create auto follow pattern in leader cluster:
try (var leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -660,7 +766,7 @@ public void testDataStreamsBiDirectionalReplication() throws Exception {
}
assertOK(leaderClient.performRequest(request));
// Then create the actual auto follow pattern:
createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster");
createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster", null);
}

var numDocs = 128;
Expand Down Expand Up @@ -834,7 +940,7 @@ public void testAutoFollowSearchableSnapshotsFails() throws Exception {
final String mountedIndex = testPrefix + "-mounted";

try {
createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster", null);

// Create a regular index on leader
try (var leaderClient = buildLeaderClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception {

// Setup
{
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster");
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster", null);
}
// Create data stream and ensure that is is auto followed
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,13 @@ protected static List<String> verifyDataStream(final RestClient client, final St
return List.copyOf(actualBackingIndices);
}

protected static void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException {
protected static void createAutoFollowPattern(
RestClient client,
String name,
String pattern,
String remoteCluster,
String followIndexPattern
) throws IOException {
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
Expand All @@ -341,6 +347,9 @@ protected static void createAutoFollowPattern(RestClient client, String name, St
bodyBuilder.value(pattern);
}
bodyBuilder.endArray();
if (followIndexPattern != null) {
bodyBuilder.field("follow_index_pattern", followIndexPattern);
}
bodyBuilder.field("remote_cluster", remoteCluster);
}
bodyBuilder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private void createFollowerIndex(
}

if (remoteDataStream != null) {
// when following a backing index then the names of the backing index must be remain the same in the local
// when following a backing index then the names of the backing index must remain the same in the local
// and remote cluster.
if (request.getLeaderIndex().equals(request.getFollowerIndex()) == false) {
listener.onFailure(
Expand Down