Skip to content

Commit 66cc202

Browse files
committed
Refactor old state version check to ZenDiscovery
1 parent c2ed5a1 commit 66cc202

File tree

4 files changed

+30
-23
lines changed

4 files changed

+30
-23
lines changed

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3737
import org.elasticsearch.cluster.service.ClusterService;
3838
import org.elasticsearch.common.Priority;
39+
import org.elasticsearch.common.SuppressForbidden;
3940
import org.elasticsearch.common.component.AbstractLifecycleComponent;
4041
import org.elasticsearch.common.component.Lifecycle;
4142
import org.elasticsearch.common.inject.Inject;
@@ -773,15 +774,24 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
773774
* If the first condition fails we reject the cluster state and throw an error.
774775
* If the second condition fails we ignore the cluster state.
775776
*/
776-
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
777+
@SuppressForbidden(reason = "debug")
778+
public static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
777779
validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState);
778-
if (currentState.supersedes(newClusterState)) {
780+
781+
// reject cluster states that are not new from the same master
782+
if (currentState.supersedes(newClusterState) ||
783+
(newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) {
779784
// if the new state has a smaller version, and it has the same master node, then no need to process it
785+
logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
786+
return true;
787+
}
788+
789+
// reject older cluster states if we are following a master
790+
if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) {
780791
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
781792
return true;
782-
} else {
783-
return false;
784793
}
794+
return false;
785795
}
786796

787797
/**

core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -400,42 +400,39 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus
400400
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
401401
throw new IllegalStateException("received state from a node that is not part of the cluster");
402402
}
403-
final DiscoveryNodes currentNodes = clusterStateSupplier.get().nodes();
403+
final ClusterState clusterState = clusterStateSupplier.get();
404404

405-
if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
405+
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
406406
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
407407
throw new IllegalStateException("received state with a local node that does not match the current local node");
408408
}
409409

410-
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
411-
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) {
412-
final String message = String.format(
410+
if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) {
411+
String message = String.format(
413412
Locale.ROOT,
414-
"received cluster state from current master superseded by last seen cluster state; " +
415-
"received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
413+
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
416414
incomingState.version(),
417415
incomingState.stateUUID(),
418-
lastSeenClusterState.version(),
419-
lastSeenClusterState.stateUUID()
416+
incomingState.nodes().getMasterNodeId()
420417
);
421418
logger.warn(message);
422419
throw new IllegalStateException(message);
423420
}
424421

425-
final ClusterState state = clusterStateSupplier.get();
426-
if (state.nodes().getMasterNodeId() != null && incomingState.version() <= state.version()) {
427-
assert !incomingState.stateUUID().equals(state.stateUUID());
422+
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) {
428423
final String message = String.format(
429424
Locale.ROOT,
430-
"received cluster state older than current cluster state; " +
431-
"received version [%d] with uuid [%s], current version [%d]",
425+
"received cluster state from current master superseded by last seen cluster state; " +
426+
"received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
432427
incomingState.version(),
433428
incomingState.stateUUID(),
434-
state.version()
429+
lastSeenClusterState.version(),
430+
lastSeenClusterState.stateUUID()
435431
);
436432
logger.warn(message);
437433
throw new IllegalStateException(message);
438434
}
435+
439436
}
440437

441438
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {

core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.node.DiscoveryNode;
2626
import org.elasticsearch.cluster.node.DiscoveryNodes;
27+
import org.elasticsearch.common.Strings;
2728
import org.elasticsearch.common.transport.DummyTransportAddress;
2829
import org.elasticsearch.discovery.zen.ping.ZenPing;
2930
import org.elasticsearch.test.ESTestCase;
@@ -64,7 +65,7 @@ public void testShouldIgnoreNewClusterState() {
6465
assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
6566
currentState.version(1);
6667
newState.version(1);
67-
assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
68+
assertTrue("should ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
6869
currentState.version(1);
6970
newState.version(2);
7071
assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));

core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -656,11 +656,10 @@ public void testIncomingClusterStateValidation() throws Exception {
656656
expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState));
657657
final String message = String.format(
658658
Locale.ROOT,
659-
"received cluster state from current master superseded by last seen cluster state; received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
659+
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
660660
incomingState.version(),
661661
incomingState.stateUUID(),
662-
node.clusterState.version(),
663-
node.clusterState.stateUUID()
662+
incomingState.nodes().getMasterNodeId()
664663
);
665664
assertThat(e, hasToString("java.lang.IllegalStateException: " + message));
666665

0 commit comments

Comments
 (0)