From 801decd8d148f352e1e0029268daad50d37fb6d2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 14 Mar 2022 10:19:24 +0100 Subject: [PATCH 1/7] Add failing test for data stream renaming through CCR Relates to #81751 --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 120 +++++++++++++++++- .../xpack/ccr/FollowIndexSecurityIT.java | 2 +- .../xpack/ccr/ESCCRRestTestCase.java | 11 +- .../ccr/action/TransportPutFollowAction.java | 2 +- 4 files changed, 125 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 35fac474d86f3..28a975b3cc23e 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -236,7 +236,7 @@ public void testDataStreams() throws Exception { int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); try { // Create auto follow pattern - createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", null); // Create data stream and ensure that is is auto followed try (RestClient leaderClient = buildLeaderClient()) { @@ -320,6 +320,112 @@ public void testDataStreams() throws Exception { } } + public void testDataStreamsRenameFollowDataStream() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String dataStreamName = "logs-mysql-error"; + final String dataStreamNameFollower = "logs-mysql-error_copy"; + final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT); + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + try { + // Create auto follow pattern + createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", "{{leader_index}}_copy"); + + // Create data stream and ensure that is is auto followed + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamNameFollower, backingIndexName(dataStreamName, 1)); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamNameFollower, numDocs); + }); + + // First rollover and ensure second backing index is replicated: + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream( + client(), + dataStreamNameFollower, + backingIndexName(dataStreamNameFollower, 1), + backingIndexName(dataStreamNameFollower, 2) + ); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamNameFollower, numDocs + 1); + }); + + // Second rollover and ensure third backing index is replicated: + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream( + leaderClient, + dataStreamName, + backingIndexName(dataStreamName, 1), + backingIndexName(dataStreamName, 2), + backingIndexName(dataStreamName, 3) + ); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 2); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3)); + verifyDataStream( + client(), + dataStreamNameFollower, + backingIndexName(dataStreamNameFollower, 1), + backingIndexName(dataStreamNameFollower, 2), + backingIndexName(dataStreamNameFollower, 3) + ); + ensureYellow(dataStreamNameFollower); + verifyDocuments(client(), dataStreamNameFollower, numDocs + 2); + }); + + } finally { + cleanUpFollower( + List.of( + backingIndexName(dataStreamNameFollower, 1), + backingIndexName(dataStreamNameFollower, 2), + backingIndexName(dataStreamNameFollower, 3) + ), + List.of(dataStreamNameFollower), + List.of(autoFollowPatternName) + ); + cleanUpLeader( + List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)), + List.of(dataStreamName), + List.of() + ); + } + } + public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception { if ("follow".equals(targetCluster) == false) { return; @@ -353,7 +459,7 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception } // Create auto follow pattern - createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster", null); // Rollover and ensure only second backing index is replicated: try (RestClient leaderClient = buildLeaderClient()) { @@ -410,7 +516,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception { List backingIndexNames = null; try { // Create auto follow pattern - createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster", null); // Create data stream and ensure that is is auto followed try (var leaderClient = buildLeaderClient()) { @@ -531,7 +637,7 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception { int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); try { // Create auto follow pattern - createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster"); + createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster", null); // Create leader index and write alias: try (var leaderClient = buildLeaderClient()) { @@ -618,7 +724,7 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { try { // Create auto follow pattern in follow cluster - createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster"); + createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster", null); // Create auto follow pattern in leader cluster: try (var leaderClient = buildLeaderClient()) { @@ -658,7 +764,7 @@ public void testDataStreamsBiDirectionalReplication() throws Exception { } assertOK(leaderClient.performRequest(request)); // Then create the actual auto follow pattern: - createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster"); + createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster", null); } var numDocs = 128; @@ -832,7 +938,7 @@ public void testAutoFollowSearchableSnapshotsFails() throws Exception { final String mountedIndex = testPrefix + "-mounted"; try { - createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster"); + createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster", null); // Create a regular index on leader try (var leaderClient = buildLeaderClient()) { diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 24eb234716c4e..c2210af7e0a13 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -281,7 +281,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception { // Setup { - createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster"); + createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster", null); } // Create data stream and ensure that it is auto followed { diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index f7df63db15f97..b95d9f60c62d9 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -335,7 +335,13 @@ protected static List verifyDataStream(final RestClient client, final St return List.copyOf(actualBackingIndices); } - protected static void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException { + protected static void createAutoFollowPattern( + RestClient client, + String name, + String pattern, + String remoteCluster, + String followIndexPattern + ) throws IOException { Request request = new Request("PUT", "/_ccr/auto_follow/" + name); try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { bodyBuilder.startObject(); @@ -345,6 +351,9 @@ protected static void createAutoFollowPattern(RestClient client, String name, St bodyBuilder.value(pattern); } bodyBuilder.endArray(); + if (followIndexPattern != null) { + bodyBuilder.field("follow_index_pattern", followIndexPattern); + } bodyBuilder.field("remote_cluster", remoteCluster); } bodyBuilder.endObject(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 88301c49c2101..0b834b56e63ea 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -170,7 +170,7 @@ private void createFollowerIndex( } if (remoteDataStream != null) { - // when following a backing index then the names of the backing index must be remain the same in the local + // when following a backing index then the names of the backing index must remain the same in the local // and remote cluster. if (request.getLeaderIndex().equals(request.getFollowerIndex()) == false) { listener.onFailure( From e1d83caa0e48ae53f22019c86e6f7a43f1871796 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 26 Jul 2022 15:12:19 -0600 Subject: [PATCH 2/7] Fix renaming data streams with CCR replication This commit fixes the situation where a user wants to use CCR to replicate indices that are part of a data stream while renaming the data stream. For example, assume a user has an auto-follow request that looks like this: ``` PUT /_ccr/auto_follow/my-auto-follow-pattern { "remote_cluster" : "other-cluster", "leader_index_patterns" : ["logs-*"], "follow_index_pattern" : "{{leader_index}}_copy" } ``` And then the data stream `logs-mysql-error` was created, creating the backing index `.ds-logs-mysql-error-2022-07-29-000001`. Prior to this commit, replicating this data stream means that the backing index would be renamed to `.ds-logs-mysql-error-2022-07-29-000001_copy` and the data stream would *not* be renamed. This caused a check to trip in `TransportPutLifecycleAction` asserting that a backing index was not renamed for a data stream during following. After this commit, there are a couple of changes: First, the data stream will also be renamed. This means that the `logs-mysql-error` becomes `logs-mysql-error_copy` when created on the follower cluster. Because of the way that CCR works, this means we need to support renaming a data stream for a regular "create follower" request, so a new parameter has been added: `data_stream_name`. It works like this: ``` PUT /mynewindex/_ccr/follow { "remote_cluster": "other-cluster", "leader_index": "myotherindex", "data_stream_name": "new_ds" } ``` Second, the backing index for a data stream must be renamed in a way that does not break the parsing of a data stream backing pattern, whereas previously the index `.ds-logs-mysql-error-2022-07-29-000001` would be renamed to `.ds-logs-mysql-error-2022-07-29-000001_copy` (an illegal name since it doesn't end with the rollover digit), after this commit it will be renamed to `.ds-logs-mysql-error_copy-2022-07-29-000001` to match the renamed data stream. This means that for the given `follow_index_pattern` of `{{leader_index}}_copy` the index changes look like: | Leader Cluster | Follower Cluster | |--------------|-----------| | `logs-mysql-error` (data stream) | `logs-mysql-error_copy` (data stream) | | `.ds-logs-mysql-error-2022-07-29-000001` | `.ds-logs-mysql-error_copy-2022-07-29-000001` | Which internally means the auto-follow request turned into the create follower request of: ``` PUT /.ds-logs-mysql-error_copy-2022-07-29-000001/_ccr/follow { "remote_cluster": "other-cluster", "leader_index": ".ds-logs-mysql-error-2022-07-29-000001", "data_stream_name": "logs-mysql-error_copy" } ``` Relates to #84940 (cherry-picked the commit for a test) Relates to #61993 (where data stream support was first introduced for CCR) Resolves #81751 --- .../put-auto-follow-pattern.asciidoc | 13 +- .../ccr/apis/follow/put-follow.asciidoc | 20 ++ .../elasticsearch/xpack/ccr/AutoFollowIT.java | 15 +- .../xpack/ccr/FollowIndexIT.java | 20 -- .../ccr/action/AutoFollowCoordinator.java | 128 +++++++- .../ccr/action/TransportPutFollowAction.java | 57 ++-- .../action/AutoFollowCoordinatorTests.java | 275 ++++++++++++++++++ .../ccr/action/FollowParametersTests.java | 5 + .../action/PutFollowActionRequestTests.java | 37 +++ .../action/TransportPutFollowActionTests.java | 28 +- .../core/ccr/action/PutFollowAction.java | 37 ++- 11 files changed, 573 insertions(+), 62 deletions(-) diff --git a/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc b/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc index ed377e72fce49..3876cab007d90 100644 --- a/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc +++ b/docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc @@ -85,11 +85,14 @@ the new patterns. more `leader_index_patterns` and one or more `leader_index_exclusion_patterns` won't be followed. `follow_index_pattern`:: - (Optional, string) The name of follower index. The template `{{leader_index}}` - can be used to derive the name of the follower index from the name of the - leader index. When following a data stream, use `{{leader_index}}`; {ccr-init} - does not support changes to the names of a follower data stream's backing - indices. + (Optional, string) The name of follower index. The template `{{leader_index}}` can be used to + derive the name of the follower index from the name of the leader index. When following a data + stream, the `follow_index_pattern` will be used for renaming not only the leader index, but also + the data stream containing the leader index. For example, a data stream called + `logs-mysql-default` with a backing index of `.ds-logs-mysql-default-2022-01-01-000001` and a + `follow_index_pattern` of `{{leader_index}}_copy` will replicate the data stream as + `logs-mysql-default_copy` and the backing index as + `.ds-logs-mysql-default_copy-2022-01-01-000001`. include::../follow-request-body.asciidoc[] diff --git a/docs/reference/ccr/apis/follow/put-follow.asciidoc b/docs/reference/ccr/apis/follow/put-follow.asciidoc index d09eb51534042..93e8a710751a8 100644 --- a/docs/reference/ccr/apis/follow/put-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/put-follow.asciidoc @@ -76,6 +76,26 @@ referenced leader index. When this API returns, the follower index exists, and (Required, string) The <> containing the leader index. +[[ccr-put-follow-request-body-data_stream_name]]`data_stream_name`:: + (Optional, string) If the leader index is part of a <>, the name to + which the local data stream for the followed index should be renamed. For example, A request like: + +[source,console] +-------------------------------------------------- +PUT /.ds-logs-mysql-default_copy-2022-01-01-000001/_ccr/follow +{ + "remote_cluster" : "remote_cluster", + "leader_index" : ".ds-logs-mysql-default-2022-01-01-000001", + "data_stream_name": "logs-mysql-default_copy" +} +-------------------------------------------------- +// TEST[skip:no setup] + +Replicates the leader index `.ds-logs-mysql-default-2022-01-01-000001` into the follower index +`.ds-logs-mysql-default_copy-2022-01-01-000001` and will do so using the data stream +`logs-mysql-default_copy`, as opposed to the original leader data stream name of +`logs-mysql-default`. + include::../follow-request-body.asciidoc[] [[ccr-put-follow-examples]] diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 28a975b3cc23e..ffdd40a1bd844 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -346,14 +346,22 @@ public void testDataStreamsRenameFollowDataStream() throws Exception { verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); verifyDocuments(leaderClient, dataStreamName, numDocs); } + logger.info( + "--> checking {} with index {} has been auto followed to {} with backing index {}", + dataStreamName, + backingIndexName(dataStreamName, 1), + dataStreamNameFollower, + backingIndexName(dataStreamNameFollower, 1) + ); assertBusy(() -> { assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamNameFollower, backingIndexName(dataStreamName, 1)); - ensureYellow(dataStreamName); + verifyDataStream(client(), dataStreamNameFollower, backingIndexName(dataStreamNameFollower, 1)); + ensureYellow(dataStreamNameFollower); verifyDocuments(client(), dataStreamNameFollower, numDocs); }); // First rollover and ensure second backing index is replicated: + logger.info("--> rolling over"); try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); @@ -373,11 +381,12 @@ public void testDataStreamsRenameFollowDataStream() throws Exception { backingIndexName(dataStreamNameFollower, 1), backingIndexName(dataStreamNameFollower, 2) ); - ensureYellow(dataStreamName); + ensureYellow(dataStreamNameFollower); verifyDocuments(client(), dataStreamNameFollower, numDocs + 1); }); // Second rollover and ensure third backing index is replicated: + logger.info("--> rolling over"); try (RestClient leaderClient = buildLeaderClient()) { Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); assertOK(leaderClient.performRequest(rolloverRequest)); diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 38132b53ed300..db8562bac62ef 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -180,26 +180,6 @@ public void testFollowDataStreamFails() throws Exception { assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM")); } - public void testChangeBackingIndexNameFails() throws Exception { - if ("follow".equals(targetCluster) == false) { - return; - } - - final String dataStreamName = "logs-foobar-prod"; - try (RestClient leaderClient = buildLeaderClient()) { - Request request = new Request("PUT", "/_data_stream/" + dataStreamName); - assertOK(leaderClient.performRequest(request)); - verifyDataStream(leaderClient, dataStreamName, DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1)); - } - - ResponseException failure = expectThrows( - ResponseException.class, - () -> followIndex(DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1), ".ds-logs-barbaz-prod-000001") - ); - assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400)); - assertThat(failure.getMessage(), containsString("a backing index name in the local and remote cluster must remain the same")); - } - public void testFollowSearchableSnapshotsFails() throws Exception { final String testPrefix = getTestName().toLowerCase(Locale.ROOT); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index a53ea9dc69039..f09bce85f89b7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -61,6 +62,8 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; @@ -72,9 +75,24 @@ */ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ClusterStateListener { + /** + * This is the string that will be replaced by the leader index name for a backing index or data + * stream. It allows auto-following to automatically rename an index or data stream when + * automatically followed. For example, using "{{leader_index}}_copy" for the follow pattern + * means that a data stream called "logs-foo-bar" would be renamed "logs-foo-bar_copy" when + * replicated, and a backing index called ".ds-logs-foo-bar-2022-02-02-000001" would be renamed + * to ".ds-logs-foo-bar_copy-2022-02-02-000001". + * See {@link AutoFollower#getFollowerIndexName} for the entire usage. + */ + public static final String AUTO_FOLLOW_PATTERN_REPLACEMENT = "{{leader_index}}"; + private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); private static final int MAX_AUTO_FOLLOW_ERRORS = 256; + private static final Pattern DS_BACKING_PATTERN = Pattern.compile( + "^(.*?" + DataStream.BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)?$" + ); + private final Client client; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; @@ -563,6 +581,12 @@ private void autoFollowIndices( cleanFollowedRemoteIndices(remoteClusterState, patterns); } + /** + * Go through all the leader indices that need to be followed, ensuring that they are + * auto-followed by only a single pattern, have soft-deletes enabled, are not + * searchable snapshots, and are not already followed. If all of those conditions are met, + * then follow the indices. + */ private void checkAutoFollowPattern( String autoFollowPattenName, String remoteClusterString, @@ -582,8 +606,13 @@ private void checkAutoFollowPattern( leaderIndicesToFollow.size() ); + // Loop through all the as-of-yet-unfollowed indices from the leader for (final Index indexToFollow : leaderIndicesToFollow) { + // Look up the abstraction for the given index, e.g., an index ".ds-foo" could look + // up the Data Stream "foo" IndexAbstraction indexAbstraction = remoteMetadata.getIndicesLookup().get(indexToFollow.getName()); + // Ensure that the remote cluster doesn't have other patterns + // that would follow the index, there can be only one. List otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() .filter(otherPattern -> otherPattern.v2().match(indexAbstraction)) .map(Tuple::v1) @@ -605,6 +634,7 @@ private void checkAutoFollowPattern( ); } else { final IndexMetadata leaderIndexMetadata = remoteMetadata.getIndexSafe(indexToFollow); + // First ensure that the index on the leader that we want to follow has soft-deletes enabled if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(leaderIndexMetadata.getSettings()) == false) { String message = String.format( Locale.ROOT, @@ -639,10 +669,12 @@ private void checkAutoFollowPattern( error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)) ); } else { + // Finally, if there are no reasons why we cannot follow the leader index, perform the follow. followLeaderIndex( autoFollowPattenName, remoteClusterString, indexToFollow, + indexAbstraction, autoFollowPattern, headers, error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error)) @@ -669,22 +701,32 @@ private static boolean leaderIndexAlreadyFollowed(AutoFollowPattern autoFollowPa return false; } - private void followLeaderIndex( - String autoFollowPattenName, - String remoteClusterString, + /** + * Given a remote cluster, index that will be followed (and its abstraction), as well as an + * {@link AutoFollowPattern}, generate the internal follow request for following the index. + */ + static PutFollowAction.Request generateRequest( + String remoteCluster, Index indexToFollow, - AutoFollowPattern pattern, - Map headers, - Consumer onResult + IndexAbstraction indexAbstraction, + AutoFollowPattern pattern ) { final String leaderIndexName = indexToFollow.getName(); final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); PutFollowAction.Request request = new PutFollowAction.Request(); - request.setRemoteCluster(remoteClusterString); + request.setRemoteCluster(remoteCluster); request.setLeaderIndex(indexToFollow.getName()); request.setFollowerIndex(followIndexName); request.setSettings(pattern.getSettings()); + // If there was a pattern specified for renaming the backing index, and this index is + // part of a data stream, then send the new data stream name as part of the request. + if (pattern.getFollowIndexPattern() != null && indexAbstraction.getParentDataStream() != null) { + String dataStreamName = indexAbstraction.getParentDataStream().getDataStream().getName(); + // Send the follow index pattern as the data stream pattern, so that data streams can be + // renamed accordingly (not only the backing indices) + request.setDataStreamName(pattern.getFollowIndexPattern().replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, dataStreamName)); + } request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount()); request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize()); request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests()); @@ -697,9 +739,23 @@ private void followLeaderIndex( request.getParameters().setReadPollTimeout(pattern.getReadPollTimeout()); request.masterNodeTimeout(TimeValue.MAX_VALUE); + return request; + } + + private void followLeaderIndex( + String autoFollowPattenName, + String remoteClusterString, + Index indexToFollow, + IndexAbstraction indexAbstraction, + AutoFollowPattern pattern, + Map headers, + Consumer onResult + ) { + PutFollowAction.Request request = generateRequest(remoteClusterString, indexToFollow, indexAbstraction, pattern); + // Execute if the create and follow api call succeeds: Runnable successHandler = () -> { - LOGGER.info("auto followed leader index [{}] as follow index [{}]", indexToFollow, followIndexName); + LOGGER.info("auto followed leader index [{}] as follow index [{}]", indexToFollow, request.getFollowerIndex()); // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: // (so that we do not try to follow it in subsequent auto follow runs) @@ -731,6 +787,22 @@ private void finalise(int slot, AutoFollowResult result, final Thread thread) { } } + /** + * Given an auto following pattern for a set of indices and the cluster state from a remote + * cluster, return the list of indices that need to be followed. The list of followed index + * UUIDs contains indices that have already been followed, so the returned list will only + * contain "new" indices from the leader that need to be followed. + * + * When looking up the name of the index to see if it matches one of the patterns, the index + * abstraction ({@link IndexAbstraction}) of the index is used for comparison, this means + * that if an index named ".ds-foo" was part of a data stream "foo", then an auto-follow + * pattern of "f*" would allow the ".ds-foo" index to be returned. + * + * @param autoFollowPattern pattern to check indices that may need to be followed + * @param remoteClusterState state from the remote ES cluster + * @param followedIndexUUIDs a collection of UUIDs of indices already being followed + * @return any new indices on the leader that need to be followed + */ static List getLeaderIndicesToFollow( AutoFollowPattern autoFollowPattern, ClusterState remoteClusterState, @@ -760,9 +832,45 @@ static List getLeaderIndicesToFollow( return leaderIndicesToFollow; } + /** + * Returns the new name for the follower index. If the auto-follow configuration includes a + * follow index pattern, the text "{@code {{leader_index}}}" is replaced with the original + * index name, so a leader index called "foo" and a pattern of "{{leader_index}}_copy" + * becomes a new follower index called "foo_copy". + */ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String leaderIndexName) { - if (autoFollowPattern.getFollowIndexPattern() != null) { - return autoFollowPattern.getFollowIndexPattern().replace("{{leader_index}}", leaderIndexName); + final String followPattern = autoFollowPattern.getFollowIndexPattern(); + if (followPattern != null) { + if (leaderIndexName.contains(DataStream.BACKING_INDEX_PREFIX)) { + // The index being replicated is a data stream backing index, so it's something + // like: .ds--20XX-mm-dd-NNNNNN + // + // However, we cannot just replace the name with the proposed follow index + // pattern, or else we'll end up with something like ".ds-logs-foo-bar-2022-02-02-000001_copy" + // for "{{leader_index}}_copy", which will cause problems because it doesn't + // follow a parseable pattern. Instead it would be better to rename it as though + // the data stream name was the leader index name, ending up with + // ".ds-logs-foo-bar_copy-2022-02-02-000001" as the final index name. + Matcher m = DS_BACKING_PATTERN.matcher(leaderIndexName); + if (m.find()) { + return m.group(1) + // Prefix including ".ds-" + followPattern.replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, m.group(2)) + // Data stream name changed + "-" + // Hyphen separator + m.group(3) + // Date math + m.group(4); + } else { + throw new IllegalArgumentException( + "unable to determine follower index name from leader index name [" + + leaderIndexName + + "] and follow index pattern: [" + + followPattern + + "]" + ); + } + } else { + // If the index does nat contain a `.ds-`, then rename it as usual. + return followPattern.replace("{{leader_index}}", leaderIndexName); + } } else { return leaderIndexName; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 0b834b56e63ea..cd7b970bef2d7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -169,17 +170,6 @@ private void createFollowerIndex( return; } - if (remoteDataStream != null) { - // when following a backing index then the names of the backing index must remain the same in the local - // and remote cluster. - if (request.getLeaderIndex().equals(request.getFollowerIndex()) == false) { - listener.onFailure( - new IllegalArgumentException("a backing index name in the local and remote cluster must remain the same") - ); - return; - } - } - final Settings overrideSettings = Settings.builder() .put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex()) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) @@ -215,15 +205,37 @@ protected void doRun() { (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response) ); if (remoteDataStream == null) { + // If the index we're following is not part of a data stream, start the + // restoration of the index normally. restoreService.restoreSnapshot(restoreRequest, delegatelistener); } else { String followerIndexName = request.getFollowerIndex(); + // This method is used to update the metadata in the same cluster state + // update as the snapshot is restored. BiConsumer updater = (currentState, mdBuilder) -> { - DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(remoteDataStream.getName()); - Index followerIndex = mdBuilder.get(followerIndexName).getIndex(); - assert followerIndex != null; + final String localDataStreamName; + + // If we have been given a data stream name, use that name for the local + // data stream. See the javadoc for AUTO_FOLLOW_PATTERN_REPLACEMENT + // for more info. + final String dsName = request.getDataStreamName(); + if (Strings.hasText(dsName)) { + localDataStreamName = dsName; + } else { + // There was no specified name, use the original data stream name. + localDataStreamName = remoteDataStream.getName(); + } + final DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(localDataStreamName); + final Index followerIndex = mdBuilder.get(followerIndexName).getIndex(); + assert followerIndex != null + : "expected followerIndex " + followerIndexName + " to exist in the state, but it did not"; - DataStream updatedDataStream = updateLocalDataStream(followerIndex, localDataStream, remoteDataStream); + final DataStream updatedDataStream = updateLocalDataStream( + followerIndex, + localDataStream, + localDataStreamName, + remoteDataStream + ); mdBuilder.put(updatedDataStream); }; restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater); @@ -303,12 +315,23 @@ private void initiateFollowing( ); } - static DataStream updateLocalDataStream(Index backingIndexToFollow, DataStream localDataStream, DataStream remoteDataStream) { + /** + * Given the backing index that the follower is going to follower, the local data stream (if it + * exists) and the remote data stream, return the new local data stream for the local cluster + * (the follower) updated with whichever information is necessary to restore the new + * soon-to-be-followed index. + */ + static DataStream updateLocalDataStream( + Index backingIndexToFollow, + DataStream localDataStream, + String localDataStreamName, + DataStream remoteDataStream + ) { if (localDataStream == null) { // The data stream and the backing indices have been created and validated in the remote cluster, // just copying the data stream is in this case safe. return new DataStream( - remoteDataStream.getName(), + localDataStreamName, List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 825c2abeb95ac..28d5b7921ac26 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -32,6 +33,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -1001,6 +1003,279 @@ public void testGetFollowerIndexName() { null ); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); + + // Test that index of data stream type name works correctly: + autoFollowPattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + "{{leader_index}}_copy", + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + assertThat( + AutoFollower.getFollowerIndexName(autoFollowPattern, ".ds-logs-foo-bar-2022-02-01-123456"), + equalTo(".ds-logs-foo-bar_copy-2022-02-01-123456") + ); + + autoFollowPattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + "prepend_{{leader_index}}", + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + assertThat( + AutoFollower.getFollowerIndexName(autoFollowPattern, ".ds-logs-foo-bar-2022-02-01-123456"), + equalTo(".ds-prepend_logs-foo-bar-2022-02-01-123456") + ); + + } + + public void testGenerateRequest() { + // Renaming with a suffix and normal pattern backing indices + { + AutoFollowPattern pattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + "{{leader_index}}_copy", + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid"); + IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex( + IndexMetadata.builder(index.getName()) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build() + ) + .build(), + new IndexAbstraction.DataStream( + new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD) + ) + ); + + PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern); + assertThat(request.getRemoteCluster(), equalTo("remote")); + assertThat(request.getFollowerIndex(), equalTo(".ds-logs-foo-bar_copy-2022-02-01-123456")); + assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456")); + assertThat(request.getDataStreamName(), equalTo("logs-foo-bar_copy")); + } + + // Renaming with a prefix and normal pattern backing indices + { + AutoFollowPattern pattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + "copy_{{leader_index}}", + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid"); + IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex( + IndexMetadata.builder(index.getName()) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build() + ) + .build(), + new IndexAbstraction.DataStream( + new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD) + ) + ); + + PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern); + assertThat(request.getRemoteCluster(), equalTo("remote")); + assertThat(request.getFollowerIndex(), equalTo(".ds-copy_logs-foo-bar-2022-02-01-123456")); + assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456")); + assertThat(request.getDataStreamName(), equalTo("copy_logs-foo-bar")); + } + + // Renaming with a suffix and irregular pattern backing indices + { + AutoFollowPattern pattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + "{{leader_index}}_copy", + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + Index index = new Index("my-backing-index", "uuid"); + IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex( + IndexMetadata.builder(index.getName()) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build() + ) + .build(), + new IndexAbstraction.DataStream( + new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD) + ) + ); + + PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern); + assertThat(request.getRemoteCluster(), equalTo("remote")); + assertThat(request.getFollowerIndex(), equalTo("my-backing-index_copy")); + assertThat(request.getLeaderIndex(), equalTo("my-backing-index")); + assertThat(request.getDataStreamName(), equalTo("logs-foo-bar_copy")); + } + + // Renaming with a suffix but not part of a data stream + { + AutoFollowPattern pattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + "{{leader_index}}_copy", + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid"); + IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex( + IndexMetadata.builder(index.getName()) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build() + ) + .build(), + null + ); + + PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern); + assertThat(request.getRemoteCluster(), equalTo("remote")); + assertThat(request.getFollowerIndex(), equalTo(".ds-logs-foo-bar_copy-2022-02-01-123456")); + assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456")); + assertThat(request.getDataStreamName(), equalTo(null)); + } + + // Regular backing index, but no renaming + { + AutoFollowPattern pattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + null, + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid"); + IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex( + IndexMetadata.builder(index.getName()) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build() + ) + .build(), + new IndexAbstraction.DataStream( + new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD) + ) + ); + + PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern); + assertThat(request.getRemoteCluster(), equalTo("remote")); + assertThat(request.getFollowerIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456")); + assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456")); + assertThat(request.getDataStreamName(), equalTo(null)); + } } public void testStats() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java index fd92bc3ecff99..93879f2dfb842 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java @@ -38,6 +38,11 @@ protected Writeable.Reader instanceReader() { return FollowParameters::new; } + @Override + protected FollowParameters mutateInstance(FollowParameters instance) { + return randomInstance(); + } + static FollowParameters randomInstance() { FollowParameters followParameters = new FollowParameters(); followParameters.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java index 50fe5ce87182e..ab84ca9fd9ca7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -38,6 +39,7 @@ protected PutFollowAction.Request createTestInstance() { Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build() ); ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters()); + request.setDataStreamName(randomAlphaOfLength(4)); return request; } @@ -53,6 +55,7 @@ protected PutFollowAction.Request createXContextTestInstance(XContentType xConte ); request.setFollowerIndex("followerIndex"); ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters()); + request.setDataStreamName(randomAlphaOfLength(4)); return request; } @@ -61,6 +64,40 @@ protected PutFollowAction.Request doParseInstance(XContentParser parser) throws return PutFollowAction.Request.fromXContent(parser, "followerIndex", ActiveShardCount.DEFAULT); } + @Override + protected PutFollowAction.Request mutateInstance(PutFollowAction.Request instance) throws IOException { + PutFollowAction.Request request = new PutFollowAction.Request(); + request.setFollowerIndex(instance.getFollowerIndex()); + request.waitForActiveShards(instance.waitForActiveShards()); + request.setRemoteCluster(instance.getRemoteCluster()); + request.setLeaderIndex(instance.getLeaderIndex()); + request.setSettings(instance.getSettings()); + request.setParameters(instance.getParameters()); + request.setDataStreamName(instance.getDataStreamName()); + + switch (randomIntBetween(0, 6)) { + case 0 -> request.setFollowerIndex(randomAlphaOfLength(5)); + case 1 -> request.waitForActiveShards(new ActiveShardCount(randomIntBetween(3, 5))); + case 2 -> request.setRemoteCluster(randomAlphaOfLength(5)); + case 3 -> request.setLeaderIndex(randomAlphaOfLength(5)); + case 4 -> request.setSettings( + Settings.builder() + .put( + IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), + randomValueOtherThan( + IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(request.getSettings()), + ESTestCase::randomInt + ) + ) + .build() + ); + case 5 -> request.setParameters(FollowParametersTests.randomInstance()); + case 6 -> request.setDataStreamName(randomAlphaOfLength(5)); + default -> throw new AssertionError("failed branch"); + } + return request; + } + @Override protected boolean supportsUnknownFields() { return false; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java index 955623bdda743..61050b4172119 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -24,7 +24,12 @@ public class TransportPutFollowActionTests extends ESTestCase { public void testCreateNewLocalDataStream() { DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); - DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream); + DataStream result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + null, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); @@ -36,7 +41,12 @@ public void testUpdateLocalDataStream_followNewBackingIndex() { DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false); DataStream localDataStream = generateDataSteam("logs-foobar", 2, true); Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); - DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + DataStream result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + localDataStream, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); @@ -51,7 +61,12 @@ public void testUpdateLocalDataStream_followOlderBackingIndex() { DataStream remoteDataStream = generateDataSteam("logs-foobar", 5, false); DataStream localDataStream = generateDataSteam("logs-foobar", 5, true, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); Index backingIndexToFollow = remoteDataStream.getIndices().get(0); - DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + DataStream result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + localDataStream, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); @@ -62,7 +77,12 @@ public void testUpdateLocalDataStream_followOlderBackingIndex() { // follow second last backing index: localDataStream = result; backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 2); - result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + result = TransportPutFollowAction.updateLocalDataStream( + backingIndexToFollow, + localDataStream, + remoteDataStream.getName(), + remoteDataStream + ); assertThat(result.getName(), equalTo(remoteDataStream.getName())); assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java index 910cf956c5dac..1b340a27bac2e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java @@ -15,9 +15,11 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -43,6 +45,7 @@ public static class Request extends AcknowledgedRequest implements Indi private static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); private static final ParseField SETTINGS_FIELD = new ParseField("settings"); + private static final ParseField DATA_STREAM_NAME = new ParseField("data_stream_name"); // Note that Request should be the Value class here for this parser with a 'parameters' field that maps to // PutFollowParameters class. But since two minor version are already released with duplicate follow parameters @@ -52,6 +55,7 @@ public static class Request extends AcknowledgedRequest implements Indi static { PARSER.declareString((putFollowParameters, value) -> putFollowParameters.remoteCluster = value, REMOTE_CLUSTER_FIELD); PARSER.declareString((putFollowParameters, value) -> putFollowParameters.leaderIndex = value, LEADER_INDEX_FIELD); + PARSER.declareString((putFollowParameters, value) -> putFollowParameters.dataStreamName = value, DATA_STREAM_NAME); PARSER.declareObject( (putFollowParameters, value) -> putFollowParameters.settings = value, (p, c) -> Settings.fromXContent(p), @@ -69,6 +73,7 @@ public static Request fromXContent(final XContentParser parser, final String fol request.setFollowerIndex(followerIndex); request.setRemoteCluster(parameters.remoteCluster); request.setLeaderIndex(parameters.leaderIndex); + request.setDataStreamName(parameters.dataStreamName); request.setSettings(parameters.settings); request.setParameters(parameters); return request; @@ -76,8 +81,10 @@ public static Request fromXContent(final XContentParser parser, final String fol private String remoteCluster; private String leaderIndex; - private Settings settings = Settings.EMPTY; private String followerIndex; + @Nullable + private String dataStreamName; + private Settings settings = Settings.EMPTY; private FollowParameters parameters = new FollowParameters(); private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE; @@ -123,6 +130,15 @@ public void setParameters(FollowParameters parameters) { this.parameters = parameters; } + @Nullable + public String getDataStreamName() { + return dataStreamName; + } + + public void setDataStreamName(String dataStreamName) { + this.dataStreamName = dataStreamName; + } + public ActiveShardCount waitForActiveShards() { return waitForActiveShards; } @@ -156,6 +172,9 @@ public ActionRequestValidationException validate() { if (followerIndex == null) { e = addValidationError("follower_index is missing", e); } + if (dataStreamName != null && Strings.hasText(dataStreamName) == false) { + e = addValidationError("data stream name must contain text if present", e); + } return e; } @@ -179,6 +198,9 @@ public Request(StreamInput in) throws IOException { } this.parameters = new FollowParameters(in); waitForActiveShards(ActiveShardCount.readFrom(in)); + if (in.getVersion().onOrAfter(Version.V_8_5_0)) { + this.dataStreamName = in.readOptionalString(); + } } @Override @@ -192,6 +214,9 @@ public void writeTo(StreamOutput out) throws IOException { } parameters.writeTo(out); waitForActiveShards.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_5_0)) { + out.writeOptionalString(this.dataStreamName); + } } @Override @@ -200,6 +225,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws { builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); + if (dataStreamName != null) { + builder.field(DATA_STREAM_NAME.getPreferredName(), dataStreamName); + } if (settings.isEmpty() == false) { builder.startObject(SETTINGS_FIELD.getPreferredName()); { @@ -222,12 +250,14 @@ public boolean equals(Object o) { && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followerIndex, request.followerIndex) && Objects.equals(parameters, request.parameters) - && Objects.equals(waitForActiveShards, request.waitForActiveShards); + && Objects.equals(waitForActiveShards, request.waitForActiveShards) + && Objects.equals(dataStreamName, request.dataStreamName) + && Objects.equals(settings, request.settings); } @Override public int hashCode() { - return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, waitForActiveShards); + return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, settings, waitForActiveShards, dataStreamName); } // This class only exists for reuse of the FollowParameters class, see comment above the parser field. @@ -235,6 +265,7 @@ private static class PutFollowParameters extends FollowParameters { private String remoteCluster; private String leaderIndex; + private String dataStreamName; private Settings settings = Settings.EMPTY; } From 9ee7adf2d5c90f0c1f7ed6e4e67d41e8e1c44860 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 29 Jul 2022 11:01:23 -0600 Subject: [PATCH 3/7] Update docs/changelog/88875.yaml --- docs/changelog/88875.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/88875.yaml diff --git a/docs/changelog/88875.yaml b/docs/changelog/88875.yaml new file mode 100644 index 0000000000000..e6f549047a219 --- /dev/null +++ b/docs/changelog/88875.yaml @@ -0,0 +1,6 @@ +pr: 88875 +summary: Fix renaming data streams with CCR replication +area: "CCR, Data streams" +type: bug +issues: + - 81751 From 1d3a7f5b0c2290820275d3d150120df6432a5291 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 29 Jul 2022 11:06:55 -0600 Subject: [PATCH 4/7] Fix changelog --- docs/changelog/88875.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/88875.yaml b/docs/changelog/88875.yaml index e6f549047a219..0643e86a6dfe7 100644 --- a/docs/changelog/88875.yaml +++ b/docs/changelog/88875.yaml @@ -1,6 +1,6 @@ pr: 88875 summary: Fix renaming data streams with CCR replication -area: "CCR, Data streams" +area: "Data streams" type: bug issues: - 81751 From e3001d50afaa2537ad51d58cbd9761811ffcd4f7 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 1 Aug 2022 08:09:00 -0600 Subject: [PATCH 5/7] Add a test for an unparseable data stream backing index --- .../ccr/action/AutoFollowCoordinator.java | 2 +- .../action/AutoFollowCoordinatorTests.java | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index f09bce85f89b7..b11fafd01f6b9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -864,7 +864,7 @@ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String l + leaderIndexName + "] and follow index pattern: [" + followPattern - + "]" + + "], index appears to follow a regular data stream backing pattern, but could not be parsed" ); } } else { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 28d5b7921ac26..2566af17410b8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -76,6 +76,7 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -1276,6 +1277,58 @@ public void testGenerateRequest() { assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456")); assertThat(request.getDataStreamName(), equalTo(null)); } + + // Renaming with a suffix and just the worst named backing indices + { + AutoFollowPattern pattern = new AutoFollowPattern( + "remote", + List.of("logs-*"), + List.of(), + "{{leader_index}}_copy", + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + Index index = new Index("my-.ds-backing-index", "uuid"); + IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex( + IndexMetadata.builder(index.getName()) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build() + ) + .build(), + new IndexAbstraction.DataStream( + new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD) + ) + ); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> AutoFollower.generateRequest("remote", index, indexAbstraction, pattern) + ); + assertThat( + e.getMessage(), + containsString( + "unable to determine follower index name from leader index name " + + "[my-.ds-backing-index] and follow index pattern: [{{leader_index}}_copy]," + + ", index appears to follow a regular data stream backing pattern, but could not be parsed" + ) + ); + } } public void testStats() { From 27896002044b3946e1e85aecb3afd537783ad849 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 1 Aug 2022 08:10:28 -0600 Subject: [PATCH 6/7] Fix a typo in the docs Co-authored-by: Mary Gouseti --- .../xpack/ccr/action/TransportPutFollowAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index cd7b970bef2d7..b95e03eb09f58 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -316,7 +316,7 @@ private void initiateFollowing( } /** - * Given the backing index that the follower is going to follower, the local data stream (if it + * Given the backing index that the follower is going to follow, the local data stream (if it * exists) and the remote data stream, return the new local data stream for the local cluster * (the follower) updated with whichever information is necessary to restore the new * soon-to-be-followed index. From 00bdaba1e4b9d51ceebe85147bce238214829ebc Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 1 Aug 2022 08:26:10 -0600 Subject: [PATCH 7/7] Whoops, typo in test --- .../xpack/ccr/action/AutoFollowCoordinatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 2566af17410b8..f8cca99ce5e8e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -1324,7 +1324,7 @@ public void testGenerateRequest() { e.getMessage(), containsString( "unable to determine follower index name from leader index name " - + "[my-.ds-backing-index] and follow index pattern: [{{leader_index}}_copy]," + + "[my-.ds-backing-index] and follow index pattern: [{{leader_index}}_copy]" + ", index appears to follow a regular data stream backing pattern, but could not be parsed" ) );