From adae8e80af8537e010f28e4978e13f7d4d17ade1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 14 Jan 2019 17:48:52 -0700 Subject: [PATCH 1/4] Implement follower rate limiting for file restore This is related to #35975. This commit implements rate limiting on the follower side using the `RateLimitingInputStream`. --- .../xpack/ccr/repository/CcrRepository.java | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 025892f80d834..a8a310ffcd5c1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.repository; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.store.RateLimiter; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -25,6 +26,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -35,6 +37,7 @@ import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -82,6 +85,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final String remoteClusterAlias; private final Client client; private final CcrLicenseChecker ccrLicenseChecker; + private final RateLimiter.SimpleRateLimiter rateLimiter; public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) { super(settings); @@ -90,6 +94,7 @@ public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseCheck this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1]; this.ccrLicenseChecker = ccrLicenseChecker; this.client = client; + this.rateLimiter = new RateLimiter.SimpleRateLimiter(new ByteSizeValue(40, ByteSizeUnit.MB).getMbFrac()); } @Override @@ -258,7 +263,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v // TODO: There should be some local timeout. And if the remote cluster returns an unknown session // response, we should be able to retry by creating a new session. String name = metadata.name(); - try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { + try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { restoreSession.restoreFiles(); } catch (Exception e) { throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); @@ -286,7 +291,16 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index } } - private static class RestoreSession extends FileRestoreContext implements Closeable { + private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, + RecoveryState recoveryState) { + String sessionUUID = UUIDs.randomBase64UUID(); + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); + return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, + response.getStoreFileMetaData()); + } + + private class RestoreSession extends FileRestoreContext implements Closeable { private static final int BUFFER_SIZE = 1 << 16; @@ -304,15 +318,6 @@ private static class RestoreSession extends FileRestoreContext implements Closea this.sourceMetaData = sourceMetaData; } - static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, - RecoveryState recoveryState) { - String sessionUUID = UUIDs.randomBase64UUID(); - PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); - return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, - response.getStoreFileMetaData()); - } - void restoreFiles() throws IOException { ArrayList fileInfos = new ArrayList<>(); for (StoreFileMetaData fileMetaData : sourceMetaData) { @@ -325,7 +330,8 @@ void restoreFiles() throws IOException { @Override protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); + RestoreFileInputStream restoreInputStream = new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); + return new RateLimitingInputStream(restoreInputStream, rateLimiter, (n) -> {}); } @Override @@ -336,7 +342,7 @@ public void close() { } } - private static class RestoreFileInputStream extends InputStream { + private class RestoreFileInputStream extends InputStream { private final Client remoteClient; private final String sessionUUID; From f26e0e6f2fcf6421cdb5e47f32899aa57bff2a5a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 15 Jan 2019 12:40:39 -0700 Subject: [PATCH 2/4] Changes --- .../elasticsearch/xpack/ccr/CcrSettings.java | 11 ++++ .../xpack/ccr/repository/CcrRepository.java | 31 ++++++++-- .../elasticsearch/xpack/CcrIntegTestCase.java | 20 +++++-- .../xpack/ccr/CcrRepositoryIT.java | 57 +++++++++++++++++++ 4 files changed, 108 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d7495dec8c2cf..fde8a11f5e3ee 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,6 +7,8 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.XPackSettings; @@ -35,6 +37,14 @@ private CcrSettings() { public static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( "ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + + /** + * Max bytes a follower node can recover per second. + */ + public static final Setting FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND = + Setting.byteSizeSetting("ccr.follower.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), + Setting.Property.Dynamic, Setting.Property.NodeScope); + /** * The settings defined by CCR. * @@ -44,6 +54,7 @@ static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, + FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index a8a310ffcd5c1..273d8a611b680 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -25,8 +25,8 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -37,7 +37,6 @@ import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -69,6 +68,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.xpack.ccr.CcrSettings.FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND; /** * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to @@ -85,7 +87,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final String remoteClusterAlias; private final Client client; private final CcrLicenseChecker ccrLicenseChecker; + private final RateLimiter.SimpleRateLimiter rateLimiter; + private final CounterMetric throttledTime = new CounterMetric(); + private final AtomicLong bytesSinceLastPause = new AtomicLong(); public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) { super(settings); @@ -94,7 +99,7 @@ public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseCheck this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1]; this.ccrLicenseChecker = ccrLicenseChecker; this.client = client; - this.rateLimiter = new RateLimiter.SimpleRateLimiter(new ByteSizeValue(40, ByteSizeUnit.MB).getMbFrac()); + this.rateLimiter = new RateLimiter.SimpleRateLimiter(FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.get(settings).getMbFrac()); } @Override @@ -212,7 +217,7 @@ public long getSnapshotThrottleTimeInNanos() { @Override public long getRestoreThrottleTimeInNanos() { - return 0; + return throttledTime.count(); } @Override @@ -330,8 +335,7 @@ void restoreFiles() throws IOException { @Override protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - RestoreFileInputStream restoreInputStream = new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); - return new RateLimitingInputStream(restoreInputStream, rateLimiter, (n) -> {}); + return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); } @Override @@ -372,6 +376,9 @@ public int read(byte[] bytes, int off, int len) throws IOException { } int bytesRequested = (int) Math.min(remainingBytes, len); + + maybePause(bytesRequested); + String fileName = fileToRecover.name(); GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested); GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response = @@ -395,5 +402,17 @@ public int read(byte[] bytes, int off, int len) throws IOException { return bytesReceived; } + + private void maybePause(int bytesRequested) { + long bytesSincePause = bytesSinceLastPause.addAndGet(bytesRequested); + if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytesSincePause); + long throttleTimeInNanos = rateLimiter.pause(bytesSincePause); + if (throttleTimeInNanos > 0) { + throttledTime.inc(throttleTimeInNanos); + } + } + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 7af3d690e3a99..94ebd23f15575 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -111,6 +111,10 @@ public abstract class CcrIntegTestCase extends ESTestCase { @Before public final void startClusters() throws Exception { + startClusters(Settings.EMPTY); + } + + private void startClusters(Settings additionalSettings) throws Exception { if (clusterGroup != null && reuseClusters()) { clusterGroup.leaderCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster()); clusterGroup.followerCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster()); @@ -122,8 +126,8 @@ public final void startClusters() throws Exception { TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin()); InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins, - Function.identity()); + numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null, additionalSettings), 0, + "leader", mockPlugins, Function.identity()); leaderCluster.beforeTest(random(), 0.0D); leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster()); assertBusy(() -> { @@ -133,8 +137,8 @@ public final void startClusters() throws Exception { String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, "follower", - mockPlugins, Function.identity()); + numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address, additionalSettings), 0, + "follower", mockPlugins, Function.identity()); clusterGroup = new ClusterGroup(leaderCluster, followerCluster); followerCluster.beforeTest(random(), 0.0D); @@ -145,6 +149,11 @@ public final void startClusters() throws Exception { }); } + protected void restartClustersWithSettings(Settings settings) throws Exception { + stopClusters(); + startClusters(settings); + } + /** * Follower indices don't get all the settings from leader, for example 'index.unassigned.node_left.delayed_timeout' * is not replicated and if tests kill nodes, we have to wait 60s by default... @@ -180,7 +189,7 @@ public void afterTest() throws Exception { } } - private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) { + private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress, Settings settings) { Settings.Builder builder = Settings.builder(); builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE); // Default the watermarks to absurdly low to prevent the tests @@ -202,6 +211,7 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + builder.put(settings); if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 36e1027dc5f87..2411cc00200e2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -38,6 +39,8 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -235,6 +238,60 @@ public void testDocsAreRecovered() throws Exception { thread.join(); } + public void testRateLimitingIsEmployed() throws Exception { + restartClustersWithSettings(Settings.builder().put(CcrSettings.FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.getKey(), + new ByteSizeValue(500)).build()); + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + List repositories = new ArrayList<>(); + try { + for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) { + Repository repository = repositoriesService.repository(leaderClusterRepoName); + repositories.add((CcrRepository) repository); + } + } catch (RepositoryMissingException e) { + fail("need repository"); + } + + final int firstBatchNumDocs = 10; + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + + try { + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + + assertBusy(() -> assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0))); + } finally { + restartClustersWithSettings(Settings.EMPTY); + } + } + public void testFollowerMappingIsUpdated() throws IOException { String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; From 425fa92663f2d236129809c99082c25af707d538 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 15 Jan 2019 18:09:58 -0700 Subject: [PATCH 3/4] Changes --- .../common/util/CombinedRateLimiter.java | 59 +++++++++++++++++++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 6 +- .../elasticsearch/xpack/ccr/CcrSettings.java | 31 +++++++--- .../xpack/ccr/repository/CcrRepository.java | 27 ++++----- .../elasticsearch/xpack/CcrIntegTestCase.java | 20 ++----- .../xpack/ccr/CcrRepositoryIT.java | 49 +++++++-------- 6 files changed, 128 insertions(+), 64 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java diff --git a/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java b/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java new file mode 100644 index 0000000000000..bd7b02a7a7948 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import org.apache.lucene.store.RateLimiter; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A rate limiter designed for multiple concurrent users. + */ +public class CombinedRateLimiter { + + // TODO: This rate limiter has some concurrency issues between the two add operations + + private final AtomicLong bytesSinceLastPause = new AtomicLong(); + private final RateLimiter.SimpleRateLimiter rateLimiter; + private volatile boolean rateLimit; + + public CombinedRateLimiter(ByteSizeValue maxBytesPerSec) { + rateLimit = maxBytesPerSec.getBytes() > 0; + rateLimiter = new RateLimiter.SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + } + + public long maybePause(int bytes) { + if (rateLimit) { + long bytesSincePause = bytesSinceLastPause.addAndGet(bytes); + if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytesSincePause); + return rateLimiter.pause(bytesSincePause); + } + } + return 0; + } + + public void setMBPerSec(ByteSizeValue maxBytesPerSec) { + rateLimit = maxBytesPerSec.getBytes() > 0; + rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac()); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 370d017a4bde7..078d1109f1044 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -117,6 +117,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; private final SetOnce restoreSourceService = new SetOnce<>(); + private final SetOnce ccrSettings = new SetOnce<>(); private Client client; /** @@ -159,6 +160,8 @@ public Collection createComponents( CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings); this.restoreSourceService.set(restoreSourceService); + CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); + this.ccrSettings.set(ccrSettings); return Arrays.asList( ccrLicenseChecker, restoreSourceService, @@ -291,7 +294,8 @@ public List> getExecutorBuilders(Settings settings) { @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings); + Repository.Factory repositoryFactory = + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get()); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index fde8a11f5e3ee..fe0eb7853e3ce 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -5,11 +5,14 @@ */ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CombinedRateLimiter; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -20,11 +23,6 @@ */ public final class CcrSettings { - // prevent construction - private CcrSettings() { - - } - /** * Index setting for a following index. */ @@ -39,10 +37,10 @@ private CcrSettings() { /** - * Max bytes a follower node can recover per second. + * Max bytes a node can recover per second. */ - public static final Setting FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND = - Setting.byteSizeSetting("ccr.follower.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), + public static final Setting RECOVERY_MAX_BYTES_PER_SECOND = + Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), Setting.Property.Dynamic, Setting.Property.NodeScope); /** @@ -54,8 +52,23 @@ static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, - FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND, + RECOVERY_MAX_BYTES_PER_SECOND, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } + private final CombinedRateLimiter ccrRateLimiter; + + public CcrSettings(Settings settings, ClusterSettings clusterSettings) { + this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); + clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); + } + + private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { + ccrRateLimiter.setMBPerSec(maxBytesPerSec); + } + + public CombinedRateLimiter getRateLimiter() { + return ccrRateLimiter; + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 273d8a611b680..1a3a7e80850bf 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ccr.repository; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.store.RateLimiter; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -28,6 +27,7 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.CombinedRateLimiter; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineException; @@ -51,6 +51,7 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; @@ -68,9 +69,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import static org.elasticsearch.xpack.ccr.CcrSettings.FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND; /** * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to @@ -84,22 +82,22 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST); private final RepositoryMetaData metadata; + private final CcrSettings ccrSettings; private final String remoteClusterAlias; private final Client client; private final CcrLicenseChecker ccrLicenseChecker; - private final RateLimiter.SimpleRateLimiter rateLimiter; private final CounterMetric throttledTime = new CounterMetric(); - private final AtomicLong bytesSinceLastPause = new AtomicLong(); - public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) { + public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings, + CcrSettings ccrSettings) { super(settings); this.metadata = metadata; + this.ccrSettings = ccrSettings; assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX; this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1]; this.ccrLicenseChecker = ccrLicenseChecker; this.client = client; - this.rateLimiter = new RateLimiter.SimpleRateLimiter(FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.get(settings).getMbFrac()); } @Override @@ -352,6 +350,7 @@ private class RestoreFileInputStream extends InputStream { private final String sessionUUID; private final DiscoveryNode node; private final StoreFileMetaData fileToRecover; + private final CombinedRateLimiter rateLimiter; private long pos = 0; @@ -360,6 +359,7 @@ private RestoreFileInputStream(Client remoteClient, String sessionUUID, Discover this.sessionUUID = sessionUUID; this.node = node; this.fileToRecover = fileToRecover; + this.rateLimiter = ccrSettings.getRateLimiter(); } @@ -404,14 +404,9 @@ public int read(byte[] bytes, int off, int len) throws IOException { } private void maybePause(int bytesRequested) { - long bytesSincePause = bytesSinceLastPause.addAndGet(bytesRequested); - if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) { - // Time to pause - bytesSinceLastPause.addAndGet(-bytesSincePause); - long throttleTimeInNanos = rateLimiter.pause(bytesSincePause); - if (throttleTimeInNanos > 0) { - throttledTime.inc(throttleTimeInNanos); - } + long throttleTimeInNanos = rateLimiter.maybePause(bytesRequested); + if (throttleTimeInNanos > 0) { + throttledTime.inc(throttleTimeInNanos); } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 94ebd23f15575..7af3d690e3a99 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -111,10 +111,6 @@ public abstract class CcrIntegTestCase extends ESTestCase { @Before public final void startClusters() throws Exception { - startClusters(Settings.EMPTY); - } - - private void startClusters(Settings additionalSettings) throws Exception { if (clusterGroup != null && reuseClusters()) { clusterGroup.leaderCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster()); clusterGroup.followerCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster()); @@ -126,8 +122,8 @@ private void startClusters(Settings additionalSettings) throws Exception { TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin()); InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null, additionalSettings), 0, - "leader", mockPlugins, Function.identity()); + numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins, + Function.identity()); leaderCluster.beforeTest(random(), 0.0D); leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster()); assertBusy(() -> { @@ -137,8 +133,8 @@ private void startClusters(Settings additionalSettings) throws Exception { String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address, additionalSettings), 0, - "follower", mockPlugins, Function.identity()); + numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, "follower", + mockPlugins, Function.identity()); clusterGroup = new ClusterGroup(leaderCluster, followerCluster); followerCluster.beforeTest(random(), 0.0D); @@ -149,11 +145,6 @@ private void startClusters(Settings additionalSettings) throws Exception { }); } - protected void restartClustersWithSettings(Settings settings) throws Exception { - stopClusters(); - startClusters(settings); - } - /** * Follower indices don't get all the settings from leader, for example 'index.unassigned.node_left.delayed_timeout' * is not replicated and if tests kill nodes, we have to wait 60s by default... @@ -189,7 +180,7 @@ public void afterTest() throws Exception { } } - private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress, Settings settings) { + private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) { Settings.Builder builder = Settings.builder(); builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE); // Default the watermarks to absurdly low to prevent the tests @@ -211,7 +202,6 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); - builder.put(settings); if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 2411cc00200e2..220e7d3a4ea41 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -239,8 +239,10 @@ public void testDocsAreRecovered() throws Exception { } public void testRateLimitingIsEmployed() throws Exception { - restartClustersWithSettings(Settings.builder().put(CcrSettings.FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.getKey(), - new ByteSizeValue(500)).build()); + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K")); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; String followerIndex = "index2"; @@ -256,7 +258,7 @@ public void testRateLimitingIsEmployed() throws Exception { List repositories = new ArrayList<>(); try { - for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) { + for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) { Repository repository = repositoriesService.repository(leaderClusterRepoName); repositories.add((CcrRepository) repository); } @@ -264,32 +266,33 @@ public void testRateLimitingIsEmployed() throws Exception { fail("need repository"); } - final int firstBatchNumDocs = 10; - logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - for (int i = 0; i < firstBatchNumDocs; i++) { + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); - try { - Settings.Builder settingsBuilder = Settings.builder() - .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) - .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); - RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, - CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, - "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, - false, true, settingsBuilder.build(), new String[0], - "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); - - PlainActionFuture future = PlainActionFuture.newFuture(); - restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - - assertBusy(() -> assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0))); - } finally { - restartClustersWithSettings(Settings.EMPTY); - } + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + future.actionGet(); + + assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0)); + + settingsRequest = new ClusterUpdateSettingsRequest(); + ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } public void testFollowerMappingIsUpdated() throws IOException { From b529abae2a3151d125f014707b7eacf01d95d304 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 16 Jan 2019 16:00:18 -0700 Subject: [PATCH 4/4] Changes --- .../common/util/CombinedRateLimiter.java | 4 +-- .../xpack/ccr/repository/CcrRepository.java | 32 +++++++++++-------- .../xpack/ccr/CcrRepositoryIT.java | 11 +++---- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java b/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java index bd7b02a7a7948..23324cbe00b04 100644 --- a/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java +++ b/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java @@ -29,7 +29,7 @@ */ public class CombinedRateLimiter { - // TODO: This rate limiter has some concurrency issues between the two add operations + // TODO: This rate limiter has some concurrency issues between the two maybePause operations private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final RateLimiter.SimpleRateLimiter rateLimiter; @@ -46,7 +46,7 @@ public long maybePause(int bytes) { if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) { // Time to pause bytesSinceLastPause.addAndGet(-bytesSincePause); - return rateLimiter.pause(bytesSincePause); + return Math.max(rateLimiter.pause(bytesSincePause), 0); } } return 0; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 76aa75cb062fd..33a8c64c96138 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -69,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.LongConsumer; /** * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to @@ -299,10 +300,10 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, - response.getStoreFileMetaData()); + response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc); } - private class RestoreSession extends FileRestoreContext implements Closeable { + private static class RestoreSession extends FileRestoreContext implements Closeable { private static final int BUFFER_SIZE = 1 << 16; @@ -310,14 +311,19 @@ private class RestoreSession extends FileRestoreContext implements Closeable { private final String sessionUUID; private final DiscoveryNode node; private final Store.MetadataSnapshot sourceMetaData; + private final CombinedRateLimiter rateLimiter; + private final LongConsumer throttleListener; RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, - RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) { + RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter, + LongConsumer throttleListener) { super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.sourceMetaData = sourceMetaData; + this.rateLimiter = rateLimiter; + this.throttleListener = throttleListener; } void restoreFiles() throws IOException { @@ -332,7 +338,7 @@ void restoreFiles() throws IOException { @Override protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); + return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener); } @Override @@ -343,22 +349,25 @@ public void close() { } } - private class RestoreFileInputStream extends InputStream { + private static class RestoreFileInputStream extends InputStream { private final Client remoteClient; private final String sessionUUID; private final DiscoveryNode node; private final StoreFileMetaData fileToRecover; private final CombinedRateLimiter rateLimiter; + private final LongConsumer throttleListener; private long pos = 0; - private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) { + private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover, + CombinedRateLimiter rateLimiter, LongConsumer throttleListener) { this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.fileToRecover = fileToRecover; - this.rateLimiter = ccrSettings.getRateLimiter(); + this.rateLimiter = rateLimiter; + this.throttleListener = throttleListener; } @@ -376,7 +385,8 @@ public int read(byte[] bytes, int off, int len) throws IOException { int bytesRequested = (int) Math.min(remainingBytes, len); - maybePause(bytesRequested); + long nanosPaused = rateLimiter.maybePause(bytesRequested); + throttleListener.accept(nanosPaused); String fileName = fileToRecover.name(); GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested); @@ -402,11 +412,5 @@ public int read(byte[] bytes, int off, int len) throws IOException { return bytesReceived; } - private void maybePause(int bytesRequested) { - long throttleTimeInNanos = rateLimiter.maybePause(bytesRequested); - if (throttleTimeInNanos > 0) { - throttledTime.inc(throttleTimeInNanos); - } - } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 220e7d3a4ea41..faaef8b407c92 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -257,13 +257,10 @@ public void testRateLimitingIsEmployed() throws Exception { final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); List repositories = new ArrayList<>(); - try { - for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) { - Repository repository = repositoriesService.repository(leaderClusterRepoName); - repositories.add((CcrRepository) repository); - } - } catch (RepositoryMissingException e) { - fail("need repository"); + + for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) { + Repository repository = repositoriesService.repository(leaderClusterRepoName); + repositories.add((CcrRepository) repository); } logger.info("--> indexing some data");