Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ The API returns the following results:
"number_of_failed_follow_indices" : 0,
"number_of_failed_remote_cluster_state_requests" : 0,
"number_of_successful_follow_indices" : 1,
"recent_auto_follow_errors" : []
"recent_auto_follow_errors" : [],
"tracking_remote_clusters" : []
},
"follow_stats" : {
"indices" : [
Expand Down Expand Up @@ -151,6 +152,7 @@ The API returns the following results:
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]
// TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/]
// TESTRESPONSE[s/"tracking_remote_clusters" : \[\]/"tracking_remote_clusters" : $body.auto_follow_stats.tracking_remote_clusters/]
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/]
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/]
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/]
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/ccr/qa/chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ followClusterTestCluster {
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\""
setting 'node.name', 'follow'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;

import java.util.Comparator;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class ChainIT extends ESCCRRestTestCase {

public void testFollowIndex() throws Exception {
Expand Down Expand Up @@ -71,4 +79,68 @@ public void testFollowIndex() throws Exception {
}
}

public void testAutoFollowPatterns() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
assertOK(client().performRequest(putPatternRequest));

putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}");
assertOK(client().performRequest(putPatternRequest));

try (RestClient leaderClient = buildLeaderClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20190101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(leaderClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
}
}

try (RestClient middleClient = buildMiddleClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20200101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(middleClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true");
}
}

assertBusy(() -> {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Map<?, ?> autoFollowStats = (Map<?, ?>) response.get("auto_follow_stats");
assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2));

@SuppressWarnings("unchecked")
List<Map<String, ?>> trackingRemoteClusters =
(List<Map<String, ?>>) autoFollowStats.get("tracking_remote_clusters");
trackingRemoteClusters.sort(Comparator.comparing(o -> ((String) o.get("cluster_name"))));
assertThat(trackingRemoteClusters.size(), equalTo(2));
assertThat(trackingRemoteClusters.get(0).get("cluster_name"), equalTo("leader_cluster"));
assertThat(trackingRemoteClusters.get(1).get("cluster_name"), equalTo("middle_cluster"));

ensureYellow("logs-20190101");
ensureYellow("logs-20200101");
verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true");
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, System::nanoTime)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
leaderClusterStateResponse -> {
ClusterState leaderClusterState = leaderClusterStateResponse.getState();
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex));
Expand Down Expand Up @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
Expand Down Expand Up @@ -192,7 +193,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
final Client remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster
Expand All @@ -204,7 +205,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
} else {
Expand All @@ -228,9 +229,7 @@ public void onFailure(final Exception e) {
* @param onFailure the failure consumer
* @param historyUUIDConsumer the leader index history uuid and consumer
*/
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
// in case of following a local or a remote cluster.
public void fetchLeaderHistoryUUIDs(
private void fetchLeaderHistoryUUIDs(
final Client remoteClient,
final IndexMetaData leaderIndexMetaData,
final Consumer<Exception> onFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -29,12 +28,6 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -43,8 +36,8 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_POLL_INTERVAL);
CCR_FOLLOWING_INDEX_SETTING
);
}

}
Loading