Skip to content

Commit d82b9b4

Browse files
committed
Reduce recovery time with compress or secure transport.
Backport of elastic/elasticsearch#36981
1 parent 1676811 commit d82b9b4

File tree

18 files changed

+731
-151
lines changed

18 files changed

+731
-151
lines changed

app/src/main/dist/config/crate.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,9 @@ auth:
387387
# that are expected to take a long time.
388388
#indices.recovery.retry_internal_long_action_timeout: 30m
389389

390+
# Controls the number of file chunk requests that can be sent in parallel per recovery.
391+
# indices.recovery.max_concurrent_file_chunks: 2
392+
390393

391394
################################# Discovery ##################################
392395

blackbox/docs/appendices/release-notes/unreleased.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ Breaking Changes
5656
Changes
5757
=======
5858

59+
- Reduced recovery time by sending file-chunks concurrently. It apllies
60+
only for when transport communication is secured or compressed. The number of
61+
chunks is controlled by the :ref:`indices.recovery.max_concurrent_file_chunks
62+
<indices.recovery.max_concurrent_file_chunks>` setting.
63+
5964
- Allow user to control how table data is stored and accessed on a disk
6065
via the :ref:`store.type <table_parameter.store_type>` table parameter and
6166
:ref:`node.store.allow_mmap <node.store_allow_mmap>` node setting.

blackbox/docs/config/cluster.rst

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,22 @@ Recovery
879879
fail. Defaults to :ref:`internal_action_long_timeout
880880
<indices.recovery.internal_action_long_timeout>`.
881881

882+
.. _indices.recovery.max_concurrent_file_chunks:
883+
884+
**indices.recovery.max_concurrent_file_chunks**
885+
| *Default:* ``2``
886+
| *Runtime:* ``yes``
887+
888+
Controls the number of file chunk requests that can be sent in parallel
889+
per recovery. As multiple recoveries are already running in parallel,
890+
controlled by :ref:`cluster.routing.allocation.node_concurrent_recoveries
891+
<cluster.routing.allocation.node_concurrent_recoveries>`. Increasing this
892+
expert-level setting might only help in situations where peer recovery of
893+
a single shard is not reaching the total inbound and outbound peer recovery
894+
traffic as configured by :ref:`indices.recovery.max_bytes_per_sec
895+
<indices.recovery.max_bytes_per_sec>`. but is CPU-bound instead, typically
896+
when using transport-level security or compression.
897+
882898
Query circuit breaker
883899
---------------------
884900

blob/src/main/java/io/crate/blob/BlobService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@
3737
import org.elasticsearch.cluster.service.ClusterService;
3838
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3939
import org.elasticsearch.common.inject.Inject;
40+
import org.elasticsearch.common.settings.Settings;
4041
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
4142
import org.elasticsearch.transport.TransportService;
4243

44+
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING;
45+
4346
public class BlobService extends AbstractLifecycleComponent {
4447

4548
private final BlobIndicesService blobIndicesService;
@@ -50,6 +53,7 @@ public class BlobService extends AbstractLifecycleComponent {
5053
private final BlobTransferTarget blobTransferTarget;
5154
private final Client client;
5255
private final PipelineRegistry pipelineRegistry;
56+
private final Settings settings;
5357

5458
@Inject
5559
public BlobService(ClusterService clusterService,
@@ -59,7 +63,8 @@ public BlobService(ClusterService clusterService,
5963
TransportService transportService,
6064
BlobTransferTarget blobTransferTarget,
6165
Client client,
62-
PipelineRegistry pipelineRegistry) {
66+
PipelineRegistry pipelineRegistry,
67+
Settings settings) {
6368
this.clusterService = clusterService;
6469
this.blobIndicesService = blobIndicesService;
6570
this.blobHeadRequestHandler = blobHeadRequestHandler;
@@ -68,6 +73,7 @@ public BlobService(ClusterService clusterService,
6873
this.blobTransferTarget = blobTransferTarget;
6974
this.client = client;
7075
this.pipelineRegistry = pipelineRegistry;
76+
this.settings = settings;
7177
}
7278

7379
public RemoteDigestBlob newBlob(String index, String digest) {
@@ -92,6 +98,7 @@ protected void doStart() throws ElasticsearchException {
9298
recoveryTarget,
9399
request,
94100
fileChunkSizeInBytes,
101+
INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings),
95102
transportService,
96103
blobTransferTarget,
97104
blobIndicesService

blob/src/main/java/io/crate/blob/recovery/BlobRecoveryHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,11 @@ public BlobRecoveryHandler(IndexShard shard,
7979
RecoveryTargetHandler recoveryTarget,
8080
StartRecoveryRequest request,
8181
int fileChunkSizeInBytes,
82+
int maxConcurrentFileChunks,
8283
final TransportService transportService,
8384
BlobTransferTarget blobTransferTarget,
8485
BlobIndicesService blobIndicesService) {
85-
super(shard, recoveryTarget, request, fileChunkSizeInBytes);
86+
super(shard, recoveryTarget, request, fileChunkSizeInBytes, maxConcurrentFileChunks);
8687
assert BlobIndex.isBlobIndex(shard.shardId().getIndexName()) : "Shard must belong to a blob index";
8788
this.blobShard = blobIndicesService.blobShardSafe(request.shardId());
8889
this.request = request;

es/es-server/src/main/java/org/elasticsearch/common/Numbers.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,24 @@ public static long bytesToLong(BytesRef bytes) {
4242
return (((long) high) << 32) | (low & 0x0ffffffffL);
4343
}
4444

45+
/**
46+
* Converts a long to a byte array.
47+
*
48+
* @param val The long to convert to a byte array
49+
* @return The byte array converted
50+
*/
51+
public static byte[] longToBytes(long val) {
52+
byte[] arr = new byte[8];
53+
arr[0] = (byte) (val >>> 56);
54+
arr[1] = (byte) (val >>> 48);
55+
arr[2] = (byte) (val >>> 40);
56+
arr[3] = (byte) (val >>> 32);
57+
arr[4] = (byte) (val >>> 24);
58+
arr[5] = (byte) (val >>> 16);
59+
arr[6] = (byte) (val >>> 8);
60+
arr[7] = (byte) (val);
61+
return arr;
62+
}
4563

4664
/** Return the long that {@code n} stores, or throws an exception if the
4765
* stored value cannot be converted to a long that stores the exact same

es/es-server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public void apply(Settings value, Settings current, Settings previous) {
194194
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
195195
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
196196
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
197+
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
197198
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
198199
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
199200
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,

es/es-server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,13 @@ synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request,
176176

177177
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
178178
RecoverySourceHandler handler;
179-
final RemoteRecoveryTargetHandler recoveryTarget =
180-
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
181-
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
179+
final RemoteRecoveryTargetHandler recoveryTarget = new RemoteRecoveryTargetHandler(
180+
request.recoveryId(),
181+
request.shardId(),
182+
transportService,
183+
request.targetNode(),
184+
recoverySettings,
185+
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
182186

183187
// CRATE_PATCH: used to inject BlobRecoveryHandler
184188
int recoveryChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
@@ -191,17 +195,23 @@ private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest r
191195

192196
if (handler != null){
193197
return handler;
198+
} else {
199+
return new RecoverySourceHandler(
200+
shard,
201+
recoveryTarget,
202+
request,
203+
recoveryChunkSizeInBytes,
204+
recoverySettings.getMaxConcurrentFileChunks());
194205
}
195-
return new RecoverySourceHandler(shard, recoveryTarget, request, recoveryChunkSizeInBytes);
196206
}
197207
}
198208
}
199209

200210
@Nullable
201-
RecoverySourceHandler getCustomRecoverySourceHandler(IndexShard shard,
202-
RemoteRecoveryTargetHandler recoveryTarget,
203-
StartRecoveryRequest request,
204-
int recoveryChunkSizeInBytes) {
211+
private RecoverySourceHandler getCustomRecoverySourceHandler(IndexShard shard,
212+
RemoteRecoveryTargetHandler recoveryTarget,
213+
StartRecoveryRequest request,
214+
int recoveryChunkSizeInBytes) {
205215
for (RecoverySourceHandlerProvider recoverySourceHandlerProvider : recoverySourceHandlerProviders) {
206216
RecoverySourceHandler handler = recoverySourceHandlerProvider.get(
207217
shard, request, recoveryTarget, recoveryChunkSizeInBytes);

es/es-server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.ElasticsearchException;
3030
import org.elasticsearch.ElasticsearchTimeoutException;
3131
import org.elasticsearch.ExceptionsHelper;
32+
import org.elasticsearch.action.ActionListener;
33+
import org.elasticsearch.action.support.HandledTransportAction;
3234
import org.elasticsearch.action.support.PlainActionFuture;
3335
import org.elasticsearch.cluster.ClusterState;
3436
import org.elasticsearch.cluster.ClusterStateObserver;
@@ -571,8 +573,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler<Recove
571573

572574
@Override
573575
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
574-
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
575-
)) {
576+
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
576577
final RecoveryTarget recoveryTarget = recoveryRef.target();
577578
final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
578579
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
@@ -591,8 +592,17 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
591592
}
592593
}
593594

594-
recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(),
595-
request.lastChunk(), request.totalTranslogOps()
595+
final ActionListener<TransportResponse> listener =
596+
new HandledTransportAction.ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
597+
recoveryTarget.writeFileChunk(
598+
request.metadata(),
599+
request.position(),
600+
request.content(),
601+
request.lastChunk(),
602+
request.totalTranslogOps(),
603+
ActionListener.wrap(
604+
nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE),
605+
listener::onFailure)
596606
);
597607
}
598608
channel.sendResponse(TransportResponse.Empty.INSTANCE);

es/es-server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ public class RecoverySettings {
3939
Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
4040
Property.Dynamic, Property.NodeScope);
4141

42+
/**
43+
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
44+
*/
45+
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
46+
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);
47+
4248
/**
4349
* how long to wait before retrying after issues cause by cluster state syncing between nodes
4450
* i.e., local node is not yet known on remote node, remote shard not yet started etc.
@@ -78,6 +84,7 @@ public class RecoverySettings {
7884
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
7985

8086
private volatile ByteSizeValue maxBytesPerSec;
87+
private volatile int maxConcurrentFileChunks;
8188
private volatile SimpleRateLimiter rateLimiter;
8289
private volatile TimeValue retryDelayStateSync;
8390
private volatile TimeValue retryDelayNetwork;
@@ -92,6 +99,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
9299
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
93100
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
94101
// and we want to give the master time to remove a faulty node
102+
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
95103
this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
96104

97105
this.internalActionTimeout = INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.get(settings);
@@ -109,6 +117,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
109117
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
110118

111119
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
120+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
112121
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
113122
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
114123
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
@@ -180,4 +189,12 @@ private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
180189
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
181190
}
182191
}
192+
193+
public int getMaxConcurrentFileChunks() {
194+
return maxConcurrentFileChunks;
195+
}
196+
197+
private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
198+
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
199+
}
183200
}

0 commit comments

Comments
 (0)