Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private void buildResponse(final ClusterStateRequest request,
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
builder.stateUUID(currentState.stateUUID());
builder.minimumMasterNodesOnPublishingMaster(currentState.getMinimumMasterNodesOnPublishingMaster());

if (request.nodes()) {
builder.nodes(currentState.nodes());
Expand Down
43 changes: 38 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -178,17 +179,19 @@ default boolean isPrivate() {

private final boolean wasReadFromDiff;

private final int minimumMasterNodesOnPublishingMaster;

// built on demand
private volatile RoutingNodes routingNodes;

public ClusterState(long version, String stateUUID, ClusterState state) {
this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(),
state.customs(), false);
state.customs(), -1, false);
}

public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
boolean wasReadFromDiff) {
int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) {
this.version = version;
this.stateUUID = stateUUID;
this.clusterName = clusterName;
Expand All @@ -197,6 +200,7 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met
this.nodes = nodes;
this.blocks = blocks;
this.customs = customs;
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
this.wasReadFromDiff = wasReadFromDiff;
}

Expand Down Expand Up @@ -290,6 +294,17 @@ public Set<VotingConfigExclusion> getVotingConfigExclusions() {
return coordinationMetaData().getVotingConfigExclusions();
}

/**
* The node-level `discovery.zen.minimum_master_nodes` setting on the master node that published this cluster state, for use in rolling
* upgrades from 6.x to 7.x. Once all the 6.x master-eligible nodes have left the cluster, the 7.x nodes use this value to determine how
* many master-eligible nodes must be discovered before the cluster can be bootstrapped. Note that this method returns the node-level
* value of this setting, and ignores any cluster-level override that was set via the API. Callers are expected to combine this value
* with any value set in the cluster-level settings. This should be removed once we no longer need support for {@link Version#V_6_7_0}.
*/
public int getMinimumMasterNodesOnPublishingMaster() {
return minimumMasterNodesOnPublishingMaster;
}

// Used for testing and logging to determine how this cluster state was send over the wire
public boolean wasReadFromDiff() {
return wasReadFromDiff;
Expand Down Expand Up @@ -644,7 +659,7 @@ public static class Builder {
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
private final ImmutableOpenMap.Builder<String, Custom> customs;
private boolean fromDiff;

private int minimumMasterNodesOnPublishingMaster = -1;

public Builder(ClusterState state) {
this.clusterName = state.clusterName;
Expand All @@ -655,6 +670,7 @@ public Builder(ClusterState state) {
this.metaData = state.metaData();
this.blocks = state.blocks();
this.customs = ImmutableOpenMap.builder(state.customs());
this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster;
this.fromDiff = false;
}

Expand Down Expand Up @@ -715,6 +731,11 @@ public Builder stateUUID(String uuid) {
return this;
}

public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) {
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
return this;
}

public Builder putCustom(String type, Custom custom) {
customs.put(type, custom);
return this;
Expand All @@ -739,7 +760,8 @@ public ClusterState build() {
if (UNKNOWN_UUID.equals(uuid)) {
uuid = UUIDs.randomBase64UUID();
}
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff);
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(),
minimumMasterNodesOnPublishingMaster, fromDiff);
}

public static byte[] toBytes(ClusterState state) throws IOException {
Expand Down Expand Up @@ -782,6 +804,7 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
Custom customIndexMetaData = in.readNamedWriteable(Custom.class);
builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData);
}
builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
return builder.build();
}

Expand All @@ -807,6 +830,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(cursor.value);
}
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

private static class ClusterStateDiff implements Diff<ClusterState> {
Expand All @@ -829,6 +855,8 @@ private static class ClusterStateDiff implements Diff<ClusterState> {

private final Diff<ImmutableOpenMap<String, Custom>> customs;

private final int minimumMasterNodesOnPublishingMaster;

ClusterStateDiff(ClusterState before, ClusterState after) {
fromUuid = before.stateUUID;
toUuid = after.stateUUID;
Expand All @@ -839,6 +867,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = after.metaData.diff(before.metaData);
blocks = after.blocks.diff(before.blocks);
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster;
}

ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
Expand All @@ -851,6 +880,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = MetaData.readDiffFrom(in);
blocks = ClusterBlocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
}

@Override
Expand All @@ -864,6 +894,9 @@ public void writeTo(StreamOutput out) throws IOException {
metaData.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

@Override
Expand All @@ -883,9 +916,9 @@ public ClusterState apply(ClusterState state) {
builder.metaData(metaData.apply(state.metaData));
builder.blocks(blocks.apply(state.blocks));
builder.customs(customs.apply(state.customs));
builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster);
builder.fromDiff(true);
return builder.build();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers,
this::isInitialConfigurationSet, this::setInitialConfiguration);
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService,
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
Expand Down Expand Up @@ -454,7 +454,7 @@ void becomeCandidate(String method) {
clusterFormationFailureHelper.start();

if (getCurrentTerm() == ZEN1_BWC_TERM) {
discoveryUpgradeService.activate(lastKnownLeader);
discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState());
}

leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -60,6 +60,7 @@
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING;

/**
Expand All @@ -80,7 +81,12 @@ public class DiscoveryUpgradeService {
public static final Setting<Boolean> ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING =
Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope);

private final ElectMasterService electMasterService;
/**
* Dummy {@link ElectMasterService} that is only used to choose the best 6.x master from the discovered nodes, ignoring the
* `minimum_master_nodes` setting.
*/
private static final ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY);

private final TransportService transportService;
private final BooleanSupplier isBootstrappedSupplier;
private final JoinHelper joinHelper;
Expand All @@ -93,12 +99,11 @@ public class DiscoveryUpgradeService {
@Nullable // null if no active joining round
private volatile JoiningRound joiningRound;

public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
public DiscoveryUpgradeService(Settings settings, TransportService transportService,
BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper,
Supplier<Iterable<DiscoveryNode>> peersSupplier,
Consumer<VotingConfiguration> initialConfigurationConsumer) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed";
electMasterService = new ElectMasterService(settings);
this.transportService = transportService;
this.isBootstrappedSupplier = isBootstrappedSupplier;
this.joinHelper = joinHelper;
Expand All @@ -107,12 +112,9 @@ public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSetting
this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings);
this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings);
this.clusterName = CLUSTER_NAME_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large
}

public void activate(Optional<DiscoveryNode> lastKnownLeader) {
public void activate(Optional<DiscoveryNode> lastKnownLeader, ClusterState lastAcceptedClusterState) {
// called under coordinator mutex

if (isBootstrappedSupplier.getAsBoolean()) {
Expand All @@ -122,8 +124,13 @@ public void activate(Optional<DiscoveryNode> lastKnownLeader) {
assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader;
// if there was a leader and it's not a old node then we must have been bootstrapped

final Settings dynamicSettings = lastAcceptedClusterState.metaData().settings();
final int minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(dynamicSettings)
? DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(dynamicSettings)
: lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster();

assert joiningRound == null : joiningRound;
joiningRound = new JoiningRound(lastKnownLeader.isPresent());
joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes);
joiningRound.scheduleNextAttempt();
}

Expand Down Expand Up @@ -160,15 +167,21 @@ void countDown() {

private class JoiningRound {
private final boolean upgrading;
private final int minimumMasterNodes;

JoiningRound(boolean upgrading) {
JoiningRound(boolean upgrading, int minimumMasterNodes) {
this.upgrading = upgrading;
this.minimumMasterNodes = minimumMasterNodes;
}

private boolean isRunning() {
return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false;
}

private boolean canBootstrap(Set<DiscoveryNode> discoveryNodes) {
return upgrading && minimumMasterNodes <= discoveryNodes.stream().filter(DiscoveryNode::isMasterNode).count();
}

void scheduleNextAttempt() {
if (isRunning() == false) {
return;
Expand All @@ -189,26 +202,22 @@ public void run() {
// this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not
// connected each time it wakes up (every second by default)

logger.debug("nodes: {}", discoveryNodes);

if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) {
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
electBestOldMaster(discoveryNodes);
} else if (upgrading && enableUnsafeBootstrappingOnUpgrade) {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
scheduleNextAttempt();
}
});
} else {
scheduleNextAttempt();
}
logger.debug("upgrading={}, minimumMasterNodes={}, nodes={}", upgrading, minimumMasterNodes, discoveryNodes);

if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
electBestOldMaster(discoveryNodes);
} else if (canBootstrap(discoveryNodes)) {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
scheduleNextAttempt();
}
});
} else {
scheduleNextAttempt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {

@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
Expand Down
Loading