5454import org .elasticsearch .cluster .routing .RecoverySource .SnapshotRecoverySource ;
5555import org .elasticsearch .cluster .routing .ShardRouting ;
5656import org .elasticsearch .common .Booleans ;
57+ import org .elasticsearch .common .CheckedConsumer ;
5758import org .elasticsearch .common .CheckedFunction ;
5859import org .elasticsearch .common .CheckedRunnable ;
5960import org .elasticsearch .common .Nullable ;
@@ -1821,34 +1822,38 @@ public ShardPath shardPath() {
18211822 return path ;
18221823 }
18231824
1824- public boolean recoverFromLocalShards (BiConsumer <String , MappingMetaData > mappingUpdateConsumer ,
1825- List < IndexShard > localShards ) throws IOException {
1825+ public void recoverFromLocalShards (BiConsumer <String , MappingMetaData > mappingUpdateConsumer , List < IndexShard > localShards ,
1826+ ActionListener < Boolean > listener ) throws IOException {
18261827 assert shardRouting .primary () : "recover from local shards only makes sense if the shard is a primary shard" ;
18271828 assert recoveryState .getRecoverySource ().getType () == RecoverySource .Type .LOCAL_SHARDS : "invalid recovery type: " +
18281829 recoveryState .getRecoverySource ();
18291830 final List <LocalShardSnapshot > snapshots = new ArrayList <>();
1831+ final ActionListener <Boolean > recoveryListener = ActionListener .runBefore (listener , () -> IOUtils .close (snapshots ));
1832+ boolean success = false ;
18301833 try {
18311834 for (IndexShard shard : localShards ) {
18321835 snapshots .add (new LocalShardSnapshot (shard ));
18331836 }
1834-
18351837 // we are the first primary, recover from the gateway
18361838 // if its post api allocation, the index should exists
18371839 assert shardRouting .primary () : "recover from local shards only makes sense if the shard is a primary shard" ;
18381840 StoreRecovery storeRecovery = new StoreRecovery (shardId , logger );
1839- return storeRecovery .recoverFromLocalShards (mappingUpdateConsumer , this , snapshots );
1841+ storeRecovery .recoverFromLocalShards (mappingUpdateConsumer , this , snapshots , recoveryListener );
1842+ success = true ;
18401843 } finally {
1841- IOUtils .close (snapshots );
1844+ if (success == false ) {
1845+ IOUtils .close (snapshots );
1846+ }
18421847 }
18431848 }
18441849
1845- public boolean recoverFromStore () {
1850+ public void recoverFromStore (ActionListener < Boolean > listener ) {
18461851 // we are the first primary, recover from the gateway
18471852 // if its post api allocation, the index should exists
18481853 assert shardRouting .primary () : "recover from store only makes sense if the shard is a primary shard" ;
18491854 assert shardRouting .initializing () : "can only start recovery on initializing shard" ;
18501855 StoreRecovery storeRecovery = new StoreRecovery (shardId , logger );
1851- return storeRecovery .recoverFromStore (this );
1856+ storeRecovery .recoverFromStore (this , listener );
18521857 }
18531858
18541859 public void restoreFromRepository (Repository repository , ActionListener <Boolean > listener ) {
@@ -2520,17 +2525,7 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
25202525 switch (recoveryState .getRecoverySource ().getType ()) {
25212526 case EMPTY_STORE :
25222527 case EXISTING_STORE :
2523- markAsRecovering ("from store" , recoveryState ); // mark the shard as recovering on the cluster state thread
2524- threadPool .generic ().execute (() -> {
2525- try {
2526- if (recoverFromStore ()) {
2527- recoveryListener .onRecoveryDone (recoveryState );
2528- }
2529- } catch (Exception e ) {
2530- recoveryListener .onRecoveryFailure (recoveryState ,
2531- new RecoveryFailedException (recoveryState , null , e ), true );
2532- }
2533- });
2528+ executeRecovery ("from store" , recoveryState , recoveryListener , this ::recoverFromStore );
25342529 break ;
25352530 case PEER :
25362531 try {
@@ -2543,17 +2538,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
25432538 }
25442539 break ;
25452540 case SNAPSHOT :
2546- markAsRecovering ("from snapshot" , recoveryState ); // mark the shard as recovering on the cluster state thread
2547- SnapshotRecoverySource recoverySource = (SnapshotRecoverySource ) recoveryState .getRecoverySource ();
2548- threadPool .generic ().execute (
2549- ActionRunnable .<Boolean >wrap (ActionListener .wrap (r -> {
2550- if (r ) {
2551- recoveryListener .onRecoveryDone (recoveryState );
2552- }
2553- },
2554- e -> recoveryListener .onRecoveryFailure (recoveryState , new RecoveryFailedException (recoveryState , null , e ), true )),
2555- restoreListener -> restoreFromRepository (
2556- repositoriesService .repository (recoverySource .snapshot ().getRepository ()), restoreListener )));
2541+ final String repo = ((SnapshotRecoverySource ) recoveryState .getRecoverySource ()).snapshot ().getRepository ();
2542+ executeRecovery ("from snapshot" ,
2543+ recoveryState , recoveryListener , l -> restoreFromRepository (repositoriesService .repository (repo ), l ));
25572544 break ;
25582545 case LOCAL_SHARDS :
25592546 final IndexMetaData indexMetaData = indexSettings ().getIndexMetaData ();
@@ -2578,18 +2565,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
25782565
25792566 if (numShards == startedShards .size ()) {
25802567 assert requiredShards .isEmpty () == false ;
2581- markAsRecovering ("from local shards" , recoveryState ); // mark the shard as recovering on the cluster state thread
2582- threadPool .generic ().execute (() -> {
2583- try {
2584- if (recoverFromLocalShards (mappingUpdateConsumer , startedShards .stream ()
2585- .filter ((s ) -> requiredShards .contains (s .shardId ())).collect (Collectors .toList ()))) {
2586- recoveryListener .onRecoveryDone (recoveryState );
2587- }
2588- } catch (Exception e ) {
2589- recoveryListener .onRecoveryFailure (recoveryState ,
2590- new RecoveryFailedException (recoveryState , null , e ), true );
2591- }
2592- });
2568+ executeRecovery ("from local shards" , recoveryState , recoveryListener ,
2569+ l -> recoverFromLocalShards (mappingUpdateConsumer ,
2570+ startedShards .stream ().filter ((s ) -> requiredShards .contains (s .shardId ())).collect (Collectors .toList ()), l ));
25932571 } else {
25942572 final RuntimeException e ;
25952573 if (numShards == -1 ) {
@@ -2607,6 +2585,17 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
26072585 }
26082586 }
26092587
2588+ private void executeRecovery (String reason , RecoveryState recoveryState , PeerRecoveryTargetService .RecoveryListener recoveryListener ,
2589+ CheckedConsumer <ActionListener <Boolean >, Exception > action ) {
2590+ markAsRecovering (reason , recoveryState ); // mark the shard as recovering on the cluster state thread
2591+ threadPool .generic ().execute (ActionRunnable .wrap (ActionListener .wrap (r -> {
2592+ if (r ) {
2593+ recoveryListener .onRecoveryDone (recoveryState );
2594+ }
2595+ },
2596+ e -> recoveryListener .onRecoveryFailure (recoveryState , new RecoveryFailedException (recoveryState , null , e ), true )), action ));
2597+ }
2598+
26102599 /**
26112600 * Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}).
26122601 */
0 commit comments