Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/88875.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 88875
summary: Fix renaming data streams with CCR replication
area: "Data streams"
type: bug
issues:
- 81751
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ the new patterns.
more `leader_index_patterns` and one or more `leader_index_exclusion_patterns` won't be followed.

`follow_index_pattern`::
(Optional, string) The name of follower index. The template `{{leader_index}}`
can be used to derive the name of the follower index from the name of the
leader index. When following a data stream, use `{{leader_index}}`; {ccr-init}
does not support changes to the names of a follower data stream's backing
indices.
(Optional, string) The name of follower index. The template `{{leader_index}}` can be used to
derive the name of the follower index from the name of the leader index. When following a data
stream, the `follow_index_pattern` will be used for renaming not only the leader index, but also
the data stream containing the leader index. For example, a data stream called
`logs-mysql-default` with a backing index of `.ds-logs-mysql-default-2022-01-01-000001` and a
`follow_index_pattern` of `{{leader_index}}_copy` will replicate the data stream as
`logs-mysql-default_copy` and the backing index as
`.ds-logs-mysql-default_copy-2022-01-01-000001`.

include::../follow-request-body.asciidoc[]

Expand Down
20 changes: 20 additions & 0 deletions docs/reference/ccr/apis/follow/put-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,26 @@ referenced leader index. When this API returns, the follower index exists, and
(Required, string) The <<remote-clusters,remote cluster>> containing
the leader index.

[[ccr-put-follow-request-body-data_stream_name]]`data_stream_name`::
(Optional, string) If the leader index is part of a <<data-streams,data stream>>, the name to
which the local data stream for the followed index should be renamed. For example, A request like:

[source,console]
--------------------------------------------------
PUT /.ds-logs-mysql-default_copy-2022-01-01-000001/_ccr/follow
{
"remote_cluster" : "remote_cluster",
"leader_index" : ".ds-logs-mysql-default-2022-01-01-000001",
"data_stream_name": "logs-mysql-default_copy"
}
--------------------------------------------------
// TEST[skip:no setup]

Replicates the leader index `.ds-logs-mysql-default-2022-01-01-000001` into the follower index
`.ds-logs-mysql-default_copy-2022-01-01-000001` and will do so using the data stream
`logs-mysql-default_copy`, as opposed to the original leader data stream name of
`logs-mysql-default`.

include::../follow-request-body.asciidoc[]

[[ccr-put-follow-examples]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void testDataStreams() throws Exception {
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
try {
// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", null);

// Create data stream and ensure that is is auto followed
try (RestClient leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -320,6 +320,121 @@ public void testDataStreams() throws Exception {
}
}

public void testDataStreamsRenameFollowDataStream() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final int numDocs = 64;
final String dataStreamName = "logs-mysql-error";
final String dataStreamNameFollower = "logs-mysql-error_copy";
final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT);

int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
try {
// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", "{{leader_index}}_copy");

// Create data stream and ensure that is is auto followed
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1));
verifyDocuments(leaderClient, dataStreamName, numDocs);
}
logger.info(
"--> checking {} with index {} has been auto followed to {} with backing index {}",
dataStreamName,
backingIndexName(dataStreamName, 1),
dataStreamNameFollower,
backingIndexName(dataStreamNameFollower, 1)
);
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamNameFollower, backingIndexName(dataStreamNameFollower, 1));
ensureYellow(dataStreamNameFollower);
verifyDocuments(client(), dataStreamNameFollower, numDocs);
});

// First rollover and ensure second backing index is replicated:
logger.info("--> rolling over");
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyDataStream(
client(),
dataStreamNameFollower,
backingIndexName(dataStreamNameFollower, 1),
backingIndexName(dataStreamNameFollower, 2)
);
ensureYellow(dataStreamNameFollower);
verifyDocuments(client(), dataStreamNameFollower, numDocs + 1);
});

// Second rollover and ensure third backing index is replicated:
logger.info("--> rolling over");
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(
leaderClient,
dataStreamName,
backingIndexName(dataStreamName, 1),
backingIndexName(dataStreamName, 2),
backingIndexName(dataStreamName, 3)
);

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 2);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
verifyDataStream(
client(),
dataStreamNameFollower,
backingIndexName(dataStreamNameFollower, 1),
backingIndexName(dataStreamNameFollower, 2),
backingIndexName(dataStreamNameFollower, 3)
);
ensureYellow(dataStreamNameFollower);
verifyDocuments(client(), dataStreamNameFollower, numDocs + 2);
});

} finally {
cleanUpFollower(
List.of(
backingIndexName(dataStreamNameFollower, 1),
backingIndexName(dataStreamNameFollower, 2),
backingIndexName(dataStreamNameFollower, 3)
),
List.of(dataStreamNameFollower),
List.of(autoFollowPatternName)
);
cleanUpLeader(
List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)),
List.of(dataStreamName),
List.of()
);
}
}

public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
Expand Down Expand Up @@ -353,7 +468,7 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception
}

// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster", null);

// Rollover and ensure only second backing index is replicated:
try (RestClient leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -410,7 +525,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {
List<String> backingIndexNames = null;
try {
// Create auto follow pattern
createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster", null);

// Create data stream and ensure that is is auto followed
try (var leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -531,7 +646,7 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception {
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
try {
// Create auto follow pattern
createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster");
createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster", null);

// Create leader index and write alias:
try (var leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -618,7 +733,7 @@ public void testDataStreamsBiDirectionalReplication() throws Exception {

try {
// Create auto follow pattern in follow cluster
createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster");
createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster", null);

// Create auto follow pattern in leader cluster:
try (var leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -658,7 +773,7 @@ public void testDataStreamsBiDirectionalReplication() throws Exception {
}
assertOK(leaderClient.performRequest(request));
// Then create the actual auto follow pattern:
createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster");
createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster", null);
}

var numDocs = 128;
Expand Down Expand Up @@ -832,7 +947,7 @@ public void testAutoFollowSearchableSnapshotsFails() throws Exception {
final String mountedIndex = testPrefix + "-mounted";

try {
createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster");
createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster", null);

// Create a regular index on leader
try (var leaderClient = buildLeaderClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,26 +180,6 @@ public void testFollowDataStreamFails() throws Exception {
assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM"));
}

public void testChangeBackingIndexNameFails() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final String dataStreamName = "logs-foobar-prod";
try (RestClient leaderClient = buildLeaderClient()) {
Request request = new Request("PUT", "/_data_stream/" + dataStreamName);
assertOK(leaderClient.performRequest(request));
verifyDataStream(leaderClient, dataStreamName, DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1));
}

ResponseException failure = expectThrows(
ResponseException.class,
() -> followIndex(DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1), ".ds-logs-barbaz-prod-000001")
);
assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(failure.getMessage(), containsString("a backing index name in the local and remote cluster must remain the same"));
}

public void testFollowSearchableSnapshotsFails() throws Exception {
final String testPrefix = getTestName().toLowerCase(Locale.ROOT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception {

// Setup
{
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster");
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster", null);
}
// Create data stream and ensure that it is auto followed
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,13 @@ protected static List<String> verifyDataStream(final RestClient client, final St
return List.copyOf(actualBackingIndices);
}

protected static void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException {
protected static void createAutoFollowPattern(
RestClient client,
String name,
String pattern,
String remoteCluster,
String followIndexPattern
) throws IOException {
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
Expand All @@ -345,6 +351,9 @@ protected static void createAutoFollowPattern(RestClient client, String name, St
bodyBuilder.value(pattern);
}
bodyBuilder.endArray();
if (followIndexPattern != null) {
bodyBuilder.field("follow_index_pattern", followIndexPattern);
}
bodyBuilder.field("remote_cluster", remoteCluster);
}
bodyBuilder.endObject();
Expand Down
Loading