@@ -167,12 +167,12 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
167167 throw new DelayRecoveryException ("source node does not have the shard listed in its state as allocated on the node" );
168168 }
169169 assert targetShardRouting .initializing () : "expected recovery target to be initializing but was " + targetShardRouting ;
170- retentionLeaseRef .set (softDeletesEnabled ? shard . getRetentionLeases (). get (
171- ReplicationTracker .getPeerRecoveryRetentionLeaseId (targetShardRouting )) : null );
170+ retentionLeaseRef .set (
171+ shard . getRetentionLeases (). get ( ReplicationTracker .getPeerRecoveryRetentionLeaseId (targetShardRouting )));
172172 }, shardId + " validating recovery target [" + request .targetAllocationId () + "] registered " ,
173173 shard , cancellableThreads , logger );
174174 final Engine .HistorySource historySource ;
175- if (shard .useRetentionLeasesInPeerRecovery () || retentionLeaseRef .get () != null ) {
175+ if (softDeletesEnabled && ( shard .useRetentionLeasesInPeerRecovery () || retentionLeaseRef .get () != null ) ) {
176176 historySource = Engine .HistorySource .INDEX ;
177177 } else {
178178 historySource = Engine .HistorySource .TRANSLOG ;
@@ -192,7 +192,7 @@ && isTargetSameHistory()
192192 // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
193193 // without having a complete history.
194194
195- if (isSequenceNumberBasedRecovery && retentionLeaseRef .get () != null ) {
195+ if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef .get () != null ) {
196196 // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
197197 retentionLock .close ();
198198 logger .trace ("history is retained by {}" , retentionLeaseRef .get ());
@@ -211,7 +211,7 @@ && isTargetSameHistory()
211211 if (isSequenceNumberBasedRecovery ) {
212212 logger .trace ("performing sequence numbers based recovery. starting at [{}]" , request .startingSeqNo ());
213213 startingSeqNo = request .startingSeqNo ();
214- if (softDeletesEnabled && retentionLeaseRef .get () == null ) {
214+ if (retentionLeaseRef .get () == null ) {
215215 createRetentionLease (startingSeqNo , ActionListener .map (sendFileStep , ignored -> SendFileResult .EMPTY ));
216216 } else {
217217 sendFileStep .onResponse (SendFileResult .EMPTY );
@@ -253,36 +253,24 @@ && isTargetSameHistory()
253253 });
254254
255255 final StepListener <ReplicationResponse > deleteRetentionLeaseStep = new StepListener <>();
256- if (softDeletesEnabled ) {
257- runUnderPrimaryPermit (() -> {
258- try {
259- // If the target previously had a copy of this shard then a file-based recovery might move its global
260- // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
261- // new one later on in the recovery.
262- shard .removePeerRecoveryRetentionLease (request .targetNode ().getId (),
263- new ThreadedActionListener <>(logger , shard .getThreadPool (), ThreadPool .Names .GENERIC ,
264- deleteRetentionLeaseStep , false ));
265- } catch (RetentionLeaseNotFoundException e ) {
266- logger .debug ("no peer-recovery retention lease for " + request .targetAllocationId ());
267- deleteRetentionLeaseStep .onResponse (null );
268- }
269- }, shardId + " removing retention leaes for [" + request .targetAllocationId () + "]" ,
270- shard , cancellableThreads , logger );
271- } else {
272- deleteRetentionLeaseStep .onResponse (null );
273- }
256+ runUnderPrimaryPermit (() -> {
257+ try {
258+ // If the target previously had a copy of this shard then a file-based recovery might move its global
259+ // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
260+ // new one later on in the recovery.
261+ shard .removePeerRecoveryRetentionLease (request .targetNode ().getId (),
262+ new ThreadedActionListener <>(logger , shard .getThreadPool (), ThreadPool .Names .GENERIC ,
263+ deleteRetentionLeaseStep , false ));
264+ } catch (RetentionLeaseNotFoundException e ) {
265+ logger .debug ("no peer-recovery retention lease for " + request .targetAllocationId ());
266+ deleteRetentionLeaseStep .onResponse (null );
267+ }
268+ }, shardId + " removing retention lease for [" + request .targetAllocationId () + "]" ,
269+ shard , cancellableThreads , logger );
274270
275271 deleteRetentionLeaseStep .whenComplete (ignored -> {
276272 assert Transports .assertNotTransportThread (RecoverySourceHandler .this + "[phase1]" );
277-
278- final Consumer <ActionListener <RetentionLease >> createRetentionLeaseAsync ;
279- if (softDeletesEnabled ) {
280- createRetentionLeaseAsync = l -> createRetentionLease (startingSeqNo , l );
281- } else {
282- createRetentionLeaseAsync = l -> l .onResponse (null );
283- }
284-
285- phase1 (safeCommitRef .getIndexCommit (), createRetentionLeaseAsync , () -> estimateNumOps , sendFileStep );
273+ phase1 (safeCommitRef .getIndexCommit (), startingSeqNo , () -> estimateNumOps , sendFileStep );
286274 }, onFailure );
287275
288276 } catch (final Exception e ) {
@@ -454,8 +442,7 @@ static final class SendFileResult {
454442 * segments that are missing. Only segments that have the same size and
455443 * checksum can be reused
456444 */
457- void phase1 (IndexCommit snapshot , Consumer <ActionListener <RetentionLease >> createRetentionLease ,
458- IntSupplier translogOps , ActionListener <SendFileResult > listener ) {
445+ void phase1 (IndexCommit snapshot , long startingSeqNo , IntSupplier translogOps , ActionListener <SendFileResult > listener ) {
459446 cancellableThreads .checkForCancel ();
460447 final Store store = shard .store ();
461448 try {
@@ -529,7 +516,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
529516 sendFileInfoStep .whenComplete (r ->
530517 sendFiles (store , phase1Files .toArray (new StoreFileMetaData [0 ]), translogOps , sendFilesStep ), listener ::onFailure );
531518
532- sendFilesStep .whenComplete (r -> createRetentionLease . accept ( createRetentionLeaseStep ), listener ::onFailure );
519+ sendFilesStep .whenComplete (r -> createRetentionLease ( startingSeqNo , createRetentionLeaseStep ), listener ::onFailure );
533520
534521 createRetentionLeaseStep .whenComplete (retentionLease ->
535522 {
@@ -557,7 +544,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
557544
558545 // but we must still create a retention lease
559546 final StepListener <RetentionLease > createRetentionLeaseStep = new StepListener <>();
560- createRetentionLease . accept ( createRetentionLeaseStep );
547+ createRetentionLease ( startingSeqNo , createRetentionLeaseStep );
561548 createRetentionLeaseStep .whenComplete (retentionLease -> {
562549 final TimeValue took = stopWatch .totalTime ();
563550 logger .trace ("recovery [phase1]: took [{}]" , took );
@@ -593,7 +580,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener<Reten
593580 // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before
594581 // 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for
595582 // recovery as a conservative estimate for the global checkpoint.
596- assert shard .indexSettings ().getIndexVersionCreated ().before (Version .V_7_4_0 );
583+ assert shard .indexSettings ().getIndexVersionCreated ().before (Version .V_7_4_0 )
584+ || shard .indexSettings ().isSoftDeleteEnabled () == false ;
597585 final StepListener <ReplicationResponse > addRetentionLeaseStep = new StepListener <>();
598586 final long estimatedGlobalCheckpoint = startingSeqNo - 1 ;
599587 final RetentionLease newLease = shard .addPeerRecoveryRetentionLease (request .targetNode ().getId (),
0 commit comments