Skip to content

Commit 36833f5

Browse files
committed
Adapt InternalCluster#fullRestart to call onNodeStopped when all nodes are stopped (#35494)
Refactors and simplifies the logic around stopping nodes, making sure that for a full cluster restart onNodeStopped is only called after the nodes are actually all stopped (and in particular not while starting up some nodes again). This change also ensures that a closed node client is not being used anymore (which required a small change to a test). Relates to #35049
1 parent da574ef commit 36833f5

File tree

2 files changed

+41
-45
lines changed

2 files changed

+41
-45
lines changed

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -933,28 +933,12 @@ void startNode() {
933933
}
934934
}
935935

936-
void closeNode() throws IOException {
937-
markNodeDataDirsAsPendingForWipe(node);
938-
node.close();
939-
}
940-
941-
/**
942-
* closes the current node if not already closed, builds a new node object using the current node settings and starts it
943-
*/
944-
void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
945-
if (!node.isClosed()) {
946-
closeNode();
947-
}
948-
recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes, () -> rebuildUnicastHostFiles(emptyList()));
949-
startNode();
950-
}
951-
952936
/**
953-
* rebuilds a new node object using the current node settings and starts it
937+
* closes the node and prepares it to be restarted
954938
*/
955-
void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes,
956-
Runnable onTransportServiceStarted) throws Exception {
939+
Settings closeForRestart(RestartCallback callback, int minMasterNodes) throws Exception {
957940
assert callback != null;
941+
close();
958942
Settings callbackSettings = callback.onNodeStopped(name);
959943
Settings.Builder newSettings = Settings.builder();
960944
if (callbackSettings != null) {
@@ -964,12 +948,9 @@ void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded,
964948
assert DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed";
965949
newSettings.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build();
966950
}
967-
if (clearDataIfNeeded) {
968-
clearDataIfNeeded(callback);
969-
}
970-
createNewNode(newSettings.build(), onTransportServiceStarted);
971-
// make sure cached client points to new node
972-
resetClient();
951+
// delete data folders now, before we start other nodes that may claim it
952+
clearDataIfNeeded(callback);
953+
return newSettings.build();
973954
}
974955

975956
private void clearDataIfNeeded(RestartCallback callback) throws IOException {
@@ -983,7 +964,10 @@ private void clearDataIfNeeded(RestartCallback callback) throws IOException {
983964
}
984965
}
985966

986-
private void createNewNode(final Settings newSettings, final Runnable onTransportServiceStarted) {
967+
private void recreateNode(final Settings newSettings, final Runnable onTransportServiceStarted) {
968+
if (closed.get() == false) {
969+
throw new IllegalStateException("node " + name + " should be closed before recreating it");
970+
}
987971
// use a new seed to make sure we have new node id
988972
final long newIdSeed = NodeEnvironment.NODE_ID_SEED_SETTING.get(node.settings()) + 1;
989973
Settings finalSettings = Settings.builder()
@@ -1003,6 +987,7 @@ public void afterStart() {
1003987
onTransportServiceStarted.run();
1004988
}
1005989
});
990+
closed.set(false);
1006991
markNodeDataDirsAsNotEligableForWipe(node);
1007992
}
1008993

@@ -1012,7 +997,8 @@ public void close() throws IOException {
1012997
resetClient();
1013998
} finally {
1014999
closed.set(true);
1015-
closeNode();
1000+
markNodeDataDirsAsPendingForWipe(node);
1001+
node.close();
10161002
}
10171003
}
10181004
}
@@ -1743,7 +1729,10 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
17431729
if (updateMinMaster) {
17441730
updateMinMasterNodes(masterNodesCount - 1);
17451731
}
1746-
nodeAndClient.restart(callback, true, autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
1732+
final Settings newSettings = nodeAndClient.closeForRestart(callback,
1733+
autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
1734+
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
1735+
nodeAndClient.startNode();
17471736
if (activeDisruptionScheme != null) {
17481737
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
17491738
}
@@ -1764,19 +1753,20 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
17641753
*/
17651754
public synchronized void fullRestart(RestartCallback callback) throws Exception {
17661755
int numNodesRestarted = 0;
1756+
final Settings[] newNodeSettings = new Settings[nextNodeId.get()];
17671757
Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
17681758
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
1759+
final int minMasterNodes = autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1;
17691760
for (NodeAndClient nodeAndClient : nodes.values()) {
17701761
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
1771-
logger.info("Stopping node [{}] ", nodeAndClient.name);
1762+
logger.info("Stopping and resetting node [{}] ", nodeAndClient.name);
17721763
if (activeDisruptionScheme != null) {
17731764
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
17741765
}
1775-
nodeAndClient.closeNode();
1776-
// delete data folders now, before we start other nodes that may claim it
1777-
nodeAndClient.clearDataIfNeeded(callback);
17781766
DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
1779-
rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId] = discoveryNode.getRoles();
1767+
final Settings newSettings = nodeAndClient.closeForRestart(callback, minMasterNodes);
1768+
newNodeSettings[nodeAndClient.nodeAndClientId()] = newSettings;
1769+
rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId()] = discoveryNode.getRoles();
17801770
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
17811771
}
17821772

@@ -1801,10 +1791,8 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
18011791
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0;
18021792

18031793
for (NodeAndClient nodeAndClient : startUpOrder) {
1804-
logger.info("resetting node [{}] ", nodeAndClient.name);
1805-
// we already cleared data folders, before starting nodes up
1806-
nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1,
1807-
() -> rebuildUnicastHostFiles(startUpOrder));
1794+
logger.info("creating node [{}] ", nodeAndClient.name);
1795+
nodeAndClient.recreateNode(newNodeSettings[nodeAndClient.nodeAndClientId()], () -> rebuildUnicastHostFiles(startUpOrder));
18081796
}
18091797

18101798
startAndPublishNodesAndClients(startUpOrder);

test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,21 @@ private void assertMMNinNodeSetting(String node, InternalTestCluster cluster, in
147147
}
148148

149149
private void assertMMNinClusterSetting(InternalTestCluster cluster, int masterNodes) {
150-
final int minMasterNodes = masterNodes / 2 + 1;
151150
for (final String node : cluster.getNodeNames()) {
152-
Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true)
153-
.get().getState().getMetaData().settings();
154-
155-
assertEquals("dynamic setting for node [" + node + "] has the wrong min_master_node setting : ["
156-
+ stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
157-
DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(stateSettings).intValue(), minMasterNodes);
151+
assertMMNinClusterSetting(node, cluster, masterNodes);
158152
}
159153
}
160154

155+
private void assertMMNinClusterSetting(String node, InternalTestCluster cluster, int masterNodes) {
156+
final int minMasterNodes = masterNodes / 2 + 1;
157+
Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true)
158+
.get().getState().getMetaData().settings();
159+
160+
assertEquals("dynamic setting for node [" + node + "] has the wrong min_master_node setting : ["
161+
+ stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
162+
DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(stateSettings).intValue(), minMasterNodes);
163+
}
164+
161165
public void testBeforeTest() throws Exception {
162166
final boolean autoManageMinMasterNodes = randomBoolean();
163167
long clusterSeed = randomLong();
@@ -505,7 +509,11 @@ public Settings transportClientSettings() {
505509
cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
506510
@Override
507511
public Settings onNodeStopped(String nodeName) throws Exception {
508-
assertMMNinClusterSetting(cluster, 1);
512+
for (String name : cluster.getNodeNames()) {
513+
if (name.equals(nodeName) == false) {
514+
assertMMNinClusterSetting(name, cluster, 1);
515+
}
516+
}
509517
return super.onNodeStopped(nodeName);
510518
}
511519
});

0 commit comments

Comments
 (0)