3434import org .elasticsearch .action .ActionListener ;
3535import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
3636import org .elasticsearch .cluster .routing .ShardRouting ;
37- import org .elasticsearch .common .Nullable ;
3837import org .elasticsearch .common .StopWatch ;
3938import org .elasticsearch .common .bytes .BytesArray ;
4039import org .elasticsearch .common .lease .Releasable ;
7069import java .util .List ;
7170import java .util .Locale ;
7271import java .util .concurrent .CompletableFuture ;
72+ import java .util .concurrent .CopyOnWriteArrayList ;
7373import java .util .concurrent .atomic .AtomicLong ;
7474import java .util .function .Function ;
7575import java .util .function .Supplier ;
@@ -96,22 +96,7 @@ public class RecoverySourceHandler {
9696 private final StartRecoveryRequest request ;
9797 private final int chunkSizeInBytes ;
9898 private final RecoveryTargetHandler recoveryTarget ;
99-
100- private final CancellableThreads cancellableThreads = new CancellableThreads () {
101- @ Override
102- protected void onCancel (String reason , @ Nullable Exception suppressedException ) {
103- RuntimeException e ;
104- if (shard .state () == IndexShardState .CLOSED ) { // check if the shard got closed on us
105- e = new IndexShardClosedException (shard .shardId (), "shard is closed and recovery was canceled reason [" + reason + "]" );
106- } else {
107- e = new ExecutionCancelledException ("recovery was canceled reason [" + reason + "]" );
108- }
109- if (suppressedException != null ) {
110- e .addSuppressed (suppressedException );
111- }
112- throw e ;
113- }
114- };
99+ private final CancellableThreads cancellableThreads = new CancellableThreads ();
115100
116101 public RecoverySourceHandler (final IndexShard shard , RecoveryTargetHandler recoveryTarget ,
117102 final StartRecoveryRequest request ,
@@ -131,19 +116,37 @@ public StartRecoveryRequest getRequest() {
131116 /**
132117 * performs the recovery from the local engine to the target
133118 */
134- public RecoveryResponse recoverToTarget () throws IOException {
135- runUnderPrimaryPermit (() -> {
136- final IndexShardRoutingTable routingTable = shard .getReplicationGroup ().getRoutingTable ();
137- ShardRouting targetShardRouting = routingTable .getByAllocationId (request .targetAllocationId ());
138- if (targetShardRouting == null ) {
139- logger .debug ("delaying recovery of {} as it is not listed as assigned to target node {}" , request .shardId (),
140- request .targetNode ());
141- throw new DelayRecoveryException ("source node does not have the shard listed in its state as allocated on the node" );
142- }
143- assert targetShardRouting .initializing () : "expected recovery target to be initializing but was " + targetShardRouting ;
144- }, shardId + " validating recovery target [" + request .targetAllocationId () + "] registered " , shard , cancellableThreads , logger );
145-
146- try (Closeable ignored = shard .acquireRetentionLockForPeerRecovery ()) {
119+ public void recoverToTarget (ActionListener <RecoveryResponse > listener ) {
120+ final List <Closeable > resources = new CopyOnWriteArrayList <>();
121+ final Closeable releaseResources = () -> IOUtils .close (resources );
122+ final ActionListener <RecoveryResponse > wrappedListener = ActionListener .notifyOnce (listener );
123+ try {
124+ cancellableThreads .setOnCancel ((reason , beforeCancelEx ) -> {
125+ final RuntimeException e ;
126+ if (shard .state () == IndexShardState .CLOSED ) { // check if the shard got closed on us
127+ e = new IndexShardClosedException (shard .shardId (), "shard is closed and recovery was canceled reason [" + reason + "]" );
128+ } else {
129+ e = new CancellableThreads .ExecutionCancelledException ("recovery was canceled reason [" + reason + "]" );
130+ }
131+ if (beforeCancelEx != null ) {
132+ e .addSuppressed (beforeCancelEx );
133+ }
134+ IOUtils .closeWhileHandlingException (releaseResources , () -> wrappedListener .onFailure (e ));
135+ throw e ;
136+ });
137+ runUnderPrimaryPermit (() -> {
138+ final IndexShardRoutingTable routingTable = shard .getReplicationGroup ().getRoutingTable ();
139+ ShardRouting targetShardRouting = routingTable .getByAllocationId (request .targetAllocationId ());
140+ if (targetShardRouting == null ) {
141+ logger .debug ("delaying recovery of {} as it is not listed as assigned to target node {}" , request .shardId (),
142+ request .targetNode ());
143+ throw new DelayRecoveryException ("source node does not have the shard listed in its state as allocated on the node" );
144+ }
145+ assert targetShardRouting .initializing () : "expected recovery target to be initializing but was " + targetShardRouting ;
146+ }, shardId + " validating recovery target [" + request .targetAllocationId () + "] registered " ,
147+ shard , cancellableThreads , logger );
148+ final Closeable retentionLock = shard .acquireRetentionLockForPeerRecovery ();
149+ resources .add (retentionLock );
147150 final long startingSeqNo ;
148151 final long requiredSeqNoRangeStart ;
149152 final boolean isSequenceNumberBasedRecovery = request .startingSeqNo () != SequenceNumbers .UNASSIGNED_SEQ_NO &&
@@ -217,6 +220,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
217220 }
218221 final SendSnapshotResult sendSnapshotResult ;
219222 try (Translog .Snapshot snapshot = shard .getHistoryOperations ("peer-recovery" , startingSeqNo )) {
223+ // we can release the retention lock here because the snapshot itself will retain the required operations.
224+ IOUtils .close (retentionLock , () -> resources .remove (retentionLock ));
220225 // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
221226 // are at least as high as the corresponding values on the primary when any of these operations were executed on it.
222227 final long maxSeenAutoIdTimestamp = shard .getMaxSeenAutoIdTimestamp ();
@@ -229,10 +234,16 @@ public RecoveryResponse recoverToTarget() throws IOException {
229234
230235 finalizeRecovery (sendSnapshotResult .targetLocalCheckpoint );
231236 final long phase1ThrottlingWaitTime = 0L ; // TODO: return the actual throttle time
232- return new RecoveryResponse (sendFileResult .phase1FileNames , sendFileResult .phase1FileSizes ,
233- sendFileResult .phase1ExistingFileNames , sendFileResult .phase1ExistingFileSizes , sendFileResult .totalSize ,
234- sendFileResult .existingTotalSize , sendFileResult .took .millis (), phase1ThrottlingWaitTime , prepareEngineTime .millis (),
235- sendSnapshotResult .totalOperations , sendSnapshotResult .tookTime .millis ());
237+ assert resources .isEmpty () : "not every resource is released [" + resources + "]" ;
238+ IOUtils .close (resources );
239+ wrappedListener .onResponse (
240+ new RecoveryResponse (sendFileResult .phase1FileNames , sendFileResult .phase1FileSizes ,
241+ sendFileResult .phase1ExistingFileNames , sendFileResult .phase1ExistingFileSizes , sendFileResult .totalSize ,
242+ sendFileResult .existingTotalSize , sendFileResult .took .millis (), phase1ThrottlingWaitTime , prepareEngineTime .millis (),
243+ sendSnapshotResult .totalOperations , sendSnapshotResult .tookTime .millis ())
244+ );
245+ } catch (Exception e ) {
246+ IOUtils .closeWhileHandlingException (releaseResources , () -> wrappedListener .onFailure (e ));
236247 }
237248 }
238249
0 commit comments