|
23 | 23 | import org.elasticsearch.action.DocWriteResponse; |
24 | 24 | import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; |
25 | 25 | import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; |
| 26 | +import org.elasticsearch.action.admin.indices.stats.ShardStats; |
26 | 27 | import org.elasticsearch.action.bulk.BulkResponse; |
27 | 28 | import org.elasticsearch.action.index.IndexResponse; |
28 | 29 | import org.elasticsearch.action.support.ActiveShardCount; |
| 30 | +import org.elasticsearch.action.support.IndicesOptions; |
29 | 31 | import org.elasticsearch.cluster.ClusterState; |
30 | 32 | import org.elasticsearch.cluster.ClusterStateListener; |
31 | 33 | import org.elasticsearch.cluster.metadata.IndexMetaData; |
|
65 | 67 | import java.util.concurrent.ExecutionException; |
66 | 68 | import java.util.concurrent.TimeUnit; |
67 | 69 | import java.util.stream.Collectors; |
| 70 | +import java.util.stream.Stream; |
68 | 71 |
|
69 | 72 | import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; |
70 | 73 | import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; |
@@ -231,7 +234,9 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { |
231 | 234 | Set<String> historyUUIDs = Arrays.stream(client().admin().indices().prepareStats("test").clear().get().getShards()) |
232 | 235 | .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)).collect(Collectors.toSet()); |
233 | 236 | createStaleReplicaScenario(master); |
234 | | - |
| 237 | + if (randomBoolean()) { |
| 238 | + assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(0)); |
| 239 | + } |
235 | 240 | boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy |
236 | 241 | logger.info("--> explicitly promote old primary shard"); |
237 | 242 | final String idxName = "test"; |
@@ -281,15 +286,18 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { |
281 | 286 | assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get() |
282 | 287 | .getState().routingTable().index(idxName).allPrimaryShardsActive())); |
283 | 288 | } |
284 | | - assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); |
285 | | - |
| 289 | + ShardStats[] shardStats = client().admin().indices().prepareStats("test") |
| 290 | + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED).get().getShards(); |
| 291 | + for (ShardStats shardStat : shardStats) { |
| 292 | + assertThat(shardStat.getCommitStats().getNumDocs(), equalTo(useStaleReplica ? 1 : 0)); |
| 293 | + } |
286 | 294 | // allocation id of old primary was cleaned from the in-sync set |
287 | 295 | final ClusterState state = client().admin().cluster().prepareState().get().getState(); |
288 | 296 |
|
289 | 297 | assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()), |
290 | 298 | state.metaData().index(idxName).inSyncAllocationIds(0)); |
291 | 299 |
|
292 | | - Set<String> newHistoryUUIds = Arrays.stream(client().admin().indices().prepareStats("test").clear().get().getShards()) |
| 300 | + Set<String> newHistoryUUIds = Stream.of(shardStats) |
293 | 301 | .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)).collect(Collectors.toSet()); |
294 | 302 | assertThat(newHistoryUUIds, everyItem(not(isIn(historyUUIDs)))); |
295 | 303 | assertThat(newHistoryUUIds, hasSize(1)); |
|
0 commit comments