Skip to content

Commit 282fdf8

Browse files
committed
[CCR] Add ccr.auto_follow_coordinator.wait_for_timeout setting (#36714)
This setting controls the wait for timeout the autofollow coordinator should use when setting cluster state requests to a remote cluster.
1 parent cd54d41 commit 282fdf8

File tree

6 files changed

+48
-6
lines changed

6 files changed

+48
-6
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public Collection<Object> createComponents(
163163

164164
return Arrays.asList(
165165
ccrLicenseChecker,
166-
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
166+
new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
167167
);
168168
}
169169

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.common.settings.Setting;
99
import org.elasticsearch.common.settings.Setting.Property;
10+
import org.elasticsearch.common.unit.TimeValue;
1011
import org.elasticsearch.xpack.core.XPackSettings;
1112

1213
import java.util.Arrays;
@@ -28,6 +29,12 @@ private CcrSettings() {
2829
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
2930
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);
3031

32+
/**
33+
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
34+
*/
35+
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
36+
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);
37+
3138
/**
3239
* The settings defined by CCR.
3340
*
@@ -36,7 +43,8 @@ private CcrSettings() {
3643
static List<Setting<?>> getSettings() {
3744
return Arrays.asList(
3845
XPackSettings.CCR_ENABLED_SETTING,
39-
CCR_FOLLOWING_INDEX_SETTING);
46+
CCR_FOLLOWING_INDEX_SETTING,
47+
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
4048
}
4149

4250
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
2727
import org.elasticsearch.common.collect.Tuple;
2828
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.common.unit.TimeValue;
2930
import org.elasticsearch.common.util.concurrent.AtomicArray;
3031
import org.elasticsearch.common.util.concurrent.CountDown;
3132
import org.elasticsearch.index.Index;
3233
import org.elasticsearch.index.IndexSettings;
3334
import org.elasticsearch.license.LicenseUtils;
3435
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
36+
import org.elasticsearch.xpack.ccr.CcrSettings;
3537
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
3638
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
3739
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
@@ -72,6 +74,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
7274
private final CcrLicenseChecker ccrLicenseChecker;
7375
private final LongSupplier relativeMillisTimeProvider;
7476

77+
private volatile TimeValue waitForMetadataTimeOut;
7578
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();
7679

7780
// The following fields are read and updated under a lock:
@@ -81,6 +84,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
8184
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;
8285

8386
public AutoFollowCoordinator(
87+
Settings settings,
8488
Client client,
8589
ClusterService clusterService,
8690
CcrLicenseChecker ccrLicenseChecker,
@@ -97,6 +101,15 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
97101
return size() > MAX_AUTO_FOLLOW_ERRORS;
98102
}
99103
};
104+
105+
Consumer<TimeValue> updater = newWaitForTimeOut -> {
106+
if (newWaitForTimeOut.equals(waitForMetadataTimeOut) == false) {
107+
LOGGER.info("changing wait_for_metadata_timeout from [{}] to [{}]", waitForMetadataTimeOut, newWaitForTimeOut);
108+
waitForMetadataTimeOut = newWaitForTimeOut;
109+
}
110+
};
111+
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater);
112+
waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings);
100113
}
101114

102115
public synchronized AutoFollowStats getStats() {
@@ -180,6 +193,7 @@ void getRemoteClusterState(final String remoteCluster,
180193
request.metaData(true);
181194
request.routingTable(true);
182195
request.waitForMetaDataVersion(metadataVersion);
196+
request.waitForTimeout(waitForMetadataTimeOut);
183197
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
184198
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
185199
client,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.test.TestCluster;
6666
import org.elasticsearch.test.discovery.TestZenDiscovery;
6767
import org.elasticsearch.transport.TransportService;
68+
import org.elasticsearch.xpack.ccr.CcrSettings;
6869
import org.elasticsearch.xpack.ccr.LocalStateCcr;
6970
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
7071
import org.elasticsearch.xpack.core.XPackSettings;
@@ -209,6 +210,8 @@ private NodeConfigurationSource createNodeConfigurationSource() {
209210
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
210211
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
211212
builder.put(NetworkModule.HTTP_ENABLED.getKey(), false);
213+
// Let cluster state api return quickly in order to speed up auto follow tests:
214+
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
212215
return new NodeConfigurationSource() {
213216
@Override
214217
public Settings nodeSettings(int nodeOrdinal) {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.plugins.Plugin;
1616
import org.elasticsearch.test.ESSingleNodeTestCase;
1717
import org.elasticsearch.transport.TransportService;
18+
import org.elasticsearch.xpack.ccr.CcrSettings;
1819
import org.elasticsearch.xpack.ccr.LocalStateCcr;
1920
import org.elasticsearch.xpack.core.XPackSettings;
2021
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@@ -42,6 +43,8 @@ protected Settings nodeSettings() {
4243
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
4344
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
4445
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
46+
// Let cluster state api return quickly in order to speed up auto follow tests:
47+
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
4548
return builder.build();
4649
}
4750

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import org.elasticsearch.cluster.routing.TestShardRouting;
2020
import org.elasticsearch.cluster.service.ClusterService;
2121
import org.elasticsearch.common.collect.Tuple;
22+
import org.elasticsearch.common.settings.ClusterSettings;
2223
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.index.Index;
2425
import org.elasticsearch.index.IndexSettings;
2526
import org.elasticsearch.test.ESTestCase;
2627
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
28+
import org.elasticsearch.xpack.ccr.CcrSettings;
2729
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
2830
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
2931
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
@@ -533,8 +535,9 @@ public void testGetFollowerIndexName() {
533535

534536
public void testStats() {
535537
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
538+
Settings.EMPTY,
536539
null,
537-
mock(ClusterService.class),
540+
mockClusterService(),
538541
new CcrLicenseChecker(() -> true, () -> false),
539542
() -> 1L);
540543

@@ -589,14 +592,15 @@ public void testStats() {
589592
}
590593

591594
public void testUpdateAutoFollowers() {
592-
ClusterService clusterService = mock(ClusterService.class);
595+
ClusterService clusterService = mockClusterService();
593596
// Return a cluster state with no patterns so that the auto followers never really execute:
594597
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
595598
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
596599
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
597600
.build();
598601
when(clusterService.state()).thenReturn(followerState);
599602
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
603+
Settings.EMPTY,
600604
null,
601605
clusterService,
602606
new CcrLicenseChecker(() -> true, () -> false),
@@ -651,8 +655,9 @@ public void testUpdateAutoFollowers() {
651655

652656
public void testUpdateAutoFollowersNoPatterns() {
653657
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
658+
Settings.EMPTY,
654659
null,
655-
mock(ClusterService.class),
660+
mockClusterService(),
656661
new CcrLicenseChecker(() -> true, () -> false),
657662
() -> 1L);
658663
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
@@ -665,8 +670,9 @@ public void testUpdateAutoFollowersNoPatterns() {
665670

666671
public void testUpdateAutoFollowersNoAutoFollowMetadata() {
667672
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
673+
Settings.EMPTY,
668674
null,
669-
mock(ClusterService.class),
675+
mockClusterService(),
670676
new CcrLicenseChecker(() -> true, () -> false),
671677
() -> 1L);
672678
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
@@ -918,4 +924,12 @@ private static Supplier<ClusterState> localClusterStateSupplier(ClusterState...
918924
};
919925
}
920926

927+
private ClusterService mockClusterService() {
928+
ClusterService clusterService = mock(ClusterService.class);
929+
ClusterSettings clusterSettings =
930+
new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT));
931+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
932+
return clusterService;
933+
}
934+
921935
}

0 commit comments

Comments
 (0)