Skip to content

Commit b6f06a4

Browse files
authored
Implement follower rate limiting for file restore (#37449)
This is related to #35975. This commit implements rate limiting on the follower side using a new class `CombinedRateLimiter`.
1 parent 381d035 commit b6f06a4

File tree

5 files changed

+189
-21
lines changed

5 files changed

+189
-21
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.util;
21+
22+
import org.apache.lucene.store.RateLimiter;
23+
import org.elasticsearch.common.unit.ByteSizeValue;
24+
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
/**
28+
* A rate limiter designed for multiple concurrent users.
29+
*/
30+
public class CombinedRateLimiter {
31+
32+
// TODO: This rate limiter has some concurrency issues between the two maybePause operations
33+
34+
private final AtomicLong bytesSinceLastPause = new AtomicLong();
35+
private final RateLimiter.SimpleRateLimiter rateLimiter;
36+
private volatile boolean rateLimit;
37+
38+
public CombinedRateLimiter(ByteSizeValue maxBytesPerSec) {
39+
rateLimit = maxBytesPerSec.getBytes() > 0;
40+
rateLimiter = new RateLimiter.SimpleRateLimiter(maxBytesPerSec.getMbFrac());
41+
}
42+
43+
public long maybePause(int bytes) {
44+
if (rateLimit) {
45+
long bytesSincePause = bytesSinceLastPause.addAndGet(bytes);
46+
if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) {
47+
// Time to pause
48+
bytesSinceLastPause.addAndGet(-bytesSincePause);
49+
return Math.max(rateLimiter.pause(bytesSincePause), 0);
50+
}
51+
}
52+
return 0;
53+
}
54+
55+
public void setMBPerSec(ByteSizeValue maxBytesPerSec) {
56+
rateLimit = maxBytesPerSec.getBytes() > 0;
57+
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
58+
}
59+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
117117
private final Settings settings;
118118
private final CcrLicenseChecker ccrLicenseChecker;
119119
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
120+
private final SetOnce<CcrSettings> ccrSettings = new SetOnce<>();
120121
private Client client;
121122

122123
/**
@@ -159,6 +160,8 @@ public Collection<Object> createComponents(
159160

160161
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService();
161162
this.restoreSourceService.set(restoreSourceService);
163+
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
164+
this.ccrSettings.set(ccrSettings);
162165
return Arrays.asList(
163166
ccrLicenseChecker,
164167
restoreSourceService,
@@ -291,7 +294,8 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
291294

292295
@Override
293296
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
294-
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings);
297+
Repository.Factory repositoryFactory =
298+
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get());
295299
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
296300
}
297301

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
*/
66
package org.elasticsearch.xpack.ccr;
77

8+
import org.elasticsearch.common.settings.ClusterSettings;
89
import org.elasticsearch.common.settings.Setting;
910
import org.elasticsearch.common.settings.Setting.Property;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.unit.ByteSizeUnit;
13+
import org.elasticsearch.common.unit.ByteSizeValue;
1014
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.common.util.CombinedRateLimiter;
1116
import org.elasticsearch.xpack.core.XPackSettings;
1217

1318
import java.util.Arrays;
@@ -18,11 +23,6 @@
1823
*/
1924
public final class CcrSettings {
2025

21-
// prevent construction
22-
private CcrSettings() {
23-
24-
}
25-
2626
/**
2727
* Index setting for a following index.
2828
*/
@@ -35,6 +35,14 @@ private CcrSettings() {
3535
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
3636
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);
3737

38+
39+
/**
40+
* Max bytes a node can recover per second.
41+
*/
42+
public static final Setting<ByteSizeValue> RECOVERY_MAX_BYTES_PER_SECOND =
43+
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
44+
Setting.Property.Dynamic, Setting.Property.NodeScope);
45+
3846
/**
3947
* The settings defined by CCR.
4048
*
@@ -44,7 +52,23 @@ static List<Setting<?>> getSettings() {
4452
return Arrays.asList(
4553
XPackSettings.CCR_ENABLED_SETTING,
4654
CCR_FOLLOWING_INDEX_SETTING,
55+
RECOVERY_MAX_BYTES_PER_SECOND,
4756
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
4857
}
4958

59+
private final CombinedRateLimiter ccrRateLimiter;
60+
61+
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
62+
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
63+
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
64+
}
65+
66+
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
67+
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
68+
}
69+
70+
public CombinedRateLimiter getRateLimiter() {
71+
return ccrRateLimiter;
72+
}
73+
5074
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.elasticsearch.common.collect.ImmutableOpenMap;
2525
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2626
import org.elasticsearch.common.io.stream.StreamInput;
27+
import org.elasticsearch.common.metrics.CounterMetric;
2728
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.unit.ByteSizeValue;
30+
import org.elasticsearch.common.util.CombinedRateLimiter;
2931
import org.elasticsearch.index.Index;
3032
import org.elasticsearch.index.IndexSettings;
3133
import org.elasticsearch.index.engine.EngineException;
@@ -49,6 +51,7 @@
4951
import org.elasticsearch.snapshots.SnapshotState;
5052
import org.elasticsearch.xpack.ccr.Ccr;
5153
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
54+
import org.elasticsearch.xpack.ccr.CcrSettings;
5255
import org.elasticsearch.xpack.ccr.action.CcrRequests;
5356
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
5457
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
@@ -66,6 +69,7 @@
6669
import java.util.List;
6770
import java.util.Map;
6871
import java.util.Set;
72+
import java.util.function.LongConsumer;
6973

7074
/**
7175
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
@@ -79,12 +83,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
7983
private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST);
8084

8185
private final RepositoryMetaData metadata;
86+
private final CcrSettings ccrSettings;
8287
private final String remoteClusterAlias;
8388
private final Client client;
8489
private final CcrLicenseChecker ccrLicenseChecker;
8590

86-
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) {
91+
private final CounterMetric throttledTime = new CounterMetric();
92+
93+
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
94+
CcrSettings ccrSettings) {
8795
this.metadata = metadata;
96+
this.ccrSettings = ccrSettings;
8897
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
8998
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
9099
this.ccrLicenseChecker = ccrLicenseChecker;
@@ -206,7 +215,7 @@ public long getSnapshotThrottleTimeInNanos() {
206215

207216
@Override
208217
public long getRestoreThrottleTimeInNanos() {
209-
return 0;
218+
return throttledTime.count();
210219
}
211220

212221
@Override
@@ -257,7 +266,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
257266
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
258267
// response, we should be able to retry by creating a new session.
259268
String name = metadata.name();
260-
try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
269+
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
261270
restoreSession.restoreFiles();
262271
} catch (Exception e) {
263272
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
@@ -285,6 +294,15 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
285294
}
286295
}
287296

297+
private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
298+
RecoveryState recoveryState) {
299+
String sessionUUID = UUIDs.randomBase64UUID();
300+
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
301+
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
302+
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
303+
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
304+
}
305+
288306
private static class RestoreSession extends FileRestoreContext implements Closeable {
289307

290308
private static final int BUFFER_SIZE = 1 << 16;
@@ -293,23 +311,19 @@ private static class RestoreSession extends FileRestoreContext implements Closea
293311
private final String sessionUUID;
294312
private final DiscoveryNode node;
295313
private final Store.MetadataSnapshot sourceMetaData;
314+
private final CombinedRateLimiter rateLimiter;
315+
private final LongConsumer throttleListener;
296316

297317
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
298-
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) {
318+
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
319+
LongConsumer throttleListener) {
299320
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
300321
this.remoteClient = remoteClient;
301322
this.sessionUUID = sessionUUID;
302323
this.node = node;
303324
this.sourceMetaData = sourceMetaData;
304-
}
305-
306-
static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
307-
RecoveryState recoveryState) {
308-
String sessionUUID = UUIDs.randomBase64UUID();
309-
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
310-
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
311-
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
312-
response.getStoreFileMetaData());
325+
this.rateLimiter = rateLimiter;
326+
this.throttleListener = throttleListener;
313327
}
314328

315329
void restoreFiles() throws IOException {
@@ -324,7 +338,7 @@ void restoreFiles() throws IOException {
324338

325339
@Override
326340
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
327-
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata());
341+
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
328342
}
329343

330344
@Override
@@ -341,14 +355,19 @@ private static class RestoreFileInputStream extends InputStream {
341355
private final String sessionUUID;
342356
private final DiscoveryNode node;
343357
private final StoreFileMetaData fileToRecover;
358+
private final CombinedRateLimiter rateLimiter;
359+
private final LongConsumer throttleListener;
344360

345361
private long pos = 0;
346362

347-
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) {
363+
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
364+
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
348365
this.remoteClient = remoteClient;
349366
this.sessionUUID = sessionUUID;
350367
this.node = node;
351368
this.fileToRecover = fileToRecover;
369+
this.rateLimiter = rateLimiter;
370+
this.throttleListener = throttleListener;
352371
}
353372

354373

@@ -365,6 +384,10 @@ public int read(byte[] bytes, int off, int len) throws IOException {
365384
}
366385

367386
int bytesRequested = (int) Math.min(remainingBytes, len);
387+
388+
long nanosPaused = rateLimiter.maybePause(bytesRequested);
389+
throttleListener.accept(nanosPaused);
390+
368391
String fileName = fileToRecover.name();
369392
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
370393
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
@@ -388,5 +411,6 @@ public int read(byte[] bytes, int off, int len) throws IOException {
388411

389412
return bytesReceived;
390413
}
414+
391415
}
392416
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.collect.ImmutableOpenMap;
2424
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.unit.ByteSizeValue;
2526
import org.elasticsearch.common.unit.TimeValue;
2627
import org.elasticsearch.common.xcontent.XContentType;
2728
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@@ -39,6 +40,8 @@
3940
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
4041

4142
import java.io.IOException;
43+
import java.util.ArrayList;
44+
import java.util.List;
4245
import java.util.Locale;
4346
import java.util.Map;
4447
import java.util.concurrent.TimeUnit;
@@ -234,6 +237,60 @@ public void testDocsAreRecovered() throws Exception {
234237
thread.join();
235238
}
236239

240+
public void testRateLimitingIsEmployed() throws Exception {
241+
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
242+
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
243+
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
244+
245+
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
246+
String leaderIndex = "index1";
247+
String followerIndex = "index2";
248+
249+
final int numberOfPrimaryShards = randomIntBetween(1, 3);
250+
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
251+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
252+
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
253+
ensureLeaderGreen(leaderIndex);
254+
255+
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
256+
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
257+
258+
List<CcrRepository> repositories = new ArrayList<>();
259+
260+
for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
261+
Repository repository = repositoriesService.repository(leaderClusterRepoName);
262+
repositories.add((CcrRepository) repository);
263+
}
264+
265+
logger.info("--> indexing some data");
266+
for (int i = 0; i < 100; i++) {
267+
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
268+
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
269+
}
270+
271+
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
272+
273+
Settings.Builder settingsBuilder = Settings.builder()
274+
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
275+
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
276+
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
277+
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
278+
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
279+
false, true, settingsBuilder.build(), new String[0],
280+
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");
281+
282+
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
283+
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
284+
future.actionGet();
285+
286+
assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
287+
288+
settingsRequest = new ClusterUpdateSettingsRequest();
289+
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
290+
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
291+
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
292+
}
293+
237294
public void testFollowerMappingIsUpdated() throws IOException {
238295
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
239296
String leaderIndex = "index1";

0 commit comments

Comments
 (0)