@@ -148,23 +148,26 @@ public RecoveryResponse recoverToTarget() throws IOException {
148148 final Translog translog = shard .getTranslog ();
149149
150150 final long startingSeqNo ;
151+ final long requiredSeqNoRangeStart ;
151152 final boolean isSequenceNumberBasedRecoveryPossible = request .startingSeqNo () != SequenceNumbers .UNASSIGNED_SEQ_NO &&
152153 isTargetSameHistory () && isTranslogReadyForSequenceNumberBasedRecovery ();
153-
154154 if (isSequenceNumberBasedRecoveryPossible ) {
155155 logger .trace ("performing sequence numbers based recovery. starting at [{}]" , request .startingSeqNo ());
156156 startingSeqNo = request .startingSeqNo ();
157+ requiredSeqNoRangeStart = startingSeqNo ;
157158 } else {
158159 final Engine .IndexCommitRef phase1Snapshot ;
159160 try {
160161 phase1Snapshot = shard .acquireIndexCommit (false );
161162 } catch (final Exception e ) {
162163 throw new RecoveryEngineException (shard .shardId (), 1 , "snapshot failed" , e );
163164 }
164- // we set this to unassigned to create a translog roughly according to the retention policy
165- // on the target
166- startingSeqNo = SequenceNumbers .UNASSIGNED_SEQ_NO ;
167-
165+ // we set this to 0 to create a translog roughly according to the retention policy
166+ // on the target. Note that it will still filter out legacy operations with no sequence numbers
167+ startingSeqNo = 0 ;
168+ // but we must have everything above the local checkpoint in the commit
169+ requiredSeqNoRangeStart =
170+ Long .parseLong (phase1Snapshot .getIndexCommit ().getUserData ().get (SequenceNumbers .LOCAL_CHECKPOINT_KEY )) + 1 ;
168171 try {
169172 phase1 (phase1Snapshot .getIndexCommit (), translog ::totalOperations );
170173 } catch (final Exception e ) {
@@ -177,6 +180,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
177180 }
178181 }
179182 }
183+ assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo ;
184+ assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
185+ + startingSeqNo + "]" ;
180186
181187 runUnderPrimaryPermit (() -> shard .initiateTracking (request .targetAllocationId ()));
182188
@@ -186,10 +192,19 @@ public RecoveryResponse recoverToTarget() throws IOException {
186192 throw new RecoveryEngineException (shard .shardId (), 1 , "prepare target for translog failed" , e );
187193 }
188194
195+ final long endingSeqNo = shard .seqNoStats ().getMaxSeqNo ();
196+ /*
197+ * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
198+ * operations in the required range will be available for replaying from the translog of the source.
199+ */
200+ cancellableThreads .execute (() -> shard .waitForOpsToComplete (endingSeqNo ));
201+
202+ logger .trace ("all operations up to [{}] completed, which will be used as an ending sequence number" , endingSeqNo );
203+
189204 logger .trace ("snapshot translog for recovery; current size is [{}]" , translog .estimateTotalOperationsFromMinSeq (startingSeqNo ));
190205 final long targetLocalCheckpoint ;
191206 try (Translog .Snapshot snapshot = translog .newSnapshotFromMinSeqNo (startingSeqNo )) {
192- targetLocalCheckpoint = phase2 (startingSeqNo , snapshot );
207+ targetLocalCheckpoint = phase2 (startingSeqNo , requiredSeqNoRangeStart , endingSeqNo , snapshot );
193208 } catch (Exception e ) {
194209 throw new RecoveryEngineException (shard .shardId (), 2 , "phase2 failed" , e );
195210 }
@@ -223,26 +238,19 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
223238
224239 /**
225240 * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
226- * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source.
241+ * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain
242+ * all ops above the source local checkpoint, so we can stop check there.
227243 *
228244 * @return {@code true} if the source is ready for a sequence-number-based recovery
229245 * @throws IOException if an I/O exception occurred reading the translog snapshot
230246 */
231247 boolean isTranslogReadyForSequenceNumberBasedRecovery () throws IOException {
232248 final long startingSeqNo = request .startingSeqNo ();
233249 assert startingSeqNo >= 0 ;
234- final long endingSeqNo = shard .seqNoStats (). getMaxSeqNo ();
235- logger .trace ("testing sequence numbers in range: [{}, {}]" , startingSeqNo , endingSeqNo );
250+ final long localCheckpoint = shard .getLocalCheckpoint ();
251+ logger .trace ("testing sequence numbers in range: [{}, {}]" , startingSeqNo , localCheckpoint );
236252 // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
237- if (startingSeqNo - 1 <= endingSeqNo ) {
238- /*
239- * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
240- * operations in the required range will be available for replaying from the translog of the source.
241- */
242- cancellableThreads .execute (() -> shard .waitForOpsToComplete (endingSeqNo ));
243-
244- logger .trace ("all operations up to [{}] completed, checking translog content" , endingSeqNo );
245-
253+ if (startingSeqNo - 1 <= localCheckpoint ) {
246254 final LocalCheckpointTracker tracker = new LocalCheckpointTracker (startingSeqNo , startingSeqNo - 1 );
247255 try (Translog .Snapshot snapshot = shard .getTranslog ().newSnapshotFromMinSeqNo (startingSeqNo )) {
248256 Translog .Operation operation ;
@@ -252,7 +260,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
252260 }
253261 }
254262 }
255- return tracker .getCheckpoint () >= endingSeqNo ;
263+ return tracker .getCheckpoint () >= localCheckpoint ;
256264 } else {
257265 return false ;
258266 }
@@ -432,24 +440,27 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
432440 * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
433441 * shard.
434442 *
435- * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
436- * ops should be sent
437- * @param snapshot a snapshot of the translog
438- *
443+ * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
444+ * ops should be sent
445+ * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
446+ * @param endingSeqNo the highest sequence number that should be sent
447+ * @param snapshot a snapshot of the translog
439448 * @return the local checkpoint on the target
440449 */
441- long phase2 (final long startingSeqNo , final Translog .Snapshot snapshot ) throws IOException {
450+ long phase2 (final long startingSeqNo , long requiredSeqNoRangeStart , long endingSeqNo , final Translog .Snapshot snapshot )
451+ throws IOException {
442452 if (shard .state () == IndexShardState .CLOSED ) {
443453 throw new IndexShardClosedException (request .shardId ());
444454 }
445455 cancellableThreads .checkForCancel ();
446456
447457 final StopWatch stopWatch = new StopWatch ().start ();
448458
449- logger .trace ("recovery [phase2]: sending transaction log operations" );
459+ logger .trace ("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
460+ "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]" );
450461
451462 // send all the snapshot's translog operations to the target
452- final SendSnapshotResult result = sendSnapshot (startingSeqNo , snapshot );
463+ final SendSnapshotResult result = sendSnapshot (startingSeqNo , requiredSeqNoRangeStart , endingSeqNo , snapshot );
453464
454465 stopWatch .stop ();
455466 logger .trace ("recovery [phase2]: took [{}]" , stopWatch .totalTime ());
@@ -510,18 +521,26 @@ static class SendSnapshotResult {
510521 * <p>
511522 * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
512523 *
513- * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
514- * @param snapshot the translog snapshot to replay operations from
515- * @return the local checkpoint on the target and the total number of operations sent
524+ * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
525+ * @param requiredSeqNoRangeStart the lower sequence number of the required range
526+ * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
527+ * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
528+ * total number of operations sent
516529 * @throws IOException if an I/O exception occurred reading the translog snapshot
517530 */
518- protected SendSnapshotResult sendSnapshot (final long startingSeqNo , final Translog .Snapshot snapshot ) throws IOException {
531+ protected SendSnapshotResult sendSnapshot (final long startingSeqNo , long requiredSeqNoRangeStart , long endingSeqNo ,
532+ final Translog .Snapshot snapshot ) throws IOException {
533+ assert requiredSeqNoRangeStart <= endingSeqNo + 1 :
534+ "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo ;
535+ assert startingSeqNo <= requiredSeqNoRangeStart :
536+ "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart ;
519537 int ops = 0 ;
520538 long size = 0 ;
521539 int skippedOps = 0 ;
522540 int totalSentOps = 0 ;
523541 final AtomicLong targetLocalCheckpoint = new AtomicLong (SequenceNumbers .UNASSIGNED_SEQ_NO );
524542 final List <Translog .Operation > operations = new ArrayList <>();
543+ final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker (endingSeqNo , requiredSeqNoRangeStart - 1 );
525544
526545 final int expectedTotalOps = snapshot .totalOperations ();
527546 if (expectedTotalOps == 0 ) {
@@ -538,19 +557,17 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
538557 throw new IndexShardClosedException (request .shardId ());
539558 }
540559 cancellableThreads .checkForCancel ();
541- /*
542- * If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
543- * any ops before the starting sequence number.
544- */
560+
545561 final long seqNo = operation .seqNo ();
546- if (startingSeqNo >= 0 && ( seqNo == SequenceNumbers . UNASSIGNED_SEQ_NO || seqNo < startingSeqNo ) ) {
562+ if (seqNo < startingSeqNo || seqNo > endingSeqNo ) {
547563 skippedOps ++;
548564 continue ;
549565 }
550566 operations .add (operation );
551567 ops ++;
552568 size += operation .estimateSize ();
553569 totalSentOps ++;
570+ requiredOpsTracker .markSeqNoAsCompleted (seqNo );
554571
555572 // check if this request is past bytes threshold, and if so, send it off
556573 if (size >= chunkSizeInBytes ) {
@@ -567,6 +584,12 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
567584 cancellableThreads .executeIO (sendBatch );
568585 }
569586
587+ if (requiredOpsTracker .getCheckpoint () < endingSeqNo ) {
588+ throw new IllegalStateException ("translog replay failed to cover required sequence numbers" +
589+ " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
590+ + (requiredOpsTracker .getCheckpoint () + 1 ) + "]" );
591+ }
592+
570593 assert expectedTotalOps == skippedOps + totalSentOps
571594 : "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]" ;
572595
0 commit comments