From ef65f309bb2fc7c4c2551db36d9952d711636134 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 15 Jan 2019 05:33:59 -0500 Subject: [PATCH 1/7] Make recovery source send operations non-blocking Relates #37458 --- .../index/translog/Translog.java | 33 ++++ .../recovery/RecoverySourceHandler.java | 176 ++++++++++-------- .../recovery/RecoverySourceHandlerTests.java | 70 ++++++- 3 files changed, 198 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index d8acba635f822..b25b1b78a00ae 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -954,6 +954,39 @@ default int overriddenOperations() { * Returns the next operation in the snapshot or null if we reached the end. */ Translog.Operation next() throws IOException; + + /** + * Wraps a given snapshot and returns a synchronized (thread-safe) snapshot. + */ + static Snapshot synchronizedSnapshot(final Snapshot delegate) { + return new Snapshot() { + + @Override + public synchronized Operation next() throws IOException { + return delegate.next(); + } + + @Override + public synchronized void close() throws IOException { + delegate.close(); + } + + @Override + public synchronized int totalOperations() { + return delegate.totalOperations(); + } + + @Override + public synchronized int skippedOperations() { + return delegate.skippedOperations(); + } + + @Override + public synchronized int overriddenOperations() { + return delegate.overriddenOperations(); + } + }; + } } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d2d03156271cd..53194e6967b93 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -33,7 +33,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.StopWatch; @@ -71,6 +70,7 @@ import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -514,97 +514,121 @@ TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int to */ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { - ActionListener.completeWith(listener, () -> sendSnapshotBlockingly( - startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes)); - } - - private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, - Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo + 1: "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); + try { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. + // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible. + final SnapshotSender snapshotSender = new SnapshotSender(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, + maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, Translog.Snapshot.synchronizedSnapshot(snapshot)); + snapshotSender.sendSnapshot(listener); + } catch (Exception e) { + listener.onFailure(e); } + } - final StopWatch stopWatch = new StopWatch().start(); - - logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + - "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); - - int ops = 0; - long size = 0; - int skippedOps = 0; - int totalSentOps = 0; + final class SnapshotSender { + final long startingSeqNo; + final long requiredSeqNoRangeStart; + final long endingSeqNo; + final long maxSeenAutoIdTimestamp; + final long maxSeqNoOfUpdatesOrDeletes; + + final Translog.Snapshot snapshot; + final int expectedTotalOps; + final AtomicInteger skippedOps = new AtomicInteger(); + final AtomicInteger totalSentOps = new AtomicInteger(); + final LocalCheckpointTracker requiredOpsTracker; final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); - final List operations = new ArrayList<>(); - final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); - final int expectedTotalOps = snapshot.totalOperations(); - if (expectedTotalOps == 0) { - logger.trace("no translog operations to send"); + SnapshotSender(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, Translog.Snapshot snapshot) { + this.startingSeqNo = startingSeqNo; + this.requiredSeqNoRangeStart = requiredSeqNoRangeStart; + this.endingSeqNo = endingSeqNo; + this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp; + this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; + this.requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); + this.snapshot = snapshot; + this.expectedTotalOps = snapshot.totalOperations(); } - final CancellableThreads.IOInterruptible sendBatch = () -> { - // TODO: Make this non-blocking - final PlainActionFuture future = new PlainActionFuture<>(); - recoveryTarget.indexTranslogOperations( - operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future); - targetLocalCheckpoint.set(future.actionGet()); - }; - - // send operations in batches - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - cancellableThreads.checkForCancel(); - - final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo || seqNo > endingSeqNo) { - skippedOps++; - continue; - } - operations.add(operation); - ops++; - size += operation.estimateSize(); - totalSentOps++; - requiredOpsTracker.markSeqNoAsCompleted(seqNo); - - // check if this request is past bytes threshold, and if so, send it off - if (size >= chunkSizeInBytes) { - cancellableThreads.executeIO(sendBatch); - logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); - ops = 0; - size = 0; - operations.clear(); + void sendSnapshot(ActionListener listener) throws IOException { + logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); + if (expectedTotalOps == 0) { + logger.trace("no translog operations to send"); } + final StopWatch stopWatch = new StopWatch().start(); + sendBatch(true, ActionListener.wrap( + nullVal -> { + stopWatch.stop(); + final TimeValue tookTime = stopWatch.totalTime(); + logger.trace("recovery [phase2]: took [{}]", tookTime); + listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps.get(), tookTime)); + }, + listener::onFailure + )); } - if (!operations.isEmpty() || totalSentOps == 0) { - // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint - cancellableThreads.executeIO(sendBatch); + private void sendBatch(boolean firstBatch, ActionListener listener) throws IOException { + final Tuple, ByteSizeValue> batch = readBatch(); + final List ops = batch.v1(); + if (ops.isEmpty() == false || firstBatch) { + logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, batch.v2(), expectedTotalOps); + cancellableThreads.executeIO(() -> { + recoveryTarget.indexTranslogOperations(ops, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, + ActionListener.wrap(newCheckpoint -> { + targetLocalCheckpoint.updateAndGet(curr -> SequenceNumbers.max(curr, newCheckpoint)); + sendBatch(false, listener); + }, listener::onFailure)); + }); + } else { + assert expectedTotalOps == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() + : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", + expectedTotalOps, snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); + if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { + listener.onFailure( + new IllegalStateException("translog replay failed to cover required sequence numbers" + + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + + (requiredOpsTracker.getCheckpoint() + 1) + "]")); + } else { + listener.onResponse(null); + } + } } - assert expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps - : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", - expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps); - - if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { - throw new IllegalStateException("translog replay failed to cover required sequence numbers" + - " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" - + (requiredOpsTracker.getCheckpoint() + 1) + "]"); + private Tuple, ByteSizeValue> readBatch() throws IOException { + final List operations = new ArrayList<>(); + long batchSizeInBytes = 0L; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + cancellableThreads.checkForCancel(); + final long seqNo = operation.seqNo(); + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { + skippedOps.incrementAndGet(); + continue; + } + operations.add(operation); + batchSizeInBytes += operation.estimateSize(); + totalSentOps.incrementAndGet(); + requiredOpsTracker.markSeqNoAsCompleted(seqNo); + + // check if this request is past bytes threshold, and if so, send it off + if (batchSizeInBytes >= chunkSizeInBytes) { + break; + } + } + return Tuple.tuple(operations, new ByteSizeValue(batchSizeInBytes)); } - - logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); - - stopWatch.stop(); - final TimeValue tookTime = stopWatch.totalTime(); - logger.trace("recovery [phase2]: took [{}]", tookTime); - return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime); } void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener listener) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 97f2cadfa3a5d..af58d136e02b7 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -76,6 +76,10 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.io.OutputStream; @@ -115,6 +119,18 @@ public class RecoverySourceHandlerTests extends ESTestCase { private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1); private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private ThreadPool threadPool; + + @Before + public void setUpThreadPool() { + threadPool = new TestThreadPool(getTestName()); + } + + @After + public void tearDownThreadPool() { + terminate(threadPool); + } + public void testSendFiles() throws Throwable { Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). put("indices.recovery.concurrent_small_file_streams", 1).build(); @@ -198,18 +214,17 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException { } public void testSendSnapshotSendsOps() throws IOException { - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); - final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt(); + final int fileChunkSizeInBytes = between(1, 4096); final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); final List operations = new ArrayList<>(); - final int initialNumberOfDocs = randomIntBetween(16, 64); + final int initialNumberOfDocs = randomIntBetween(10, 1000); for (int i = 0; i < initialNumberOfDocs; i++) { final Engine.Index index = getIndex(Integer.toString(i)); operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, SequenceNumbers.UNASSIGNED_SEQ_NO, true))); } - final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64); + final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(10, 1000); for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) { final Engine.Index index = getIndex(Integer.toString(i)); operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true))); @@ -219,12 +234,14 @@ public void testSendSnapshotSendsOps() throws IOException { final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); final List shippedOps = new ArrayList<>(); + final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, ActionListener listener) { shippedOps.addAll(operations); - listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED); + checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); + maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); @@ -239,6 +256,7 @@ public void indexTranslogOperations(List operations, int tot for (int i = 0; i < shippedOps.size(); i++) { assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); } + assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get())); if (endingSeqNo >= requiredStartingSeqNo + 1) { // check that missing ops blows up List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker @@ -253,6 +271,40 @@ public void indexTranslogOperations(List operations, int tot } } + public void testSendSnapshotStopOnError() throws Exception { + final int fileChunkSizeInBytes = between(1, 10 * 1024); + final StartRecoveryRequest request = getStartRecoveryRequest(); + final IndexShard shard = mock(IndexShard.class); + when(shard.state()).thenReturn(IndexShardState.STARTED); + final List ops = new ArrayList<>(); + for (int numOps = between(1, 256), i = 0; i < numOps; i++) { + final Engine.Index index = getIndex(Integer.toString(i)); + ops.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i, true))); + } + final AtomicBoolean wasFailed = new AtomicBoolean(); + RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { + @Override + public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, + long msu, ActionListener listener) { + if (randomBoolean()) { + maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); + }else { + maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index"))); + wasFailed.set(true); + } + } + }; + RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); + PlainActionFuture future = new PlainActionFuture<>(); + final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); + final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); + handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), + randomNonNegativeLong(), randomNonNegativeLong(), future); + if (wasFailed.get()) { + assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index")); + } + } + private Engine.Index getIndex(final String id) { final String type = "test"; final ParseContext.Document document = new ParseContext.Document(); @@ -717,4 +769,12 @@ public void close() { } }; } + + private void maybeExecuteAsync(Runnable runnable) { + if (randomBoolean()) { + threadPool.generic().execute(runnable); + } else { + runnable.run(); + } + } } From 5490c7acf703cad1f7aea98f3ece79fe7cfbdcf0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 15 Jan 2019 15:44:54 -0500 Subject: [PATCH 2/7] style --- .../indices/recovery/RecoverySourceHandlerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index af58d136e02b7..0cecc925b2488 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -288,7 +288,7 @@ public void indexTranslogOperations(List operations, int tot long msu, ActionListener listener) { if (randomBoolean()) { maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); - }else { + } else { maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index"))); wasFailed.set(true); } From 82ae1ae05b207b2fb92bf2082816fabf0af554fe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 16 Jan 2019 00:06:57 -0500 Subject: [PATCH 3/7] simplify --- .../index/translog/Translog.java | 33 --- .../recovery/RecoverySourceHandler.java | 192 +++++++++--------- 2 files changed, 95 insertions(+), 130 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index b25b1b78a00ae..d8acba635f822 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -954,39 +954,6 @@ default int overriddenOperations() { * Returns the next operation in the snapshot or null if we reached the end. */ Translog.Operation next() throws IOException; - - /** - * Wraps a given snapshot and returns a synchronized (thread-safe) snapshot. - */ - static Snapshot synchronizedSnapshot(final Snapshot delegate) { - return new Snapshot() { - - @Override - public synchronized Operation next() throws IOException { - return delegate.next(); - } - - @Override - public synchronized void close() throws IOException { - delegate.close(); - } - - @Override - public synchronized int totalOperations() { - return delegate.totalOperations(); - } - - @Override - public synchronized int skippedOperations() { - return delegate.skippedOperations(); - } - - @Override - public synchronized int overriddenOperations() { - return delegate.overriddenOperations(); - } - }; - } } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 53194e6967b93..43a364e3b5bb4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -71,7 +71,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -518,116 +517,115 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; - try { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. - // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible. - final SnapshotSender snapshotSender = new SnapshotSender(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, - maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, Translog.Snapshot.synchronizedSnapshot(snapshot)); - snapshotSender.sendSnapshot(listener); - } catch (Exception e) { - listener.onFailure(e); + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); + final int expectedTotalOps = snapshot.totalOperations(); + if (expectedTotalOps == 0) { + logger.trace("no translog operations to send"); } - } - - final class SnapshotSender { - final long startingSeqNo; - final long requiredSeqNoRangeStart; - final long endingSeqNo; - final long maxSeenAutoIdTimestamp; - final long maxSeqNoOfUpdatesOrDeletes; - final Translog.Snapshot snapshot; - final int expectedTotalOps; final AtomicInteger skippedOps = new AtomicInteger(); final AtomicInteger totalSentOps = new AtomicInteger(); - final LocalCheckpointTracker requiredOpsTracker; - final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); - - SnapshotSender(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, Translog.Snapshot snapshot) { - this.startingSeqNo = startingSeqNo; - this.requiredSeqNoRangeStart = requiredSeqNoRangeStart; - this.endingSeqNo = endingSeqNo; - this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp; - this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; - this.requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); - this.snapshot = snapshot; - this.expectedTotalOps = snapshot.totalOperations(); - } + final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); + + // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. + // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible. + final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { + @Override + public synchronized Translog.Operation next() throws IOException { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + final long seqNo = op.seqNo(); + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { + skippedOps.incrementAndGet(); + } else { + totalSentOps.incrementAndGet(); + requiredOpsTracker.markSeqNoAsCompleted(seqNo); + return op; + } + } + return null; + } - void sendSnapshot(ActionListener listener) throws IOException { - logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + - "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); - if (expectedTotalOps == 0) { - logger.trace("no translog operations to send"); + @Override + public synchronized void close() throws IOException { + snapshot.close(); } - final StopWatch stopWatch = new StopWatch().start(); - sendBatch(true, ActionListener.wrap( - nullVal -> { - stopWatch.stop(); - final TimeValue tookTime = stopWatch.totalTime(); - logger.trace("recovery [phase2]: took [{}]", tookTime); - listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps.get(), tookTime)); - }, - listener::onFailure - )); - } - private void sendBatch(boolean firstBatch, ActionListener listener) throws IOException { - final Tuple, ByteSizeValue> batch = readBatch(); - final List ops = batch.v1(); - if (ops.isEmpty() == false || firstBatch) { - logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, batch.v2(), expectedTotalOps); - cancellableThreads.executeIO(() -> { - recoveryTarget.indexTranslogOperations(ops, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.wrap(newCheckpoint -> { - targetLocalCheckpoint.updateAndGet(curr -> SequenceNumbers.max(curr, newCheckpoint)); - sendBatch(false, listener); - }, listener::onFailure)); - }); - } else { + @Override + public synchronized int totalOperations() { + return snapshot.totalOperations(); + } + + @Override + public synchronized int skippedOperations() { + return snapshot.skippedOperations(); + } + + @Override + public synchronized int overriddenOperations() { + return snapshot.overriddenOperations(); + } + }; + + final StopWatch stopWatch = new StopWatch().start(); + final ActionListener batchedListener = ActionListener.wrap( + targetLocalCheckpoint -> { assert expectedTotalOps == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", expectedTotalOps, snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { - listener.onFailure( - new IllegalStateException("translog replay failed to cover required sequence numbers" + - " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" - + (requiredOpsTracker.getCheckpoint() + 1) + "]")); - } else { - listener.onResponse(null); + throw new IllegalStateException("translog replay failed to cover required sequence numbers" + + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + + (requiredOpsTracker.getCheckpoint() + 1) + "]"); } - } - } + stopWatch.stop(); + final TimeValue tookTime = stopWatch.totalTime(); + logger.trace("recovery [phase2]: took [{}]", tookTime); + listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime)); + }, + listener::onFailure + ); + + sendBatch(wrappedSnapshot, true, SequenceNumbers.UNASSIGNED_SEQ_NO, expectedTotalOps, + maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener); + } - private Tuple, ByteSizeValue> readBatch() throws IOException { - final List operations = new ArrayList<>(); - long batchSizeInBytes = 0L; - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - cancellableThreads.checkForCancel(); - final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo || seqNo > endingSeqNo) { - skippedOps.incrementAndGet(); - continue; - } - operations.add(operation); - batchSizeInBytes += operation.estimateSize(); - totalSentOps.incrementAndGet(); - requiredOpsTracker.markSeqNoAsCompleted(seqNo); - - // check if this request is past bytes threshold, and if so, send it off - if (batchSizeInBytes >= chunkSizeInBytes) { - break; - } + private void sendBatch(Translog.Snapshot snapshot, boolean firstBatch, long targetLocalCheckpoint, int totalTranslogOps, + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { + final List operations = new ArrayList<>(); + long size = 0; + Translog.Operation op; + while ((op = snapshot.next()) != null) { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); } - return Tuple.tuple(operations, new ByteSizeValue(batchSizeInBytes)); + cancellableThreads.checkForCancel(); + operations.add(op); + size += op.estimateSize(); + // check if this request is past bytes threshold, and if so, send it off + if (size >= chunkSizeInBytes) { + break; + } + } + // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint + if (operations.isEmpty() == false || firstBatch) { + logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", operations, new ByteSizeValue(size), totalTranslogOps); + cancellableThreads.execute(() -> { + recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, + ActionListener.wrap( + newCheckpoint -> { + sendBatch(snapshot, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), + totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener); + }, + listener::onFailure + )); + }); + } else { + listener.onResponse(targetLocalCheckpoint); } } From 321891a5fa0b5e4c4e8af12bc07337a494f24693 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 16 Jan 2019 00:09:44 -0500 Subject: [PATCH 4/7] fix comment --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 43a364e3b5bb4..438c7fce85834 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -531,7 +531,7 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final AtomicInteger totalSentOps = new AtomicInteger(); final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); - // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. + // Wrap translog snapshot to make it synchronized as it is accessed by different threads through sendBatch. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible. final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override From ebbb041d813c75c7f7ad52a200a8d911085eb184 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 16 Jan 2019 03:05:55 -0500 Subject: [PATCH 5/7] check for cancel before reading operation --- .../recovery/RecoverySourceHandler.java | 122 +++++++++--------- 1 file changed, 62 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 438c7fce85834..66c95fdcc64ac 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Tuple; @@ -522,10 +523,6 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, } logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); - final int expectedTotalOps = snapshot.totalOperations(); - if (expectedTotalOps == 0) { - logger.trace("no translog operations to send"); - } final AtomicInteger skippedOps = new AtomicInteger(); final AtomicInteger totalSentOps = new AtomicInteger(); @@ -533,50 +530,40 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, // Wrap translog snapshot to make it synchronized as it is accessed by different threads through sendBatch. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible. - final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { - @Override - public synchronized Translog.Operation next() throws IOException { - Translog.Operation op; - while ((op = snapshot.next()) != null) { - final long seqNo = op.seqNo(); - if (seqNo < startingSeqNo || seqNo > endingSeqNo) { - skippedOps.incrementAndGet(); - } else { - totalSentOps.incrementAndGet(); - requiredOpsTracker.markSeqNoAsCompleted(seqNo); - return op; - } + final Translog.Snapshot wrappedSnapshot = synchronizedSnapshot(snapshot); + final CheckedSupplier, IOException> readNextBatch = () -> { + final List operations = new ArrayList<>(); + long batchSizeInBytes = 0L; + Translog.Operation operation; + while ((operation = wrappedSnapshot.next()) != null) { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + cancellableThreads.checkForCancel(); + final long seqNo = operation.seqNo(); + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { + skippedOps.incrementAndGet(); + continue; + } + operations.add(operation); + batchSizeInBytes += operation.estimateSize(); + totalSentOps.incrementAndGet(); + requiredOpsTracker.markSeqNoAsCompleted(seqNo); + + // check if this request is past bytes threshold, and if so, send it off + if (batchSizeInBytes >= chunkSizeInBytes) { + break; } - return null; - } - - @Override - public synchronized void close() throws IOException { - snapshot.close(); - } - - @Override - public synchronized int totalOperations() { - return snapshot.totalOperations(); - } - - @Override - public synchronized int skippedOperations() { - return snapshot.skippedOperations(); - } - - @Override - public synchronized int overriddenOperations() { - return snapshot.overriddenOperations(); } + return operations; }; final StopWatch stopWatch = new StopWatch().start(); final ActionListener batchedListener = ActionListener.wrap( targetLocalCheckpoint -> { - assert expectedTotalOps == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() + assert wrappedSnapshot.totalOperations() == wrappedSnapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", - expectedTotalOps, snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); + wrappedSnapshot.totalOperations(), wrappedSnapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { throw new IllegalStateException("translog replay failed to cover required sequence numbers" + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" @@ -590,35 +577,21 @@ public synchronized int overriddenOperations() { listener::onFailure ); - sendBatch(wrappedSnapshot, true, SequenceNumbers.UNASSIGNED_SEQ_NO, expectedTotalOps, + sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, wrappedSnapshot.totalOperations(), maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener); } - private void sendBatch(Translog.Snapshot snapshot, boolean firstBatch, long targetLocalCheckpoint, int totalTranslogOps, - long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { - final List operations = new ArrayList<>(); - long size = 0; - Translog.Operation op; - while ((op = snapshot.next()) != null) { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - cancellableThreads.checkForCancel(); - operations.add(op); - size += op.estimateSize(); - // check if this request is past bytes threshold, and if so, send it off - if (size >= chunkSizeInBytes) { - break; - } - } + private void sendBatch(CheckedSupplier, IOException> nextBatch, boolean firstBatch, + long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { + final List operations = nextBatch.get(); // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint if (operations.isEmpty() == false || firstBatch) { - logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", operations, new ByteSizeValue(size), totalTranslogOps); cancellableThreads.execute(() -> { recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, ActionListener.wrap( newCheckpoint -> { - sendBatch(snapshot, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), + sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener); }, listener::onFailure @@ -629,6 +602,35 @@ private void sendBatch(Translog.Snapshot snapshot, boolean firstBatch, long targ } } + private Translog.Snapshot synchronizedSnapshot(Translog.Snapshot snapshot) { + return new Translog.Snapshot() { + @Override + public synchronized Translog.Operation next() throws IOException { + return snapshot.next(); + } + + @Override + public synchronized void close() throws IOException { + snapshot.close(); + } + + @Override + public synchronized int totalOperations() { + return snapshot.totalOperations(); + } + + @Override + public synchronized int skippedOperations() { + return snapshot.skippedOperations(); + } + + @Override + public synchronized int overriddenOperations() { + return snapshot.overriddenOperations(); + } + }; + } + void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener listener) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); From 3511664624885f40fd4b9b2ac56488f095f7acf8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 16 Jan 2019 12:23:25 -0500 Subject: [PATCH 6/7] synchronize Snaphot#next only --- .../recovery/RecoverySourceHandler.java | 86 +++++++------------ 1 file changed, 29 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 66c95fdcc64ac..df4cc61e8af7f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -528,42 +528,43 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final AtomicInteger totalSentOps = new AtomicInteger(); final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); - // Wrap translog snapshot to make it synchronized as it is accessed by different threads through sendBatch. - // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible. - final Translog.Snapshot wrappedSnapshot = synchronizedSnapshot(snapshot); final CheckedSupplier, IOException> readNextBatch = () -> { - final List operations = new ArrayList<>(); - long batchSizeInBytes = 0L; - Translog.Operation operation; - while ((operation = wrappedSnapshot.next()) != null) { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - cancellableThreads.checkForCancel(); - final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo || seqNo > endingSeqNo) { - skippedOps.incrementAndGet(); - continue; - } - operations.add(operation); - batchSizeInBytes += operation.estimateSize(); - totalSentOps.incrementAndGet(); - requiredOpsTracker.markSeqNoAsCompleted(seqNo); - - // check if this request is past bytes threshold, and if so, send it off - if (batchSizeInBytes >= chunkSizeInBytes) { - break; + // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch. + // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible. + synchronized (snapshot) { + final List operations = new ArrayList<>(); + long batchSizeInBytes = 0L; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + cancellableThreads.checkForCancel(); + final long seqNo = operation.seqNo(); + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { + skippedOps.incrementAndGet(); + continue; + } + operations.add(operation); + batchSizeInBytes += operation.estimateSize(); + totalSentOps.incrementAndGet(); + requiredOpsTracker.markSeqNoAsCompleted(seqNo); + + // check if this request is past bytes threshold, and if so, send it off + if (batchSizeInBytes >= chunkSizeInBytes) { + break; + } } + return operations; } - return operations; }; final StopWatch stopWatch = new StopWatch().start(); final ActionListener batchedListener = ActionListener.wrap( targetLocalCheckpoint -> { - assert wrappedSnapshot.totalOperations() == wrappedSnapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() + assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", - wrappedSnapshot.totalOperations(), wrappedSnapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); + snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { throw new IllegalStateException("translog replay failed to cover required sequence numbers" + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" @@ -577,7 +578,7 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, listener::onFailure ); - sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, wrappedSnapshot.totalOperations(), + sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(), maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener); } @@ -602,35 +603,6 @@ private void sendBatch(CheckedSupplier, IOException> ne } } - private Translog.Snapshot synchronizedSnapshot(Translog.Snapshot snapshot) { - return new Translog.Snapshot() { - @Override - public synchronized Translog.Operation next() throws IOException { - return snapshot.next(); - } - - @Override - public synchronized void close() throws IOException { - snapshot.close(); - } - - @Override - public synchronized int totalOperations() { - return snapshot.totalOperations(); - } - - @Override - public synchronized int skippedOperations() { - return snapshot.skippedOperations(); - } - - @Override - public synchronized int overriddenOperations() { - return snapshot.overriddenOperations(); - } - }; - } - void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener listener) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); From 1a7e4022571226086f0d250347a4ec4371e6d9e3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 16 Jan 2019 12:25:11 -0500 Subject: [PATCH 7/7] use last batch count to estimate the next batch --- .../indices/recovery/RecoverySourceHandler.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index df4cc61e8af7f..34434f50b456f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -527,12 +527,12 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final AtomicInteger skippedOps = new AtomicInteger(); final AtomicInteger totalSentOps = new AtomicInteger(); final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); - + final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch. final CheckedSupplier, IOException> readNextBatch = () -> { // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch. // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible. synchronized (snapshot) { - final List operations = new ArrayList<>(); + final List ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>(); long batchSizeInBytes = 0L; Translog.Operation operation; while ((operation = snapshot.next()) != null) { @@ -545,7 +545,7 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, skippedOps.incrementAndGet(); continue; } - operations.add(operation); + ops.add(operation); batchSizeInBytes += operation.estimateSize(); totalSentOps.incrementAndGet(); requiredOpsTracker.markSeqNoAsCompleted(seqNo); @@ -555,7 +555,8 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, break; } } - return operations; + lastBatchCount.set(ops.size()); + return ops; } };