@@ -794,22 +794,26 @@ public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
794794 MockTransportService redTransportService =
795795 (MockTransportService ) internalCluster ().getInstance (TransportService .class , redNodeName );
796796
797- AtomicBoolean recoveryStarted = new AtomicBoolean (false );
797+ final AtomicBoolean recoveryStarted = new AtomicBoolean (false );
798+ final AtomicBoolean finalizeReceived = new AtomicBoolean (false );
798799
799- final SingleStartEnforcer validator = new SingleStartEnforcer (indexName , recoveryStarted );
800+ final SingleStartEnforcer validator = new SingleStartEnforcer (indexName , recoveryStarted , finalizeReceived );
800801 redTransportService .addSendBehavior (blueTransportService , (connection , requestId , action , request , options ) -> {
801802 validator .accept (action , request );
802803 connection .sendRequest (requestId , action , request , options );
803804 });
804805 Runnable connectionBreaker = () -> {
805806 // Always break connection from source to remote to ensure that actions are retried
807+ logger .info ("--> closing connections from source node to target node" );
806808 blueTransportService .disconnectFromNode (redTransportService .getLocalDiscoNode ());
807809 if (randomBoolean ()) {
808810 // Sometimes break connection from remote to source to ensure that recovery is re-established
811+ logger .info ("--> closing connections from target node to source node" );
809812 redTransportService .disconnectFromNode (blueTransportService .getLocalDiscoNode ());
810813 }
811814 };
812- TransientReceiveRejected handlingBehavior = new TransientReceiveRejected (recoveryActionToBlock , recoveryStarted , connectionBreaker );
815+ TransientReceiveRejected handlingBehavior =
816+ new TransientReceiveRejected (recoveryActionToBlock , finalizeReceived , recoveryStarted , connectionBreaker );
813817 redTransportService .addRequestHandlingBehavior (recoveryActionToBlock , handlingBehavior );
814818
815819 try {
@@ -833,12 +837,15 @@ private class TransientReceiveRejected implements StubbableTransport.RequestHand
833837
834838 private final String actionName ;
835839 private final AtomicBoolean recoveryStarted ;
840+ private final AtomicBoolean finalizeReceived ;
836841 private final Runnable connectionBreaker ;
837842 private final AtomicInteger blocksRemaining ;
838843
839- private TransientReceiveRejected (String actionName , AtomicBoolean recoveryStarted , Runnable connectionBreaker ) {
844+ private TransientReceiveRejected (String actionName , AtomicBoolean recoveryStarted , AtomicBoolean finalizeReceived ,
845+ Runnable connectionBreaker ) {
840846 this .actionName = actionName ;
841847 this .recoveryStarted = recoveryStarted ;
848+ this .finalizeReceived = finalizeReceived ;
842849 this .connectionBreaker = connectionBreaker ;
843850 this .blocksRemaining = new AtomicInteger (randomIntBetween (1 , 3 ));
844851 }
@@ -847,6 +854,9 @@ private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarte
847854 public void messageReceived (TransportRequestHandler <TransportRequest > handler , TransportRequest request , TransportChannel channel ,
848855 Task task ) throws Exception {
849856 recoveryStarted .set (true );
857+ if (actionName .equals (PeerRecoveryTargetService .Actions .FINALIZE )) {
858+ finalizeReceived .set (true );
859+ }
850860 if (blocksRemaining .getAndUpdate (i -> i == 0 ? 0 : i - 1 ) != 0 ) {
851861 String rejected = "rejected" ;
852862 String circuit = "circuit" ;
@@ -872,11 +882,13 @@ public void messageReceived(TransportRequestHandler<TransportRequest> handler, T
872882 private class SingleStartEnforcer implements BiConsumer <String , TransportRequest > {
873883
874884 private final AtomicBoolean recoveryStarted ;
885+ private final AtomicBoolean finalizeReceived ;
875886 private final String indexName ;
876887
877- private SingleStartEnforcer (String indexName , AtomicBoolean recoveryStarted ) {
888+ private SingleStartEnforcer (String indexName , AtomicBoolean recoveryStarted , AtomicBoolean finalizeReceived ) {
878889 this .indexName = indexName ;
879890 this .recoveryStarted = recoveryStarted ;
891+ this .finalizeReceived = finalizeReceived ;
880892 }
881893
882894 @ Override
@@ -887,7 +899,7 @@ public void accept(String action, TransportRequest request) {
887899 StartRecoveryRequest startRecoveryRequest = (StartRecoveryRequest ) request ;
888900 ShardId shardId = startRecoveryRequest .shardId ();
889901 logger .info ("--> attempting to send start_recovery request for shard: " + shardId );
890- if (indexName .equals (shardId .getIndexName ()) && recoveryStarted .get ()) {
902+ if (indexName .equals (shardId .getIndexName ()) && recoveryStarted .get () && finalizeReceived . get () == false ) {
891903 throw new IllegalStateException ("Recovery cannot be started twice" );
892904 }
893905 }
0 commit comments