Skip to content

Commit 9ddfb52

Browse files
authored
Add timeout for ccr recovery action (#37840) (#38056)
This is related to #35975. It adds a action timeout setting that allows timeouts to be applied to the individual transport actions that are used during a ccr recovery.
1 parent 288bdf8 commit 9ddfb52

File tree

7 files changed

+127
-19
lines changed

7 files changed

+127
-19
lines changed

test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public boolean setDefaultNodeConnectedBehavior(NodeConnectedBehavior behavior) {
6969
}
7070

7171
public void clearBehaviors() {
72+
defaultGetConnectionBehavior = ConnectionManager::getConnection;
7273
getConnectionBehaviors.clear();
74+
defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
7375
nodeConnectedBehaviors.clear();
7476
}
7577

test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ boolean addConnectBehavior(TransportAddress transportAddress, OpenConnectionBeha
7676
}
7777

7878
void clearBehaviors() {
79+
this.defaultSendRequest = null;
7980
sendBehaviors.clear();
81+
this.defaultConnectBehavior = null;
8082
connectBehaviors.clear();
8183
}
8284

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ public final class CcrSettings {
5757
Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60),
5858
Setting.Property.Dynamic, Setting.Property.NodeScope);
5959

60+
/**
61+
* The timeout value to use for requests made as part of ccr recovery process.
62+
* */
63+
public static final Setting<TimeValue> INDICES_RECOVERY_ACTION_TIMEOUT_SETTING =
64+
Setting.positiveTimeSetting("ccr.indices.recovery.internal_action_timeout", TimeValue.timeValueSeconds(60),
65+
Property.Dynamic, Property.NodeScope);
66+
6067
/**
6168
* The settings defined by CCR.
6269
*
@@ -67,19 +74,23 @@ static List<Setting<?>> getSettings() {
6774
XPackSettings.CCR_ENABLED_SETTING,
6875
CCR_FOLLOWING_INDEX_SETTING,
6976
RECOVERY_MAX_BYTES_PER_SECOND,
77+
INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
7078
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
7179
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
7280
CCR_WAIT_FOR_METADATA_TIMEOUT);
7381
}
7482

7583
private final CombinedRateLimiter ccrRateLimiter;
7684
private volatile TimeValue recoveryActivityTimeout;
85+
private volatile TimeValue recoveryActionTimeout;
7786

7887
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
7988
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
89+
this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
8090
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
8191
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
8292
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
93+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
8394
}
8495

8596
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
@@ -90,11 +101,19 @@ private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) {
90101
this.recoveryActivityTimeout = recoveryActivityTimeout;
91102
}
92103

104+
private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) {
105+
this.recoveryActionTimeout = recoveryActionTimeout;
106+
}
107+
93108
public CombinedRateLimiter getRateLimiter() {
94109
return ccrRateLimiter;
95110
}
96111

97112
public TimeValue getRecoveryActivityTimeout() {
98113
return recoveryActivityTimeout;
99114
}
115+
116+
public TimeValue getRecoveryActionTimeout() {
117+
return recoveryActionTimeout;
118+
}
100119
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public RepositoryMetaData getMetadata() {
124124
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
125125
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
126126
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
127-
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true).get();
127+
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true)
128+
.get(ccrSettings.getRecoveryActionTimeout());
128129
ImmutableOpenMap<String, IndexMetaData> indicesMap = response.getState().metaData().indices();
129130
ArrayList<String> indices = new ArrayList<>(indicesMap.size());
130131
indicesMap.keysIt().forEachRemaining(indices::add);
@@ -138,7 +139,8 @@ public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
138139
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
139140
// We set a single dummy index name to avoid fetching all the index data
140141
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
141-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
142+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
143+
.actionGet(ccrSettings.getRecoveryActionTimeout());
142144
return clusterState.getState().metaData();
143145
}
144146

@@ -149,13 +151,14 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
149151
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
150152

151153
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
152-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
154+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
155+
.actionGet(ccrSettings.getRecoveryActionTimeout());
153156

154157
// Validates whether the leader cluster has been configured properly:
155158
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
156159
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
157160
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
158-
String[] leaderHistoryUUIDs = future.actionGet();
161+
String[] leaderHistoryUUIDs = future.actionGet(ccrSettings.getRecoveryActionTimeout());
159162

160163
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndexMetaData);
161164
// Adding the leader index uuid for each shard as custom metadata:
@@ -172,7 +175,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
172175
@Override
173176
public RepositoryData getRepositoryData() {
174177
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
175-
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).get();
178+
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
179+
.get(ccrSettings.getRecoveryActionTimeout());
176180
MetaData remoteMetaData = response.getState().getMetaData();
177181

178182
Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
@@ -282,7 +286,8 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve
282286

283287
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
284288
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
285-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
289+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
290+
.actionGet(ccrSettings.getRecoveryActionTimeout());
286291
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
287292
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
288293

@@ -292,17 +297,17 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
292297
leaderIndexMetadata.getMappings().size() + "]";
293298
MappingMetaData mappingMetaData = leaderIndexMetadata.getMappings().iterator().next().value;
294299
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
295-
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
300+
localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
296301
}
297302
}
298303

299304
private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
300305
RecoveryState recoveryState) {
301306
String sessionUUID = UUIDs.randomBase64UUID();
302307
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
303-
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
308+
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
304309
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
305-
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
310+
response.getStoreFileMetaData(), ccrSettings, throttledTime::inc);
306311
}
307312

308313
private static class RestoreSession extends FileRestoreContext implements Closeable {
@@ -313,18 +318,18 @@ private static class RestoreSession extends FileRestoreContext implements Closea
313318
private final String sessionUUID;
314319
private final DiscoveryNode node;
315320
private final Store.MetadataSnapshot sourceMetaData;
316-
private final CombinedRateLimiter rateLimiter;
321+
private final CcrSettings ccrSettings;
317322
private final LongConsumer throttleListener;
318323

319324
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
320-
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
325+
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings,
321326
LongConsumer throttleListener) {
322327
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
323328
this.remoteClient = remoteClient;
324329
this.sessionUUID = sessionUUID;
325330
this.node = node;
326331
this.sourceMetaData = sourceMetaData;
327-
this.rateLimiter = rateLimiter;
332+
this.ccrSettings = ccrSettings;
328333
this.throttleListener = throttleListener;
329334
}
330335

@@ -340,14 +345,14 @@ void restoreFiles() throws IOException {
340345

341346
@Override
342347
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
343-
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
348+
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), ccrSettings, throttleListener);
344349
}
345350

346351
@Override
347352
public void close() {
348353
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
349354
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
350-
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
355+
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
351356
}
352357
}
353358

@@ -358,17 +363,19 @@ private static class RestoreFileInputStream extends InputStream {
358363
private final DiscoveryNode node;
359364
private final StoreFileMetaData fileToRecover;
360365
private final CombinedRateLimiter rateLimiter;
366+
private final CcrSettings ccrSettings;
361367
private final LongConsumer throttleListener;
362368

363369
private long pos = 0;
364370

365371
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
366-
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
372+
CcrSettings ccrSettings, LongConsumer throttleListener) {
367373
this.remoteClient = remoteClient;
368374
this.sessionUUID = sessionUUID;
369375
this.node = node;
370376
this.fileToRecover = fileToRecover;
371-
this.rateLimiter = rateLimiter;
377+
this.ccrSettings = ccrSettings;
378+
this.rateLimiter = ccrSettings.getRateLimiter();
372379
this.throttleListener = throttleListener;
373380
}
374381

@@ -393,7 +400,7 @@ public int read(byte[] bytes, int off, int len) throws IOException {
393400
String fileName = fileToRecover.name();
394401
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
395402
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
396-
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet();
403+
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout());
397404
BytesReference fileChunk = response.getChunk();
398405

399406
int bytesReceived = fileChunk.length();

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.test.NodeConfigurationSource;
6464
import org.elasticsearch.test.TestCluster;
6565
import org.elasticsearch.test.discovery.TestZenDiscovery;
66+
import org.elasticsearch.test.transport.MockTransportService;
6667
import org.elasticsearch.transport.TransportService;
6768
import org.elasticsearch.xpack.ccr.CcrSettings;
6869
import org.elasticsearch.xpack.ccr.LocalStateCcr;
@@ -123,7 +124,7 @@ public final void startClusters() throws Exception {
123124

124125
stopClusters();
125126
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
126-
TestZenDiscovery.TestPlugin.class, getTestTransportPlugin());
127+
TestZenDiscovery.TestPlugin.class, getTestTransportPlugin(), MockTransportService.TestPlugin.class);
127128

128129
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
129130
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, false, "leader",

0 commit comments

Comments
 (0)