Skip to content

Commit cb90491

Browse files
dnhatnywelsch
andcommitted
Do not wait for advancement of checkpoint in recovery (#39006)
With this change, we won't wait for the local checkpoint to advance to the max_seq_no before starting phase2 of peer-recovery. We also remove the sequence number range check in peer-recovery. We can safely do these thanks to Yannick's finding. The replication group to be used is currently sampled after indexing into the primary (see `ReplicationOperation` class). This means that when initiating tracking of a new replica, we have to consider the following two cases: - There are operations for which the replication group has not been sampled yet. As we initiated the new replica as tracking, we know that those operations will be replicated to the new replica and follow the typical replication group semantics (e.g. marked as stale when unavailable). - There are operations for which the replication group has already been sampled. These operations will not be sent to the new replica. However, we know that those operations are already indexed into Lucene and the translog on the primary, as the sampling is happening after that. This means that by taking a snapshot of Lucene or the translog, we will be getting those ops as well. What we cannot guarantee anymore is that all ops up to `endingSeqNo` are available in the snapshot (i.e. also see comment in `RecoverySourceHandler` saying `We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all operations in the required range will be available for replaying from the translog of the source.`). This is not needed, though, as we can no longer guarantee that max seq no == local checkpoint. Relates #39000 Closes #38949 Co-authored-by: Yannick Welsch <[email protected]>
1 parent 10a8572 commit cb90491

File tree

9 files changed

+39
-100
lines changed

9 files changed

+39
-100
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -808,14 +808,6 @@ public final CommitStats commitStats() {
808808
*/
809809
public abstract long getLocalCheckpoint();
810810

811-
/**
812-
* Waits for all operations up to the provided sequence number to complete.
813-
*
814-
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
815-
* @throws InterruptedException if the thread was interrupted while blocking on the condition
816-
*/
817-
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;
818-
819811
/**
820812
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
821813
*/

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2510,11 +2510,6 @@ public long getLocalCheckpoint() {
25102510
return localCheckpointTracker.getCheckpoint();
25112511
}
25122512

2513-
@Override
2514-
public void waitForOpsToComplete(long seqNo) throws InterruptedException {
2515-
localCheckpointTracker.waitForOpsToComplete(seqNo);
2516-
}
2517-
25182513
/**
25192514
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
25202515
*/

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,6 @@ public long getLocalCheckpoint() {
321321
return seqNoStats.getLocalCheckpoint();
322322
}
323323

324-
@Override
325-
public void waitForOpsToComplete(long seqNo) {
326-
}
327-
328324
@Override
329325
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
330326
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2077,16 +2077,6 @@ public void syncRetentionLeases() {
20772077
}
20782078
}
20792079

2080-
/**
2081-
* Waits for all operations up to the provided sequence number to complete.
2082-
*
2083-
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
2084-
* @throws InterruptedException if the thread was interrupted while blocking on the condition
2085-
*/
2086-
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
2087-
getEngine().waitForOpsToComplete(seqNo);
2088-
}
2089-
20902080
/**
20912081
* Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures
20922082
* have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group.

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,12 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
158158
final Closeable retentionLock = shard.acquireRetentionLock();
159159
resources.add(retentionLock);
160160
final long startingSeqNo;
161-
final long requiredSeqNoRangeStart;
162161
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
163162
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
164163
final SendFileResult sendFileResult;
165164
if (isSequenceNumberBasedRecovery) {
166165
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
167166
startingSeqNo = request.startingSeqNo();
168-
requiredSeqNoRangeStart = startingSeqNo;
169167
sendFileResult = SendFileResult.EMPTY;
170168
} else {
171169
final Engine.IndexCommitRef phase1Snapshot;
@@ -174,13 +172,15 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
174172
} catch (final Exception e) {
175173
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
176174
}
177-
// We must have everything above the local checkpoint in the commit
178-
requiredSeqNoRangeStart =
179-
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
180175
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
181176
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
182177
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
183-
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
178+
if (shard.indexSettings().isSoftDeleteEnabled()) {
179+
startingSeqNo = Long.parseLong(
180+
phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
181+
} else {
182+
startingSeqNo = 0;
183+
}
184184
try {
185185
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
186186
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
@@ -195,8 +195,6 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
195195
}
196196
}
197197
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
198-
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
199-
+ startingSeqNo + "]";
200198

201199
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
202200
// For a sequence based recovery, the target can keep its local translog
@@ -214,13 +212,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
214212
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
215213

216214
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
217-
/*
218-
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
219-
* operations in the required range will be available for replaying from the translog of the source.
220-
*/
221-
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
222215
if (logger.isTraceEnabled()) {
223-
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
224216
logger.trace("snapshot translog for recovery; current size is [{}]",
225217
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
226218
}
@@ -233,15 +225,8 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
233225
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
234226
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
235227
final RetentionLeases retentionLeases = shard.getRetentionLeases();
236-
phase2(
237-
startingSeqNo,
238-
requiredSeqNoRangeStart,
239-
endingSeqNo,
240-
phase2Snapshot,
241-
maxSeenAutoIdTimestamp,
242-
maxSeqNoOfUpdatesOrDeletes,
243-
retentionLeases,
244-
sendSnapshotStep);
228+
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
229+
retentionLeases, sendSnapshotStep);
245230
sendSnapshotStep.whenComplete(
246231
r -> IOUtils.close(phase2Snapshot),
247232
e -> {
@@ -519,7 +504,6 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
519504
*
520505
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
521506
* ops should be sent
522-
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
523507
* @param endingSeqNo the highest sequence number that should be sent
524508
* @param snapshot a snapshot of the translog
525509
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
@@ -528,26 +512,19 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
528512
*/
529513
void phase2(
530514
final long startingSeqNo,
531-
final long requiredSeqNoRangeStart,
532515
final long endingSeqNo,
533516
final Translog.Snapshot snapshot,
534517
final long maxSeenAutoIdTimestamp,
535518
final long maxSeqNoOfUpdatesOrDeletes,
536519
final RetentionLeases retentionLeases,
537520
final ActionListener<SendSnapshotResult> listener) throws IOException {
538-
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
539-
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
540-
assert startingSeqNo <= requiredSeqNoRangeStart :
541-
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
542521
if (shard.state() == IndexShardState.CLOSED) {
543522
throw new IndexShardClosedException(request.shardId());
544523
}
545-
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
546-
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
524+
logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
547525

548526
final AtomicInteger skippedOps = new AtomicInteger();
549527
final AtomicInteger totalSentOps = new AtomicInteger();
550-
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
551528
final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
552529
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
553530
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
@@ -569,7 +546,6 @@ void phase2(
569546
ops.add(operation);
570547
batchSizeInBytes += operation.estimateSize();
571548
totalSentOps.incrementAndGet();
572-
requiredOpsTracker.markSeqNoAsCompleted(seqNo);
573549

574550
// check if this request is past bytes threshold, and if so, send it off
575551
if (batchSizeInBytes >= chunkSizeInBytes) {
@@ -587,11 +563,6 @@ void phase2(
587563
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
588564
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
589565
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
590-
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
591-
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
592-
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
593-
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
594-
}
595566
stopWatch.stop();
596567
final TimeValue tookTime = stopWatch.totalTime();
597568
logger.trace("recovery [phase2]: took [{}]", tookTime);

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@
9797
import java.util.concurrent.atomic.AtomicReference;
9898
import java.util.function.IntSupplier;
9999
import java.util.function.Supplier;
100-
import java.util.stream.Collectors;
101100
import java.util.zip.CRC32;
102101

103102
import static java.util.Collections.emptyMap;
@@ -231,8 +230,7 @@ public void testSendSnapshotSendsOps() throws IOException {
231230
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
232231
}
233232
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
234-
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
235-
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
233+
final long endingSeqNo = randomLongBetween(startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
236234

237235
final List<Translog.Operation> shippedOps = new ArrayList<>();
238236
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
@@ -247,7 +245,7 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
247245
};
248246
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
249247
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
250-
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
248+
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
251249
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
252250
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
253251
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
@@ -258,18 +256,6 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
258256
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
259257
}
260258
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
261-
if (endingSeqNo >= requiredStartingSeqNo + 1) {
262-
// check that missing ops blows up
263-
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
264-
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
265-
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
266-
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> failedFuture = new PlainActionFuture<>();
267-
expectThrows(IllegalStateException.class, () -> {
268-
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip),
269-
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture);
270-
failedFuture.actionGet();
271-
});
272-
}
273259
}
274260

275261
public void testSendSnapshotStopOnError() throws Exception {
@@ -299,7 +285,7 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
299285
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
300286
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
301287
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
302-
handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
288+
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
303289
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
304290
if (wasFailed.get()) {
305291
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
@@ -498,11 +484,11 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
498484
}
499485

500486
@Override
501-
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
487+
void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot,
502488
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases,
503489
ActionListener<SendSnapshotResult> listener) throws IOException {
504490
phase2Called.set(true);
505-
super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
491+
super.phase2(startingSeqNo, endingSeqNo, snapshot,
506492
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener);
507493
}
508494

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,16 @@ public static Translog getTranslog(Engine engine) {
11211121
return internalEngine.getTranslog();
11221122
}
11231123

1124+
/**
1125+
* Waits for all operations up to the provided sequence number to complete in the given internal engine.
1126+
*
1127+
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
1128+
* @throws InterruptedException if the thread was interrupted while blocking on the condition
1129+
*/
1130+
public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throws InterruptedException {
1131+
engine.getLocalCheckpointTracker().waitForOpsToComplete(seqNo);
1132+
}
1133+
11241134
public static boolean hasSnapshottedCommits(Engine engine) {
11251135
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
11261136
InternalEngine internalEngine = (InternalEngine) engine;

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ public void testFollowIndex() throws Exception {
220220
}
221221
}
222222

223-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949")
224223
public void testFollowIndexWithConcurrentMappingChanges() throws Exception {
225224
final int numberOfPrimaryShards = randomIntBetween(1, 3);
226225
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),

0 commit comments

Comments
 (0)