Skip to content

Commit ba15d08

Browse files
Allow cluster access during node restart (#42946) (#43272)
This commit modifies InternalTestCluster to allow using client() and other operations inside a RestartCallback (onStoppedNode typically). Restarting nodes are now removed from the map and thus all methods now return the state as if the restarting node does not exist. This avoids various exceptions stemming from accessing the stopped node(s).
1 parent 4b58827 commit ba15d08

File tree

3 files changed

+52
-40
lines changed

3 files changed

+52
-40
lines changed

server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.cluster.routing.RoutingTable;
4040
import org.elasticsearch.cluster.routing.ShardRoutingState;
4141
import org.elasticsearch.cluster.routing.UnassignedInfo;
42+
import org.elasticsearch.common.CheckedConsumer;
4243
import org.elasticsearch.common.Priority;
4344
import org.elasticsearch.common.settings.Settings;
4445
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -55,7 +56,11 @@
5556

5657
import java.io.IOException;
5758
import java.util.List;
59+
import java.util.Map;
5860
import java.util.concurrent.TimeUnit;
61+
import java.util.function.Function;
62+
import java.util.stream.Collectors;
63+
import java.util.stream.Stream;
5964

6065
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
6166
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@@ -369,14 +374,7 @@ public void testRecoverBrokenIndexMetadata() throws Exception {
369374
// this one is not validated ahead of time and breaks allocation
370375
.put("index.analysis.filter.myCollator.type", "icu_collation")
371376
).build();
372-
internalCluster().fullRestart(new RestartCallback(){
373-
@Override
374-
public Settings onNodeStopped(String nodeName) throws Exception {
375-
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
376-
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
377-
return super.onNodeStopped(nodeName);
378-
}
379-
});
377+
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));
380378

381379
// check that the cluster does not keep reallocating shards
382380
assertBusy(() -> {
@@ -443,14 +441,7 @@ public void testRecoverMissingAnalyzer() throws Exception {
443441
final IndexMetaData metaData = state.getMetaData().index("test");
444442
final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings()
445443
.filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build();
446-
internalCluster().fullRestart(new RestartCallback(){
447-
@Override
448-
public Settings onNodeStopped(String nodeName) throws Exception {
449-
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
450-
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
451-
return super.onNodeStopped(nodeName);
452-
}
453-
});
444+
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));
454445

455446
// check that the cluster does not keep reallocating shards
456447
assertBusy(() -> {
@@ -495,14 +486,7 @@ public void testArchiveBrokenClusterSettings() throws Exception {
495486
final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder()
496487
.put(metaData.persistentSettings()).put("this.is.unknown", true)
497488
.put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build();
498-
internalCluster().fullRestart(new RestartCallback(){
499-
@Override
500-
public Settings onNodeStopped(String nodeName) throws Exception {
501-
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
502-
metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta);
503-
return super.onNodeStopped(nodeName);
504-
}
505-
});
489+
writeBrokenMeta(metaStateService -> metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta));
506490

507491
ensureYellow("test"); // wait for state recovery
508492
state = client().admin().cluster().prepareState().get().getState();
@@ -519,4 +503,17 @@ public Settings onNodeStopped(String nodeName) throws Exception {
519503
+ MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
520504
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
521505
}
506+
507+
private void writeBrokenMeta(CheckedConsumer<MetaStateService, IOException> writer) throws Exception {
508+
Map<String, MetaStateService> metaStateServices = Stream.of(internalCluster().getNodeNames())
509+
.collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(MetaStateService.class, nodeName)));
510+
internalCluster().fullRestart(new RestartCallback(){
511+
@Override
512+
public Settings onNodeStopped(String nodeName) throws Exception {
513+
final MetaStateService metaStateService = metaStateServices.get(nodeName);
514+
writer.accept(metaStateService);
515+
return super.onNodeStopped(nodeName);
516+
}
517+
});
518+
}
522519
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@
144144
import java.util.stream.IntStream;
145145
import java.util.stream.Stream;
146146

147-
import static java.util.Collections.emptyList;
148147
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
149148
import static org.apache.lucene.util.LuceneTestCase.rarely;
150149
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
@@ -972,6 +971,7 @@ void startNode() {
972971
Settings closeForRestart(RestartCallback callback, int minMasterNodes) throws Exception {
973972
assert callback != null;
974973
close();
974+
removeNode(this);
975975
Settings callbackSettings = callback.onNodeStopped(name);
976976
assert callbackSettings != null;
977977
Settings.Builder newSettings = Settings.builder();
@@ -1805,20 +1805,9 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
18051805

18061806
removeExclusions(excludedNodeIds);
18071807

1808-
boolean success = false;
1809-
try {
1810-
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
1811-
nodeAndClient.startNode();
1812-
success = true;
1813-
} finally {
1814-
if (success == false) {
1815-
removeNode(nodeAndClient);
1816-
}
1817-
}
1818-
1819-
if (activeDisruptionScheme != null) {
1820-
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1821-
}
1808+
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(Collections.singletonList(nodeAndClient)));
1809+
nodeAndClient.startNode();
1810+
publishNode(nodeAndClient);
18221811

18231812
if (callback.validateClusterForming() || excludedNodeIds.isEmpty() == false) {
18241813
// we have to validate cluster size to ensure that the restarted node has rejoined the cluster if it was master-eligible;
@@ -1894,6 +1883,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
18941883
Map<Set<DiscoveryNodeRole>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
18951884
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
18961885
final int minMasterNodes = autoManageMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1;
1886+
final int nodeCount = nodes.size();
18971887
for (NodeAndClient nodeAndClient : nodes.values()) {
18981888
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
18991889
logger.info("Stopping and resetting node [{}] ", nodeAndClient.name);
@@ -1907,7 +1897,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
19071897
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
19081898
}
19091899

1910-
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size();
1900+
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodeCount;
19111901

19121902
// randomize start up order, but making sure that:
19131903
// 1) A data folder that was assigned to a data node will stay so

test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
*/
1919
package org.elasticsearch.test.test;
2020

21+
import org.elasticsearch.client.node.NodeClient;
22+
import org.elasticsearch.common.settings.Settings;
2123
import org.elasticsearch.test.ESIntegTestCase;
2224
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
25+
import org.elasticsearch.test.InternalTestCluster;
2326

2427
import java.io.IOException;
2528

@@ -61,4 +64,26 @@ public void testStoppingNodesOneByOne() throws IOException {
6164

6265
ensureGreen();
6366
}
67+
68+
public void testOperationsDuringRestart() throws Exception {
69+
internalCluster().startMasterOnlyNode();
70+
internalCluster().startDataOnlyNodes(2);
71+
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
72+
@Override
73+
public Settings onNodeStopped(String nodeName) throws Exception {
74+
ensureGreen();
75+
internalCluster().validateClusterFormed();
76+
assertNotNull(internalCluster().getInstance(NodeClient.class));
77+
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
78+
@Override
79+
public Settings onNodeStopped(String nodeName) throws Exception {
80+
ensureGreen();
81+
internalCluster().validateClusterFormed();
82+
return super.onNodeStopped(nodeName);
83+
}
84+
});
85+
return super.onNodeStopped(nodeName);
86+
}
87+
});
88+
}
6489
}

0 commit comments

Comments
 (0)