From 7d5ee4309beb1cd99811556aee491d047cf2d99f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 29 Jan 2019 12:29:06 -0700 Subject: [PATCH] Add timeout for ccr recovery action (#37840) This is related to #35975. It adds a action timeout setting that allows timeouts to be applied to the individual transport actions that are used during a ccr recovery. --- .../transport/StubbableConnectionManager.java | 2 + .../test/transport/StubbableTransport.java | 2 + .../elasticsearch/xpack/ccr/CcrSettings.java | 19 +++++ .../xpack/ccr/repository/CcrRepository.java | 41 ++++++---- .../elasticsearch/xpack/CcrIntegTestCase.java | 3 +- .../xpack/ccr/CcrRepositoryIT.java | 77 +++++++++++++++++++ .../CcrRestoreSourceServiceTests.java | 2 +- 7 files changed, 127 insertions(+), 19 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index 994037be07a92..41ac87f0af576 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -69,7 +69,9 @@ public boolean setDefaultNodeConnectedBehavior(NodeConnectedBehavior behavior) { } public void clearBehaviors() { + defaultGetConnectionBehavior = ConnectionManager::getConnection; getConnectionBehaviors.clear(); + defaultNodeConnectedBehavior = ConnectionManager::nodeConnected; nodeConnectedBehaviors.clear(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 0014e1225c595..673ed49387570 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -76,7 +76,9 @@ boolean addConnectBehavior(TransportAddress transportAddress, OpenConnectionBeha } void clearBehaviors() { + this.defaultSendRequest = null; sendBehaviors.clear(); + this.defaultConnectBehavior = null; connectBehaviors.clear(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d6262a7711dad..625429dc0abc6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -57,6 +57,13 @@ public final class CcrSettings { Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60), Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * The timeout value to use for requests made as part of ccr recovery process. + * */ + public static final Setting INDICES_RECOVERY_ACTION_TIMEOUT_SETTING = + Setting.positiveTimeSetting("ccr.indices.recovery.internal_action_timeout", TimeValue.timeValueSeconds(60), + Property.Dynamic, Property.NodeScope); + /** * The settings defined by CCR. * @@ -67,6 +74,7 @@ static List> getSettings() { XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, RECOVERY_MAX_BYTES_PER_SECOND, + INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, CCR_WAIT_FOR_METADATA_TIMEOUT); @@ -74,12 +82,15 @@ static List> getSettings() { private final CombinedRateLimiter ccrRateLimiter; private volatile TimeValue recoveryActivityTimeout; + private volatile TimeValue recoveryActionTimeout; public CcrSettings(Settings settings, ClusterSettings clusterSettings) { this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); + this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings); this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout); } private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { @@ -90,6 +101,10 @@ private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) { this.recoveryActivityTimeout = recoveryActivityTimeout; } + private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) { + this.recoveryActionTimeout = recoveryActionTimeout; + } + public CombinedRateLimiter getRateLimiter() { return ccrRateLimiter; } @@ -97,4 +112,8 @@ public CombinedRateLimiter getRateLimiter() { public TimeValue getRecoveryActivityTimeout() { return recoveryActivityTimeout; } + + public TimeValue getRecoveryActionTimeout() { + return recoveryActionTimeout; + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index a2cc22ef6ed63..539c19ca8501f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -124,7 +124,8 @@ public RepositoryMetaData getMetadata() { public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true).get(); + ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true) + .get(ccrSettings.getRecoveryActionTimeout()); ImmutableOpenMap indicesMap = response.getState().metaData().indices(); ArrayList indices = new ArrayList<>(indicesMap.size()); indicesMap.keysIt().forEachRemaining(indices::add); @@ -138,7 +139,8 @@ public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); // We set a single dummy index name to avoid fetching all the index data ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name"); - ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest) + .actionGet(ccrSettings.getRecoveryActionTimeout()); return clusterState.getState().metaData(); } @@ -149,13 +151,14 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex); - ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest) + .actionGet(ccrSettings.getRecoveryActionTimeout()); // Validates whether the leader cluster has been configured properly: PlainActionFuture future = PlainActionFuture.newFuture(); IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex); ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse); - String[] leaderHistoryUUIDs = future.actionGet(); + String[] leaderHistoryUUIDs = future.actionGet(ccrSettings.getRecoveryActionTimeout()); IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndexMetaData); // Adding the leader index uuid for each shard as custom metadata: @@ -172,7 +175,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind @Override public RepositoryData getRepositoryData() { Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).get(); + ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true) + .get(ccrSettings.getRecoveryActionTimeout()); MetaData remoteMetaData = response.getState().getMetaData(); Map copiedSnapshotIds = new HashMap<>(); @@ -282,7 +286,8 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); - ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest) + .actionGet(ccrSettings.getRecoveryActionTimeout()); IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); long leaderMappingVersion = leaderIndexMetadata.getMappingVersion(); @@ -292,7 +297,7 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndexMetadata.getMappings().size() + "]"; MappingMetaData mappingMetaData = leaderIndexMetadata.getMappings().iterator().next().value; PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); - localClient.admin().indices().putMapping(putMappingRequest).actionGet(); + localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); } } @@ -300,9 +305,9 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S RecoveryState recoveryState) { String sessionUUID = UUIDs.randomBase64UUID(); PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout()); return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, - response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc); + response.getStoreFileMetaData(), ccrSettings, throttledTime::inc); } private static class RestoreSession extends FileRestoreContext implements Closeable { @@ -313,18 +318,18 @@ private static class RestoreSession extends FileRestoreContext implements Closea private final String sessionUUID; private final DiscoveryNode node; private final Store.MetadataSnapshot sourceMetaData; - private final CombinedRateLimiter rateLimiter; + private final CcrSettings ccrSettings; private final LongConsumer throttleListener; RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, - RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter, + RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings, LongConsumer throttleListener) { super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.sourceMetaData = sourceMetaData; - this.rateLimiter = rateLimiter; + this.ccrSettings = ccrSettings; this.throttleListener = throttleListener; } @@ -340,14 +345,14 @@ void restoreFiles() throws IOException { @Override protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener); + return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), ccrSettings, throttleListener); } @Override public void close() { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = - remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); + remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); } } @@ -358,17 +363,19 @@ private static class RestoreFileInputStream extends InputStream { private final DiscoveryNode node; private final StoreFileMetaData fileToRecover; private final CombinedRateLimiter rateLimiter; + private final CcrSettings ccrSettings; private final LongConsumer throttleListener; private long pos = 0; private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover, - CombinedRateLimiter rateLimiter, LongConsumer throttleListener) { + CcrSettings ccrSettings, LongConsumer throttleListener) { this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.fileToRecover = fileToRecover; - this.rateLimiter = rateLimiter; + this.ccrSettings = ccrSettings; + this.rateLimiter = ccrSettings.getRateLimiter(); this.throttleListener = throttleListener; } @@ -393,7 +400,7 @@ public int read(byte[] bytes, int off, int len) throws IOException { String fileName = fileToRecover.name(); GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested); GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response = - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(); + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout()); BytesReference fileChunk = response.getChunk(); int bytesReceived = fileChunk.length(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index add19f5926a8a..771ee2e0338d2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -63,6 +63,7 @@ import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.discovery.TestZenDiscovery; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; @@ -123,7 +124,7 @@ public final void startClusters() throws Exception { stopClusters(); Collection> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class, - TestZenDiscovery.TestPlugin.class, getTestTransportPlugin()); + TestZenDiscovery.TestPlugin.class, getTestTransportPlugin(), MockTransportService.TestPlugin.class); InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, false, "leader", diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index ee02241978cfe..f22857939e0d1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; @@ -34,8 +36,10 @@ import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; @@ -51,6 +55,7 @@ import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; // TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work // TODO: is completed. @@ -308,6 +313,78 @@ public void testRateLimitingIsEmployed() throws Exception { } } + public void testIndividualActionsTimeout() throws Exception { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + TimeValue timeValue = TimeValue.timeValueMillis(100); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), timeValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + List transportServices = new ArrayList<>(); + + for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + transportServices.add(mockTransportService); + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) { + connection.sendRequest(requestId, action, request, options); + } + }); + } + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) + .indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$") + .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) + .indexSettings(settingsBuilder); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + + // Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching + // metadata this will throw an exception. If it times-out when restoring a shard, the shard will + // be marked as failed. Either one is a success for the purpose of this test. + try { + RestoreInfo restoreInfo = future.actionGet(); + assertEquals(0, restoreInfo.successfulShards()); + assertEquals(numberOfPrimaryShards, restoreInfo.failedShards()); + } catch (Exception e) { + assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class)); + } + + + for (MockTransportService transportService : transportServices) { + transportService.clearAllRules(); + } + + settingsRequest = new ClusterUpdateSettingsRequest(); + TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), + defaultValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37887") public void testFollowerMappingIsUpdated() throws IOException { String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 5f352788d9597..3035b96b5bcac 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -40,7 +40,7 @@ public void setUp() throws Exception { Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); taskQueue = new DeterministicTaskQueue(settings, random()); Set> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, - CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND); + CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings); restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings)); }