Skip to content

Commit ed9c4ef

Browse files
committed
Review comments
1 parent 8a8b13d commit ed9c4ef

File tree

15 files changed

+215
-179
lines changed

15 files changed

+215
-179
lines changed

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/SnapshotBasedIndexRecoveryIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080

8181
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
8282
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS;
83-
import static org.elasticsearch.indices.recovery.RecoverySnapshotFileDownloadsThrottler.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE;
83+
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE;
8484
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
8585
import static org.hamcrest.Matchers.empty;
8686
import static org.hamcrest.Matchers.equalTo;
@@ -1010,8 +1010,8 @@ public void testRecoveryReEstablishKeepsTheGrantedSnapshotFileDownloadPermit() t
10101010
}
10111011

10121012
private void executeRecoveryWithSnapshotFileDownloadThrottled(SnapshotBasedRecoveryThrottlingTestCase testCase) throws Exception {
1013-
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), "1");
10141013
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1");
1014+
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(), "1");
10151015

10161016
try {
10171017
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import org.elasticsearch.indices.breaker.BreakerSettings;
7878
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
7979
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
80-
import org.elasticsearch.indices.recovery.RecoverySnapshotFileDownloadsThrottler;
8180
import org.elasticsearch.indices.recovery.RecoverySettings;
8281
import org.elasticsearch.indices.store.IndicesStore;
8382
import org.elasticsearch.monitor.fs.FsHealthService;
@@ -217,7 +216,7 @@ public void apply(Settings value, Settings current, Settings previous) {
217216
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
218217
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
219218
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
220-
RecoverySnapshotFileDownloadsThrottler.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
219+
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
221220
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
222221
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
223222
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
326326
recoverySettings.getMaxConcurrentFileChunks(),
327327
recoverySettings.getMaxConcurrentOperations(),
328328
recoverySettings.getMaxConcurrentSnapshotFileDownloads(),
329-
recoverySettings.getUseSnapshotsDuringRecovery(),
330329
recoveryPlannerService);
331330
return Tuple.tuple(handler, recoveryTarget);
332331
}

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,14 @@
6060
import org.elasticsearch.transport.TransportService;
6161

6262
import java.io.IOException;
63+
import java.util.Locale;
6364
import java.util.concurrent.atomic.AtomicLong;
6465
import java.util.function.Consumer;
6566

67+
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
6668
import static org.elasticsearch.core.TimeValue.timeValueMillis;
6769
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
70+
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS;
6871

6972
/**
7073
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
@@ -96,21 +99,18 @@ public static class Actions {
9699
private final SnapshotFilesProvider snapshotFilesProvider;
97100

98101
private final RecoveriesCollection onGoingRecoveries;
99-
private final RecoverySnapshotFileDownloadsThrottler recoverySnapshotFileDownloadsThrottler;
100102

101103
public PeerRecoveryTargetService(ThreadPool threadPool,
102104
TransportService transportService,
103105
RecoverySettings recoverySettings,
104106
ClusterService clusterService,
105-
SnapshotFilesProvider snapshotFilesProvider,
106-
RecoverySnapshotFileDownloadsThrottler recoverySnapshotFileDownloadsThrottler) {
107+
SnapshotFilesProvider snapshotFilesProvider) {
107108
this.threadPool = threadPool;
108109
this.transportService = transportService;
109110
this.recoverySettings = recoverySettings;
110111
this.clusterService = clusterService;
111112
this.snapshotFilesProvider = snapshotFilesProvider;
112113
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
113-
this.recoverySnapshotFileDownloadsThrottler = recoverySnapshotFileDownloadsThrottler;
114114

115115
transportService.registerRequestHandler(Actions.FILES_INFO, ThreadPool.Names.GENERIC, RecoveryFilesInfoRequest::new,
116116
new FilesInfoRequestHandler());
@@ -141,8 +141,17 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
141141
}
142142

143143
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
144-
final Releasable snapshotFileDownloadsPermit =
145-
recoverySnapshotFileDownloadsThrottler.tryAcquire(recoverySettings.getMaxConcurrentSnapshotFileDownloads());
144+
final Releasable snapshotFileDownloadsPermit = recoverySettings.tryAcquireSnapshotDownloadPermits();
145+
if (snapshotFileDownloadsPermit == null) {
146+
logger.warn(String.format(Locale.ROOT,
147+
"Unable to acquire permit to use snapshot files during recovery, this recovery will recover from the source node. " +
148+
"[%s] should have the same value as [%s]/[%s]",
149+
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(),
150+
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(),
151+
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey()
152+
)
153+
);
154+
}
146155
// create a new recovery status, and process...
147156
final long recoveryId = onGoingRecoveries.startRecovery(
148157
indexShard,

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,25 @@
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.unit.ByteSizeUnit;
2222
import org.elasticsearch.common.unit.ByteSizeValue;
23+
import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
24+
import org.elasticsearch.core.Nullable;
25+
import org.elasticsearch.core.Releasable;
26+
import org.elasticsearch.core.Releasables;
2327
import org.elasticsearch.core.TimeValue;
2428
import org.elasticsearch.jdk.JavaVersion;
2529
import org.elasticsearch.monitor.os.OsProbe;
2630
import org.elasticsearch.node.NodeRoleSettings;
2731

32+
import java.util.Collection;
33+
import java.util.Collections;
34+
import java.util.Iterator;
2835
import java.util.List;
36+
import java.util.Locale;
37+
import java.util.Map;
2938
import java.util.stream.Collectors;
3039

40+
import static org.elasticsearch.common.settings.Setting.parseInt;
41+
3142
public class RecoverySettings {
3243
public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0;
3344
public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0;
@@ -135,7 +146,7 @@ public class RecoverySettings {
135146

136147
/**
137148
* recoveries would try to use files from available snapshots instead of sending them from the source node.
138-
* defaults to `false`
149+
* defaults to `true`
139150
*/
140151
public static final Setting<Boolean> INDICES_RECOVERY_USE_SNAPSHOTS_SETTING =
141152
Setting.boolSetting("indices.recovery.use_snapshots", true, Property.Dynamic, Property.NodeScope);
@@ -149,6 +160,43 @@ public class RecoverySettings {
149160
Property.NodeScope
150161
);
151162

163+
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE = new Setting<>(
164+
"indices.recovery.max_concurrent_snapshot_file_downloads_per_node",
165+
"25",
166+
(s) -> parseInt(s, 1, 25, "indices.recovery.max_concurrent_snapshot_file_downloads_per_node", false),
167+
new Setting.Validator<>() {
168+
private final Collection<Setting<?>> dependencies =
169+
Collections.singletonList(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS);
170+
@Override
171+
public void validate(Integer value) {
172+
// ignore
173+
}
174+
175+
@Override
176+
public void validate(Integer maxConcurrentSnapshotFileDownloadsPerNode, Map<Setting<?>, Object> settings) {
177+
int maxConcurrentSnapshotFileDownloads = (int) settings.get(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS);
178+
if (maxConcurrentSnapshotFileDownloadsPerNode < maxConcurrentSnapshotFileDownloads) {
179+
throw new IllegalArgumentException(
180+
String.format(Locale.ROOT,
181+
"[%s]=%d is less than [%s]=%d",
182+
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.getKey(),
183+
maxConcurrentSnapshotFileDownloadsPerNode,
184+
INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(),
185+
maxConcurrentSnapshotFileDownloads
186+
)
187+
);
188+
}
189+
}
190+
191+
@Override
192+
public Iterator<Setting<?>> settings() {
193+
return dependencies.iterator();
194+
}
195+
},
196+
Setting.Property.Dynamic,
197+
Setting.Property.NodeScope
198+
);
199+
152200
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
153201

154202
private volatile ByteSizeValue maxBytesPerSec;
@@ -162,7 +210,9 @@ public class RecoverySettings {
162210
private volatile TimeValue internalActionRetryTimeout;
163211
private volatile TimeValue internalActionLongTimeout;
164212
private volatile boolean useSnapshotsDuringRecovery;
165-
private volatile int getMaxConcurrentSnapshotFileDownloads;
213+
private volatile int maxConcurrentSnapshotFileDownloads;
214+
215+
private final AdjustableSemaphore maxSnapshotFileDownloadsPerNodeSemaphore;
166216

167217
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
168218

@@ -186,7 +236,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
186236
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
187237
}
188238
this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings);
189-
this.getMaxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
239+
this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
190240

191241
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
192242

@@ -202,7 +252,12 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
202252
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
203253
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, this::setUseSnapshotsDuringRecovery);
204254
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
205-
this::setGetMaxConcurrentSnapshotFileDownloads);
255+
this::setMaxConcurrentSnapshotFileDownloads);
256+
257+
int maxSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings);
258+
this.maxSnapshotFileDownloadsPerNodeSemaphore = new AdjustableSemaphore(maxSnapshotFileDownloadsPerNode, true);
259+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
260+
this::setMaxConcurrentSnapshotFileDownloadsPerNode);
206261
}
207262

208263
public RateLimiter rateLimiter() {
@@ -298,10 +353,25 @@ private void setUseSnapshotsDuringRecovery(boolean useSnapshotsDuringRecovery) {
298353
}
299354

300355
public int getMaxConcurrentSnapshotFileDownloads() {
301-
return getMaxConcurrentSnapshotFileDownloads;
356+
return maxConcurrentSnapshotFileDownloads;
302357
}
303358

304-
public void setGetMaxConcurrentSnapshotFileDownloads(int getMaxConcurrentSnapshotFileDownloads) {
305-
this.getMaxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads;
359+
public void setMaxConcurrentSnapshotFileDownloads(int getMaxConcurrentSnapshotFileDownloads) {
360+
this.maxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads;
361+
}
362+
363+
private void setMaxConcurrentSnapshotFileDownloadsPerNode(int maxConcurrentSnapshotFileDownloadsPerNode) {
364+
maxSnapshotFileDownloadsPerNodeSemaphore.setMaxPermits(maxConcurrentSnapshotFileDownloadsPerNode);
365+
}
366+
367+
@Nullable
368+
Releasable tryAcquireSnapshotDownloadPermits() {
369+
final int maxConcurrentSnapshotFileDownloads = getMaxConcurrentSnapshotFileDownloads();
370+
if (getUseSnapshotsDuringRecovery() == false ||
371+
maxSnapshotFileDownloadsPerNodeSemaphore.tryAcquire(maxConcurrentSnapshotFileDownloads) == false) {
372+
return null;
373+
}
374+
375+
return Releasables.releaseOnce(() -> maxSnapshotFileDownloadsPerNodeSemaphore.release(maxConcurrentSnapshotFileDownloads));
306376
}
307377
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySnapshotFileDownloadsThrottler.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ public class RecoverySourceHandler {
118118
private final int maxConcurrentFileChunks;
119119
private final int maxConcurrentOperations;
120120
private final int maxConcurrentSnapshotFileDownloads;
121-
private final boolean useSnapshots;
122121
private final ThreadPool threadPool;
123122
private final RecoveryPlannerService recoveryPlannerService;
124123
private final CancellableThreads cancellableThreads = new CancellableThreads();
@@ -127,7 +126,7 @@ public class RecoverySourceHandler {
127126

128127
public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool,
129128
StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks,
130-
int maxConcurrentOperations, int maxConcurrentSnapshotFileDownloads, boolean useSnapshots,
129+
int maxConcurrentOperations, int maxConcurrentSnapshotFileDownloads,
131130
RecoveryPlannerService recoveryPlannerService) {
132131
this.shard = shard;
133132
this.recoveryTarget = recoveryTarget;
@@ -140,7 +139,6 @@ public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTar
140139
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
141140
this.maxConcurrentOperations = maxConcurrentOperations;
142141
this.maxConcurrentSnapshotFileDownloads = maxConcurrentSnapshotFileDownloads;
143-
this.useSnapshots = useSnapshots;
144142
}
145143

146144
public StartRecoveryRequest getRequest() {
@@ -486,15 +484,14 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
486484
}
487485
if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
488486
cancellableThreads.checkForCancel();
489-
boolean canUseSnapshots = useSnapshots && request.hasPermitsToDownloadSnapshotFiles();
490487
recoveryPlannerService.computeRecoveryPlan(shard.shardId(),
491488
shardStateIdentifier,
492489
recoverySourceMetadata,
493490
request.metadataSnapshot(),
494491
startingSeqNo,
495492
translogOps.getAsInt(),
496493
getRequest().targetNode().getVersion(),
497-
canUseSnapshots,
494+
request.canDownloadSnapshotFiles(),
498495
ActionListener.wrap(plan ->
499496
recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, listener), listener::onFailure)
500497
);

0 commit comments

Comments
 (0)