|
26 | 26 | import org.elasticsearch.client.ResponseException; |
27 | 27 | import org.elasticsearch.cluster.metadata.IndexMetaData; |
28 | 28 | import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; |
29 | | -import org.elasticsearch.cluster.routing.RecoverySource; |
30 | | -import org.elasticsearch.cluster.routing.ShardRouting; |
31 | | -import org.elasticsearch.cluster.routing.UnassignedInfo; |
32 | 29 | import org.elasticsearch.common.Booleans; |
33 | 30 | import org.elasticsearch.common.settings.Settings; |
34 | 31 | import org.elasticsearch.common.util.concurrent.AbstractRunnable; |
35 | 32 | import org.elasticsearch.common.xcontent.support.XContentMapValues; |
36 | 33 | import org.elasticsearch.index.IndexSettings; |
37 | | -import org.elasticsearch.index.seqno.ReplicationTracker; |
38 | | -import org.elasticsearch.index.shard.ShardId; |
| 34 | +import org.elasticsearch.index.seqno.RetentionLeaseUtils; |
39 | 35 | import org.elasticsearch.rest.RestStatus; |
40 | 36 | import org.elasticsearch.rest.action.document.RestIndexAction; |
41 | 37 | import org.elasticsearch.rest.action.document.RestUpdateAction; |
|
45 | 41 | import java.io.IOException; |
46 | 42 | import java.util.ArrayList; |
47 | 43 | import java.util.Collection; |
48 | | -import java.util.HashSet; |
49 | 44 | import java.util.List; |
50 | 45 | import java.util.Locale; |
51 | 46 | import java.util.Map; |
52 | 47 | import java.util.Objects; |
53 | | -import java.util.Set; |
54 | 48 | import java.util.concurrent.Future; |
55 | 49 | import java.util.concurrent.TimeUnit; |
56 | 50 | import java.util.function.Predicate; |
|
60 | 54 | import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; |
61 | 55 | import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; |
62 | 56 | import static org.hamcrest.Matchers.equalTo; |
63 | | -import static org.hamcrest.Matchers.hasItems; |
64 | 57 | import static org.hamcrest.Matchers.hasSize; |
65 | 58 | import static org.hamcrest.Matchers.is; |
66 | 59 | import static org.hamcrest.Matchers.isIn; |
@@ -408,7 +401,7 @@ public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exceptio |
408 | 401 | } |
409 | 402 | ensureGreen(index); |
410 | 403 | if (CLUSTER_TYPE == ClusterType.UPGRADED) { |
411 | | - assertAllCopiesHaveRetentionLeases(index); |
| 404 | + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); |
412 | 405 | } |
413 | 406 | } |
414 | 407 |
|
@@ -451,52 +444,19 @@ public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Excepti |
451 | 444 | putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}"); |
452 | 445 | assertOK(client().performRequest(putSettingsRequest)); |
453 | 446 | ensureGreen(index); |
454 | | - assertAllCopiesHaveRetentionLeases(index); |
| 447 | + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); |
455 | 448 | } else { |
456 | 449 | ensureGreen(index); |
457 | 450 | } |
458 | 451 | break; |
459 | 452 |
|
460 | 453 | case UPGRADED: |
461 | 454 | ensureGreen(index); |
462 | | - assertAllCopiesHaveRetentionLeases(index); |
| 455 | + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); |
463 | 456 | break; |
464 | 457 | } |
465 | 458 | } |
466 | 459 |
|
467 | | - private void assertAllCopiesHaveRetentionLeases(String index) throws Exception { |
468 | | - assertBusy(() -> { |
469 | | - final Request statsRequest = new Request("GET", "/" + index + "/_stats"); |
470 | | - statsRequest.addParameter("level", "shards"); |
471 | | - final Map<?, ?> shardsStats = ObjectPath.createFromResponse(client().performRequest(statsRequest)) |
472 | | - .evaluate("indices." + index + ".shards"); |
473 | | - for (Map.Entry<?, ?> shardCopiesEntry : shardsStats.entrySet()) { |
474 | | - final List<?> shardCopiesList = (List<?>) shardCopiesEntry.getValue(); |
475 | | - |
476 | | - final Set<String> expectedLeaseIds = new HashSet<>(); |
477 | | - for (Object shardCopyStats : shardCopiesList) { |
478 | | - final String nodeId |
479 | | - = Objects.requireNonNull((String) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("routing"))).get("node")); |
480 | | - expectedLeaseIds.add(ReplicationTracker.getPeerRecoveryRetentionLeaseId( |
481 | | - ShardRouting.newUnassigned(new ShardId("_na_", "test", 0), false, RecoverySource.PeerRecoverySource.INSTANCE, |
482 | | - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")).initialize(nodeId, null, 0L))); |
483 | | - } |
484 | | - |
485 | | - final Set<String> actualLeaseIds = new HashSet<>(); |
486 | | - for (Object shardCopyStats : shardCopiesList) { |
487 | | - final List<?> leases |
488 | | - = (List<?>) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("retention_leases"))).get("leases"); |
489 | | - for (Object lease : leases) { |
490 | | - actualLeaseIds.add(Objects.requireNonNull((String) (((Map<?, ?>) lease).get("id")))); |
491 | | - } |
492 | | - } |
493 | | - assertThat("[" + index + "][" + shardCopiesEntry.getKey() + "] has leases " + actualLeaseIds |
494 | | - + " but expected " + expectedLeaseIds, |
495 | | - actualLeaseIds, hasItems(expectedLeaseIds.toArray(new String[0]))); |
496 | | - } |
497 | | - }); |
498 | | - } |
499 | | - |
500 | 460 | /** |
501 | 461 | * This test creates an index in the non upgraded cluster and closes it. It then checks that the index |
502 | 462 | * is effectively closed and potentially replicated (if the version the index was created on supports |
|
0 commit comments