Skip to content

Commit f3f9cab

Browse files
authored
Add timeout for ccr recovery action (#37840)
This is related to #35975. It adds a action timeout setting that allows timeouts to be applied to the individual transport actions that are used during a ccr recovery.
1 parent e933233 commit f3f9cab

File tree

8 files changed

+145
-19
lines changed

8 files changed

+145
-19
lines changed

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,16 @@ public boolean addSendBehavior(TransportAddress transportAddress, StubbableTrans
406406
return transport().addSendBehavior(transportAddress, sendBehavior);
407407
}
408408

409+
/**
410+
* Adds a send behavior that is the default send behavior.
411+
*
412+
* @return {@code true} if no default send behavior was registered
413+
*/
414+
public boolean addSendBehavior(StubbableTransport.SendRequestBehavior behavior) {
415+
return transport().setDefaultSendBehavior(behavior);
416+
}
417+
418+
409419
/**
410420
* Adds a new connect behavior that is used for creating connections with the given delegate service.
411421
*

test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public boolean setDefaultNodeConnectedBehavior(NodeConnectedBehavior behavior) {
6969
}
7070

7171
public void clearBehaviors() {
72+
defaultGetConnectionBehavior = ConnectionManager::getConnection;
7273
getConnectionBehaviors.clear();
74+
defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
7375
nodeConnectedBehaviors.clear();
7476
}
7577

test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ public StubbableTransport(Transport transport) {
5555
this.delegate = transport;
5656
}
5757

58+
boolean setDefaultSendBehavior(SendRequestBehavior sendBehavior) {
59+
SendRequestBehavior prior = defaultSendRequest;
60+
defaultSendRequest = sendBehavior;
61+
return prior == null;
62+
}
63+
5864
public boolean setDefaultConnectBehavior(OpenConnectionBehavior openConnectionBehavior) {
5965
OpenConnectionBehavior prior = this.defaultConnectBehavior;
6066
this.defaultConnectBehavior = openConnectionBehavior;
@@ -70,7 +76,9 @@ boolean addConnectBehavior(TransportAddress transportAddress, OpenConnectionBeha
7076
}
7177

7278
void clearBehaviors() {
79+
this.defaultSendRequest = null;
7380
sendBehaviors.clear();
81+
this.defaultConnectBehavior = null;
7482
connectBehaviors.clear();
7583
}
7684

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ public final class CcrSettings {
5757
Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60),
5858
Setting.Property.Dynamic, Setting.Property.NodeScope);
5959

60+
/**
61+
* The timeout value to use for requests made as part of ccr recovery process.
62+
* */
63+
public static final Setting<TimeValue> INDICES_RECOVERY_ACTION_TIMEOUT_SETTING =
64+
Setting.positiveTimeSetting("ccr.indices.recovery.internal_action_timeout", TimeValue.timeValueSeconds(60),
65+
Property.Dynamic, Property.NodeScope);
66+
6067
/**
6168
* The settings defined by CCR.
6269
*
@@ -67,19 +74,23 @@ static List<Setting<?>> getSettings() {
6774
XPackSettings.CCR_ENABLED_SETTING,
6875
CCR_FOLLOWING_INDEX_SETTING,
6976
RECOVERY_MAX_BYTES_PER_SECOND,
77+
INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
7078
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
7179
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
7280
CCR_WAIT_FOR_METADATA_TIMEOUT);
7381
}
7482

7583
private final CombinedRateLimiter ccrRateLimiter;
7684
private volatile TimeValue recoveryActivityTimeout;
85+
private volatile TimeValue recoveryActionTimeout;
7786

7887
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
7988
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
89+
this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
8090
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
8191
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
8292
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
93+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
8394
}
8495

8596
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
@@ -90,11 +101,19 @@ private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) {
90101
this.recoveryActivityTimeout = recoveryActivityTimeout;
91102
}
92103

104+
private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) {
105+
this.recoveryActionTimeout = recoveryActionTimeout;
106+
}
107+
93108
public CombinedRateLimiter getRateLimiter() {
94109
return ccrRateLimiter;
95110
}
96111

97112
public TimeValue getRecoveryActivityTimeout() {
98113
return recoveryActivityTimeout;
99114
}
115+
116+
public TimeValue getRecoveryActionTimeout() {
117+
return recoveryActionTimeout;
118+
}
100119
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public RepositoryMetaData getMetadata() {
127127
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
128128
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
129129
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
130-
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true).get();
130+
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true)
131+
.get(ccrSettings.getRecoveryActionTimeout());
131132
ImmutableOpenMap<String, IndexMetaData> indicesMap = response.getState().metaData().indices();
132133
ArrayList<String> indices = new ArrayList<>(indicesMap.size());
133134
indicesMap.keysIt().forEachRemaining(indices::add);
@@ -141,7 +142,8 @@ public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
141142
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
142143
// We set a single dummy index name to avoid fetching all the index data
143144
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
144-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
145+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
146+
.actionGet(ccrSettings.getRecoveryActionTimeout());
145147
return clusterState.getState().metaData();
146148
}
147149

@@ -152,13 +154,14 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
152154
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
153155

154156
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
155-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
157+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
158+
.actionGet(ccrSettings.getRecoveryActionTimeout());
156159

157160
// Validates whether the leader cluster has been configured properly:
158161
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
159162
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
160163
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
161-
String[] leaderHistoryUUIDs = future.actionGet();
164+
String[] leaderHistoryUUIDs = future.actionGet(ccrSettings.getRecoveryActionTimeout());
162165

163166
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndex);
164167
// Adding the leader index uuid for each shard as custom metadata:
@@ -188,7 +191,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
188191
@Override
189192
public RepositoryData getRepositoryData() {
190193
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
191-
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).get();
194+
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
195+
.get(ccrSettings.getRecoveryActionTimeout());
192196
MetaData remoteMetaData = response.getState().getMetaData();
193197

194198
Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
@@ -298,25 +302,26 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve
298302

299303
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
300304
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
301-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
305+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
306+
.actionGet(ccrSettings.getRecoveryActionTimeout());
302307
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
303308
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
304309

305310
if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
306311
Index followerIndex = followerIndexSettings.getIndex();
307312
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
308313
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
309-
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
314+
localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
310315
}
311316
}
312317

313318
private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
314319
RecoveryState recoveryState) {
315320
String sessionUUID = UUIDs.randomBase64UUID();
316321
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
317-
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
322+
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
318323
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
319-
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
324+
response.getStoreFileMetaData(), ccrSettings, throttledTime::inc);
320325
}
321326

322327
private static class RestoreSession extends FileRestoreContext implements Closeable {
@@ -327,18 +332,18 @@ private static class RestoreSession extends FileRestoreContext implements Closea
327332
private final String sessionUUID;
328333
private final DiscoveryNode node;
329334
private final Store.MetadataSnapshot sourceMetaData;
330-
private final CombinedRateLimiter rateLimiter;
335+
private final CcrSettings ccrSettings;
331336
private final LongConsumer throttleListener;
332337

333338
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
334-
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
339+
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings,
335340
LongConsumer throttleListener) {
336341
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
337342
this.remoteClient = remoteClient;
338343
this.sessionUUID = sessionUUID;
339344
this.node = node;
340345
this.sourceMetaData = sourceMetaData;
341-
this.rateLimiter = rateLimiter;
346+
this.ccrSettings = ccrSettings;
342347
this.throttleListener = throttleListener;
343348
}
344349

@@ -354,14 +359,14 @@ void restoreFiles() throws IOException {
354359

355360
@Override
356361
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
357-
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
362+
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), ccrSettings, throttleListener);
358363
}
359364

360365
@Override
361366
public void close() {
362367
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
363368
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
364-
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
369+
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
365370
}
366371
}
367372

@@ -372,17 +377,19 @@ private static class RestoreFileInputStream extends InputStream {
372377
private final DiscoveryNode node;
373378
private final StoreFileMetaData fileToRecover;
374379
private final CombinedRateLimiter rateLimiter;
380+
private final CcrSettings ccrSettings;
375381
private final LongConsumer throttleListener;
376382

377383
private long pos = 0;
378384

379385
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
380-
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
386+
CcrSettings ccrSettings, LongConsumer throttleListener) {
381387
this.remoteClient = remoteClient;
382388
this.sessionUUID = sessionUUID;
383389
this.node = node;
384390
this.fileToRecover = fileToRecover;
385-
this.rateLimiter = rateLimiter;
391+
this.ccrSettings = ccrSettings;
392+
this.rateLimiter = ccrSettings.getRateLimiter();
386393
this.throttleListener = throttleListener;
387394
}
388395

@@ -407,7 +414,7 @@ public int read(byte[] bytes, int off, int len) throws IOException {
407414
String fileName = fileToRecover.name();
408415
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
409416
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
410-
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet();
417+
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout());
411418
BytesReference fileChunk = response.getChunk();
412419

413420
int bytesReceived = fileChunk.length();

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@
6565
import org.elasticsearch.test.NodeConfigurationSource;
6666
import org.elasticsearch.test.TestCluster;
6767
import org.elasticsearch.test.discovery.TestZenDiscovery;
68+
import org.elasticsearch.test.transport.MockTransportService;
6869
import org.elasticsearch.transport.TransportService;
70+
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
6971
import org.elasticsearch.xpack.ccr.CcrSettings;
7072
import org.elasticsearch.xpack.ccr.LocalStateCcr;
7173
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
@@ -120,7 +122,8 @@ public final void startClusters() throws Exception {
120122

121123
stopClusters();
122124
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
123-
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin());
125+
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class,
126+
MockNioTransportPlugin.class);
124127

125128
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
126129
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins,

0 commit comments

Comments
 (0)