From 03298b6ec83da4e8a10bc2f25e2689c46246d1e8 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 4 Sep 2020 15:39:17 +0200 Subject: [PATCH 01/13] Add data stream support to CCR This commit adds support data stream support to CCR's auto following by making the following changes: * When the auto follow coordinator iterates over the candidate indices to follow, the auto follow coordinator also checks whether the index is part of a data stream and if the name of data stream also matches with the auto follow pattern then the index will be auto followed. * When following an index, the put follow api also checks whether that index is part of a data stream and if so then also replicates the data stream definition to the local cluster. * In order for the follow index api to determine whether an index is part of a data stream, the cluster state api was modified to also fetch the data stream definition of the cluster state if only the state is queried for specific indices. When a data stream is auto followed, only new backing indices are auto followed. This is in line with how time based indices patterns are replicated today. This means that the data stream isn't copied 1 to 1 into the local cluster. The local cluster's data stream definition contains the same name, timestamp field and generation, but the list of backing indices may be different (depending on when a data stream was auto followed). Closes #56259 --- .../state/TransportClusterStateAction.java | 21 +++- .../snapshots/RestoreService.java | 8 ++ .../elasticsearch/xpack/ccr/AutoFollowIT.java | 107 ++++++++++++++++++ .../xpack/ccr/CcrLicenseChecker.java | 20 ++-- .../ccr/action/AutoFollowCoordinator.java | 11 +- .../TransportPutAutoFollowPatternAction.java | 4 +- .../ccr/action/TransportPutFollowAction.java | 39 ++++++- .../action/TransportResumeFollowAction.java | 2 +- .../xpack/core/ccr/AutoFollowMetadata.java | 15 ++- 9 files changed, 202 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 04d89d3ca8e9c..cf7e571d2c682 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -29,6 +29,8 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -38,6 +40,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -161,9 +164,21 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, mdBuilder.version(currentState.metadata().version()); String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request); for (String filteredIndex : indices) { - IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex); - if (indexMetadata != null) { - mdBuilder.put(indexMetadata, false); + // If the requested index is part of a data stream then that data stream should also be included: + IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(filteredIndex); + if (indexAbstraction.getParentDataStream() != null) { + DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream(); + mdBuilder.put(dataStream); + // Also the IMD of other backing indices need to be included, otherwise the cluster state api + // can't create a valid cluster state instance: + for (Index backingIndex : dataStream.getIndices()) { + mdBuilder.put(currentState.metadata().index(backingIndex), false); + } + } else { + IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex); + if (indexMetadata != null) { + mdBuilder.put(indexMetadata, false); + } } } } else { diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index a02b3a833f34e..971b317aa2f3f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -86,6 +86,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -184,6 +185,12 @@ public RestoreService(ClusterService clusterService, RepositoriesService reposit * @param listener restore listener */ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener listener) { + restoreSnapshot(request, listener, (clusterState, builder) -> {}); + } + + public void restoreSnapshot(final RestoreSnapshotRequest request, + final ActionListener listener, + final BiConsumer updater) { try { // Read snapshot info and metadata from the repository final String repositoryName = request.repository(); @@ -454,6 +461,7 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards), } RoutingTable rt = rtBuilder.build(); + updater.accept(currentState, mdBuilder); ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build(); return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]"); } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 2a2fcca41c51c..6fb6a1ceb1e2b 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -13,14 +13,21 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; public class AutoFollowIT extends ESCCRRestTestCase { @@ -163,6 +170,80 @@ public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws ); } + public void testDataStreams() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int numDocs = 64; + final String dataStreamName = "logs-mysql-error"; + final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT); + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startArray("leader_index_patterns"); + { + bodyBuilder.value("logs-*"); + } + bodyBuilder.endArray(); + bodyBuilder.field("remote_cluster", "leader_cluster"); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(client().performRequest(request)); + + try (RestClient leaderClient = buildLeaderClient()) { + Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/mysql-error"); + putComposableIndexTemplateRequest.setJsonEntity( + "{" + + "\"index_patterns\":[\"logs-mysql-*\"]," + + "\"priority\":200," + + "\"composed_of\":[\"logs-mappings\",\"logs-settings\"]," + + "\"data_stream\":{}" + + "}" + ); + assertOK(leaderClient.performRequest(putComposableIndexTemplateRequest)); + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001"); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs); + }); + + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + } + + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 1); + }); + } + private int getNumberOfSuccessfulFollowedIndices() throws IOException { Request statsRequest = new Request("GET", "/_ccr/stats"); Map response = toMap(client().performRequest(statsRequest)); @@ -170,5 +251,31 @@ private int getNumberOfSuccessfulFollowedIndices() throws IOException { return (Integer) response.get("number_of_successful_follow_indices"); } + private static void verifyDocuments(final RestClient client, + final String index, + final int expectedNumDocs) throws IOException { + final Request request = new Request("GET", "/" + index + "/_search"); + request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + Map response = toMap(client.performRequest(request)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(index, numDocs, equalTo(expectedNumDocs)); + } + + private static void verifyDataStream(final RestClient client, + final String name, + final String... expectedBackingIndices) throws IOException { + Request request = new Request("GET", "/_data_stream/" + name); + Map response = toMap(client.performRequest(request)); + List retrievedDataStreams = (List) response.get("data_streams"); + assertThat(retrievedDataStreams, hasSize(1)); + List actualBackingIndices = (List) ((Map) retrievedDataStreams.get(0)).get("indices"); + assertThat(actualBackingIndices, hasSize(expectedBackingIndices.length)); + for (int i = 0; i < expectedBackingIndices.length; i++) { + Map actualBackingIndex = (Map) actualBackingIndices.get(i); + String expectedBackingIndex = expectedBackingIndices[i]; + assertThat(actualBackingIndex.get("index_name"), equalTo(expectedBackingIndex)); + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 6073a618ea69f..96029c951f3ce 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -23,9 +23,12 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; @@ -109,11 +112,11 @@ public boolean isCcrAllowed() { * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards */ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( - final Client client, - final String clusterAlias, - final String leaderIndex, - final Consumer onFailure, - final BiConsumer consumer) { + final Client client, + final String clusterAlias, + final String leaderIndex, + final Consumer onFailure, + final BiConsumer> consumer) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -127,7 +130,10 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( onFailure, remoteClusterStateResponse -> { ClusterState remoteClusterState = remoteClusterStateResponse.getState(); - IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex); + final IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex); + IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex); + final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null ? + indexAbstraction.getParentDataStream().getDataStream() : null; if (leaderIndexMetadata == null) { onFailure.accept(new IndexNotFoundException(leaderIndex)); return; @@ -140,7 +146,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> { if (e == null) { fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetadata, onFailure, historyUUIDs -> - consumer.accept(historyUUIDs, leaderIndexMetadata)); + consumer.accept(historyUUIDs, Tuple.tuple(leaderIndexMetadata, remoteDataStream))); } else { onFailure.accept(e); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 3cdd6dc973bcd..edcea9e1beb16 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -105,7 +106,7 @@ public AutoFollowCoordinator( this.relativeMillisTimeProvider = relativeMillisTimeProvider; this.absoluteMillisTimeProvider = absoluteMillisTimeProvider; this.executor = Objects.requireNonNull(executor); - this.recentAutoFollowErrors = new LinkedHashMap>() { + this.recentAutoFollowErrors = new LinkedHashMap<>() { @Override protected boolean removeEldestEntry(final Map.Entry> eldest) { return size() > MAX_AUTO_FOLLOW_ERRORS; @@ -496,8 +497,9 @@ private void checkAutoFollowPattern(String autoFollowPattenName, leaderIndicesToFollow.size()); for (final Index indexToFollow : leaderIndicesToFollow) { + IndexAbstraction indexAbstraction = remoteMetadata.getIndicesLookup().get(indexToFollow.getName()); List otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() - .filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName())) + .filter(otherPattern -> otherPattern.v2().match(indexAbstraction)) .map(Tuple::v1) .collect(Collectors.toList()); if (otherMatchingPatterns.size() != 0) { @@ -615,7 +617,9 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, if (leaderIndexMetadata.getState() != IndexMetadata.State.OPEN) { continue; } - if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetadata.getIndex().getName())) { + IndexAbstraction indexAbstraction = + remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndexMetadata.getIndex().getName()); + if (autoFollowPattern.isActive() && autoFollowPattern.match(indexAbstraction)) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetadata.getIndex()); if (indexRoutingTable != null && // Leader indices can be in the cluster state, but not all primary shards may be ready yet. @@ -624,7 +628,6 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, // this index will be auto followed. indexRoutingTable.allPrimaryShardsActive() && followedIndexUUIDs.contains(leaderIndexMetadata.getIndex().getUUID()) == false) { - leaderIndicesToFollow.add(leaderIndexMetadata.getIndex()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index feb7ec69c778a..b34483dfd6b70 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -213,7 +214,8 @@ private static void markExistingIndicesAsAutoFollowed( List followedIndexUUIDS) { for (final IndexMetadata indexMetadata : leaderMetadata) { - if (AutoFollowPattern.match(patterns, indexMetadata.getIndex().getName())) { + IndexAbstraction indexAbstraction = leaderMetadata.getIndicesLookup().get(indexMetadata.getIndex().getName()); + if (AutoFollowPattern.match(patterns, indexAbstraction)) { followedIndexUUIDS.add(indexMetadata.getIndexUUID()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 04c440708241a..b346d8fa6ede1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -20,13 +20,16 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.snapshots.RestoreInfo; @@ -42,8 +45,11 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.function.BiConsumer; public final class TransportPutFollowAction extends TransportMasterNodeAction { @@ -108,11 +114,12 @@ protected void masterOperation( remoteCluster, leaderIndex, listener::onFailure, - (historyUUID, leaderIndexMetadata) -> createFollowerIndex(leaderIndexMetadata, request, listener)); + (historyUUID, tuple) -> createFollowerIndex(tuple.v1(), tuple.v2(), request, listener)); } private void createFollowerIndex( final IndexMetadata leaderIndexMetadata, + final DataStream remoteDataStream, final PutFollowAction.Request request, final ActionListener listener) { if (leaderIndexMetadata == null) { @@ -158,9 +165,31 @@ public void onFailure(Exception e) { @Override protected void doRun() { - restoreService.restoreSnapshot(restoreRequest, - ActionListener.delegateFailure(listener, - (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response))); + ActionListener delegatelistener = ActionListener.delegateFailure( + listener, + (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response) + ); + if (remoteDataStream == null) { + restoreService.restoreSnapshot(restoreRequest, delegatelistener); + } else { + BiConsumer updater = (currentState, mdBuilder) -> { + Index followerIndex = mdBuilder.get(request.getFollowerIndex()).getIndex(); + // The data stream and the backing indices have been created and validated in the remote cluster, + // just copying the data stream is in this case safe. + DataStream dataStream = currentState.getMetadata().dataStreams().get(remoteDataStream.getName()); + if (dataStream == null) { + dataStream = new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(), + List.of(followerIndex), remoteDataStream.getGeneration()); + } else { + List backingIndices = new ArrayList<>(dataStream.getIndices()); + backingIndices.add(followerIndex); + dataStream = new DataStream(dataStream.getName(), dataStream.getTimeStampField(), backingIndices, + remoteDataStream.getGeneration()); + } + mdBuilder.put(dataStream); + }; + restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater); + } } }); } @@ -171,7 +200,7 @@ private void afterRestoreStarted(Client clientWithHeaders, PutFollowAction.Reque final ActionListener listener; if (ActiveShardCount.NONE.equals(request.waitForActiveShards())) { originalListener.onResponse(new PutFollowAction.Response(true, false, false)); - listener = new ActionListener() { + listener = new ActionListener<>() { @Override public void onResponse(PutFollowAction.Response response) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 143aa66c087f4..c1086359faaac 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -143,7 +143,7 @@ protected void masterOperation(Task task, final ResumeFollowAction.Request reque listener::onFailure, (leaderHistoryUUID, leaderIndexMetadata) -> { try { - start(request, leaderCluster, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); + start(request, leaderCluster, leaderIndexMetadata.v1(), followerIndexMetadata, leaderHistoryUUID, listener); } catch (final IOException e) { listener.onFailure(e); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index f9d5a97779390..d685301e04816 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -275,12 +276,18 @@ private AutoFollowPattern(String remoteCluster, List leaderIndexPatterns } } - public boolean match(String indexName) { - return match(leaderIndexPatterns, indexName); + public boolean match(IndexAbstraction indexAbstraction) { + return match(leaderIndexPatterns, indexAbstraction); } - public static boolean match(List leaderIndexPatterns, String indexName) { - return Regex.simpleMatch(leaderIndexPatterns, indexName); + public static boolean match(List leaderIndexPatterns, IndexAbstraction indexAbstraction) { + boolean matches = Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getName()); + if (matches) { + return true; + } else { + return indexAbstraction.getParentDataStream() != null && + Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName()); + } } public String getRemoteCluster() { From 206fbf05782519f7da1f3735e30e0d967575680c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Sep 2020 16:11:42 +0200 Subject: [PATCH 02/13] move the check that checks parent data stream after the check whether index actually exists --- .../java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 96029c951f3ce..541a9b2fe9638 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -131,9 +131,6 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( remoteClusterStateResponse -> { ClusterState remoteClusterState = remoteClusterStateResponse.getState(); final IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex); - IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex); - final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null ? - indexAbstraction.getParentDataStream().getDataStream() : null; if (leaderIndexMetadata == null) { onFailure.accept(new IndexNotFoundException(leaderIndex)); return; @@ -142,6 +139,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( onFailure.accept(new IndexClosedException(leaderIndexMetadata.getIndex())); return; } + IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex); + final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null ? + indexAbstraction.getParentDataStream().getDataStream() : null; final Client remoteClient = client.getRemoteClusterClient(clusterAlias); hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> { if (e == null) { From addf40f637e1c4ba0e14239501e0ec92d9d77335 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 Sep 2020 17:26:26 +0200 Subject: [PATCH 03/13] delete auto follow pattern after test --- .../src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 6fb6a1ceb1e2b..1f804b948f263 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -129,6 +129,7 @@ public void testAutoFollowPatterns() throws Exception { verifyCcrMonitoring("metrics-20210101", "metrics-20210101"); verifyAutoFollowMonitoring(); }, 30, TimeUnit.SECONDS); + deleteAutoFollowPattern("test_pattern"); } public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException { @@ -242,6 +243,7 @@ public void testDataStreams() throws Exception { ensureYellow(dataStreamName); verifyDocuments(client(), dataStreamName, numDocs + 1); }); + deleteAutoFollowPattern("test_pattern"); } private int getNumberOfSuccessfulFollowedIndices() throws IOException { From 606da3dd85f1f8b0ffe99070d8b9ed290113eba3 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 21 Sep 2020 14:05:08 +0200 Subject: [PATCH 04/13] added tests and docs --- docs/reference/ccr/auto-follow.asciidoc | 7 ++ .../ccr/action/TransportPutFollowAction.java | 36 +++--- .../action/AutoFollowCoordinatorTests.java | 114 ++++++++++++++++++ .../datastreams/DataStreamIT.java | 14 +++ 4 files changed, 157 insertions(+), 14 deletions(-) diff --git a/docs/reference/ccr/auto-follow.asciidoc b/docs/reference/ccr/auto-follow.asciidoc index d072dd8022b12..3eb9b215cf2b4 100644 --- a/docs/reference/ccr/auto-follow.asciidoc +++ b/docs/reference/ccr/auto-follow.asciidoc @@ -7,6 +7,13 @@ each new index in the series is replicated automatically. Whenever the name of a new index on the remote cluster matches the auto-follow pattern, a corresponding follower index is added to the local cluster. +Data streams can also be auto followed. If a new backing index's data stream name +on the remote cluster matches with the auto-follow pattern then that index and +data stream is also auto followed. If a data stream is created on or after the +moment the auto follow pattern is created then all backing indices are auto +followed, otherwise only the backing indices on or after the creation of +the auto follow pattern are auto followed. + Auto-follow patterns are especially useful with <>, which might continually create new indices on the cluster containing the leader index. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index b346d8fa6ede1..d55ccbe48f989 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -172,21 +172,14 @@ protected void doRun() { if (remoteDataStream == null) { restoreService.restoreSnapshot(restoreRequest, delegatelistener); } else { + String followerIndexName = request.getFollowerIndex(); BiConsumer updater = (currentState, mdBuilder) -> { - Index followerIndex = mdBuilder.get(request.getFollowerIndex()).getIndex(); - // The data stream and the backing indices have been created and validated in the remote cluster, - // just copying the data stream is in this case safe. - DataStream dataStream = currentState.getMetadata().dataStreams().get(remoteDataStream.getName()); - if (dataStream == null) { - dataStream = new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(), - List.of(followerIndex), remoteDataStream.getGeneration()); - } else { - List backingIndices = new ArrayList<>(dataStream.getIndices()); - backingIndices.add(followerIndex); - dataStream = new DataStream(dataStream.getName(), dataStream.getTimeStampField(), backingIndices, - remoteDataStream.getGeneration()); - } - mdBuilder.put(dataStream); + DataStream localDataStream = currentState.getMetadata().dataStreams().get(remoteDataStream.getName()); + Index followerIndex = mdBuilder.get(followerIndexName).getIndex(); + assert followerIndex != null; + + DataStream updatedDataStream = updateLocalDataStream(followerIndex, localDataStream, remoteDataStream); + mdBuilder.put(updatedDataStream); }; restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater); } @@ -250,6 +243,21 @@ private void initiateFollowing( )); } + static DataStream updateLocalDataStream(Index backingIndexToFollow, + DataStream localDataStream, + DataStream remoteDataStream) { + if (localDataStream == null) { + // The data stream and the backing indices have been created and validated in the remote cluster, + // just copying the data stream is in this case safe. + return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(), + List.of(backingIndexToFollow), remoteDataStream.getGeneration()); + } else { + List backingIndices = new ArrayList<>(localDataStream.getIndices()); + backingIndices.add(backingIndexToFollow); + return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices); + } + } + @Override protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) { return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index d45f980e8fb67..7e8a83d03fe3a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -171,6 +172,93 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa assertThat(invoked[0], is(true)); } + public void testAutoFollower_dataStream() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar"); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern( + "remote", + Collections.singletonList("logs-*"), + null, + Settings.EMPTY, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Map.of("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + ClusterState currentState = ClusterState.builder(new ClusterName("name")) + .metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + boolean[] invoked = new boolean[]{false}; + Consumer> handler = results -> { + invoked[0] = true; + + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo(".ds-logs-foobar-000001")); + assertThat(entries.get(0).getValue(), nullValue()); + }; + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); + assertThat(followRequest.getRemoteCluster(), equalTo("remote")); + assertThat(followRequest.getLeaderIndex(), equalTo(".ds-logs-foobar-000001")); + assertThat(followRequest.getFollowerIndex(), equalTo(".ds-logs-foobar-000001")); + assertThat(followRequest.masterNodeTimeout(), equalTo(TimeValue.MAX_VALUE)); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + ClusterState resultCs = updateFunction.apply(currentState); + AutoFollowMetadata result = resultCs.metadata().custom(AutoFollowMetadata.TYPE); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + autoFollower.start(); + assertThat(invoked[0], is(true)); + } + public void testAutoFollowerClusterStateApiFailure() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); @@ -2009,4 +2097,30 @@ private ClusterService mockClusterService() { return clusterService; } + private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) { + Settings.Builder indexSettings = settings(Version.CURRENT); + indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())); + indexSettings.put("index.hidden", true); + + IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1)) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + DataStream dataStream = new DataStream(dataStreamName, new DataStream.TimestampField("@timestamp"), + List.of(indexMetadata.getIndex())); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) + .metadata(Metadata.builder() + .put(indexMetadata, true) + .put(dataStream) + .version(0L)); + + ShardRouting shardRouting = + TestShardRouting.newShardRouting(dataStreamName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build(); + csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + + return csBuilder.build(); + } + } diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index e39c0b1eb109c..431ddac65282f 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -33,6 +34,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; @@ -932,6 +934,18 @@ public void testAutoCreateV1TemplateNoDataStream() { assertThat(getIndexResponse.getSettings().get("logs-foobar").get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS), equalTo("0")); } + public void testClusterStateIncludeDataStream() throws Exception { + putComposableIndexTemplate("id1", List.of("metrics-foo*")); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + // when querying a backing index then the data stream should be included as well. + ClusterStateRequest request = new ClusterStateRequest().indices(".ds-metrics-foo-000001"); + ClusterState state = client().admin().cluster().state(request).get().getState(); + assertThat(state.metadata().dataStreams().size(), equalTo(1)); + assertThat(state.metadata().dataStreams().get("metrics-foo").getName(), equalTo("metrics-foo")); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } From 551adc88f55afd843389a7de5cbb47b5999020a0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 21 Sep 2020 14:18:19 +0200 Subject: [PATCH 05/13] updated jdocs --- .../java/org/elasticsearch/snapshots/RestoreService.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 8d786917cd444..7fa7aa374b26b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -187,6 +187,14 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi restoreSnapshot(request, listener, (clusterState, builder) -> {}); } + /** + * Restores snapshot specified in the restore request. + * + * @param request restore request + * @param listener restore listener + * @param updater handler that allows callers to make modifications to {@link Metadata} + * in the same cluster state update as the restore operation + */ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener listener, final BiConsumer updater) { From f81cc8b0e4a5586507aeca096c4b1e1a7cced1d7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 21 Sep 2020 15:10:58 +0200 Subject: [PATCH 06/13] iter --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 104 +++++++++++------- 1 file changed, 67 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 1f804b948f263..887ad9ffb1d87 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -71,6 +71,7 @@ public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception { verifyDocuments("logs-20190101", 5, "filtered_field:true"); verifyDocuments("logs-20200101", 5, "filtered_field:true"); }); + deleteAutoFollowPattern("leader_cluster_pattern"); } public void testAutoFollowPatterns() throws Exception { @@ -181,6 +182,8 @@ public void testDataStreams() throws Exception { final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT); int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + + // Create auto follow pattern Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { bodyBuilder.startObject(); @@ -197,52 +200,79 @@ public void testDataStreams() throws Exception { } assertOK(client().performRequest(request)); - try (RestClient leaderClient = buildLeaderClient()) { - Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/mysql-error"); - putComposableIndexTemplateRequest.setJsonEntity( - "{" - + "\"index_patterns\":[\"logs-mysql-*\"]," - + "\"priority\":200," - + "\"composed_of\":[\"logs-mappings\",\"logs-settings\"]," - + "\"data_stream\":{}" - + "}" - ); - assertOK(leaderClient.performRequest(putComposableIndexTemplateRequest)); - for (int i = 0; i < numDocs; i++) { + // Create data stream and ensure that is is auto followed + { + try (RestClient leaderClient = buildLeaderClient()) { + Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/mysql-error"); + putComposableIndexTemplateRequest.setJsonEntity( + "{" + + "\"index_patterns\":[\"logs-mysql-*\"]," + + "\"priority\":200," + + "\"composed_of\":[\"logs-mappings\",\"logs-settings\"]," + + "\"data_stream\":{}" + + "}" + ); + assertOK(leaderClient.performRequest(putComposableIndexTemplateRequest)); + for (int i = 0; i < numDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001"); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs); + }); + } + + // First rollover and ensure second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); } - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001"); - verifyDocuments(leaderClient, dataStreamName, numDocs); + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 1); + }); } - assertBusy(() -> { - assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001"); - ensureYellow(dataStreamName); - verifyDocuments(client(), dataStreamName, numDocs); - }); + // Second rollover and ensure third backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" + + ".ds-logs-mysql-error-000003"); - try (RestClient leaderClient = buildLeaderClient()) { - Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); - assertOK(leaderClient.performRequest(rolloverRequest)); - verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); - - Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); - indexRequest.addParameter("refresh", "true"); - indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); - assertOK(leaderClient.performRequest(indexRequest)); - verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 2); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3)); + verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", + ".ds-logs-mysql-error-000003"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 2); + }); } - - assertBusy(() -> { - assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); - verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002"); - ensureYellow(dataStreamName); - verifyDocuments(client(), dataStreamName, numDocs + 1); - }); deleteAutoFollowPattern("test_pattern"); } From c5ec9a68625c98ab9dfe353d77e7a0d7d3978b0b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 19 Oct 2020 15:12:08 +0200 Subject: [PATCH 07/13] Applying @lockewritesdocs' doc suggestion. Co-authored-by: Adam Locke --- docs/reference/ccr/auto-follow.asciidoc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/reference/ccr/auto-follow.asciidoc b/docs/reference/ccr/auto-follow.asciidoc index 3eb9b215cf2b4..333bb708cf1da 100644 --- a/docs/reference/ccr/auto-follow.asciidoc +++ b/docs/reference/ccr/auto-follow.asciidoc @@ -7,12 +7,12 @@ each new index in the series is replicated automatically. Whenever the name of a new index on the remote cluster matches the auto-follow pattern, a corresponding follower index is added to the local cluster. -Data streams can also be auto followed. If a new backing index's data stream name -on the remote cluster matches with the auto-follow pattern then that index and -data stream is also auto followed. If a data stream is created on or after the -moment the auto follow pattern is created then all backing indices are auto -followed, otherwise only the backing indices on or after the creation of -the auto follow pattern are auto followed. +You can also create auto-follow patterns for data streams. When a new backing +index is generated on a remote cluster, that index and its data stream are +automatically followed if the data stream name matches an auto-follow +pattern. If you create a data stream after creating the auto-follow pattern, +all backing indices are followed automatically. + Auto-follow patterns are especially useful with <>, which might continually create From 71e2932a40b22792b5a2294f6d662c5169161bfd Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 20 Oct 2020 11:21:08 +0200 Subject: [PATCH 08/13] Make use of builtin composable index template for logs-* and added test that follows a data stream after its creation. --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 108 +++++++++++++++--- 1 file changed, 93 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 887ad9ffb1d87..18d7af819ccd5 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -32,6 +32,8 @@ public class AutoFollowIT extends ESCCRRestTestCase { + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT); + public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception { if ("follow".equals(targetCluster) == false) { logger.info("skipping test, waiting for target cluster [follow]" ); @@ -179,7 +181,6 @@ public void testDataStreams() throws Exception { final int numDocs = 64; final String dataStreamName = "logs-mysql-error"; - final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT); int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); @@ -203,20 +204,10 @@ public void testDataStreams() throws Exception { // Create data stream and ensure that is is auto followed { try (RestClient leaderClient = buildLeaderClient()) { - Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/mysql-error"); - putComposableIndexTemplateRequest.setJsonEntity( - "{" - + "\"index_patterns\":[\"logs-mysql-*\"]," - + "\"priority\":200," - + "\"composed_of\":[\"logs-mappings\",\"logs-settings\"]," - + "\"data_stream\":{}" - + "}" - ); - assertOK(leaderClient.performRequest(putComposableIndexTemplateRequest)); for (int i = 0; i < numDocs; i++) { Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); - indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); } verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001"); @@ -239,7 +230,7 @@ public void testDataStreams() throws Exception { Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); - indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); verifyDocuments(leaderClient, dataStreamName, numDocs + 1); } @@ -261,7 +252,7 @@ public void testDataStreams() throws Exception { Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); indexRequest.addParameter("refresh", "true"); - indexRequest.setJsonEntity("{\"@timestamp\": \"" + format.format(new Date()) + "\",\"message\":\"abc\"}"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); assertOK(leaderClient.performRequest(indexRequest)); verifyDocuments(leaderClient, dataStreamName, numDocs + 2); } @@ -273,7 +264,87 @@ public void testDataStreams() throws Exception { verifyDocuments(client(), dataStreamName, numDocs + 2); }); } - deleteAutoFollowPattern("test_pattern"); + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + deleteDataStream(dataStreamName); + } + } + + public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final int initialNumDocs = 16; + final String dataStreamName = "logs-syslog-prod"; + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + // Initialize data stream prior to auto following + { + try (RestClient leaderClient = buildLeaderClient()) { + for (int i = 0; i < initialNumDocs; i++) { + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001"); + verifyDocuments(leaderClient, dataStreamName, initialNumDocs); + } + } + // Create auto follow pattern + { + Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); + try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) { + bodyBuilder.startObject(); + { + bodyBuilder.startArray("leader_index_patterns"); + { + bodyBuilder.value("logs-*"); + } + bodyBuilder.endArray(); + bodyBuilder.field("remote_cluster", "leader_cluster"); + } + bodyBuilder.endObject(); + request.setJsonEntity(Strings.toString(bodyBuilder)); + } + assertOK(client().performRequest(request)); + } + // Rollover and ensure only second backing index is replicated: + { + try (RestClient leaderClient = buildLeaderClient()) { + Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + + Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, initialNumDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, 1); + }); + } + // Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly: + { + followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001"); + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002"); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, initialNumDocs + 1); + }); + } + // Cleanup: + { + deleteAutoFollowPattern("test_pattern"); + deleteDataStream(dataStreamName); + } } private int getNumberOfSuccessfulFollowedIndices() throws IOException { @@ -310,4 +381,11 @@ private static void verifyDataStream(final RestClient client, } } + private void deleteDataStream(String name) throws IOException { + try (RestClient leaderClient = buildLeaderClient()) { + Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name); + assertOK(leaderClient.performRequest(deleteTemplateRequest)); + } + } + } From 8d4cf01b58c667de57c67880aba669d1ee7c1ef7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 20 Oct 2020 13:15:07 +0200 Subject: [PATCH 09/13] Ensure the ordering of the backing indices in the follow cluster takes the generation into account. --- .../ccr/action/TransportPutFollowAction.java | 13 ++- .../action/TransportPutFollowActionTests.java | 89 +++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index c47cd04a4a386..c9c07d0f755a7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -44,10 +44,13 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.function.BiConsumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public final class TransportPutFollowAction extends TransportMasterNodeAction { @@ -244,7 +247,15 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, } else { List backingIndices = new ArrayList<>(localDataStream.getIndices()); backingIndices.add(backingIndexToFollow); - return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices); + + // When following an older backing index it should be positioned before the newer backing indices. + // Currently the assumption is that the newest index (highest generation) is the write index. + // (just appending an older backing index to the list of backing indices would break that assumption) + // (string sorting works because of the naming backing index naming scheme) + backingIndices.sort(Comparator.comparing(Index::getName)); + + return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices, + remoteDataStream.getGeneration()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java new file mode 100644 index 0000000000000..4f19822751c73 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStream.TimestampField; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; + +public class TransportPutFollowActionTests extends ESTestCase { + + public void testCreateNewLocalDataStream() { + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); + Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); + DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(1)); + assertThat(result.getIndices().get(0), equalTo(backingIndexToFollow)); + } + + public void testUpdateLocalDataStream_followNewBackingIndex() { + DataStream remoteDataStream = generateDataSteam("logs-foobar", 3); + DataStream localDataStream = generateDataSteam("logs-foobar", 2); + Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1); + DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(3)); + assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2))); + assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 3))); + } + + public void testUpdateLocalDataStream_followOlderBackingIndex() { + // follow first backing index: + DataStream remoteDataStream = generateDataSteam("logs-foobar", 5); + DataStream localDataStream = generateDataSteam("logs-foobar", 5, DataStream.getDefaultBackingIndexName("logs-foobar", 5)); + Index backingIndexToFollow = remoteDataStream.getIndices().get(0); + DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(2)); + assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5))); + + // follow second last backing index: + localDataStream = result; + backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 2); + result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream); + assertThat(result.getName(), equalTo(remoteDataStream.getName())); + assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField())); + assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration())); + assertThat(result.getIndices().size(), equalTo(3)); + assertThat(result.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(result.getIndices().get(1).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 4))); + assertThat(result.getIndices().get(2).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 5))); + } + + static DataStream generateDataSteam(String name, int numBackingIndices) { + List backingIndices = IntStream.range(1, numBackingIndices + 1) + .mapToObj(value -> DataStream.getDefaultBackingIndexName(name, value)) + .map(value -> new Index(value, "uuid")) + .collect(Collectors.toList()); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices); + } + + static DataStream generateDataSteam(String name, int generation, String... backingIndexNames) { + List backingIndices = Arrays.stream(backingIndexNames) + .map(value -> new Index(value, "uuid")) + .collect(Collectors.toList()); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation); + } + +} From 1d705625937484c0878799820a1d377eb7df4b3d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 20 Oct 2020 14:15:12 +0200 Subject: [PATCH 10/13] removed unused imports --- .../xpack/ccr/action/TransportPutFollowAction.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index c9c07d0f755a7..5e236411991a2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -49,8 +49,6 @@ import java.util.Locale; import java.util.Objects; import java.util.function.BiConsumer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public final class TransportPutFollowAction extends TransportMasterNodeAction { From abe709406e3fb2e274f9490467d9e787bc9adb30 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 20 Oct 2020 14:45:53 +0200 Subject: [PATCH 11/13] Throw a specific error when attempting to follow a data stream or alias. --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 2 +- .../elasticsearch/xpack/ccr/FollowIndexIT.java | 18 ++++++++++++++++++ .../xpack/ccr/CcrLicenseChecker.java | 14 +++++++++++++- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 18d7af819ccd5..b84a13333058e 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -365,7 +365,7 @@ private static void verifyDocuments(final RestClient client, assertThat(index, numDocs, equalTo(expectedNumDocs)); } - private static void verifyDataStream(final RestClient client, + static void verifyDataStream(final RestClient client, final String name, final String... expectedBackingIndices) throws IOException { Request request = new Request("GET", "/_data_stream/" + name); diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 3880632d36f62..ac64d9982f569 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.ccr.AutoFollowIT.verifyDataStream; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; @@ -123,4 +124,21 @@ public void testFollowNonExistingLeaderIndex() throws Exception { assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); } + public void testFollowDataStreamFails() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final String dataStreamName = "logs-syslog-prod"; + try (RestClient leaderClient = buildLeaderClient()) { + Request request = new Request("PUT", "/_data_stream/" + dataStreamName); + assertOK(leaderClient.performRequest(request)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001"); + } + + ResponseException failure = expectThrows(ResponseException.class, () -> followIndex(dataStreamName, dataStreamName)); + assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM")); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 541a9b2fe9638..04b2347eb7001 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -132,7 +132,19 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( ClusterState remoteClusterState = remoteClusterStateResponse.getState(); final IndexMetadata leaderIndexMetadata = remoteClusterState.getMetadata().index(leaderIndex); if (leaderIndexMetadata == null) { - onFailure.accept(new IndexNotFoundException(leaderIndex)); + final IndexAbstraction indexAbstraction = remoteClusterState.getMetadata().getIndicesLookup().get(leaderIndex); + final Exception failure; + if (indexAbstraction == null) { + failure = new IndexNotFoundException(leaderIndex); + } else { + // provided name may be an alias or data stream and in that case throw a specific error: + String message = String.format(Locale.ROOT, + "cannot follow [%s], because it is a %s", + leaderIndex, indexAbstraction.getType() + ); + failure = new IllegalArgumentException(message); + } + onFailure.accept(failure); return; } if (leaderIndexMetadata.getState() == IndexMetadata.State.CLOSE) { From 261ce1c5865468d4e6697ec07b8539c9b198bd08 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 20 Oct 2020 15:01:49 +0200 Subject: [PATCH 12/13] Prohibit changing a backing index name when during follow index. --- .../elasticsearch/xpack/ccr/FollowIndexIT.java | 18 ++++++++++++++++++ .../ccr/action/TransportPutFollowAction.java | 10 ++++++++++ 2 files changed, 28 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index ac64d9982f569..1acd0d4dc7ca8 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -141,4 +141,22 @@ public void testFollowDataStreamFails() throws Exception { assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM")); } + public void testChangeBackingIndexNameFails() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + final String dataStreamName = "logs-foobar-prod"; + try (RestClient leaderClient = buildLeaderClient()) { + Request request = new Request("PUT", "/_data_stream/" + dataStreamName); + assertOK(leaderClient.performRequest(request)); + verifyDataStream(leaderClient, dataStreamName, ".ds-logs-foobar-prod-000001"); + } + + ResponseException failure = expectThrows(ResponseException.class, + () -> followIndex(".ds-logs-foobar-prod-000001", ".ds-logs-barbaz-prod-000001")); + assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(failure.getMessage(), containsString("a backing index name in the local and remote cluster must remain the same")); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 5e236411991a2..aa642ce021c17 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -134,6 +134,16 @@ private void createFollowerIndex( return; } + if (remoteDataStream != null) { + // when following a backing index then the names of the backing index must be remain the same in the local + // and remote cluster. + if (request.getLeaderIndex().equals(request.getFollowerIndex()) == false) { + listener.onFailure( + new IllegalArgumentException("a backing index name in the local and remote cluster must remain the same")); + return; + } + } + final Settings overrideSettings = Settings.builder() .put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex()) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) From efac2e45687d828f0cf7fb587289ed53f6ff9a82 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 28 Oct 2020 10:08:17 +0100 Subject: [PATCH 13/13] fixed compile errors after merging in master --- .../xpack/ccr/action/TransportPutFollowAction.java | 4 ++-- .../xpack/ccr/action/TransportPutFollowActionTests.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index aa642ce021c17..aa632f91722f4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -251,7 +251,7 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, // The data stream and the backing indices have been created and validated in the remote cluster, // just copying the data stream is in this case safe. return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(), - List.of(backingIndexToFollow), remoteDataStream.getGeneration()); + List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); } else { List backingIndices = new ArrayList<>(localDataStream.getIndices()); backingIndices.add(backingIndexToFollow); @@ -263,7 +263,7 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, backingIndices.sort(Comparator.comparing(Index::getName)); return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices, - remoteDataStream.getGeneration()); + remoteDataStream.getGeneration(), remoteDataStream.getMetadata()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java index 4f19822751c73..585a8f62ace1e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -13,6 +13,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -83,7 +84,7 @@ static DataStream generateDataSteam(String name, int generation, String... backi List backingIndices = Arrays.stream(backingIndexNames) .map(value -> new Index(value, "uuid")) .collect(Collectors.toList()); - return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation); + return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of()); } }