Skip to content

Commit 609a199

Browse files
authored
Upon being elected as master, prefer joins' node info to existing cluster state (#19743)
When we introduces [persistent node ids](#19140) we were concerned that people may copy data folders from one to another resulting in two nodes competing for the same id in the cluster. To solve this we elected to not allow an incoming join if a different with same id already exists in the cluster, or if some other node already has the same transport address as the incoming join. The rationeel there was that it is better to prefer existing nodes and that we can rely on node fault detection to remove any node from the cluster that isn't correct any more, making room for the node that wants to join (and will keep trying). Sadly there were two problems with this: 1) One minor and easy to fix - we didn't allow for the case where the existing node can have the same network address as the incoming one, but have a different ephemeral id (after node restart). This confused the logic in `AllocationService`, in this rare cases. The cluster is good enough to detect this and recover later on, but it's not clean. 2) The assumption that Node Fault Detection will clean up is *wrong* when the node just won an election (it wasn't master before) and needs to process the incoming joins in order to commit the cluster state and assume it's mastership. In those cases, the Node Fault Detection isn't active. This PR fixes these two and prefers incoming nodes to existing node when finishing an election. On top of the, on request by @ywelsch , `AllocationService` synchronization between the nodes of the cluster and it's routing table is now explicit rather than something we do all the time. The same goes for promotion of replicas to primaries.
1 parent 3f6a3c0 commit 609a199

File tree

88 files changed

+641
-482
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+641
-482
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void setUp() throws Exception {
145145
RoutingTable routingTable = rb.build();
146146
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
147147
for (int i = 1; i <= numNodes; i++) {
148-
nb.put(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
148+
nb.add(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
149149
}
150150
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
151151
.metaData(metaData).routingTable(routingTable).nodes

core/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,10 @@ public Builder nodes(DiscoveryNodes nodes) {
625625
return this;
626626
}
627627

628+
public DiscoveryNodes nodes() {
629+
return nodes;
630+
}
631+
628632
public Builder routingResult(RoutingAllocation.Result routingResult) {
629633
this.routingTable = routingResult.routingTable();
630634
this.metaData = routingResult.metaData();
@@ -723,7 +727,6 @@ public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throw
723727
public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
724728
return PROTO.readFrom(in, localNode);
725729
}
726-
727730
}
728731

729732
@Override

core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,14 @@ public DiscoveryNodes removeDeadMembers(Set<String> newNodes, String masterNodeI
357357
Builder builder = new Builder().masterNodeId(masterNodeId).localNodeId(localNodeId);
358358
for (DiscoveryNode node : this) {
359359
if (newNodes.contains(node.getId())) {
360-
builder.put(node);
360+
builder.add(node);
361361
}
362362
}
363363
return builder.build();
364364
}
365365

366366
public DiscoveryNodes newNode(DiscoveryNode node) {
367-
return new Builder(this).put(node).build();
367+
return new Builder(this).add(node).build();
368368
}
369369

370370
/**
@@ -554,8 +554,8 @@ private DiscoveryNodes readFrom(StreamInput in, DiscoveryNode localNode) throws
554554
node = localNode;
555555
}
556556
// some one already built this and validated it's OK, skip the n2 scans
557-
assert builder.validatePut(node) == null : "building disco nodes from network doesn't pass preflight: "
558-
+ builder.validatePut(node);
557+
assert builder.validateAdd(node) == null : "building disco nodes from network doesn't pass preflight: "
558+
+ builder.validateAdd(node);
559559
builder.putUnsafe(node);
560560
}
561561
return builder.build();
@@ -592,17 +592,27 @@ public Builder(DiscoveryNodes nodes) {
592592

593593
/**
594594
* adds a disco node to the builder. Will throw an {@link IllegalArgumentException} if
595-
* the supplied node doesn't pass the pre-flight checks performed by {@link #validatePut(DiscoveryNode)}
595+
* the supplied node doesn't pass the pre-flight checks performed by {@link #validateAdd(DiscoveryNode)}
596596
*/
597-
public Builder put(DiscoveryNode node) {
598-
final String preflight = validatePut(node);
597+
public Builder add(DiscoveryNode node) {
598+
final String preflight = validateAdd(node);
599599
if (preflight != null) {
600600
throw new IllegalArgumentException(preflight);
601601
}
602602
putUnsafe(node);
603603
return this;
604604
}
605605

606+
/**
607+
* Get a node by its id
608+
*
609+
* @param nodeId id of the wanted node
610+
* @return wanted node if it exists. Otherwise <code>null</code>
611+
*/
612+
@Nullable public DiscoveryNode get(String nodeId) {
613+
return nodes.get(nodeId);
614+
}
615+
606616
private void putUnsafe(DiscoveryNode node) {
607617
nodes.put(node.getId(), node);
608618
}
@@ -635,20 +645,20 @@ public Builder localNodeId(String localNodeId) {
635645
*
636646
* @return null if all is OK or an error message explaining why a node can not be added.
637647
*
638-
* Note: if this method returns a non-null value, calling {@link #put(DiscoveryNode)} will fail with an
648+
* Note: if this method returns a non-null value, calling {@link #add(DiscoveryNode)} will fail with an
639649
* exception
640650
*/
641-
private String validatePut(DiscoveryNode node) {
651+
private String validateAdd(DiscoveryNode node) {
642652
for (ObjectCursor<DiscoveryNode> cursor : nodes.values()) {
643653
final DiscoveryNode existingNode = cursor.value;
644654
if (node.getAddress().equals(existingNode.getAddress()) &&
645655
node.getId().equals(existingNode.getId()) == false) {
646656
return "can't add node " + node + ", found existing node " + existingNode + " with same address";
647657
}
648658
if (node.getId().equals(existingNode.getId()) &&
649-
node.getAddress().equals(existingNode.getAddress()) == false) {
659+
node.equals(existingNode) == false) {
650660
return "can't add node " + node + ", found existing node " + existingNode
651-
+ " with the same id, but a different address";
661+
+ " with the same id but is a different node instance";
652662
}
653663
}
654664
return null;

core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,36 @@ public Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAll
249249
applyFailedShard(allocation, failedShard, unassignedInfo);
250250
}
251251
gatewayAllocator.applyFailedShards(allocation);
252+
252253
reroute(allocation);
253254
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.routingEntry.shardId().toString());
254255
return buildResultAndLogHealthChange(allocation, "shards failed [" + failedShardsAsString + "] ...");
255256
}
256257

258+
/**
259+
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
260+
* if needed.
261+
*/
262+
public RoutingAllocation.Result deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
263+
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
264+
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
265+
routingNodes.unassigned().shuffle();
266+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
267+
clusterInfoService.getClusterInfo(), currentNanoTime(), false);
268+
269+
// first, clear from the shards any node id they used to belong to that is now dead
270+
boolean changed = deassociateDeadNodes(allocation);
271+
272+
if (reroute) {
273+
changed |= reroute(allocation);
274+
}
275+
276+
if (!changed) {
277+
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
278+
}
279+
return buildResultAndLogHealthChange(allocation, reason);
280+
}
281+
257282
/**
258283
* Removes delay markers from unassigned shards based on current time stamp. Returns true if markers were removed.
259284
*/
@@ -352,13 +377,9 @@ private void logClusterHealthStateChange(ClusterStateHealth previousStateHealth,
352377
}
353378

354379
private boolean reroute(RoutingAllocation allocation) {
355-
boolean changed = false;
356-
// first, clear from the shards any node id they used to belong to that is now dead
357-
changed |= deassociateDeadNodes(allocation);
380+
assert deassociateDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
358381

359-
// elect primaries *before* allocating unassigned, so backups of primaries that failed
360-
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
361-
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
382+
boolean changed = electPrimariesAndUnassignedDanglingReplicas(allocation);
362383

363384
// now allocate all the unassigned to available nodes
364385
if (allocation.routingNodes().unassigned().size() > 0) {
@@ -390,8 +411,8 @@ private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation al
390411
if (candidate != null) {
391412
shardEntry = unassignedIterator.demotePrimaryToReplicaShard();
392413
ShardRouting primarySwappedCandidate = routingNodes.promoteAssignedReplicaShardToPrimary(candidate);
414+
changed = true;
393415
if (primarySwappedCandidate.relocatingNodeId() != null) {
394-
changed = true;
395416
// its also relocating, make sure to move the other routing to primary
396417
RoutingNode node = routingNodes.node(primarySwappedCandidate.relocatingNodeId());
397418
if (node != null) {
@@ -406,7 +427,6 @@ private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation al
406427
IndexMetaData index = allocation.metaData().getIndexSafe(primarySwappedCandidate.index());
407428
if (IndexMetaData.isIndexUsingShadowReplicas(index.getSettings())) {
408429
routingNodes.reinitShadowPrimary(primarySwappedCandidate);
409-
changed = true;
410430
}
411431
}
412432
}

core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public synchronized void setClusterStatePublisher(BiConsumer<ClusterChangedEvent
156156

157157
public synchronized void setLocalNode(DiscoveryNode localNode) {
158158
assert clusterState.nodes().getLocalNodeId() == null : "local node is already set";
159-
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).put(localNode).localNodeId(localNode.getId());
159+
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).add(localNode).localNodeId(localNode.getId());
160160
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
161161
}
162162

core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public boolean runOnlyOnMaster() {
134134
public ClusterState execute(ClusterState currentState) {
135135
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
136136
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
137-
nodesBuilder.put(discovery.localNode());
137+
nodesBuilder.add(discovery.localNode());
138138
}
139139
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
140140
// remove the NO_MASTER block in this case
@@ -160,7 +160,7 @@ public boolean runOnlyOnMaster() {
160160
public ClusterState execute(ClusterState currentState) {
161161
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
162162
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
163-
nodesBuilder.put(discovery.localNode());
163+
nodesBuilder.add(discovery.localNode());
164164
}
165165
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
166166
currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
@@ -231,8 +231,8 @@ public ClusterState execute(ClusterState currentState) {
231231
}
232232
// reroute here, so we eagerly remove dead nodes from the routing
233233
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
234-
RoutingAllocation.Result routingResult = master.allocationService.reroute(
235-
ClusterState.builder(updatedState).build(), "elected as master");
234+
RoutingAllocation.Result routingResult = master.allocationService.deassociateDeadNodes(
235+
ClusterState.builder(updatedState).build(), true, "node stopped");
236236
return ClusterState.builder(updatedState).routingResult(routingResult).build();
237237
}
238238

core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,7 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
413413

414414
final DiscoveryNodes currentNodes = currentState.nodes();
415415
boolean nodesChanged = false;
416-
ClusterState.Builder newState = ClusterState.builder(currentState);
417-
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
416+
ClusterState.Builder newState;
418417

419418
if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
420419
return results.successes(joiningNodes).build(currentState);
@@ -423,16 +422,17 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
423422
// use these joins to try and become the master.
424423
// Note that we don't have to do any validation of the amount of joining nodes - the commit
425424
// during the cluster state publishing guarantees that we have enough
426-
nodesBuilder.masterNodeId(currentNodes.getLocalNodeId());
427-
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
428-
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
429-
newState.blocks(clusterBlocks);
425+
newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);
430426
nodesChanged = true;
431-
} else if (nodesBuilder.isLocalNodeElectedMaster() == false) {
427+
} else if (currentNodes.isLocalNodeElectedMaster() == false) {
432428
logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
433429
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
430+
} else {
431+
newState = ClusterState.builder(currentState);
434432
}
435433

434+
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
435+
436436
assert nodesBuilder.isLocalNodeElectedMaster();
437437

438438
// processing any joins
@@ -443,7 +443,7 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
443443
logger.debug("received a join request for an existing node [{}]", node);
444444
} else {
445445
try {
446-
nodesBuilder.put(node);
446+
nodesBuilder.add(node);
447447
nodesChanged = true;
448448
} catch (IllegalArgumentException e) {
449449
results.failure(node, e);
@@ -468,6 +468,28 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
468468
return results.build(newState.build());
469469
}
470470

471+
private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<DiscoveryNode> joiningNodes) {
472+
assert currentState.nodes().getMasterNodeId() == null : currentState.prettyPrint();
473+
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
474+
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
475+
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
476+
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
477+
for (final DiscoveryNode joiningNode : joiningNodes) {
478+
final DiscoveryNode existingNode = nodesBuilder.get(joiningNode.getId());
479+
if (existingNode != null && existingNode.equals(joiningNode) == false) {
480+
logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", existingNode, joiningNode);
481+
nodesBuilder.remove(existingNode.getId());
482+
}
483+
}
484+
485+
// now trim any left over dead nodes - either left there when the previous master stepped down
486+
// or removed by us above
487+
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(clusterBlocks).build();
488+
RoutingAllocation.Result result = allocationService.deassociateDeadNodes(tmpState, false,
489+
"removed dead nodes on election");
490+
return ClusterState.builder(tmpState).routingResult(result);
491+
}
492+
471493
@Override
472494
public boolean runOnlyOnMaster() {
473495
// we validate that we are allowed to change the cluster state during cluster state processing

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,8 @@ public BatchResult<Task> execute(final ClusterState currentState, final List<Tas
570570
if (!electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes())) {
571571
return resultBuilder.build(rejoin.apply(remainingNodesClusterState, "not enough master nodes"));
572572
} else {
573-
final RoutingAllocation.Result routingResult = allocationService.reroute(remainingNodesClusterState, describeTasks(tasks));
573+
final RoutingAllocation.Result routingResult =
574+
allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks));
574575
return resultBuilder.build(ClusterState.builder(remainingNodesClusterState).routingResult(routingResult).build());
575576
}
576577
}

core/src/main/java/org/elasticsearch/tribe/TribeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent
382382
clusterStateChanged = true;
383383
logger.info("[{}] adding node [{}]", tribeName, discoNode);
384384
nodes.remove(tribe.getId()); // remove any existing node with the same id but different ephemeral id
385-
nodes.put(discoNode);
385+
nodes.add(discoNode);
386386
}
387387
}
388388

core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,7 @@ public void testTasksToXContentGrouping() throws Exception {
739739
// First group by node
740740
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
741741
for (TestNode testNode : this.testNodes) {
742-
discoNodes.put(testNode.discoveryNode);
742+
discoNodes.add(testNode.discoveryNode);
743743
}
744744
response.setDiscoveryNodes(discoNodes.build());
745745
Map<String, Object> byNodes = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "nodes")));

0 commit comments

Comments
 (0)