diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java index 8b97f1357fa00..455459b758202 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java @@ -228,6 +228,10 @@ public String getAllocationId() { * matching sync ids are irrelevant. */ public boolean hasMatchingSyncId() { + // TODO: we should eventually either distinguish between sync-id and non sync-id equivalent closed shard allocation or + // rename this to isSynced(). + // left this for now, since it changes the API and should preferably be handled together with seqno based + // replica shard allocation. return matchingBytes == Long.MAX_VALUE; } diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 10bd6115b4c74..114c434a0a09e 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData; @@ -49,7 +50,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -101,17 +101,16 @@ public void processExistingRecoveries(RoutingAllocation allocation) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider - final String currentSyncId; + final TransportNodesListShardStoreMetaData.StoreFilesMetaData currentStore; if (shardStores.getData().containsKey(currentNode)) { - currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId(); + currentStore = shardStores.getData().get(currentNode).storeFilesMetaData(); } else { - currentSyncId = null; + currentStore = null; } if (currentNode.equals(nodeWithHighestMatch) == false - && Objects.equals(currentSyncId, primaryStore.syncId()) == false - && matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) { - // we found a better match that has a full sync id match, the existing allocation is not fully synced - // so we found a better one, cancel this one + && isSyncedRecovery(primaryStore, currentStore, isClosedIndex(shard, allocation)) == false + && matchingNodes.isSyncedRecovery(nodeWithHighestMatch)) { + // we found a better match that can do a fast recovery, cancel current recovery logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", currentNode, nodeWithHighestMatch); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, @@ -315,6 +314,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al boolean explain) { ObjectLongMap nodesToSize = new ObjectLongHashMap<>(); Map nodeDecisions = explain ? new HashMap<>() : null; + final boolean closedIndex = isClosedIndex(shard, allocation); for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); @@ -335,7 +335,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al long matchingBytes = -1; if (explain) { - matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData); + matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData, closedIndex); ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingBytes); nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision)); } @@ -345,7 +345,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al } if (matchingBytes < 0) { - matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData); + matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData, closedIndex); } nodesToSize.put(discoNode, matchingBytes); if (logger.isTraceEnabled()) { @@ -362,11 +362,9 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al } private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, - TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) { - String primarySyncId = primaryStore.syncId(); - String replicaSyncId = storeFilesMetaData.syncId(); - // see if we have a sync id we can make use of - if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) { + TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData, + boolean closedIndex) { + if (isSyncedRecovery(primaryStore, storeFilesMetaData, closedIndex)) { return Long.MAX_VALUE; } else { long sizeMatched = 0; @@ -380,6 +378,37 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St } } + /** + * Is a "synced recovery", which is either sync-id match or a closed index with equivalent last commits. + */ + private static boolean isSyncedRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore, boolean closedIndex) { + return syncIdMatch(primaryStore, candidateStore) + || (closedIndex && equivalentStores(primaryStore, candidateStore)); + } + + private static boolean syncIdMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + String primarySyncId = primaryStore.syncId(); + String replicaSyncId = candidateStore.syncId(); + return (replicaSyncId != null && replicaSyncId.equals(primarySyncId)); + } + + private static boolean equivalentStores(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + SequenceNumbers.CommitInfo primarySeqNoInfo = primaryStore.seqNoInfo(); + SequenceNumbers.CommitInfo candidateSeqNoInfo = candidateStore.seqNoInfo(); + + return primarySeqNoInfo.maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED // disregard empty or upgraded without ops + && primarySeqNoInfo.maxSeqNo == primarySeqNoInfo.localCheckpoint + && candidateSeqNoInfo.maxSeqNo == candidateSeqNoInfo.localCheckpoint + && primarySeqNoInfo.maxSeqNo == candidateSeqNoInfo.maxSeqNo; + } + + private boolean isClosedIndex(ShardRouting shard, RoutingAllocation allocation) { + return allocation.metaData().getIndexSafe(shard.index()).getState() == IndexMetaData.State.CLOSE; + } + protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); /** @@ -418,7 +447,10 @@ public DiscoveryNode getNodeWithHighestMatch() { return this.nodeWithHighestMatch; } - public boolean isNodeMatchBySyncID(DiscoveryNode node) { + /** + * Is supplied node a sync'ed recovery, either sync-id match or closed index with identical last commit. + */ + public boolean isSyncedRecovery(DiscoveryNode node) { return nodesToSize.get(node) == Long.MAX_VALUE; } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 65d2f8d7812f8..8813963fdd0e0 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -845,6 +845,13 @@ public long getNumDocs() { return numDocs; } + /** + * @return sequence number info from lucene commit + */ + public SequenceNumbers.CommitInfo seqNoInfo() { + return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(commitUserData.entrySet()); + } + static class LoadedMetadata { final Map fileMetadata; final Map userData; diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 5ebbdab39835b..70293f527421b 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -43,6 +43,7 @@ import org.elasticsearch.gateway.AsyncShardFetch; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -220,6 +221,13 @@ public String syncId() { return metadataSnapshot.getSyncId(); } + /** + * @return sequence number info from lucene commit + */ + public SequenceNumbers.CommitInfo seqNoInfo() { + return metadataSnapshot.seqNoInfo(); + } + @Override public String toString() { return "StoreFilesMetaData{" + diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 941ad3c658aba..afc794b8dd37d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -154,7 +154,7 @@ public void testUnassignedReplicaDelayedAllocation() throws Exception { logger.info("--> starting 3 nodes"); internalCluster().startNodes(3); - prepareIndex(1, 1); + IndexMetaData.State state = prepareIndex(1, 1); logger.info("--> stopping the node with the replica"); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode().getName())); ensureStableCluster(2); @@ -263,7 +263,7 @@ public void testUnassignedReplicaDelayedAllocation() throws Exception { nodes.put(primaryNodeName, AllocationDecision.NO); String[] currentNodes = internalCluster().getNodeNames(); nodes.put(currentNodes[0].equals(primaryNodeName) ? currentNodes[1] : currentNodes[0], AllocationDecision.YES); - verifyNodeDecisions(parser, nodes, includeYesDecisions, true); + verifyNodeDecisions(parser, nodes, includeYesDecisions, true, state == IndexMetaData.State.CLOSE); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -272,7 +272,7 @@ public void testUnassignedReplicaWithPriorCopy() throws Exception { logger.info("--> starting 3 nodes"); List nodes = internalCluster().startNodes(3); - prepareIndex(1, 1); + IndexMetaData.State indexState = prepareIndex(1, 1); String primaryNodeName = primaryNodeName(); nodes.remove(primaryNodeName); @@ -383,7 +383,7 @@ public void testUnassignedReplicaWithPriorCopy() throws Exception { for (String nodeName : internalCluster().getNodeNames()) { nodeDecisions.put(nodeName, AllocationDecision.NO); } - verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true); + verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true, indexState == IndexMetaData.State.CLOSE); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -476,7 +476,7 @@ public void testAllocationFilteringOnIndexCreation() throws Exception { for (String nodeName : internalCluster().getNodeNames()) { nodeDecisions.put(nodeName, AllocationDecision.NO); } - verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false); + verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -584,7 +584,7 @@ public void testAllocationFilteringPreventsShardMove() throws Exception { assertEquals("move_explanation", parser.currentName()); parser.nextToken(); assertEquals("cannot move shard to another node, even though it is not allowed to remain on its current node", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -696,7 +696,7 @@ public void testRebalancingNotAllowed() throws Exception { parser.nextToken(); assertEquals("rebalancing is not allowed, even though there is at least one node on which the shard can be allocated", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -799,7 +799,7 @@ public void testWorseBalance() throws Exception { parser.nextToken(); assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -909,7 +909,7 @@ public void testBetterBalanceButCannotAllocate() throws Exception { parser.nextToken(); assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -1006,7 +1006,7 @@ public void testAssignedReplicaOnSpecificNode() throws Exception { assertEquals("rebalance_explanation", parser.currentName()); parser.nextToken(); assertEquals("rebalancing is not allowed", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -1160,8 +1160,10 @@ private ClusterAllocationExplanation runExplain(boolean primary, String nodeId, return explanation; } - private void prepareIndex(final int numPrimaries, final int numReplicas) { - prepareIndex(randomIndexState(), numPrimaries, numReplicas, Settings.EMPTY, ActiveShardCount.ALL); + private IndexMetaData.State prepareIndex(final int numPrimaries, final int numReplicas) { + IndexMetaData.State indexState = randomIndexState(); + prepareIndex(indexState, numPrimaries, numReplicas, Settings.EMPTY, ActiveShardCount.ALL); + return indexState; } private void prepareIndex(final IndexMetaData.State state, final int numPrimaries, final int numReplicas, @@ -1326,7 +1328,7 @@ private void verifyStaleShardCopyNodeDecisions(XContentParser parser, int numNod } private void verifyNodeDecisions(XContentParser parser, Map expectedNodeDecisions, - boolean includeYesDecisions, boolean reuseStore) throws IOException { + boolean includeYesDecisions, boolean reuseStore, boolean synced) throws IOException { parser.nextToken(); assertEquals("node_allocation_decisions", parser.currentName()); assertEquals(Token.START_ARRAY, parser.nextToken()); @@ -1346,9 +1348,15 @@ private void verifyNodeDecisions(XContentParser parser, Map(); } @@ -379,6 +421,10 @@ public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaDat Map commitData = new HashMap<>(); if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); + if (seqNoInfo != null) { + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(seqNoInfo.localCheckpoint)); + commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoInfo.maxSeqNo)); + } } data.put(node, new TransportNodesListShardStoreMetaData.StoreFilesMetaData(shardId, new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()))); diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 740034f12ecc5..d01eaa97ce453 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -421,6 +422,56 @@ public Settings onNodeStopped(String nodeName) throws Exception { } } + /** + * Verify that if we have two shard copies around, we prefer one with identical sequence numbers and do + * a noop recovery. + */ + public void testClosedIndexRecoversFast() throws Exception { + final String indexName = "closed-index-fast-recovery"; + internalCluster().ensureAtLeastNumDataNodes(3); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureGreen(indexName); + if (randomBoolean()) { + flush(indexName); + } + + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", "Extra" + i).setSource("num", i)).collect(toList())); + ensureGreen(); + + assertAcked(client().admin().indices().prepareClose(indexName).get()); + ensureGreen(); + + // Must disable replica allocation before stopping node because of caching in ReplicaShardAllocator (async fetch) + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")).get()); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ensureYellow(); + return super.onNodeStopped(nodeName); + } + }); + return super.onNodeStopped(nodeName); + } + }); + + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.YELLOW)); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String) null)).get()); + ensureGreen(); + assertNoFileBasedRecovery(indexName); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { @@ -467,7 +518,7 @@ static void assertException(final Throwable throwable, final String indexName) { } } - void assertNoFileBasedRecovery(String indexName) { + private void assertNoFileBasedRecovery(String indexName) { for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { if (recovery.getPrimary() == false) { assertThat(recovery.getIndex().fileDetails(), empty()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 7ff928c4413d2..a460db7868cfa 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -148,8 +148,8 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE; -import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE; +import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; @@ -562,9 +562,14 @@ private NodeAndClient getRandomNodeAndClient() { return getRandomNodeAndClient(nc -> true); } - private synchronized NodeAndClient getRandomNodeAndClient(Predicate predicate) { + private NodeAndClient getRandomNodeAndClient(Predicate predicate) { + return getRandomNodeAndClientIncludingClosed(((Predicate) nc -> nc.isClosed() == false).and(predicate)); + } + + private synchronized NodeAndClient getRandomNodeAndClientIncludingClosed(Predicate predicate) { ensureOpen(); - List values = nodes.values().stream().filter(predicate).collect(Collectors.toList()); + List values = nodes.values().stream().filter(predicate) + .collect(Collectors.toList()); if (values.isEmpty() == false) { return randomFrom(random, values); } @@ -1003,6 +1008,10 @@ public void close() throws IOException { } } + public boolean isClosed() { + return closed.get(); + } + private void markNodeDataDirsAsPendingForWipe(Node node) { assert Thread.holdsLock(InternalTestCluster.this); NodeEnvironment nodeEnv = node.getNodeEnvironment(); @@ -1178,10 +1187,11 @@ public synchronized void validateClusterFormed() { /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */ private synchronized void validateClusterFormed(String viaNode) { - Set expectedNodes = new HashSet<>(); - for (NodeAndClient nodeAndClient : nodes.values()) { - expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode()); - } + Set expectedNodes = + nodes.values().stream() + .filter(nc -> nc.isClosed() == false) + .map(nc -> getInstanceFromNode(ClusterService.class, nc.node()).localNode()) + .collect(Collectors.toSet()); logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes); final Client client = client(viaNode); try { @@ -1515,7 +1525,7 @@ public T getMasterNodeInstance(Class clazz) { } private synchronized T getInstance(Class clazz, Predicate predicate) { - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(predicate); + NodeAndClient randomNodeAndClient = getRandomNodeAndClientIncludingClosed(predicate); assert randomNodeAndClient != null; return getInstanceFromNode(clazz, randomNodeAndClient.node); } @@ -1533,7 +1543,7 @@ private static T getInstanceFromNode(Class clazz, Node node) { @Override public int size() { - return nodes.size(); + return Math.toIntExact(nodes.values().stream().filter(nc -> nc.isClosed() == false).count()); } @Override @@ -2085,7 +2095,10 @@ private static int getMinMasterNodes(int eligibleMasterNodes) { } private int getMasterNodesCount() { - return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); + return (int) nodes.values().stream() + .filter(n -> n.isClosed() == false) + .filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())) + .count(); } public String startMasterOnlyNode() {