66
77package org .elasticsearch .xpack .ccr ;
88
9- import com .carrotsearch .hppc .cursors .ObjectCursor ;
109import org .elasticsearch .ElasticsearchException ;
1110import org .elasticsearch .action .ActionListener ;
1211import org .elasticsearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
@@ -222,9 +221,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
222221
223222 // block the recovery from completing; this ensures the background sync is still running
224223 final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
225- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
224+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
226225 final MockTransportService senderTransportService =
227- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
226+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
228227 senderTransportService .addSendBehavior (
229228 (connection , requestId , action , request , options ) -> {
230229 if (ClearCcrRestoreSessionAction .NAME .equals (action )
@@ -246,9 +245,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
246245 assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
247246 latch .countDown ();
248247 } finally {
249- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
248+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
250249 final MockTransportService senderTransportService =
251- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
250+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
252251 senderTransportService .clearAllRules ();
253252 }
254253 }
@@ -403,9 +402,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
403402
404403 final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
405404 try {
406- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
405+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
407406 final MockTransportService senderTransportService =
408- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
407+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
409408 senderTransportService .addSendBehavior (
410409 (connection , requestId , action , request , options ) -> {
411410 if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )
@@ -454,9 +453,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
454453 assertThat (shardStats .getRetentionLeaseStats ().retentionLeases ().leases (), empty ());
455454 }
456455 } finally {
457- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
456+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
458457 final MockTransportService senderTransportService =
459- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
458+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
460459 senderTransportService .clearAllRules ();
461460 }
462461 }
@@ -486,9 +485,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
486485
487486 final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
488487 try {
489- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
488+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
490489 final MockTransportService senderTransportService =
491- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
490+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
492491 senderTransportService .addSendBehavior (
493492 (connection , requestId , action , request , options ) -> {
494493 if (RetentionLeaseActions .Remove .ACTION_NAME .equals (action )
@@ -524,9 +523,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
524523 getLeaderCluster ().getClusterName (),
525524 new Index (leaderIndex , leaderUUID ))));
526525 } finally {
527- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
526+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
528527 final MockTransportService senderTransportService =
529- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
528+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
530529 senderTransportService .clearAllRules ();
531530 }
532531 }
@@ -614,7 +613,6 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception {
614613 });
615614 }
616615
617- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/39509" )
618616 public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused () throws Exception {
619617 final String leaderIndex = "leader" ;
620618 final String followerIndex = "follower" ;
@@ -765,35 +763,36 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
765763 final CountDownLatch latch = new CountDownLatch (1 );
766764
767765 final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
768- for (final ObjectCursor <DiscoveryNode > senderNode : followerClusterState .getState ().nodes ().getNodes ().values ()) {
769- final MockTransportService senderTransportService =
770- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value .getName ());
771- senderTransportService .addSendBehavior (
766+ try {
767+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
768+ final MockTransportService senderTransportService =
769+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
770+ senderTransportService .addSendBehavior (
772771 (connection , requestId , action , request , options ) -> {
773772 if (RetentionLeaseActions .Renew .ACTION_NAME .equals (action )
774- || TransportActionProxy .getProxyAction (RetentionLeaseActions .Renew .ACTION_NAME ).equals (action )) {
773+ || TransportActionProxy .getProxyAction (RetentionLeaseActions .Renew .ACTION_NAME ).equals (action )) {
775774 senderTransportService .clearAllRules ();
776775 final RetentionLeaseActions .RenewRequest renewRequest = (RetentionLeaseActions .RenewRequest ) request ;
777776 final String primaryShardNodeId =
778- getLeaderCluster ()
779- .clusterService ()
780- .state ()
781- .routingTable ()
782- .index (leaderIndex )
783- .shard (renewRequest .getShardId ().id ())
784- .primaryShard ()
785- .currentNodeId ();
777+ getLeaderCluster ()
778+ .clusterService ()
779+ .state ()
780+ .routingTable ()
781+ .index (leaderIndex )
782+ .shard (renewRequest .getShardId ().id ())
783+ .primaryShard ()
784+ .currentNodeId ();
786785 final String primaryShardNodeName =
787- getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
786+ getLeaderCluster ().clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
788787 final IndexShard primary =
789- getLeaderCluster ()
790- .getInstance (IndicesService .class , primaryShardNodeName )
791- .getShardOrNull (renewRequest .getShardId ());
788+ getLeaderCluster ()
789+ .getInstance (IndicesService .class , primaryShardNodeName )
790+ .getShardOrNull (renewRequest .getShardId ());
792791 final CountDownLatch innerLatch = new CountDownLatch (1 );
793792 // this forces the background renewal from following to face a retention lease not found exception
794793 primary .removeRetentionLease (
795- getRetentionLeaseId (followerIndex , leaderIndex ),
796- ActionListener .wrap (r -> innerLatch .countDown (), e -> fail (e .toString ())));
794+ getRetentionLeaseId (followerIndex , leaderIndex ),
795+ ActionListener .wrap (r -> innerLatch .countDown (), e -> fail (e .toString ())));
797796
798797 try {
799798 innerLatch .await ();
@@ -806,11 +805,18 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
806805 }
807806 connection .sendRequest (requestId , action , request , options );
808807 });
809- }
808+ }
810809
811- latch .await ();
810+ latch .await ();
812811
813- assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
812+ assertRetentionLeaseRenewal (numberOfShards , numberOfReplicas , followerIndex , leaderIndex );
813+ } finally {
814+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
815+ final MockTransportService senderTransportService =
816+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
817+ senderTransportService .clearAllRules ();
818+ }
819+ }
814820 }
815821
816822 /**
@@ -857,9 +863,9 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex
857863 final ClusterStateResponse followerClusterState = followerClient ().admin ().cluster ().prepareState ().clear ().setNodes (true ).get ();
858864
859865 try {
860- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getNodes (). values ()) {
866+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
861867 final MockTransportService senderTransportService =
862- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .value . getName ());
868+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
863869 senderTransportService .addSendBehavior (
864870 (connection , requestId , action , request , options ) -> {
865871 if (RetentionLeaseActions .Renew .ACTION_NAME .equals (action )
@@ -913,9 +919,9 @@ public void onResponseReceived(
913919 assertThat (shardStats .getRetentionLeaseStats ().retentionLeases ().leases (), empty ());
914920 }
915921 } finally {
916- for (final ObjectCursor < DiscoveryNode > senderNode : followerClusterState .getState ().nodes (). getDataNodes (). values ()) {
922+ for (final DiscoveryNode senderNode : followerClusterState .getState ().nodes ()) {
917923 final MockTransportService senderTransportService =
918- (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode . value .getName ());
924+ (MockTransportService ) getFollowerCluster ().getInstance (TransportService .class , senderNode .getName ());
919925 senderTransportService .clearAllRules ();
920926 }
921927 }
0 commit comments