From cf769f741d3ceb3e0bc54451f59566ceb38a322e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 17 Dec 2018 15:31:56 +0100 Subject: [PATCH 1/3] [CCR] Add `ccr.auto_follow_coordinator.wait_for_timeout` setting This setting controls the wait for timeout the autofollow coordinator should use when setting cluster state requests to a remote cluster. --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../elasticsearch/xpack/ccr/CcrSettings.java | 10 ++++++++- .../ccr/action/AutoFollowCoordinator.java | 14 ++++++++++++ .../elasticsearch/xpack/CcrIntegTestCase.java | 3 +++ .../xpack/CcrSingleNodeTestCase.java | 3 +++ .../action/AutoFollowCoordinatorTests.java | 22 +++++++++++++++---- 6 files changed, 48 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 70d4905d94375..7f7a0e4a5e0f1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -156,7 +156,7 @@ public Collection createComponents( return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis) + new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 544a45792e070..d90215fe6ef43 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,6 +7,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -28,6 +29,12 @@ private CcrSettings() { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); + /** + * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using. + */ + public static final Setting CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT = Setting.timeSetting( + "ccr.auto_follow_coordinator.wait_for_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + /** * The settings defined by CCR. * @@ -36,7 +43,8 @@ private CcrSettings() { static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING); + CCR_FOLLOWING_INDEX_SETTING, + CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 4888b0367fd20..b683acfdf6b36 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -27,12 +27,14 @@ import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; @@ -72,6 +74,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { private final CcrLicenseChecker ccrLicenseChecker; private final LongSupplier relativeMillisTimeProvider; + private volatile TimeValue waitForTimeOut; private volatile Map autoFollowers = Collections.emptyMap(); // The following fields are read and updated under a lock: @@ -81,6 +84,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { private final LinkedHashMap recentAutoFollowErrors; public AutoFollowCoordinator( + Settings settings, Client client, ClusterService clusterService, CcrLicenseChecker ccrLicenseChecker, @@ -97,6 +101,15 @@ protected boolean removeEldestEntry(final Map.Entry MAX_AUTO_FOLLOW_ERRORS; } }; + + Consumer updater = newWaitForTimeOut -> { + if (newWaitForTimeOut.equals(waitForTimeOut) == false) { + LOGGER.info("changing wait_for_timeout from [{}] to [{}]", waitForTimeOut, newWaitForTimeOut); + waitForTimeOut = newWaitForTimeOut; + } + }; + clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT, updater); + waitForTimeOut = CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.get(settings); } public synchronized AutoFollowStats getStats() { @@ -180,6 +193,7 @@ void getRemoteClusterState(final String remoteCluster, request.metaData(true); request.routingTable(true); request.waitForMetaDataVersion(metadataVersion); + request.waitForTimeout(waitForTimeOut); // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( client, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 5abe852ca5ff0..6d2079e5206df 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -65,6 +65,7 @@ import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.core.XPackSettings; @@ -195,6 +196,8 @@ private NodeConfigurationSource createNodeConfigurationSource() { builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + // Let cluster state api return quickly in order to speed up auto follow tests: + builder.put(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); return new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 417de7cd985c5..5a662cd91b1a3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -14,6 +14,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -41,6 +42,8 @@ protected Settings nodeSettings() { builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + // Let cluster state api return quickly in order to speed up auto follow tests: + builder.put(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); return builder.build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 7228acaacf1a9..a2dac38ca13d2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -19,11 +19,13 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; @@ -530,8 +532,9 @@ public void testGetFollowerIndexName() { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, - mock(ClusterService.class), + mockClusterService(), new CcrLicenseChecker(() -> true, () -> false), () -> 1L); @@ -586,7 +589,7 @@ public void testStats() { } public void testUpdateAutoFollowers() { - ClusterService clusterService = mock(ClusterService.class); + ClusterService clusterService = mockClusterService(); // Return a cluster state with no patterns so that the auto followers never really execute: ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -594,6 +597,7 @@ public void testUpdateAutoFollowers() { .build(); when(clusterService.state()).thenReturn(followerState); AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, clusterService, new CcrLicenseChecker(() -> true, () -> false), @@ -648,8 +652,9 @@ public void testUpdateAutoFollowers() { public void testUpdateAutoFollowersNoPatterns() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, - mock(ClusterService.class), + mockClusterService(), new CcrLicenseChecker(() -> true, () -> false), () -> 1L); ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) @@ -662,8 +667,9 @@ public void testUpdateAutoFollowersNoPatterns() { public void testUpdateAutoFollowersNoAutoFollowMetadata() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, - mock(ClusterService.class), + mockClusterService(), new CcrLicenseChecker(() -> true, () -> false), () -> 1L); ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build(); @@ -840,4 +846,12 @@ private static Supplier localClusterStateSupplier(ClusterState... }; } + private ClusterService mockClusterService() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = + new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return clusterService; + } + } From 0870d99d40d9a8a9c5ef09fb3fbf267d307d9f79 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 19 Dec 2018 10:22:20 +0100 Subject: [PATCH 2/3] rename --- .../org/elasticsearch/xpack/ccr/CcrSettings.java | 6 +++--- .../xpack/ccr/action/AutoFollowCoordinator.java | 14 +++++++------- .../org/elasticsearch/xpack/CcrIntegTestCase.java | 2 +- .../elasticsearch/xpack/CcrSingleNodeTestCase.java | 2 +- .../ccr/action/AutoFollowCoordinatorTests.java | 2 +- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d90215fe6ef43..cac462ae40d38 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -32,8 +32,8 @@ private CcrSettings() { /** * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using. */ - public static final Setting CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT = Setting.timeSetting( - "ccr.auto_follow_coordinator.wait_for_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + public static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( + "ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); /** * The settings defined by CCR. @@ -44,7 +44,7 @@ static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, - CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT); + CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index b683acfdf6b36..dbaf83a775a03 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -74,7 +74,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { private final CcrLicenseChecker ccrLicenseChecker; private final LongSupplier relativeMillisTimeProvider; - private volatile TimeValue waitForTimeOut; + private volatile TimeValue waitForMetadataTimeOut; private volatile Map autoFollowers = Collections.emptyMap(); // The following fields are read and updated under a lock: @@ -103,13 +103,13 @@ protected boolean removeEldestEntry(final Map.Entry updater = newWaitForTimeOut -> { - if (newWaitForTimeOut.equals(waitForTimeOut) == false) { - LOGGER.info("changing wait_for_timeout from [{}] to [{}]", waitForTimeOut, newWaitForTimeOut); - waitForTimeOut = newWaitForTimeOut; + if (newWaitForTimeOut.equals(waitForMetadataTimeOut) == false) { + LOGGER.info("changing wait_for_metadata_timeout from [{}] to [{}]", waitForMetadataTimeOut, newWaitForTimeOut); + waitForMetadataTimeOut = newWaitForTimeOut; } }; - clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT, updater); - waitForTimeOut = CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater); + waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings); } public synchronized AutoFollowStats getStats() { @@ -193,7 +193,7 @@ void getRemoteClusterState(final String remoteCluster, request.metaData(true); request.routingTable(true); request.waitForMetaDataVersion(metadataVersion); - request.waitForTimeout(waitForTimeOut); + request.waitForTimeout(waitForMetadataTimeOut); // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( client, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index e152b73c2badb..aa75a25eac2a8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -192,7 +192,7 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: - builder.put(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); if (leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index cf90be24528eb..0ae6c9651aacb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -43,7 +43,7 @@ protected Settings nodeSettings() { builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: - builder.put(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); return builder.build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index a2dac38ca13d2..3f7f09c95f5b0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -849,7 +849,7 @@ private static Supplier localClusterStateSupplier(ClusterState... private ClusterService mockClusterService() { ClusterService clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = - new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_COORDINATOR_WAIT_FOR_TIMEOUT)); + new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT)); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); return clusterService; } From 134eb6f3ee8d2d2c5ea16827a5d9469906f2d871 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 20 Dec 2018 07:50:59 +0100 Subject: [PATCH 3/3] fixed formatting --- .../src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index cac462ae40d38..d7495dec8c2cf 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -44,7 +44,7 @@ static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, - CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); + CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } }