5757import java .io .OutputStream ;
5858import java .util .ArrayList ;
5959import java .util .List ;
60- import java .util .Locale ;
6160import java .util .function .Function ;
6261import java .util .function .Supplier ;
6362import java .util .stream .StreamSupport ;
@@ -131,64 +130,35 @@ public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recov
131130 public RecoveryResponse recoverToTarget () throws IOException {
132131 try (final Translog .View translogView = shard .acquireTranslogView ()) {
133132 logger .trace ("{} captured translog id [{}] for recovery" , shard .shardId (), translogView .minTranslogGeneration ());
134- final IndexCommit phase1Snapshot ;
135- try {
136- phase1Snapshot = shard .acquireIndexCommit (false );
137- } catch (final Exception e ) {
138- IOUtils .closeWhileHandlingException (translogView );
139- throw new RecoveryEngineException (shard .shardId (), 1 , "snapshot failed" , e );
140- }
141133
142- final long startingSeqNo ;
143- final long endingSeqNo ;
144- if ( request . startingSeqNo () == SequenceNumbersService . UNASSIGNED_SEQ_NO ) {
145- startingSeqNo = SequenceNumbersService . UNASSIGNED_SEQ_NO ;
146- endingSeqNo = SequenceNumbersService . UNASSIGNED_SEQ_NO ;
134+ boolean isSequenceNumberBasedRecoveryPossible = request . startingSeqNo () != SequenceNumbersService . UNASSIGNED_SEQ_NO &&
135+ isTranslogReadyForSequenceNumberBasedRecovery ( translogView ) ;
136+
137+ if (! isSequenceNumberBasedRecoveryPossible ) {
138+ final IndexCommit phase1Snapshot ;
147139 try {
148- try {
149- phase1 (phase1Snapshot , translogView );
150- } catch (final Exception e ) {
151- throw new RecoveryEngineException (shard .shardId (), 1 , "phase1 failed" , e );
152- }
153- try {
154- prepareTargetForTranslog (translogView .totalOperations ());
155- } catch (final Exception e ) {
156- throw new RecoveryEngineException (shard .shardId (), 1 , "prepare target for translog failed" , e );
157- }
140+ phase1Snapshot = shard .acquireIndexCommit (false );
141+ } catch (final Exception e ) {
142+ IOUtils .closeWhileHandlingException (translogView );
143+ throw new RecoveryEngineException (shard .shardId (), 1 , "snapshot failed" , e );
144+ }
145+ try {
146+ phase1 (phase1Snapshot , translogView );
147+ } catch (final Exception e ) {
148+ throw new RecoveryEngineException (shard .shardId (), 1 , "phase1 failed" , e );
158149 } finally {
159150 try {
160151 shard .releaseIndexCommit (phase1Snapshot );
161152 } catch (final IOException ex ) {
162153 logger .warn ("releasing snapshot caused exception" , ex );
163154 }
164155 }
165- } else {
166- startingSeqNo = request .startingSeqNo ();
167- endingSeqNo = shard .seqNoStats ().getMaxSeqNo ();
168- if (endingSeqNo < startingSeqNo ) {
169- final String message = String .format (
170- Locale .ROOT ,
171- "requested starting operation [%d] higher than source operation maximum [%d]" ,
172- startingSeqNo ,
173- endingSeqNo );
174- logger .debug ("{} {}" , shard .shardId (), message );
175- final ElasticsearchException ex = new ElasticsearchException (message );
176- ex .addHeader (SEQUENCE_NUMBER_BASED_RECOVERY_FAILED , message );
177- throw ex ;
178- }
179- try {
180- prepareTargetForTranslog (translogView .totalOperations ());
181- } catch (final Exception e ) {
182- throw new RecoveryEngineException (shard .shardId (), 1 , "prepare target for translog failed" , e );
183- }
184- // we need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all operations
185- // in the required range will be available for replaying from the translog of the source
186- logger .trace (
187- "{} waiting for all operations in the range [{}, {}] to complete" ,
188- shard .shardId (),
189- startingSeqNo ,
190- endingSeqNo );
191- cancellableThreads .execute (() -> shard .waitForOpsToComplete (endingSeqNo ));
156+ }
157+
158+ try {
159+ prepareTargetForTranslog (translogView .totalOperations ());
160+ } catch (final Exception e ) {
161+ throw new RecoveryEngineException (shard .shardId (), 1 , "prepare target for translog failed" , e );
192162 }
193163
194164 // engine was just started at the end of phase1
@@ -211,7 +181,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
211181
212182 logger .trace ("{} snapshot translog for recovery; current size is [{}]" , shard .shardId (), translogView .totalOperations ());
213183 try {
214- phase2 (translogView .snapshot (), startingSeqNo , endingSeqNo );
184+ phase2 (translogView .snapshot ());
215185 } catch (Exception e ) {
216186 throw new RecoveryEngineException (shard .shardId (), 2 , "phase2 failed" , e );
217187 }
@@ -221,6 +191,30 @@ public RecoveryResponse recoverToTarget() throws IOException {
221191 return response ;
222192 }
223193
194+ boolean isTranslogReadyForSequenceNumberBasedRecovery (final Translog .View translogView ) {
195+ final long startingSeqNo = request .startingSeqNo ();
196+ final long endingSeqNo = shard .seqNoStats ().getMaxSeqNo ();
197+ if (startingSeqNo <= endingSeqNo ) {
198+ // we need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
199+ // operations in the required range will be available for replaying from the translog of the source
200+ logger .trace (
201+ "{} waiting for all operations in the range [{}, {}] to complete" ,
202+ shard .shardId (),
203+ startingSeqNo ,
204+ endingSeqNo );
205+ cancellableThreads .execute (() -> shard .waitForOpsToComplete (endingSeqNo ));
206+
207+ final LocalCheckpointTracker tracker = new LocalCheckpointTracker (shard .indexSettings (), startingSeqNo , startingSeqNo - 1 );
208+ final Translog .Snapshot snapshot = translogView .snapshot ();
209+ Translog .Operation operation ;
210+ while ((operation = getNextOperationFromSnapshot (snapshot )) != null ) {
211+ tracker .markSeqNoAsCompleted (operation .seqNo ());
212+ }
213+ return tracker .getCheckpoint () >= endingSeqNo ;
214+ }
215+ return false ;
216+ }
217+
224218 /**
225219 * Perform phase1 of the recovery operations. Once this {@link IndexCommit}
226220 * snapshot has been performed no commit operations (files being fsync'd)
@@ -231,7 +225,6 @@ public RecoveryResponse recoverToTarget() throws IOException {
231225 * checksum can be reused
232226 */
233227 public void phase1 (final IndexCommit snapshot , final Translog .View translogView ) {
234- assert request .startingSeqNo () == SequenceNumbersService .UNASSIGNED_SEQ_NO ;
235228 cancellableThreads .checkForCancel ();
236229 // Total size of segment files that are recovered
237230 long totalSize = 0 ;
@@ -402,44 +395,24 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
402395 /**
403396 * Perform phase two of the recovery process.
404397 * <p>
405- * Phase two takes a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is
398+ * Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is
406399 * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
407400 * shard.
401+ *
402+ * @param snapshot a snapshot of the translog
408403 */
409- void phase2 (final Translog .Snapshot snapshot , final long startingSeqNo , final long endingSeqNo ) {
404+ void phase2 (final Translog .Snapshot snapshot ) {
410405 if (shard .state () == IndexShardState .CLOSED ) {
411406 throw new IndexShardClosedException (request .shardId ());
412407 }
413408 cancellableThreads .checkForCancel ();
414409
415- final LocalCheckpointTracker tracker ;
416- if (startingSeqNo != SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
417- tracker = new LocalCheckpointTracker (shard .indexSettings (), startingSeqNo , startingSeqNo - 1 );
418- } else {
419- tracker = new LocalCheckpointTracker (shard .indexSettings (), Long .MAX_VALUE , Long .MAX_VALUE );
420- }
421-
422410 final StopWatch stopWatch = new StopWatch ().start ();
423411
424412 logger .trace ("{} recovery [phase2] to {}: sending transaction log operations" , request .shardId (), request .targetNode ());
425413
426414 // send all the snapshot's translog operations to the target
427- final int totalOperations = sendSnapshot (snapshot , tracker );
428-
429- // check to see if all operations in the required range were sent to the target
430- if (tracker .getCheckpoint () < endingSeqNo ) {
431- final String message = String .format (
432- Locale .ROOT ,
433- "sequence number-based recovery failed due to missing ops in range [%d, %d]; first missed op [%d]" ,
434- startingSeqNo ,
435- endingSeqNo ,
436- tracker .getCheckpoint () + 1 );
437- logger .debug ("{} {}" , shard .shardId (), message );
438- final ElasticsearchException ex = new ElasticsearchException (message );
439- ex .setShard (shard .shardId ());
440- ex .addHeader (SEQUENCE_NUMBER_BASED_RECOVERY_FAILED , message );
441- throw ex ;
442- }
415+ final int totalOperations = sendSnapshot (snapshot );
443416
444417 stopWatch .stop ();
445418 logger .trace ("{} recovery [phase2] to {}: took [{}]" , request .shardId (), request .targetNode (), stopWatch .totalTime ());
@@ -492,10 +465,9 @@ public void finalizeRecovery() {
492465 * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
493466 *
494467 * @param snapshot the translog snapshot to replay operations from
495- * @param tracker tracks the replayed operations
496468 * @return the total number of translog operations that were sent
497469 */
498- protected int sendSnapshot (final Translog .Snapshot snapshot , final LocalCheckpointTracker tracker ) {
470+ protected int sendSnapshot (final Translog .Snapshot snapshot ) {
499471 int ops = 0 ;
500472 long size = 0 ;
501473 int totalOperations = 0 ;
@@ -516,7 +488,6 @@ protected int sendSnapshot(final Translog.Snapshot snapshot, final LocalCheckpoi
516488 ops ++;
517489 size += operation .estimateSize ();
518490 totalOperations ++;
519- tracker .markSeqNoAsCompleted (operation .seqNo ());
520491
521492 // check if this request is past bytes threshold, and if so, send it off
522493 if (size >= chunkSizeInBytes ) {
0 commit comments