Skip to content

Commit 8a8b13d

Browse files
committed
Limit the number of concurrent snapshot file restores per node during recoveries
Today we limit the max number of concurrent snapshot file restores per recovery. This works well when the default node_concurrent_recoveries is used (which is 2). When this limit is increased, it is possible to exahust the underlying repository connection pool, affecting other workloads. This commit adds a new setting `indices.recovery.max_concurrent_snapshot_file_downloads_per_node` that allows to limit the max number of snapshot file downloads per node during recoveries. When a recovery starts in the target node it tries to acquire a permit that allows it to download snapshot files when it is granted. This is communicated to the source node in the StartRecoveryRequest. This is a rather conservative approach since it is possible that a recovery that gets a permit to use snapshot files doesn't recover any snapshot file while there's a concurrent recovery that doesn't get a permit could take advantage of recovering from a snapshot. Closes #79044
1 parent fceacfe commit 8a8b13d

29 files changed

+587
-86
lines changed

docs/reference/modules/indices/recovery.asciidoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,10 @@ sent in parallel to the target node for each recovery. Defaults to `5`.
100100
+
101101
Do not increase this setting without carefully verifying that your cluster has
102102
the resources available to handle the extra load that will result.
103+
104+
`indices.recovery.max_concurrent_snapshot_file_downloads_per_node`::
105+
(<<cluster-update-settings,Dynamic>>, Expert) Number of snapshot file downloads requests
106+
execyted in parallel in the target node for all recoveries. Defaults to `25`.
107+
+
108+
Do not increase this setting without carefully verifying that your cluster has
109+
the resources available to handle the extra load that will result.

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java

Lines changed: 269 additions & 3 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@
1919
import org.elasticsearch.common.settings.Setting;
2020
import org.elasticsearch.common.settings.Setting.Property;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
2223
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.common.util.concurrent.RunOnce;
2425
import org.elasticsearch.xcontent.XContentType;
2526
import org.elasticsearch.index.Index;
2627
import org.elasticsearch.index.mapper.Mapping;
2728

28-
import java.util.concurrent.Semaphore;
29-
3029
/**
3130
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
3231
* in the cluster state meta data (and broadcast to all members).
@@ -106,30 +105,4 @@ protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListe
106105
client.execute(AutoPutMappingAction.INSTANCE, putMappingRequest,
107106
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
108107
}
109-
110-
static class AdjustableSemaphore extends Semaphore {
111-
112-
private final Object maxPermitsMutex = new Object();
113-
private int maxPermits;
114-
115-
AdjustableSemaphore(int maxPermits, boolean fair) {
116-
super(maxPermits, fair);
117-
this.maxPermits = maxPermits;
118-
}
119-
120-
void setMaxPermits(int permits) {
121-
synchronized (maxPermitsMutex) {
122-
final int diff = Math.subtractExact(permits, maxPermits);
123-
if (diff > 0) {
124-
// add permits
125-
release(diff);
126-
} else if (diff < 0) {
127-
// remove permits
128-
reducePermits(Math.negateExact(diff));
129-
}
130-
131-
maxPermits = permits;
132-
}
133-
}
134-
}
135108
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.elasticsearch.indices.breaker.BreakerSettings;
7878
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
7979
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
80+
import org.elasticsearch.indices.recovery.RecoverySnapshotFileDownloadsThrottler;
8081
import org.elasticsearch.indices.recovery.RecoverySettings;
8182
import org.elasticsearch.indices.store.IndicesStore;
8283
import org.elasticsearch.monitor.fs.FsHealthService;
@@ -216,6 +217,7 @@ public void apply(Settings value, Settings current, Settings previous) {
216217
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
217218
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
218219
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
220+
RecoverySnapshotFileDownloadsThrottler.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
219221
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
220222
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
221223
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.common.util.concurrent;
10+
11+
import java.util.concurrent.Semaphore;
12+
13+
public class AdjustableSemaphore extends Semaphore {
14+
15+
private final Object maxPermitsMutex = new Object();
16+
private int maxPermits;
17+
18+
public AdjustableSemaphore(int maxPermits, boolean fair) {
19+
super(maxPermits, fair);
20+
this.maxPermits = maxPermits;
21+
}
22+
23+
public void setMaxPermits(int permits) {
24+
synchronized (maxPermitsMutex) {
25+
final int diff = Math.subtractExact(permits, maxPermits);
26+
if (diff > 0) {
27+
// add permits
28+
release(diff);
29+
} else if (diff < 0) {
30+
// remove permits
31+
reducePermits(Math.negateExact(diff));
32+
}
33+
34+
maxPermits = permits;
35+
}
36+
}
37+
}

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,18 +96,21 @@ public static class Actions {
9696
private final SnapshotFilesProvider snapshotFilesProvider;
9797

9898
private final RecoveriesCollection onGoingRecoveries;
99+
private final RecoverySnapshotFileDownloadsThrottler recoverySnapshotFileDownloadsThrottler;
99100

100101
public PeerRecoveryTargetService(ThreadPool threadPool,
101102
TransportService transportService,
102103
RecoverySettings recoverySettings,
103104
ClusterService clusterService,
104-
SnapshotFilesProvider snapshotFilesProvider) {
105+
SnapshotFilesProvider snapshotFilesProvider,
106+
RecoverySnapshotFileDownloadsThrottler recoverySnapshotFileDownloadsThrottler) {
105107
this.threadPool = threadPool;
106108
this.transportService = transportService;
107109
this.recoverySettings = recoverySettings;
108110
this.clusterService = clusterService;
109111
this.snapshotFilesProvider = snapshotFilesProvider;
110112
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
113+
this.recoverySnapshotFileDownloadsThrottler = recoverySnapshotFileDownloadsThrottler;
111114

112115
transportService.registerRequestHandler(Actions.FILES_INFO, ThreadPool.Names.GENERIC, RecoveryFilesInfoRequest::new,
113116
new FilesInfoRequestHandler());
@@ -138,9 +141,17 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
138141
}
139142

140143
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
144+
final Releasable snapshotFileDownloadsPermit =
145+
recoverySnapshotFileDownloadsThrottler.tryAcquire(recoverySettings.getMaxConcurrentSnapshotFileDownloads());
141146
// create a new recovery status, and process...
142-
final long recoveryId =
143-
onGoingRecoveries.startRecovery(indexShard, sourceNode, snapshotFilesProvider, listener, recoverySettings.activityTimeout());
147+
final long recoveryId = onGoingRecoveries.startRecovery(
148+
indexShard,
149+
sourceNode,
150+
snapshotFilesProvider,
151+
listener,
152+
recoverySettings.activityTimeout(),
153+
snapshotFileDownloadsPermit
154+
);
144155
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
145156
// assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
146157
threadPool.generic().execute(new RecoveryRunner(recoveryId));
@@ -267,7 +278,9 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov
267278
metadataSnapshot,
268279
recoveryTarget.state().getPrimary(),
269280
recoveryTarget.recoveryId(),
270-
startingSeqNo);
281+
startingSeqNo,
282+
recoveryTarget.hasPermitToDownloadSnapshotFiles()
283+
);
271284
return request;
272285
}
273286

server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1616
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
17+
import org.elasticsearch.core.Nullable;
18+
import org.elasticsearch.core.Releasable;
1719
import org.elasticsearch.core.TimeValue;
1820
import org.elasticsearch.index.shard.IndexShard;
1921
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -54,8 +56,10 @@ public long startRecovery(IndexShard indexShard,
5456
DiscoveryNode sourceNode,
5557
SnapshotFilesProvider snapshotFilesProvider,
5658
PeerRecoveryTargetService.RecoveryListener listener,
57-
TimeValue activityTimeout) {
58-
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, listener);
59+
TimeValue activityTimeout,
60+
@Nullable Releasable snapshotFileDownloadsPermit) {
61+
RecoveryTarget recoveryTarget =
62+
new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermit, listener);
5963
startRecoveryInternal(recoveryTarget, activityTimeout);
6064
return recoveryTarget.recoveryId();
6165
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
public class RecoverySettings {
3232
public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0;
3333
public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0;
34+
public static final Version SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION = Version.V_8_0_0;
3435

3536
private static final Logger logger = LogManager.getLogger(RecoverySettings.class);
3637

@@ -161,7 +162,7 @@ public class RecoverySettings {
161162
private volatile TimeValue internalActionRetryTimeout;
162163
private volatile TimeValue internalActionLongTimeout;
163164
private volatile boolean useSnapshotsDuringRecovery;
164-
private volatile int maxConcurrentSnapshotFileDownloads;
165+
private volatile int getMaxConcurrentSnapshotFileDownloads;
165166

166167
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
167168

@@ -185,7 +186,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
185186
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
186187
}
187188
this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings);
188-
this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
189+
this.getMaxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
189190

190191
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
191192

@@ -201,7 +202,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
201202
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
202203
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery);
203204
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
204-
this::setMaxConcurrentSnapshotFileDownloads);
205+
this::setGetMaxConcurrentSnapshotFileDownloads);
205206
}
206207

207208
public RateLimiter rateLimiter() {
@@ -297,10 +298,10 @@ private void setUseSnapshotsDuringRecovery(boolean useSnapshotsDuringRecovery) {
297298
}
298299

299300
public int getMaxConcurrentSnapshotFileDownloads() {
300-
return maxConcurrentSnapshotFileDownloads;
301+
return getMaxConcurrentSnapshotFileDownloads;
301302
}
302303

303-
public void setMaxConcurrentSnapshotFileDownloads(int maxConcurrentSnapshotFileDownloads) {
304-
this.maxConcurrentSnapshotFileDownloads = maxConcurrentSnapshotFileDownloads;
304+
public void setGetMaxConcurrentSnapshotFileDownloads(int getMaxConcurrentSnapshotFileDownloads) {
305+
this.getMaxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads;
305306
}
306307
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.indices.recovery;
10+
11+
import org.elasticsearch.common.settings.ClusterSettings;
12+
import org.elasticsearch.common.settings.Setting;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
15+
import org.elasticsearch.core.Nullable;
16+
import org.elasticsearch.core.Releasable;
17+
import org.elasticsearch.core.Releasables;
18+
19+
public class RecoverySnapshotFileDownloadsThrottler {
20+
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE =
21+
Setting.intSetting("indices.recovery.max_concurrent_snapshot_file_downloads_per_node",
22+
25,
23+
1,
24+
25,
25+
Setting.Property.Dynamic,
26+
Setting.Property.NodeScope
27+
);
28+
29+
private final AdjustableSemaphore semaphore;
30+
31+
public RecoverySnapshotFileDownloadsThrottler(Settings settings, ClusterSettings clusterSettings) {
32+
int maxSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings);
33+
this.semaphore = new AdjustableSemaphore(maxSnapshotFileDownloadsPerNode, true);
34+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, this::updateMaxPermits);
35+
}
36+
37+
@Nullable
38+
public Releasable tryAcquire(int count) {
39+
if (semaphore.tryAcquire(count)) {
40+
return Releasables.releaseOnce(() -> semaphore.release(count));
41+
}
42+
43+
return null;
44+
}
45+
46+
private void updateMaxPermits(int updatedMaxConcurrentSnapshotFileDownloadsPerNode) {
47+
semaphore.setMaxPermits(updatedMaxConcurrentSnapshotFileDownloadsPerNode);
48+
}
49+
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,14 +486,15 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
486486
}
487487
if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
488488
cancellableThreads.checkForCancel();
489+
boolean canUseSnapshots = useSnapshots && request.hasPermitsToDownloadSnapshotFiles();
489490
recoveryPlannerService.computeRecoveryPlan(shard.shardId(),
490491
shardStateIdentifier,
491492
recoverySourceMetadata,
492493
request.metadataSnapshot(),
493494
startingSeqNo,
494495
translogOps.getAsInt(),
495496
getRequest().targetNode().getVersion(),
496-
useSnapshots,
497+
canUseSnapshots,
497498
ActionListener.wrap(plan ->
498499
recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure)
499500
);

0 commit comments

Comments
 (0)