Skip to content

Commit 20ed3dd

Browse files
authored
Make recovery source send operations non-blocking (#37503)
Relates #37458
1 parent 4351a5e commit 20ed3dd

File tree

2 files changed

+139
-82
lines changed

2 files changed

+139
-82
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 74 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
import org.elasticsearch.Version;
3434
import org.elasticsearch.action.ActionListener;
3535
import org.elasticsearch.action.StepListener;
36-
import org.elasticsearch.action.support.PlainActionFuture;
3736
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3837
import org.elasticsearch.cluster.routing.ShardRouting;
38+
import org.elasticsearch.common.CheckedSupplier;
3939
import org.elasticsearch.common.StopWatch;
4040
import org.elasticsearch.common.bytes.BytesArray;
4141
import org.elasticsearch.common.collect.Tuple;
@@ -71,7 +71,7 @@
7171
import java.util.Locale;
7272
import java.util.concurrent.CompletableFuture;
7373
import java.util.concurrent.CopyOnWriteArrayList;
74-
import java.util.concurrent.atomic.AtomicLong;
74+
import java.util.concurrent.atomic.AtomicInteger;
7575
import java.util.concurrent.atomic.AtomicReference;
7676
import java.util.function.Consumer;
7777
import java.util.function.Supplier;
@@ -514,97 +514,94 @@ TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int to
514514
*/
515515
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
516516
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
517-
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
518-
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes));
519-
}
520-
521-
private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
522-
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
523-
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
524517
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
525518
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
526519
assert startingSeqNo <= requiredSeqNoRangeStart :
527520
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
528521
if (shard.state() == IndexShardState.CLOSED) {
529522
throw new IndexShardClosedException(request.shardId());
530523
}
531-
532-
final StopWatch stopWatch = new StopWatch().start();
533-
534524
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
535525
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
536526

537-
int ops = 0;
538-
long size = 0;
539-
int skippedOps = 0;
540-
int totalSentOps = 0;
541-
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
542-
final List<Translog.Operation> operations = new ArrayList<>();
527+
final AtomicInteger skippedOps = new AtomicInteger();
528+
final AtomicInteger totalSentOps = new AtomicInteger();
543529
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
530+
final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
531+
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
532+
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
533+
// Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
534+
synchronized (snapshot) {
535+
final List<Translog.Operation> ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>();
536+
long batchSizeInBytes = 0L;
537+
Translog.Operation operation;
538+
while ((operation = snapshot.next()) != null) {
539+
if (shard.state() == IndexShardState.CLOSED) {
540+
throw new IndexShardClosedException(request.shardId());
541+
}
542+
cancellableThreads.checkForCancel();
543+
final long seqNo = operation.seqNo();
544+
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
545+
skippedOps.incrementAndGet();
546+
continue;
547+
}
548+
ops.add(operation);
549+
batchSizeInBytes += operation.estimateSize();
550+
totalSentOps.incrementAndGet();
551+
requiredOpsTracker.markSeqNoAsCompleted(seqNo);
544552

545-
final int expectedTotalOps = snapshot.totalOperations();
546-
if (expectedTotalOps == 0) {
547-
logger.trace("no translog operations to send");
548-
}
549-
550-
final CancellableThreads.IOInterruptible sendBatch = () -> {
551-
// TODO: Make this non-blocking
552-
final PlainActionFuture<Long> future = new PlainActionFuture<>();
553-
recoveryTarget.indexTranslogOperations(
554-
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future);
555-
targetLocalCheckpoint.set(future.actionGet());
556-
};
557-
558-
// send operations in batches
559-
Translog.Operation operation;
560-
while ((operation = snapshot.next()) != null) {
561-
if (shard.state() == IndexShardState.CLOSED) {
562-
throw new IndexShardClosedException(request.shardId());
563-
}
564-
cancellableThreads.checkForCancel();
565-
566-
final long seqNo = operation.seqNo();
567-
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
568-
skippedOps++;
569-
continue;
570-
}
571-
operations.add(operation);
572-
ops++;
573-
size += operation.estimateSize();
574-
totalSentOps++;
575-
requiredOpsTracker.markSeqNoAsCompleted(seqNo);
576-
577-
// check if this request is past bytes threshold, and if so, send it off
578-
if (size >= chunkSizeInBytes) {
579-
cancellableThreads.executeIO(sendBatch);
580-
logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
581-
ops = 0;
582-
size = 0;
583-
operations.clear();
553+
// check if this request is past bytes threshold, and if so, send it off
554+
if (batchSizeInBytes >= chunkSizeInBytes) {
555+
break;
556+
}
557+
}
558+
lastBatchCount.set(ops.size());
559+
return ops;
584560
}
585-
}
586-
587-
if (!operations.isEmpty() || totalSentOps == 0) {
588-
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
589-
cancellableThreads.executeIO(sendBatch);
590-
}
561+
};
591562

592-
assert expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps
593-
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
594-
expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps);
563+
final StopWatch stopWatch = new StopWatch().start();
564+
final ActionListener<Long> batchedListener = ActionListener.wrap(
565+
targetLocalCheckpoint -> {
566+
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
567+
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
568+
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
569+
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
570+
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
571+
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
572+
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
573+
}
574+
stopWatch.stop();
575+
final TimeValue tookTime = stopWatch.totalTime();
576+
logger.trace("recovery [phase2]: took [{}]", tookTime);
577+
listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime));
578+
},
579+
listener::onFailure
580+
);
581+
582+
sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(),
583+
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener);
584+
}
595585

596-
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
597-
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
598-
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
599-
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
586+
private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch,
587+
long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp,
588+
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) throws IOException {
589+
final List<Translog.Operation> operations = nextBatch.get();
590+
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
591+
if (operations.isEmpty() == false || firstBatch) {
592+
cancellableThreads.execute(() -> {
593+
recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
594+
ActionListener.wrap(
595+
newCheckpoint -> {
596+
sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
597+
totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
598+
},
599+
listener::onFailure
600+
));
601+
});
602+
} else {
603+
listener.onResponse(targetLocalCheckpoint);
600604
}
601-
602-
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
603-
604-
stopWatch.stop();
605-
final TimeValue tookTime = stopWatch.totalTime();
606-
logger.trace("recovery [phase2]: took [{}]", tookTime);
607-
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime);
608605
}
609606

610607
void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@
7676
import org.elasticsearch.test.DummyShardLock;
7777
import org.elasticsearch.test.ESTestCase;
7878
import org.elasticsearch.test.IndexSettingsModule;
79+
import org.elasticsearch.threadpool.TestThreadPool;
80+
import org.elasticsearch.threadpool.ThreadPool;
81+
import org.junit.After;
82+
import org.junit.Before;
7983

8084
import java.io.IOException;
8185
import java.io.OutputStream;
@@ -115,6 +119,18 @@ public class RecoverySourceHandlerTests extends ESTestCase {
115119
private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1);
116120
private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
117121

122+
private ThreadPool threadPool;
123+
124+
@Before
125+
public void setUpThreadPool() {
126+
threadPool = new TestThreadPool(getTestName());
127+
}
128+
129+
@After
130+
public void tearDownThreadPool() {
131+
terminate(threadPool);
132+
}
133+
118134
public void testSendFiles() throws Throwable {
119135
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
120136
put("indices.recovery.concurrent_small_file_streams", 1).build();
@@ -198,18 +214,17 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
198214
}
199215

200216
public void testSendSnapshotSendsOps() throws IOException {
201-
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
202-
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
217+
final int fileChunkSizeInBytes = between(1, 4096);
203218
final StartRecoveryRequest request = getStartRecoveryRequest();
204219
final IndexShard shard = mock(IndexShard.class);
205220
when(shard.state()).thenReturn(IndexShardState.STARTED);
206221
final List<Translog.Operation> operations = new ArrayList<>();
207-
final int initialNumberOfDocs = randomIntBetween(16, 64);
222+
final int initialNumberOfDocs = randomIntBetween(10, 1000);
208223
for (int i = 0; i < initialNumberOfDocs; i++) {
209224
final Engine.Index index = getIndex(Integer.toString(i));
210225
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, SequenceNumbers.UNASSIGNED_SEQ_NO, true)));
211226
}
212-
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64);
227+
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(10, 1000);
213228
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
214229
final Engine.Index index = getIndex(Integer.toString(i));
215230
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
@@ -219,12 +234,14 @@ public void testSendSnapshotSendsOps() throws IOException {
219234
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
220235

221236
final List<Translog.Operation> shippedOps = new ArrayList<>();
237+
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
222238
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
223239
@Override
224240
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
225241
ActionListener<Long> listener) {
226242
shippedOps.addAll(operations);
227-
listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED);
243+
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
244+
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
228245
}
229246
};
230247
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
@@ -239,6 +256,7 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
239256
for (int i = 0; i < shippedOps.size(); i++) {
240257
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
241258
}
259+
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
242260
if (endingSeqNo >= requiredStartingSeqNo + 1) {
243261
// check that missing ops blows up
244262
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
@@ -253,6 +271,40 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
253271
}
254272
}
255273

274+
public void testSendSnapshotStopOnError() throws Exception {
275+
final int fileChunkSizeInBytes = between(1, 10 * 1024);
276+
final StartRecoveryRequest request = getStartRecoveryRequest();
277+
final IndexShard shard = mock(IndexShard.class);
278+
when(shard.state()).thenReturn(IndexShardState.STARTED);
279+
final List<Translog.Operation> ops = new ArrayList<>();
280+
for (int numOps = between(1, 256), i = 0; i < numOps; i++) {
281+
final Engine.Index index = getIndex(Integer.toString(i));
282+
ops.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i, true)));
283+
}
284+
final AtomicBoolean wasFailed = new AtomicBoolean();
285+
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
286+
@Override
287+
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
288+
long msu, ActionListener<Long> listener) {
289+
if (randomBoolean()) {
290+
maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED));
291+
} else {
292+
maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index")));
293+
wasFailed.set(true);
294+
}
295+
}
296+
};
297+
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
298+
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
299+
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
300+
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
301+
handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
302+
randomNonNegativeLong(), randomNonNegativeLong(), future);
303+
if (wasFailed.get()) {
304+
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
305+
}
306+
}
307+
256308
private Engine.Index getIndex(final String id) {
257309
final String type = "test";
258310
final ParseContext.Document document = new ParseContext.Document();
@@ -717,4 +769,12 @@ public void close() {
717769
}
718770
};
719771
}
772+
773+
private void maybeExecuteAsync(Runnable runnable) {
774+
if (randomBoolean()) {
775+
threadPool.generic().execute(runnable);
776+
} else {
777+
runnable.run();
778+
}
779+
}
720780
}

0 commit comments

Comments
 (0)