@@ -149,23 +149,26 @@ public RecoveryResponse recoverToTarget() throws IOException {
149149 final Translog translog = shard .getTranslog ();
150150
151151 final long startingSeqNo ;
152+ final long requiredSeqNoRangeStart ;
152153 final boolean isSequenceNumberBasedRecoveryPossible = request .startingSeqNo () != SequenceNumbers .UNASSIGNED_SEQ_NO &&
153154 isTargetSameHistory () && isTranslogReadyForSequenceNumberBasedRecovery ();
154-
155155 if (isSequenceNumberBasedRecoveryPossible ) {
156156 logger .trace ("performing sequence numbers based recovery. starting at [{}]" , request .startingSeqNo ());
157157 startingSeqNo = request .startingSeqNo ();
158+ requiredSeqNoRangeStart = startingSeqNo ;
158159 } else {
159160 final Engine .IndexCommitRef phase1Snapshot ;
160161 try {
161162 phase1Snapshot = shard .acquireIndexCommit (false );
162163 } catch (final Exception e ) {
163164 throw new RecoveryEngineException (shard .shardId (), 1 , "snapshot failed" , e );
164165 }
165- // we set this to unassigned to create a translog roughly according to the retention policy
166- // on the target
167- startingSeqNo = SequenceNumbers .UNASSIGNED_SEQ_NO ;
168-
166+ // we set this to 0 to create a translog roughly according to the retention policy
167+ // on the target. Note that it will still filter out legacy operations with no sequence numbers
168+ startingSeqNo = 0 ;
169+ // but we must have everything above the local checkpoint in the commit
170+ requiredSeqNoRangeStart =
171+ Long .parseLong (phase1Snapshot .getIndexCommit ().getUserData ().get (SequenceNumbers .LOCAL_CHECKPOINT_KEY )) + 1 ;
169172 try {
170173 phase1 (phase1Snapshot .getIndexCommit (), translog ::totalOperations );
171174 } catch (final Exception e ) {
@@ -178,6 +181,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
178181 }
179182 }
180183 }
184+ assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo ;
185+ assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
186+ + startingSeqNo + "]" ;
181187
182188 runUnderPrimaryPermit (() -> shard .initiateTracking (request .targetAllocationId ()));
183189
@@ -187,10 +193,19 @@ public RecoveryResponse recoverToTarget() throws IOException {
187193 throw new RecoveryEngineException (shard .shardId (), 1 , "prepare target for translog failed" , e );
188194 }
189195
196+ final long endingSeqNo = shard .seqNoStats ().getMaxSeqNo ();
197+ /*
198+ * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
199+ * operations in the required range will be available for replaying from the translog of the source.
200+ */
201+ cancellableThreads .execute (() -> shard .waitForOpsToComplete (endingSeqNo ));
202+
203+ logger .trace ("all operations up to [{}] completed, which will be used as an ending sequence number" , endingSeqNo );
204+
190205 logger .trace ("snapshot translog for recovery; current size is [{}]" , translog .estimateTotalOperationsFromMinSeq (startingSeqNo ));
191206 final long targetLocalCheckpoint ;
192207 try (Translog .Snapshot snapshot = translog .newSnapshotFromMinSeqNo (startingSeqNo )) {
193- targetLocalCheckpoint = phase2 (startingSeqNo , snapshot );
208+ targetLocalCheckpoint = phase2 (startingSeqNo , requiredSeqNoRangeStart , endingSeqNo , snapshot );
194209 } catch (Exception e ) {
195210 throw new RecoveryEngineException (shard .shardId (), 2 , "phase2 failed" , e );
196211 }
@@ -224,26 +239,19 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
224239
225240 /**
226241 * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
227- * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
242+ * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain
243+ * all ops above the source local checkpoint, so we can stop check there.
228244 *
229245 * @return {@code true} if the source is ready for a sequence-number-based recovery
230246 * @throws IOException if an I/O exception occurred reading the translog snapshot
231247 */
232248 boolean isTranslogReadyForSequenceNumberBasedRecovery () throws IOException {
233249 final long startingSeqNo = request .startingSeqNo ();
234250 assert startingSeqNo >= 0 ;
235- final long endingSeqNo = shard .seqNoStats (). getMaxSeqNo ();
236- logger .trace ("testing sequence numbers in range: [{}, {}]" , startingSeqNo , endingSeqNo );
251+ final long localCheckpoint = shard .getLocalCheckpoint ();
252+ logger .trace ("testing sequence numbers in range: [{}, {}]" , startingSeqNo , localCheckpoint );
237253 // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
238- if (startingSeqNo - 1 <= endingSeqNo ) {
239- /*
240- * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
241- * operations in the required range will be available for replaying from the translog of the source.
242- */
243- cancellableThreads .execute (() -> shard .waitForOpsToComplete (endingSeqNo ));
244-
245- logger .trace ("all operations up to [{}] completed, checking translog content" , endingSeqNo );
246-
254+ if (startingSeqNo - 1 <= localCheckpoint ) {
247255 final LocalCheckpointTracker tracker = new LocalCheckpointTracker (startingSeqNo , startingSeqNo - 1 );
248256 try (Translog .Snapshot snapshot = shard .getTranslog ().newSnapshotFromMinSeqNo (startingSeqNo )) {
249257 Translog .Operation operation ;
@@ -253,7 +261,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
253261 }
254262 }
255263 }
256- return tracker .getCheckpoint () >= endingSeqNo ;
264+ return tracker .getCheckpoint () >= localCheckpoint ;
257265 } else {
258266 return false ;
259267 }
@@ -433,24 +441,27 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
433441 * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
434442 * shard.
435443 *
436- * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
437- * ops should be sent
438- * @param snapshot a snapshot of the translog
439- *
444+ * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
445+ * ops should be sent
446+ * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
447+ * @param endingSeqNo the highest sequence number that should be sent
448+ * @param snapshot a snapshot of the translog
440449 * @return the local checkpoint on the target
441450 */
442- long phase2 (final long startingSeqNo , final Translog .Snapshot snapshot ) throws IOException {
451+ long phase2 (final long startingSeqNo , long requiredSeqNoRangeStart , long endingSeqNo , final Translog .Snapshot snapshot )
452+ throws IOException {
443453 if (shard .state () == IndexShardState .CLOSED ) {
444454 throw new IndexShardClosedException (request .shardId ());
445455 }
446456 cancellableThreads .checkForCancel ();
447457
448458 final StopWatch stopWatch = new StopWatch ().start ();
449459
450- logger .trace ("recovery [phase2]: sending transaction log operations" );
460+ logger .trace ("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
461+ "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]" );
451462
452463 // send all the snapshot's translog operations to the target
453- final SendSnapshotResult result = sendSnapshot (startingSeqNo , snapshot );
464+ final SendSnapshotResult result = sendSnapshot (startingSeqNo , requiredSeqNoRangeStart , endingSeqNo , snapshot );
454465
455466 stopWatch .stop ();
456467 logger .trace ("recovery [phase2]: took [{}]" , stopWatch .totalTime ());
@@ -511,18 +522,26 @@ static class SendSnapshotResult {
511522 * <p>
512523 * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
513524 *
514- * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
515- * @param snapshot the translog snapshot to replay operations from
516- * @return the local checkpoint on the target and the total number of operations sent
525+ * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
526+ * @param requiredSeqNoRangeStart the lower sequence number of the required range
527+ * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
528+ * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
529+ * total number of operations sent
517530 * @throws IOException if an I/O exception occurred reading the translog snapshot
518531 */
519- protected SendSnapshotResult sendSnapshot (final long startingSeqNo , final Translog .Snapshot snapshot ) throws IOException {
532+ protected SendSnapshotResult sendSnapshot (final long startingSeqNo , long requiredSeqNoRangeStart , long endingSeqNo ,
533+ final Translog .Snapshot snapshot ) throws IOException {
534+ assert requiredSeqNoRangeStart <= endingSeqNo + 1 :
535+ "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo ;
536+ assert startingSeqNo <= requiredSeqNoRangeStart :
537+ "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart ;
520538 int ops = 0 ;
521539 long size = 0 ;
522540 int skippedOps = 0 ;
523541 int totalSentOps = 0 ;
524542 final AtomicLong targetLocalCheckpoint = new AtomicLong (SequenceNumbers .UNASSIGNED_SEQ_NO );
525543 final List <Translog .Operation > operations = new ArrayList <>();
544+ final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker (endingSeqNo , requiredSeqNoRangeStart - 1 );
526545
527546 final int expectedTotalOps = snapshot .totalOperations ();
528547 if (expectedTotalOps == 0 ) {
@@ -539,19 +558,17 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
539558 throw new IndexShardClosedException (request .shardId ());
540559 }
541560 cancellableThreads .checkForCancel ();
542- /*
543- * If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
544- * any ops before the starting sequence number.
545- */
561+
546562 final long seqNo = operation .seqNo ();
547- if (startingSeqNo >= 0 && ( seqNo == SequenceNumbers . UNASSIGNED_SEQ_NO || seqNo < startingSeqNo ) ) {
563+ if (seqNo < startingSeqNo || seqNo > endingSeqNo ) {
548564 skippedOps ++;
549565 continue ;
550566 }
551567 operations .add (operation );
552568 ops ++;
553569 size += operation .estimateSize ();
554570 totalSentOps ++;
571+ requiredOpsTracker .markSeqNoAsCompleted (seqNo );
555572
556573 // check if this request is past bytes threshold, and if so, send it off
557574 if (size >= chunkSizeInBytes ) {
@@ -569,8 +586,14 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
569586 }
570587
571588 assert expectedTotalOps == snapshot .overriddenOperations () + skippedOps + totalSentOps
572- : String .format (Locale .ROOT , "expected total [%d], overridden [%d], skipped [%d], total sent [%d]" ,
573- expectedTotalOps , snapshot .overriddenOperations (), skippedOps , totalSentOps );
589+ : String .format (Locale .ROOT , "expected total [%d], overridden [%d], skipped [%d], total sent [%d]" ,
590+ expectedTotalOps , snapshot .overriddenOperations (), skippedOps , totalSentOps );
591+
592+ if (requiredOpsTracker .getCheckpoint () < endingSeqNo ) {
593+ throw new IllegalStateException ("translog replay failed to cover required sequence numbers" +
594+ " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
595+ + (requiredOpsTracker .getCheckpoint () + 1 ) + "]" );
596+ }
574597
575598 logger .trace ("sent final batch of [{}][{}] (total: [{}]) translog operations" , ops , new ByteSizeValue (size ), expectedTotalOps );
576599
0 commit comments