Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
03298b6
Add data stream support to CCR
martijnvg Sep 4, 2020
ca215b1
Merge remote-tracking branch 'es/master' into ccr_data_stream_support
martijnvg Sep 7, 2020
206fbf0
move the check that checks parent data stream after the check whether…
martijnvg Sep 7, 2020
addf40f
delete auto follow pattern after test
martijnvg Sep 7, 2020
4023672
Merge remote-tracking branch 'es/master' into ccr_data_stream_support
martijnvg Sep 21, 2020
606da3d
added tests and docs
martijnvg Sep 21, 2020
551adc8
updated jdocs
martijnvg Sep 21, 2020
f81cc8b
iter
martijnvg Sep 21, 2020
2e9cd8f
Merge branch 'master' into ccr_data_stream_support
elasticmachine Sep 23, 2020
715dfe3
Merge remote-tracking branch 'es/master' into ccr_data_stream_support
martijnvg Sep 28, 2020
c5ec9a6
Applying @lockewritesdocs' doc suggestion.
martijnvg Oct 19, 2020
520b8c1
Merge remote-tracking branch 'es/master' into ccr_data_stream_support
martijnvg Oct 19, 2020
71e2932
Make use of builtin composable index template for logs-* and
martijnvg Oct 20, 2020
8d4cf01
Ensure the ordering of the backing indices in the follow cluster take…
martijnvg Oct 20, 2020
1d70562
removed unused imports
martijnvg Oct 20, 2020
abe7094
Throw a specific error when attempting to follow a data stream or alias.
martijnvg Oct 20, 2020
261ce1c
Prohibit changing a backing index name when during follow index.
martijnvg Oct 20, 2020
d042d00
Merge remote-tracking branch 'es/master' into ccr_data_stream_support
martijnvg Oct 21, 2020
498a1ff
Merge remote-tracking branch 'es/master' into ccr_data_stream_support
martijnvg Oct 28, 2020
efac2e4
fixed compile errors after merging in master
martijnvg Oct 28, 2020
94827c3
Merge remote-tracking branch 'es/master' into ccr_data_stream_support
martijnvg Nov 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/ccr/auto-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ each new index in the series is replicated automatically. Whenever the name of
a new index on the remote cluster matches the auto-follow pattern, a
corresponding follower index is added to the local cluster.

You can also create auto-follow patterns for data streams. When a new backing
index is generated on a remote cluster, that index and its data stream are
automatically followed if the data stream name matches an auto-follow
pattern. If you create a data stream after creating the auto-follow pattern,
all backing indices are followed automatically.


Auto-follow patterns are especially useful with
<<index-lifecycle-management,{ilm-cap}>>, which might continually create
new indices on the cluster containing the leader index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -149,9 +152,21 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request,
mdBuilder.version(currentState.metadata().version());
String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request);
for (String filteredIndex : indices) {
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
if (indexMetadata != null) {
mdBuilder.put(indexMetadata, false);
// If the requested index is part of a data stream then that data stream should also be included:
IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(filteredIndex);
if (indexAbstraction.getParentDataStream() != null) {
DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
mdBuilder.put(dataStream);
// Also the IMD of other backing indices need to be included, otherwise the cluster state api
// can't create a valid cluster state instance:
for (Index backingIndex : dataStream.getIndices()) {
mdBuilder.put(currentState.metadata().index(backingIndex), false);
}
} else {
IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
if (indexMetadata != null) {
mdBuilder.put(indexMetadata, false);
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -184,6 +185,20 @@ public RestoreService(ClusterService clusterService, RepositoriesService reposit
* @param listener restore listener
*/
public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionListener<RestoreCompletionResponse> listener) {
restoreSnapshot(request, listener, (clusterState, builder) -> {});
}

/**
* Restores snapshot specified in the restore request.
*
* @param request restore request
* @param listener restore listener
* @param updater handler that allows callers to make modifications to {@link Metadata}
* in the same cluster state update as the restore operation
*/
public void restoreSnapshot(final RestoreSnapshotRequest request,
final ActionListener<RestoreCompletionResponse> listener,
final BiConsumer<ClusterState, Metadata.Builder> updater) {
try {
// Read snapshot info and metadata from the repository
final String repositoryName = request.repository();
Expand Down Expand Up @@ -455,6 +470,7 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
}

RoutingTable rt = rtBuilder.build();
updater.accept(currentState, mdBuilder);
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,27 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class AutoFollowIT extends ESCCRRestTestCase {

private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss", Locale.ROOT);

public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );
Expand Down Expand Up @@ -64,6 +73,7 @@ public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true");
});
deleteAutoFollowPattern("leader_cluster_pattern");
}

public void testAutoFollowPatterns() throws Exception {
Expand Down Expand Up @@ -122,6 +132,7 @@ public void testAutoFollowPatterns() throws Exception {
verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS);
deleteAutoFollowPattern("test_pattern");
}

public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
Expand Down Expand Up @@ -163,12 +174,218 @@ public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws
);
}

public void testDataStreams() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final int numDocs = 64;
final String dataStreamName = "logs-mysql-error";

int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();

// Create auto follow pattern
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));

// Create data stream and ensure that is is auto followed
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001");
verifyDocuments(leaderClient, dataStreamName, numDocs);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs);
});
}

// First rollover and ensure second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 1);
});
}

// Second rollover and ensure third backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002", "" +
".ds-logs-mysql-error-000003");

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, numDocs + 2);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
verifyDataStream(client(), dataStreamName, ".ds-logs-mysql-error-000001", ".ds-logs-mysql-error-000002",
".ds-logs-mysql-error-000003");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, numDocs + 2);
});
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
deleteDataStream(dataStreamName);
}
}

public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final int initialNumDocs = 16;
final String dataStreamName = "logs-syslog-prod";
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
// Initialize data stream prior to auto following
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < initialNumDocs; i++) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001");
verifyDocuments(leaderClient, dataStreamName, initialNumDocs);
}
}
// Create auto follow pattern
{
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("logs-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));
}
// Rollover and ensure only second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");

Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
assertOK(leaderClient.performRequest(indexRequest));
verifyDocuments(leaderClient, dataStreamName, initialNumDocs + 1);
}
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, 1);
});
}
// Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly:
{
followIndex(".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000001");
assertBusy(() -> {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
verifyDataStream(client(), dataStreamName, ".ds-logs-syslog-prod-000001", ".ds-logs-syslog-prod-000002");
ensureYellow(dataStreamName);
verifyDocuments(client(), dataStreamName, initialNumDocs + 1);
});
}
// Cleanup:
{
deleteAutoFollowPattern("test_pattern");
deleteDataStream(dataStreamName);
}
}

private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
response = (Map<?, ?>) response.get("auto_follow_stats");
return (Integer) response.get("number_of_successful_follow_indices");
}

private static void verifyDocuments(final RestClient client,
final String index,
final int expectedNumDocs) throws IOException {
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Map<String, ?> response = toMap(client.performRequest(request));

int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(index, numDocs, equalTo(expectedNumDocs));
}

static void verifyDataStream(final RestClient client,
final String name,
final String... expectedBackingIndices) throws IOException {
Request request = new Request("GET", "/_data_stream/" + name);
Map<String, ?> response = toMap(client.performRequest(request));
List<?> retrievedDataStreams = (List<?>) response.get("data_streams");
assertThat(retrievedDataStreams, hasSize(1));
List<?> actualBackingIndices = (List<?>) ((Map<?, ?>) retrievedDataStreams.get(0)).get("indices");
assertThat(actualBackingIndices, hasSize(expectedBackingIndices.length));
for (int i = 0; i < expectedBackingIndices.length; i++) {
Map<?, ?> actualBackingIndex = (Map<?, ?>) actualBackingIndices.get(i);
String expectedBackingIndex = expectedBackingIndices[i];
assertThat(actualBackingIndex.get("index_name"), equalTo(expectedBackingIndex));
}
}

private void deleteDataStream(String name) throws IOException {
try (RestClient leaderClient = buildLeaderClient()) {
Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name);
assertOK(leaderClient.performRequest(deleteTemplateRequest));
}
}

}
Loading