|
43 | 43 | import org.elasticsearch.plugins.Plugin; |
44 | 44 | import org.elasticsearch.snapshots.RestoreInfo; |
45 | 45 | import org.elasticsearch.snapshots.RestoreService; |
46 | | -import org.elasticsearch.test.junit.annotations.TestIssueLogging; |
47 | 46 | import org.elasticsearch.test.transport.MockTransportService; |
48 | 47 | import org.elasticsearch.transport.ConnectTransportException; |
49 | 48 | import org.elasticsearch.transport.RemoteTransportException; |
|
87 | 86 | import static org.hamcrest.Matchers.greaterThan; |
88 | 87 | import static org.hamcrest.Matchers.hasSize; |
89 | 88 |
|
90 | | -@TestIssueLogging( |
91 | | - value = "org.elasticsearch.xpack.ccr:trace,org.elasticsearch.indices.recovery:trace,org.elasticsearch.index.seqno:debug", |
92 | | - issueUrl = "https://github.com/elastic/elasticsearch/issues/45192") |
93 | 89 | public class CcrRetentionLeaseIT extends CcrIntegTestCase { |
94 | 90 |
|
95 | 91 | public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin { |
@@ -781,40 +777,42 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep |
781 | 777 | (connection, requestId, action, request, options) -> { |
782 | 778 | if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) |
783 | 779 | || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { |
784 | | - senderTransportService.clearAllRules(); |
785 | 780 | final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request; |
786 | 781 | final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); |
787 | | - assertThat(retentionLeaseId, equalTo(renewRequest.getId())); |
788 | | - logger.info("--> intercepting renewal request for retention lease [{}]", retentionLeaseId); |
789 | | - final String primaryShardNodeId = |
790 | | - getLeaderCluster() |
791 | | - .clusterService() |
792 | | - .state() |
793 | | - .routingTable() |
794 | | - .index(leaderIndex) |
795 | | - .shard(renewRequest.getShardId().id()) |
796 | | - .primaryShard() |
797 | | - .currentNodeId(); |
798 | | - final String primaryShardNodeName = |
799 | | - getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); |
800 | | - final IndexShard primary = |
801 | | - getLeaderCluster() |
802 | | - .getInstance(IndicesService.class, primaryShardNodeName) |
803 | | - .getShardOrNull(renewRequest.getShardId()); |
804 | | - final CountDownLatch innerLatch = new CountDownLatch(1); |
805 | | - // this forces the background renewal from following to face a retention lease not found exception |
806 | | - logger.info("--> removing retention lease [{}] on the leader", retentionLeaseId); |
807 | | - primary.removeRetentionLease(retentionLeaseId, |
808 | | - ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); |
809 | | - logger.info("--> waiting for the removed retention lease [{}] to be synced on the leader", retentionLeaseId); |
810 | | - try { |
811 | | - innerLatch.await(); |
812 | | - } catch (final InterruptedException e) { |
813 | | - Thread.currentThread().interrupt(); |
814 | | - fail(e.toString()); |
| 782 | + if (retentionLeaseId.equals(renewRequest.getId())) { |
| 783 | + logger.info("--> intercepting renewal request for retention lease [{}]", retentionLeaseId); |
| 784 | + senderTransportService.clearAllRules(); |
| 785 | + final String primaryShardNodeId = |
| 786 | + getLeaderCluster() |
| 787 | + .clusterService() |
| 788 | + .state() |
| 789 | + .routingTable() |
| 790 | + .index(leaderIndex) |
| 791 | + .shard(renewRequest.getShardId().id()) |
| 792 | + .primaryShard() |
| 793 | + .currentNodeId(); |
| 794 | + final String primaryShardNodeName = |
| 795 | + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); |
| 796 | + final IndexShard primary = |
| 797 | + getLeaderCluster() |
| 798 | + .getInstance(IndicesService.class, primaryShardNodeName) |
| 799 | + .getShardOrNull(renewRequest.getShardId()); |
| 800 | + final CountDownLatch innerLatch = new CountDownLatch(1); |
| 801 | + try { |
| 802 | + // this forces the background renewal from following to face a retention lease not found exception |
| 803 | + logger.info("--> removing retention lease [{}] on the leader", retentionLeaseId); |
| 804 | + primary.removeRetentionLease(retentionLeaseId, |
| 805 | + ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); |
| 806 | + logger.info("--> waiting for the removed retention lease [{}] to be synced on the leader", |
| 807 | + retentionLeaseId); |
| 808 | + innerLatch.await(); |
| 809 | + logger.info("--> removed retention lease [{}] on the leader", retentionLeaseId); |
| 810 | + } catch (final Exception e) { |
| 811 | + throw new AssertionError("failed to remove retention lease [" + retentionLeaseId + "] on the leader"); |
| 812 | + } finally { |
| 813 | + latch.countDown(); |
| 814 | + } |
815 | 815 | } |
816 | | - logger.info("--> removed retention lease [{}] on the leader", retentionLeaseId); |
817 | | - latch.countDown(); |
818 | 816 | } |
819 | 817 | connection.sendRequest(requestId, action, request, options); |
820 | 818 | }); |
|
0 commit comments