Skip to content

Commit d6533e2

Browse files
committed
use addTemporaryStateListener
1 parent 463f6f5 commit d6533e2

File tree

1 file changed

+27
-29
lines changed

1 file changed

+27
-29
lines changed

x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.elasticsearch.action.bulk.BulkRequestBuilder;
2222
import org.elasticsearch.action.get.GetResponse;
2323
import org.elasticsearch.action.index.IndexRequest;
24+
import org.elasticsearch.action.support.ActionTestUtils;
2425
import org.elasticsearch.action.support.ActiveShardCount;
2526
import org.elasticsearch.action.support.IndicesOptions;
2627
import org.elasticsearch.action.support.PlainActionFuture;
27-
import org.elasticsearch.cluster.ClusterChangedEvent;
2828
import org.elasticsearch.cluster.ClusterStateListener;
2929
import org.elasticsearch.cluster.RestoreInProgress;
3030
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.snapshots.SnapshotId;
5353
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
5454
import org.elasticsearch.snapshots.SnapshotsInfoService;
55+
import org.elasticsearch.test.ClusterServiceUtils;
5556
import org.elasticsearch.test.transport.MockTransportService;
5657
import org.elasticsearch.transport.TransportActionProxy;
5758
import org.elasticsearch.transport.TransportService;
@@ -658,35 +659,32 @@ public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception {
658659

659660
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
660661
final PlainActionFuture<Void> waitForAllShardSnapshotSizesFailures = new PlainActionFuture<>();
661-
final ClusterStateListener listener = new ClusterStateListener() {
662-
@Override
663-
public void clusterChanged(ClusterChangedEvent event) {
664-
if (RestoreInProgress.get(event.state()).isEmpty() == false && event.state().routingTable().hasIndex(followerIndex)) {
665-
try {
666-
final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex);
667-
// this assertBusy completes because the listener is added after the InternalSnapshotsInfoService
668-
// and ClusterService preserves the order of listeners.
669-
assertBusy(() -> {
670-
List<Long> sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED)
671-
.stream()
672-
.filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA)
673-
.sorted(Comparator.comparingInt(ShardRouting::getId))
674-
.map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard))
675-
.filter(Objects::nonNull)
676-
.filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size)
677-
.collect(Collectors.toList());
678-
assertThat(sizes, hasSize(numberOfShards));
679-
});
680-
clusterService.removeListener(this);
681-
waitForAllShardSnapshotSizesFailures.onResponse(null);
682-
} catch (Exception e) {
683-
throw new AssertionError("Failed to retrieve all snapshot shard sizes", e);
684-
}
685-
}
662+
ClusterServiceUtils.addTemporaryStateListener(
663+
clusterService,
664+
state -> RestoreInProgress.get(state).isEmpty() == false && state.routingTable().hasIndex(followerIndex)
665+
).addListener(ActionTestUtils.assertNoFailureListener(ignore -> {
666+
try {
667+
// This listener runs synchronously in the same thread so that clusterService.state() returns the same state
668+
// that satisfied the predicate.
669+
final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(followerIndex);
670+
// this assertBusy completes because the listener is added after the InternalSnapshotsInfoService
671+
// and ClusterService preserves the order of listeners.
672+
assertBusy(() -> {
673+
List<Long> sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED)
674+
.stream()
675+
.filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA)
676+
.sorted(Comparator.comparingInt(ShardRouting::getId))
677+
.map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard))
678+
.filter(Objects::nonNull)
679+
.filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size)
680+
.collect(Collectors.toList());
681+
assertThat(sizes, hasSize(numberOfShards));
682+
});
683+
waitForAllShardSnapshotSizesFailures.onResponse(null);
684+
} catch (Exception e) {
685+
throw new AssertionError("Failed to retrieve all snapshot shard sizes", e);
686686
}
687-
};
688-
689-
clusterService.addListener(listener);
687+
}));
690688

691689
logger.debug("--> creating follower index [{}]", followerIndex);
692690
followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex, ActiveShardCount.NONE));

0 commit comments

Comments
 (0)