From 73e84e6d7cf164e92f12e4d93ba5614dc4b23625 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 25 Jul 2017 11:03:05 +0200 Subject: [PATCH 1/3] MasterNodeChangePredicate should use the node instance to detect master change This predicate is used to deal with the intricacies of detecting when a master is reelected/nodes rejoins an existing master. The current implementation is based on nodeIds, which is fine if the master really change. If the nodeId is equal the code falls back to detecting an increment in the cluster state version which happens when a node is re-elected or when the node rejoins. Sadly this doesn't cover the case where the same node is elected after a full restart of all master nodes. In that case we recover the cluster state from disk but the version is reset back to 0. To fix this, the check should be done based on ephemeral IDs which are reset on restart. --- .../cluster/MasterNodeChangePredicate.java | 8 +++---- .../TransportMasterNodeActionTests.java | 24 +++++++++++++++---- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java b/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java index 2fec24ec4885c..1d4ffac8945bd 100644 --- a/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java +++ b/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java @@ -33,13 +33,13 @@ private MasterNodeChangePredicate() { */ public static Predicate build(ClusterState currentState) { final long currentVersion = currentState.version(); - final String currentMaster = currentState.nodes().getMasterNodeId(); + final String currentMasterId = currentState.nodes().getMasterNode().getEphemeralId(); return newState -> { - final String newMaster = newState.nodes().getMasterNodeId(); + final String newMasterId = newState.nodes().getMasterNode().getEphemeralId(); final boolean accept; - if (newMaster == null) { + if (newMasterId == null) { accept = false; - } else if (newMaster.equals(currentMaster) == false){ + } else if (newMasterId.equals(currentMasterId) == false){ accept = true; } else { accept = newState.version() > currentVersion; diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 806277c799af7..6ca096bc70769 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -301,7 +302,9 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted boolean failsWithConnectTransportException = randomBoolean(); boolean rejoinSameMaster = failsWithConnectTransportException && randomBoolean(); Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(failsWithConnectTransportException ? 60 : 0)); - setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + DiscoveryNode masterNode = this.remoteNode; + setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, allNodes)) + .version(randomIntBetween(0, 10))); // use a random base version so it can go down when simulating a restart. PlainActionFuture listener = new PlainActionFuture<>(); new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener); @@ -314,10 +317,21 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted assertThat(capturedRequest.action, equalTo("testAction")); if (rejoinSameMaster) { - transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error")); + transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error")); assertFalse(listener.isDone()); - // reset the same state to increment a version simulating a join of an existing node - setState(clusterService, clusterService.state()); + if (randomBoolean()) { + // reset the same state to increment a version simulating a join of an existing node + // simulating use being disconnected + setState(clusterService, clusterService.state()); + } else { + // simulate master restart followed by a state recovery - this will reset the cluster state version + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + nodesBuilder.remove(masterNode); + masterNode = new DiscoveryNode(masterNode.getId(), masterNode.getAddress(), masterNode.getVersion()); + nodesBuilder.add(masterNode); + final ClusterState.Builder builder = ClusterState.builder(clusterService.state()).nodes(nodesBuilder); + setState(clusterService, builder.version(0)); + } assertFalse(listener.isDone()); capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); @@ -326,7 +340,7 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted assertThat(capturedRequest.request, equalTo(request)); assertThat(capturedRequest.action, equalTo("testAction")); } else if (failsWithConnectTransportException) { - transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error")); + transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error")); assertFalse(listener.isDone()); setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes)); assertTrue(listener.isDone()); From 81f83dc4028babd05f155c7c67c25e1bc4880956 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 26 Jul 2017 11:51:56 +0200 Subject: [PATCH 2/3] feedback --- .../cluster/MasterNodeChangePredicate.java | 11 +++++++---- .../master/TransportMasterNodeActionTests.java | 13 +++++++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java b/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java index 1d4ffac8945bd..05f2730cb1d5f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java +++ b/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster; +import org.elasticsearch.cluster.node.DiscoveryNode; + import java.util.function.Predicate; public final class MasterNodeChangePredicate { @@ -33,13 +35,14 @@ private MasterNodeChangePredicate() { */ public static Predicate build(ClusterState currentState) { final long currentVersion = currentState.version(); - final String currentMasterId = currentState.nodes().getMasterNode().getEphemeralId(); + final DiscoveryNode masterNode = currentState.nodes().getMasterNode(); + final String currentMasterId = masterNode == null ? null : masterNode.getEphemeralId(); return newState -> { - final String newMasterId = newState.nodes().getMasterNode().getEphemeralId(); + final DiscoveryNode newMaster = newState.nodes().getMasterNode(); final boolean accept; - if (newMasterId == null) { + if (newMaster == null) { accept = false; - } else if (newMasterId.equals(currentMasterId) == false){ + } else if (newMaster.getEphemeralId().equals(currentMasterId) == false){ accept = true; } else { accept = newState.version() > currentVersion; diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 6ca096bc70769..0146c63473cdb 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -299,7 +299,7 @@ public void testDelegateToMaster() throws ExecutionException, InterruptedExcepti } public void testDelegateToFailingMaster() throws ExecutionException, InterruptedException { - boolean failsWithConnectTransportException = randomBoolean(); + boolean failsWithConnectTransportException = true || randomBoolean(); boolean rejoinSameMaster = failsWithConnectTransportException && randomBoolean(); Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(failsWithConnectTransportException ? 60 : 0)); DiscoveryNode masterNode = this.remoteNode; @@ -319,16 +319,25 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted if (rejoinSameMaster) { transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error")); assertFalse(listener.isDone()); + if (randomBoolean()) { + // simulate master node removal removal + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + nodesBuilder.masterNodeId(null); + setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder)); + } if (randomBoolean()) { // reset the same state to increment a version simulating a join of an existing node // simulating use being disconnected - setState(clusterService, clusterService.state()); + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + nodesBuilder.masterNodeId(masterNode.getId()); + setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder)); } else { // simulate master restart followed by a state recovery - this will reset the cluster state version final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); nodesBuilder.remove(masterNode); masterNode = new DiscoveryNode(masterNode.getId(), masterNode.getAddress(), masterNode.getVersion()); nodesBuilder.add(masterNode); + nodesBuilder.masterNodeId(masterNode.getId()); final ClusterState.Builder builder = ClusterState.builder(clusterService.state()).nodes(nodesBuilder); setState(clusterService, builder.version(0)); } From fed572e8d882d3a5db005fe4bfab827312b13dcf Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 26 Jul 2017 13:53:06 +0200 Subject: [PATCH 3/3] feddback --- .../org/elasticsearch/cluster/MasterNodeChangePredicate.java | 2 +- .../action/support/master/TransportMasterNodeActionTests.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java b/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java index 05f2730cb1d5f..5bcfecaebafba 100644 --- a/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java +++ b/core/src/main/java/org/elasticsearch/cluster/MasterNodeChangePredicate.java @@ -42,7 +42,7 @@ public static Predicate build(ClusterState currentState) { final boolean accept; if (newMaster == null) { accept = false; - } else if (newMaster.getEphemeralId().equals(currentMasterId) == false){ + } else if (newMaster.getEphemeralId().equals(currentMasterId) == false) { accept = true; } else { accept = newState.version() > currentVersion; diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 0146c63473cdb..b14b030a5dc88 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -299,7 +299,7 @@ public void testDelegateToMaster() throws ExecutionException, InterruptedExcepti } public void testDelegateToFailingMaster() throws ExecutionException, InterruptedException { - boolean failsWithConnectTransportException = true || randomBoolean(); + boolean failsWithConnectTransportException = randomBoolean(); boolean rejoinSameMaster = failsWithConnectTransportException && randomBoolean(); Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(failsWithConnectTransportException ? 60 : 0)); DiscoveryNode masterNode = this.remoteNode; @@ -320,7 +320,7 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(masterNode, "Fake error")); assertFalse(listener.isDone()); if (randomBoolean()) { - // simulate master node removal removal + // simulate master node removal final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); nodesBuilder.masterNodeId(null); setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder));