diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 530c5ce4f6396..1fc5ef474ee04 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1009,10 +1009,8 @@ private enum ElasticsearchExceptionHandle { MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0), COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class, org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0), - CLUSTER_ALREADY_BOOTSTRAPPED_EXCEPTION(org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException.class, - org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException::new, 151, Version.V_7_0_0), SNAPSHOT_IN_PROGRESS_EXCEPTION(org.elasticsearch.snapshots.SnapshotInProgressException.class, - org.elasticsearch.snapshots.SnapshotInProgressException::new, 152, Version.V_7_0_0); + org.elasticsearch.snapshots.SnapshotInProgressException::new, 151, Version.V_7_0_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index a3d3c615162dc..142aa6bde74b6 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -23,10 +23,6 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; -import org.elasticsearch.action.admin.cluster.bootstrap.TransportBootstrapClusterAction; -import org.elasticsearch.action.admin.cluster.bootstrap.TransportGetDiscoveredNodesAction; import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; @@ -433,8 +429,6 @@ public void reg actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class); actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class); - actions.register(GetDiscoveredNodesAction.INSTANCE, TransportGetDiscoveredNodesAction.class); - actions.register(BootstrapClusterAction.INSTANCE, TransportBootstrapClusterAction.class); actions.register(AddVotingConfigExclusionsAction.INSTANCE, TransportAddVotingConfigExclusionsAction.class); actions.register(ClearVotingConfigExclusionsAction.INSTANCE, TransportClearVotingConfigExclusionsAction.class); actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java deleted file mode 100644 index 28a8e580cedc4..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.Action; -import org.elasticsearch.common.io.stream.Writeable.Reader; - -public class BootstrapClusterAction extends Action { - public static final BootstrapClusterAction INSTANCE = new BootstrapClusterAction(); - public static final String NAME = "cluster:admin/bootstrap/set_voting_config"; - - private BootstrapClusterAction() { - super(NAME); - } - - @Override - public BootstrapClusterResponse newResponse() { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - - @Override - public Reader getResponseReader() { - return BootstrapClusterResponse::new; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequest.java deleted file mode 100644 index f8d0bcb13a58f..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; - -/** - * Request to set the initial configuration of master-eligible nodes in a cluster so that the very first master election can take place. - */ -public class BootstrapClusterRequest extends ActionRequest { - private final BootstrapConfiguration bootstrapConfiguration; - - public BootstrapClusterRequest(BootstrapConfiguration bootstrapConfiguration) { - this.bootstrapConfiguration = bootstrapConfiguration; - } - - public BootstrapClusterRequest(StreamInput in) throws IOException { - super(in); - bootstrapConfiguration = new BootstrapConfiguration(in); - } - - /** - * @return the bootstrap configuration: the initial set of master-eligible nodes whose votes are counted in elections. - */ - public BootstrapConfiguration getBootstrapConfiguration() { - return bootstrapConfiguration; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - bootstrapConfiguration.writeTo(out); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponse.java deleted file mode 100644 index 2576409a3cef1..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponse.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; - -/** - * Response to a {@link BootstrapClusterRequest} indicating that the cluster has been successfully bootstrapped. - */ -public class BootstrapClusterResponse extends ActionResponse { - private final boolean alreadyBootstrapped; - - public BootstrapClusterResponse(boolean alreadyBootstrapped) { - this.alreadyBootstrapped = alreadyBootstrapped; - } - - public BootstrapClusterResponse(StreamInput in) throws IOException { - super(in); - alreadyBootstrapped = in.readBoolean(); - } - - /** - * @return whether this node already knew that the cluster had been bootstrapped when handling this request. - */ - public boolean getAlreadyBootstrapped() { - return alreadyBootstrapped; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(alreadyBootstrapped); - } - - @Override - public String toString() { - return "BootstrapClusterResponse{" + - "alreadyBootstrapped=" + alreadyBootstrapped + - '}'; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfiguration.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfiguration.java deleted file mode 100644 index 822287af77465..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfiguration.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - -public class BootstrapConfiguration implements Writeable { - - private final List nodeDescriptions; - - public BootstrapConfiguration(List nodeDescriptions) { - if (nodeDescriptions.isEmpty()) { - throw new IllegalArgumentException("cannot create empty bootstrap configuration"); - } - this.nodeDescriptions = Collections.unmodifiableList(new ArrayList<>(nodeDescriptions)); - } - - public BootstrapConfiguration(StreamInput in) throws IOException { - nodeDescriptions = Collections.unmodifiableList(in.readList(NodeDescription::new)); - assert nodeDescriptions.isEmpty() == false; - } - - public List getNodeDescriptions() { - return nodeDescriptions; - } - - public VotingConfiguration resolve(Iterable discoveredNodes) { - final Set selectedNodes = new HashSet<>(); - for (final NodeDescription nodeDescription : nodeDescriptions) { - final DiscoveryNode discoveredNode = nodeDescription.resolve(discoveredNodes); - if (selectedNodes.add(discoveredNode) == false) { - throw new ElasticsearchException("multiple nodes matching {} in {}", discoveredNode, this); - } - } - - final Set nodeIds = selectedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - assert nodeIds.size() == selectedNodes.size() : selectedNodes + " does not contain distinct IDs"; - return new VotingConfiguration(nodeIds); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeList(nodeDescriptions); - } - - @Override - public String toString() { - return "BootstrapConfiguration{" + - "nodeDescriptions=" + nodeDescriptions + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - BootstrapConfiguration that = (BootstrapConfiguration) o; - return Objects.equals(nodeDescriptions, that.nodeDescriptions); - } - - @Override - public int hashCode() { - return Objects.hash(nodeDescriptions); - } - - public static class NodeDescription implements Writeable { - - @Nullable - private final String id; - - private final String name; - - @Nullable - public String getId() { - return id; - } - - public String getName() { - return name; - } - - public NodeDescription(@Nullable String id, String name) { - this.id = id; - this.name = Objects.requireNonNull(name); - } - - public NodeDescription(DiscoveryNode discoveryNode) { - this(discoveryNode.getId(), discoveryNode.getName()); - } - - public NodeDescription(StreamInput in) throws IOException { - this(in.readOptionalString(), in.readString()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalString(id); - out.writeString(name); - } - - @Override - public String toString() { - return "NodeDescription{" + - "id='" + id + '\'' + - ", name='" + name + '\'' + - '}'; - } - - public DiscoveryNode resolve(Iterable discoveredNodes) { - DiscoveryNode selectedNode = null; - for (final DiscoveryNode discoveredNode : discoveredNodes) { - assert discoveredNode.isMasterNode() : discoveredNode; - if (discoveredNode.getName().equals(name)) { - if (id == null || id.equals(discoveredNode.getId())) { - if (selectedNode != null) { - throw new ElasticsearchException( - "discovered multiple nodes matching {} in {}", this, discoveredNodes); - } - selectedNode = discoveredNode; - } else { - throw new ElasticsearchException("node id mismatch comparing {} to {}", this, discoveredNode); - } - } else if (id != null && id.equals(discoveredNode.getId())) { - throw new ElasticsearchException("node name mismatch comparing {} to {}", this, discoveredNode); - } - } - if (selectedNode == null) { - throw new ElasticsearchException("no node matching {} found in {}", this, discoveredNodes); - } - - return selectedNode; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NodeDescription that = (NodeDescription) o; - return Objects.equals(id, that.id) && - Objects.equals(name, that.name); - } - - @Override - public int hashCode() { - return Objects.hash(id, name); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java deleted file mode 100644 index acaef284a5420..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.Action; -import org.elasticsearch.common.io.stream.Writeable.Reader; - -public class GetDiscoveredNodesAction extends Action { - public static final GetDiscoveredNodesAction INSTANCE = new GetDiscoveredNodesAction(); - public static final String NAME = "cluster:admin/bootstrap/discover_nodes"; - - private GetDiscoveredNodesAction() { - super(NAME); - } - - @Override - public GetDiscoveredNodesResponse newResponse() { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - - @Override - public Reader getResponseReader() { - return GetDiscoveredNodesResponse::new; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java deleted file mode 100644 index f91a4de5263be..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -/** - * Request the set of master-eligible nodes discovered by this node. Most useful in a brand-new cluster as a precursor to setting the - * initial configuration using {@link BootstrapClusterRequest}. - */ -public class GetDiscoveredNodesRequest extends ActionRequest { - - @Nullable // if the request should wait indefinitely - private TimeValue timeout = TimeValue.timeValueSeconds(30); - - private List requiredNodes = Collections.emptyList(); - - public GetDiscoveredNodesRequest() { - } - - public GetDiscoveredNodesRequest(StreamInput in) throws IOException { - super(in); - timeout = in.readOptionalTimeValue(); - requiredNodes = in.readList(StreamInput::readString); - } - - /** - * Sometimes it is useful to wait until enough nodes have been discovered, rather than failing immediately. This parameter controls how - * long to wait, and defaults to 30s. - * - * @param timeout how long to wait to discover sufficiently many nodes to respond successfully. - */ - public void setTimeout(@Nullable TimeValue timeout) { - if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) { - throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed"); - } - this.timeout = timeout; - } - - /** - * Sometimes it is useful to wait until enough nodes have been discovered, rather than failing immediately. This parameter controls how - * long to wait, and defaults to 30s. - * - * @return how long to wait to discover sufficiently many nodes to respond successfully. - */ - @Nullable - public TimeValue getTimeout() { - return timeout; - } - - /** - * Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes. - * This parameter gives the names or transport addresses of the expected nodes. - * - * @return list of expected nodes - */ - public List getRequiredNodes() { - return requiredNodes; - } - - /** - * Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes. - * This parameter gives the names or transport addresses of the expected nodes. - * - * @param requiredNodes list of expected nodes - */ - public void setRequiredNodes(final List requiredNodes) { - this.requiredNodes = requiredNodes; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalTimeValue(timeout); - out.writeStringList(requiredNodes); - } - - @Override - public String toString() { - return "GetDiscoveredNodesRequest{" + - "timeout=" + timeout + - ", requiredNodes=" + requiredNodes + "}"; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponse.java deleted file mode 100644 index f697e16c03c2c..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponse.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * Response to {@link GetDiscoveredNodesRequest}, containing the set of master-eligible nodes that were discovered. - */ -public class GetDiscoveredNodesResponse extends ActionResponse { - private final Set nodes; - - public GetDiscoveredNodesResponse(Set nodes) { - this.nodes = Collections.unmodifiableSet(new HashSet<>(nodes)); - } - - public GetDiscoveredNodesResponse(StreamInput in) throws IOException { - super(in); - nodes = Collections.unmodifiableSet(in.readSet(DiscoveryNode::new)); - } - - /** - * @return the set of nodes that were discovered. - */ - public Set getNodes() { - return nodes; - } - - /** - * @return a bootstrap configuration constructed from the set of nodes that were discovered, in order to make a - * {@link BootstrapClusterRequest}. - */ - public BootstrapConfiguration getBootstrapConfiguration() { - return new BootstrapConfiguration(nodes.stream().map(NodeDescription::new).collect(Collectors.toList())); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeCollection(nodes); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java deleted file mode 100644 index 32a9f39cc0db8..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.coordination.Coordinator; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.transport.TransportService; - -import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; - -public class TransportBootstrapClusterAction extends HandledTransportAction { - - @Nullable // TODO make this not nullable - private final Coordinator coordinator; - private final TransportService transportService; - private final String discoveryType; - - @Inject - public TransportBootstrapClusterAction(Settings settings, ActionFilters actionFilters, TransportService transportService, - Discovery discovery) { - super(BootstrapClusterAction.NAME, transportService, actionFilters, BootstrapClusterRequest::new); - this.transportService = transportService; - this.discoveryType = DISCOVERY_TYPE_SETTING.get(settings); - if (discovery instanceof Coordinator) { - coordinator = (Coordinator) discovery; - } else { - coordinator = null; - } - } - - @Override - protected void doExecute(Task task, BootstrapClusterRequest request, ActionListener listener) { - if (coordinator == null) { // TODO remove when not nullable - throw new IllegalArgumentException("cluster bootstrapping is not supported by discovery type [" + discoveryType + "]"); - } - - final DiscoveryNode localNode = transportService.getLocalNode(); - assert localNode != null; - if (localNode.isMasterNode() == false) { - throw new IllegalArgumentException( - "this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"); - } - - transportService.getThreadPool().generic().execute(new AbstractRunnable() { - @Override - public void doRun() { - listener.onResponse(new BootstrapClusterResponse( - coordinator.setInitialConfiguration(request.getBootstrapConfiguration()) == false)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - public String toString() { - return "setting initial configuration with " + request; - } - }); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java deleted file mode 100644 index 6f6336c3bd5f3..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException; -import org.elasticsearch.cluster.coordination.Coordinator; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.Writeable.Reader; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ListenableFuture; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.transport.TransportService; - -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; - -public class TransportGetDiscoveredNodesAction extends HandledTransportAction { - - @Nullable // TODO make this not nullable - private final Coordinator coordinator; - private final TransportService transportService; - private final String discoveryType; - - @Inject - public TransportGetDiscoveredNodesAction(Settings settings, ActionFilters actionFilters, TransportService transportService, - Discovery discovery) { - super(GetDiscoveredNodesAction.NAME, transportService, actionFilters, - (Reader) GetDiscoveredNodesRequest::new); - - this.discoveryType = DISCOVERY_TYPE_SETTING.get(settings); - this.transportService = transportService; - if (discovery instanceof Coordinator) { - coordinator = (Coordinator) discovery; - } else { - coordinator = null; - } - } - - @Override - protected void doExecute(Task task, GetDiscoveredNodesRequest request, ActionListener listener) { - if (coordinator == null) { // TODO remove when not nullable - throw new IllegalArgumentException("discovered nodes are not exposed by discovery type [" + discoveryType + "]"); - } - - final DiscoveryNode localNode = transportService.getLocalNode(); - assert localNode != null; - if (localNode.isMasterNode() == false) { - throw new IllegalArgumentException( - "this node is not master-eligible, but discovered nodes are only exposed by master-eligible nodes"); - } - final ExecutorService directExecutor = EsExecutors.newDirectExecutorService(); - final AtomicBoolean listenerNotified = new AtomicBoolean(); - final ListenableFuture listenableFuture = new ListenableFuture<>(); - final ThreadPool threadPool = transportService.getThreadPool(); - listenableFuture.addListener(listener, directExecutor, threadPool.getThreadContext()); - // TODO make it so that listenableFuture copes with multiple completions, and then remove listenerNotified - - final ActionListener> respondIfRequestSatisfied = new ActionListener>() { - @Override - public void onResponse(Iterable nodes) { - final Set nodesSet = new LinkedHashSet<>(); - nodesSet.add(localNode); - nodes.forEach(nodesSet::add); - logger.trace("discovered {}", nodesSet); - try { - if (checkWaitRequirements(request, nodesSet)) { - final GetDiscoveredNodesResponse response = new GetDiscoveredNodesResponse(nodesSet); - if (listenerNotified.compareAndSet(false, true)) { - listenableFuture.onResponse(response); - } - } - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - if (listenerNotified.compareAndSet(false, true)) { - listenableFuture.onFailure(e); - } - } - - @Override - public String toString() { - return "waiting for " + request; - } - }; - - final Releasable releasable = coordinator.withDiscoveryListener(respondIfRequestSatisfied); - listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext()); - - if (coordinator.isInitialConfigurationSet()) { - respondIfRequestSatisfied.onFailure(new ClusterAlreadyBootstrappedException()); - } else { - respondIfRequestSatisfied.onResponse(coordinator.getFoundPeers()); - } - - if (request.getTimeout() != null) { - threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() { - @Override - public void run() { - respondIfRequestSatisfied.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request)); - } - - @Override - public String toString() { - return "timeout handler for " + request; - } - }); - } - } - - private static boolean matchesRequirement(DiscoveryNode discoveryNode, String requirement) { - return discoveryNode.getName().equals(requirement) - || discoveryNode.getAddress().toString().equals(requirement) - || discoveryNode.getAddress().getAddress().equals(requirement); - } - - private static boolean checkWaitRequirements(GetDiscoveredNodesRequest request, Set nodes) { - List requirements = request.getRequiredNodes(); - final Set selectedNodes = new HashSet<>(); - for (final String requirement : requirements) { - final Set matchingNodes - = nodes.stream().filter(n -> matchesRequirement(n, requirement)).collect(Collectors.toSet()); - - if (matchingNodes.isEmpty()) { - return false; - } - if (matchingNodes.size() > 1) { - throw new IllegalArgumentException("[" + requirement + "] matches " + matchingNodes); - } - - for (final DiscoveryNode matchingNode : matchingNodes) { - if (selectedNodes.add(matchingNode) == false) { - throw new IllegalArgumentException("[" + matchingNode + "] matches " + - requirements.stream().filter(r -> matchesRequirement(matchingNode, requirement)).collect(Collectors.toList())); - } - } - } - - return true; - } -} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterAlreadyBootstrappedException.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterAlreadyBootstrappedException.java deleted file mode 100644 index cc1c77c88477c..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterAlreadyBootstrappedException.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.cluster.coordination; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.IOException; - -/** - * Exception thrown if trying to discovery nodes in order to perform cluster bootstrapping, but a cluster is formed before all the required - * nodes are discovered. - */ -public class ClusterAlreadyBootstrappedException extends ElasticsearchException { - public ClusterAlreadyBootstrappedException() { - super("node has already joined a bootstrapped cluster, bootstrapping is not required"); - } - - public ClusterAlreadyBootstrappedException(StreamInput in) throws IOException { - super(in); - } -} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index fc3f4493104fc..d21c54c03e4e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -21,56 +21,73 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; -import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; public class ClusterBootstrapService { - private static final Logger logger = LogManager.getLogger(ClusterBootstrapService.class); - public static final Setting> INITIAL_MASTER_NODES_SETTING = - Setting.listSetting("cluster.initial_master_nodes", Collections.emptyList(), Function.identity(), Property.NodeScope); + Setting.listSetting("cluster.initial_master_nodes", emptyList(), Function.identity(), Property.NodeScope); public static final Setting UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING = Setting.timeSetting("discovery.unconfigured_bootstrap_timeout", TimeValue.timeValueSeconds(3), TimeValue.timeValueMillis(1), Property.NodeScope); - private final List initialMasterNodes; - @Nullable + static final String BOOTSTRAP_PLACEHOLDER_PREFIX = "{bootstrap-placeholder}-"; + + private static final Logger logger = LogManager.getLogger(ClusterBootstrapService.class); + private final Set bootstrapRequirements; + @Nullable // null if discoveryIsConfigured() private final TimeValue unconfiguredBootstrapTimeout; private final TransportService transportService; - private volatile boolean running; + private final Supplier> discoveredNodesSupplier; + private final BooleanSupplier isBootstrappedSupplier; + private final Consumer votingConfigurationConsumer; + private final AtomicBoolean bootstrappingPermitted = new AtomicBoolean(true); + + public ClusterBootstrapService(Settings settings, TransportService transportService, + Supplier> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier, + Consumer votingConfigurationConsumer) { + + final List initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings); + bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes)); + if (bootstrapRequirements.size() != initialMasterNodes.size()) { + throw new IllegalArgumentException( + "setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes); + } - public ClusterBootstrapService(Settings settings, TransportService transportService) { - initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings); unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings); this.transportService = transportService; + this.discoveredNodesSupplier = discoveredNodesSupplier; + this.isBootstrappedSupplier = isBootstrappedSupplier; + this.votingConfigurationConsumer = votingConfigurationConsumer; } public static boolean discoveryIsConfigured(Settings settings) { @@ -78,157 +95,135 @@ public static boolean discoveryIsConfigured(Settings settings) { .anyMatch(s -> s.exists(settings)); } - public void start() { - assert running == false; - running = true; + void onFoundPeersUpdated() { + final Set nodes = getDiscoveredNodes(); + if (transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false + && isBootstrappedSupplier.getAsBoolean() == false && nodes.stream().noneMatch(Coordinator::isZen1Node)) { + + final Tuple,List> requirementMatchingResult; + try { + requirementMatchingResult = checkRequirements(nodes); + } catch (IllegalStateException e) { + logger.warn("bootstrapping cancelled", e); + bootstrappingPermitted.set(false); + return; + } + + final Set nodesMatchingRequirements = requirementMatchingResult.v1(); + final List unsatisfiedRequirements = requirementMatchingResult.v2(); + logger.trace("nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}", + nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements); + + if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) { + startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements); + } + } + } + + void scheduleUnconfiguredBootstrap() { + if (unconfiguredBootstrapTimeout == null) { + return; + } if (transportService.getLocalNode().isMasterNode() == false) { return; } - if (unconfiguredBootstrapTimeout != null) { - logger.info("no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] " + - "unless existing master is discovered", unconfiguredBootstrapTimeout); - final ThreadContext threadContext = transportService.getThreadPool().getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - threadContext.markAsSystemContext(); + logger.info("no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] " + + "unless existing master is discovered", unconfiguredBootstrapTimeout); + + transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() { + @Override + public void run() { + final Set discoveredNodes = getDiscoveredNodes(); + final List zen1Nodes = discoveredNodes.stream().filter(Coordinator::isZen1Node).collect(Collectors.toList()); + if (zen1Nodes.isEmpty()) { + logger.debug("performing best-effort cluster bootstrapping with {}", discoveredNodes); + startBootstrap(discoveredNodes, emptyList()); + } else { + logger.info("avoiding best-effort cluster bootstrapping due to discovery of pre-7.0 nodes {}", zen1Nodes); + } + } + + @Override + public String toString() { + return "unconfigured-discovery delayed bootstrap"; + } + }); + } + + private Set getDiscoveredNodes() { + return Stream.concat(Stream.of(transportService.getLocalNode()), + StreamSupport.stream(discoveredNodesSupplier.get().spliterator(), false)).collect(Collectors.toSet()); + } + + private void startBootstrap(Set discoveryNodes, List unsatisfiedRequirements) { + assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes; + assert discoveryNodes.stream().noneMatch(Coordinator::isZen1Node) : discoveryNodes; + assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements; + if (bootstrappingPermitted.compareAndSet(true, false)) { + doBootstrap(new VotingConfiguration(Stream.concat(discoveryNodes.stream().map(DiscoveryNode::getId), + unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s)) + .collect(Collectors.toSet()))); + } + } + + public static boolean isBootstrapPlaceholder(String nodeId) { + return nodeId.startsWith(BOOTSTRAP_PLACEHOLDER_PREFIX); + } + + private void doBootstrap(VotingConfiguration votingConfiguration) { + assert transportService.getLocalNode().isMasterNode(); - transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.SAME, new Runnable() { + try { + votingConfigurationConsumer.accept(votingConfiguration); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("exception when bootstrapping with {}, rescheduling", votingConfiguration), e); + transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.GENERIC, + new Runnable() { @Override public void run() { - // TODO: remove the following line once schedule method properly preserves thread context - threadContext.markAsSystemContext(); - final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest(); - logger.trace("sending {}", request); - transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request, - new TransportResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - logger.debug("discovered {}, starting to bootstrap", response.getNodes()); - awaitBootstrap(response.getBootstrapConfiguration()); - } - - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - if (rootCause instanceof ClusterAlreadyBootstrappedException) { - logger.debug(rootCause.getMessage(), rootCause); - } else { - logger.warn("discovery attempt failed", exp); - } - } - - @Override - public String executor() { - return Names.SAME; - } - - @Override - public GetDiscoveredNodesResponse read(StreamInput in) throws IOException { - return new GetDiscoveredNodesResponse(in); - } - }); + doBootstrap(votingConfiguration); } @Override public String toString() { - return "unconfigured-discovery delayed bootstrap"; + return "retry of failed bootstrapping with " + votingConfiguration; } - }); - - } - } else if (initialMasterNodes.isEmpty() == false) { - logger.debug("waiting for discovery of master-eligible nodes matching {}", initialMasterNodes); - - final ThreadContext threadContext = transportService.getThreadPool().getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - threadContext.markAsSystemContext(); - - final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest(); - request.setRequiredNodes(initialMasterNodes); - request.setTimeout(null); - logger.trace("sending {}", request); - transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request, - new TransportResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - assert response.getNodes().stream().allMatch(DiscoveryNode::isMasterNode); - logger.debug("discovered {}, starting to bootstrap", response.getNodes()); - awaitBootstrap(response.getBootstrapConfiguration()); - } - - @Override - public void handleException(TransportException exp) { - logger.warn("discovery attempt failed", exp); - } - - @Override - public String executor() { - return Names.SAME; - } - - @Override - public GetDiscoveredNodesResponse read(StreamInput in) throws IOException { - return new GetDiscoveredNodesResponse(in); - } - }); - } + } + ); } } - public void stop() { - running = false; + private static boolean matchesRequirement(DiscoveryNode discoveryNode, String requirement) { + return discoveryNode.getName().equals(requirement) + || discoveryNode.getAddress().toString().equals(requirement) + || discoveryNode.getAddress().getAddress().equals(requirement); } - private void awaitBootstrap(final BootstrapConfiguration bootstrapConfiguration) { - if (running == false) { - logger.debug("awaitBootstrap: not running"); - return; - } + private Tuple,List> checkRequirements(Set nodes) { + final Set selectedNodes = new HashSet<>(); + final List unmatchedRequirements = new ArrayList<>(); + for (final String bootstrapRequirement : bootstrapRequirements) { + final Set matchingNodes + = nodes.stream().filter(n -> matchesRequirement(n, bootstrapRequirement)).collect(Collectors.toSet()); - BootstrapClusterRequest request = new BootstrapClusterRequest(bootstrapConfiguration); - logger.trace("sending {}", request); - transportService.sendRequest(transportService.getLocalNode(), BootstrapClusterAction.NAME, request, - new TransportResponseHandler() { - @Override - public void handleResponse(BootstrapClusterResponse response) { - logger.debug("automatic cluster bootstrapping successful: received {}", response); - } + if (matchingNodes.size() == 0) { + unmatchedRequirements.add(bootstrapRequirement); + } - @Override - public void handleException(TransportException exp) { - // log a warning since a failure here indicates a bad problem, such as: - // - bootstrap configuration resolution failed (e.g. discovered nodes no longer match those in the bootstrap config) - // - discovered nodes no longer form a quorum in the bootstrap config - logger.warn(new ParameterizedMessage("automatic cluster bootstrapping failed, retrying [{}]", - bootstrapConfiguration.getNodeDescriptions()), exp); - - // There's not really much else we can do apart from retry and hope that the problem goes away. The retry is delayed - // since a tight loop here is unlikely to help. - transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.SAME, new Runnable() { - @Override - public void run() { - // TODO: remove the following line once schedule method properly preserves thread context - transportService.getThreadPool().getThreadContext().markAsSystemContext(); - awaitBootstrap(bootstrapConfiguration); - } - - @Override - public String toString() { - return "retry bootstrapping with " + bootstrapConfiguration.getNodeDescriptions(); - } - }); - } + if (matchingNodes.size() > 1) { + throw new IllegalStateException("requirement [" + bootstrapRequirement + "] matches multiple nodes: " + matchingNodes); + } - @Override - public String executor() { - return Names.SAME; + for (final DiscoveryNode matchingNode : matchingNodes) { + if (selectedNodes.add(matchingNode) == false) { + throw new IllegalStateException("node [" + matchingNode + "] matches multiple requirements: " + + bootstrapRequirements.stream().filter(r -> matchesRequirement(matchingNode, r)).collect(Collectors.toList())); } + } + } - @Override - public BootstrapClusterResponse read(StreamInput in) throws IOException { - return new BootstrapClusterResponse(in); - } - }); + return Tuple.tuple(selectedNodes, unmatchedRequirements); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java index 9fc408fc9479c..cc58628b53893 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; @@ -188,13 +189,22 @@ String getDescription() { private String describeQuorum(VotingConfiguration votingConfiguration) { final Set nodeIds = votingConfiguration.getNodeIds(); assert nodeIds.isEmpty() == false; + final int requiredNodes = nodeIds.size() / 2 + 1; + + final Set realNodeIds = new HashSet<>(nodeIds); + realNodeIds.removeIf(ClusterBootstrapService::isBootstrapPlaceholder); + assert requiredNodes <= realNodeIds.size() : nodeIds; if (nodeIds.size() == 1) { - return "a node with id " + nodeIds; + return "a node with id " + realNodeIds; } else if (nodeIds.size() == 2) { - return "two nodes with ids " + nodeIds; + return "two nodes with ids " + realNodeIds; } else { - return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds; + if (requiredNodes < realNodeIds.size()) { + return "at least " + requiredNodes + " nodes with ids from " + realNodeIds; + } else { + return requiredNodes + " nodes with ids " + realNodeIds; + } } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 72fe2e081de74..968a9a5f01f7a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,7 +24,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -83,7 +82,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_ID; import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -139,8 +137,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); - private final Set>> discoveredNodesListeners = newConcurrentSet(); - public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, UnicastHostsProvider unicastHostsProvider, @@ -170,7 +166,8 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); - this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService); + this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers, + this::isInitialConfigurationSet, this::setInitialConfiguration); this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), @@ -296,12 +293,6 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector } - if (isInitialConfigurationSet()) { - for (final ActionListener> discoveredNodesListener : discoveredNodesListeners) { - discoveredNodesListener.onFailure(new ClusterAlreadyBootstrappedException()); - } - } - return new PublishWithJoinResponse(publishResponse, joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); } @@ -598,16 +589,12 @@ public void startInitialJoin() { synchronized (mutex) { becomeCandidate("startInitialJoin"); } - - if (isInitialConfigurationSet() == false) { - clusterBootstrapService.start(); - } + clusterBootstrapService.scheduleUnconfiguredBootstrap(); } @Override protected void doStop() { configuredHostsResolver.stop(); - clusterBootstrapService.stop(); } @Override @@ -706,21 +693,6 @@ public boolean isInitialConfigurationSet() { return getStateForMasterService().getLastAcceptedConfiguration().isEmpty() == false; } - /** - * Sets the initial configuration by resolving the given {@link BootstrapConfiguration} to concrete nodes. This method is safe to call - * more than once, as long as each call's bootstrap configuration resolves to the same set of nodes. - * - * @param bootstrapConfiguration A description of the nodes that should form the initial configuration. - * @return whether this call successfully set the initial configuration - if false, the cluster has already been bootstrapped. - */ - public boolean setInitialConfiguration(final BootstrapConfiguration bootstrapConfiguration) { - final List selfAndDiscoveredPeers = new ArrayList<>(); - selfAndDiscoveredPeers.add(getLocalNode()); - getFoundPeers().forEach(selfAndDiscoveredPeers::add); - final VotingConfiguration votingConfiguration = bootstrapConfiguration.resolve(selfAndDiscoveredPeers); - return setInitialConfiguration(votingConfiguration); - } - /** * Sets the initial configuration to the given {@link VotingConfiguration}. This method is safe to call * more than once, as long as the argument to each call is the same. @@ -733,13 +705,10 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura final ClusterState currentState = getStateForMasterService(); if (isInitialConfigurationSet()) { + logger.debug("initial configuration already set, ignoring {}", votingConfiguration); return false; } - if (mode != Mode.CANDIDATE) { - throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); - } - final List knownNodes = new ArrayList<>(); knownNodes.add(getLocalNode()); peerFinder.getFoundPeers().forEach(knownNodes::add); @@ -899,8 +868,6 @@ private ClusterState clusterStateWithNoMasterBlock(ClusterState clusterState) { public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener publishListener, AckListener ackListener) { try { synchronized (mutex) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - if (mode != Mode.LEADER) { logger.debug(() -> new ParameterizedMessage("[{}] failed publication as not currently leading", clusterChangedEvent.source())); @@ -1019,9 +986,8 @@ protected void onActiveMasterFound(DiscoveryNode masterNode, long term) { @Override protected void onFoundPeersUpdated() { - final Iterable foundPeers; synchronized (mutex) { - foundPeers = getFoundPeers(); + final Iterable foundPeers = getFoundPeers(); if (mode == Mode.CANDIDATE) { final CoordinationState.VoteCollection expectedVotes = new CoordinationState.VoteCollection(); foundPeers.forEach(expectedVotes::addVote); @@ -1039,9 +1005,7 @@ protected void onFoundPeersUpdated() { } } - for (final ActionListener> discoveredNodesListener : discoveredNodesListeners) { - discoveredNodesListener.onResponse(foundPeers); - } + clusterBootstrapService.onFoundPeersUpdated(); } } @@ -1076,14 +1040,6 @@ public String toString() { }); } - public Releasable withDiscoveryListener(ActionListener> listener) { - discoveredNodesListeners.add(listener); - return () -> { - boolean removed = discoveredNodesListeners.remove(listener); - assert removed : listener; - }; - } - public Iterable getFoundPeers() { // TODO everyone takes this and adds the local node. Maybe just add the local node here? return peerFinder.getFoundPeers(); diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 489a98dcbaac6..b4ce907d6f7b3 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException; import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException; @@ -809,8 +808,7 @@ public void testIds() { ids.put(148, UnknownNamedObjectException.class); ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class); ids.put(150, CoordinationStateRejectedException.class); - ids.put(151, ClusterAlreadyBootstrappedException.class); - ids.put(152, SnapshotInProgressException.class); + ids.put(151, SnapshotInProgressException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequestTests.java deleted file mode 100644 index ee9c58413b350..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterRequestTests.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; -import java.util.Collections; - -import static org.hamcrest.Matchers.equalTo; - -public class BootstrapClusterRequestTests extends ESTestCase { - - public void testSerialization() throws IOException { - final BootstrapConfiguration bootstrapConfiguration - = new BootstrapConfiguration(Collections.singletonList(new NodeDescription(null, randomAlphaOfLength(10)))); - final BootstrapClusterRequest original = new BootstrapClusterRequest(bootstrapConfiguration); - assertNull(original.validate()); - final BootstrapClusterRequest deserialized = copyWriteable(original, writableRegistry(), BootstrapClusterRequest::new); - assertThat(deserialized.getBootstrapConfiguration(), equalTo(bootstrapConfiguration)); - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponseTests.java deleted file mode 100644 index fb33dbc5fcbd6..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterResponseTests.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -import static org.hamcrest.Matchers.equalTo; - -public class BootstrapClusterResponseTests extends ESTestCase { - public void testSerialization() throws IOException { - final BootstrapClusterResponse original = new BootstrapClusterResponse(randomBoolean()); - final BootstrapClusterResponse deserialized = copyWriteable(original, writableRegistry(), BootstrapClusterResponse::new); - assertThat(deserialized.getAlreadyBootstrapped(), equalTo(original.getAlreadyBootstrapped())); - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfigurationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfigurationTests.java deleted file mode 100644 index fc2e24017e76a..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapConfigurationTests.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; -import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNode.Role; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.EqualsHashCodeTestUtils; -import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.singleton; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.startsWith; - -public class BootstrapConfigurationTests extends ESTestCase { - - public void testEqualsHashcodeSerialization() { - // Note: the explicit cast of the CopyFunction is needed for some IDE (specifically Eclipse 4.8.0) to infer the right type - EqualsHashCodeTestUtils.checkEqualsAndHashCode(randomBootstrapConfiguration(), - (CopyFunction) bootstrapConfiguration -> copyWriteable(bootstrapConfiguration, writableRegistry(), - BootstrapConfiguration::new), - this::mutate); - } - - public void testNodeDescriptionResolvedByName() { - final List discoveryNodes = randomDiscoveryNodes(); - final DiscoveryNode expectedNode = randomFrom(discoveryNodes); - assertThat(new NodeDescription(null, expectedNode.getName()).resolve(discoveryNodes), equalTo(expectedNode)); - } - - public void testNodeDescriptionResolvedByIdAndName() { - final List discoveryNodes = randomDiscoveryNodes(); - final DiscoveryNode expectedNode = randomFrom(discoveryNodes); - assertThat(new NodeDescription(expectedNode).resolve(discoveryNodes), equalTo(expectedNode)); - } - - public void testRejectsMismatchedId() { - final List discoveryNodes = randomDiscoveryNodes(); - final DiscoveryNode expectedNode = randomFrom(discoveryNodes); - final ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new NodeDescription(randomAlphaOfLength(11), expectedNode.getName()).resolve(discoveryNodes)); - assertThat(e.getMessage(), startsWith("node id mismatch comparing ")); - } - - public void testRejectsMismatchedName() { - final List discoveryNodes = randomDiscoveryNodes(); - final DiscoveryNode expectedNode = randomFrom(discoveryNodes); - final ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new NodeDescription(expectedNode.getId(), randomAlphaOfLength(11)).resolve(discoveryNodes)); - assertThat(e.getMessage(), startsWith("node name mismatch comparing ")); - } - - public void testFailsIfNoMatch() { - final List discoveryNodes = randomDiscoveryNodes(); - final ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> randomNodeDescription().resolve(discoveryNodes)); - assertThat(e.getMessage(), startsWith("no node matching ")); - } - - public void testFailsIfDuplicateMatchOnName() { - final List discoveryNodes = randomDiscoveryNodes(); - final DiscoveryNode discoveryNode = randomFrom(discoveryNodes); - discoveryNodes.add(new DiscoveryNode(discoveryNode.getName(), randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), - singleton(Role.MASTER), Version.CURRENT)); - final ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new NodeDescription(null, discoveryNode.getName()).resolve(discoveryNodes)); - assertThat(e.getMessage(), startsWith("discovered multiple nodes matching ")); - } - - public void testFailsIfDuplicatedNode() { - final List discoveryNodes = randomDiscoveryNodes(); - final DiscoveryNode discoveryNode = randomFrom(discoveryNodes); - discoveryNodes.add(discoveryNode); - final ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new NodeDescription(discoveryNode).resolve(discoveryNodes)); - assertThat(e.getMessage(), startsWith("discovered multiple nodes matching ")); - } - - public void testResolvesEntireConfiguration() { - final List discoveryNodes = randomDiscoveryNodes(); - final List selectedNodes = randomSubsetOf(randomIntBetween(1, discoveryNodes.size()), discoveryNodes); - final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(selectedNodes.stream() - .map(discoveryNode -> randomBoolean() ? new NodeDescription(discoveryNode) : new NodeDescription(null, discoveryNode.getName())) - .collect(Collectors.toList())); - - final VotingConfiguration expectedConfiguration - = new VotingConfiguration(selectedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())); - final VotingConfiguration votingConfiguration = bootstrapConfiguration.resolve(discoveryNodes); - assertThat(votingConfiguration, equalTo(expectedConfiguration)); - } - - public void testRejectsDuplicatedDescriptions() { - final List discoveryNodes = randomDiscoveryNodes(); - final List selectedNodes = randomSubsetOf(randomIntBetween(1, discoveryNodes.size()), discoveryNodes); - final List selectedNodeDescriptions = selectedNodes.stream() - .map(discoveryNode -> randomBoolean() ? new NodeDescription(discoveryNode) : new NodeDescription(null, discoveryNode.getName())) - .collect(Collectors.toList()); - final NodeDescription toDuplicate = randomFrom(selectedNodeDescriptions); - selectedNodeDescriptions.add(randomBoolean() ? toDuplicate : new NodeDescription(null, toDuplicate.getName())); - final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(selectedNodeDescriptions); - - final ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> bootstrapConfiguration.resolve(discoveryNodes)); - assertThat(e.getMessage(), startsWith("multiple nodes matching ")); - } - - private NodeDescription mutate(NodeDescription original) { - if (randomBoolean()) { - return new NodeDescription(original.getId(), randomAlphaOfLength(21 - original.getName().length())); - } else { - if (original.getId() == null) { - return new NodeDescription(randomAlphaOfLength(10), original.getName()); - } else if (randomBoolean()) { - return new NodeDescription(randomAlphaOfLength(21 - original.getId().length()), original.getName()); - } else { - return new NodeDescription(null, original.getName()); - } - } - } - - protected BootstrapConfiguration mutate(BootstrapConfiguration original) { - final List newDescriptions = new ArrayList<>(original.getNodeDescriptions()); - final int mutateElement = randomIntBetween(0, newDescriptions.size()); - if (mutateElement == newDescriptions.size()) { - newDescriptions.add(randomIntBetween(0, newDescriptions.size()), randomNodeDescription()); - } else { - if (newDescriptions.size() > 1 && randomBoolean()) { - newDescriptions.remove(mutateElement); - } else { - newDescriptions.set(mutateElement, mutate(newDescriptions.get(mutateElement))); - } - } - return new BootstrapConfiguration(newDescriptions); - } - - protected NodeDescription randomNodeDescription() { - return new NodeDescription(randomBoolean() ? null : randomAlphaOfLength(10), randomAlphaOfLength(10)); - } - - protected BootstrapConfiguration randomBootstrapConfiguration() { - final int size = randomIntBetween(1, 5); - final List nodeDescriptions = new ArrayList<>(size); - while (nodeDescriptions.size() <= size) { - nodeDescriptions.add(randomNodeDescription()); - } - return new BootstrapConfiguration(nodeDescriptions); - } - - protected List randomDiscoveryNodes() { - final int size = randomIntBetween(1, 5); - final List nodes = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - nodes.add(new DiscoveryNode(randomAlphaOfLength(10), randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), - singleton(Role.MASTER), Version.CURRENT)); - } - return nodes; - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java deleted file mode 100644 index 676e2e958cac7..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -import static org.hamcrest.Matchers.endsWith; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.startsWith; -import static org.hamcrest.core.Is.is; - -public class GetDiscoveredNodesRequestTests extends ESTestCase { - - public void testTimeoutValidation() { - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - assertThat("default value is 30s", getDiscoveredNodesRequest.getTimeout(), is(TimeValue.timeValueSeconds(30))); - - final TimeValue newTimeout = TimeValue.parseTimeValue(randomTimeValue(), "timeout"); - getDiscoveredNodesRequest.setTimeout(newTimeout); - assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), equalTo(newTimeout)); - - final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, - () -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1)))); - assertThat(exception.getMessage(), startsWith("negative timeout of ")); - assertThat(exception.getMessage(), endsWith(" is not allowed")); - - getDiscoveredNodesRequest.setTimeout(null); - assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue()); - } - - public void testSerialization() throws IOException { - final GetDiscoveredNodesRequest originalRequest = new GetDiscoveredNodesRequest(); - - if (randomBoolean()) { - originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout")); - } else if (randomBoolean()) { - originalRequest.setTimeout(null); - } - - final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new); - - assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout())); - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponseTests.java deleted file mode 100644 index 7d2fc602e66c6..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesResponseTests.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNode.Role; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.stream.Collectors; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.singleton; -import static org.hamcrest.Matchers.equalTo; - -public class GetDiscoveredNodesResponseTests extends ESTestCase { - public void testSerialization() throws IOException { - final GetDiscoveredNodesResponse original = new GetDiscoveredNodesResponse(randomDiscoveryNodeSet()); - final GetDiscoveredNodesResponse deserialized = copyWriteable(original, writableRegistry(), GetDiscoveredNodesResponse::new); - assertThat(deserialized.getNodes(), equalTo(original.getNodes())); - } - - private Set randomDiscoveryNodeSet() { - final int size = randomIntBetween(1, 10); - final Set nodes = new HashSet<>(size); - while (nodes.size() < size) { - assertTrue(nodes.add(new DiscoveryNode(randomAlphaOfLength(10), randomAlphaOfLength(10), - UUIDs.randomBase64UUID(random()), randomAlphaOfLength(10), randomAlphaOfLength(10), buildNewFakeTransportAddress(), - emptyMap(), singleton(Role.MASTER), Version.CURRENT))); - } - return nodes; - } - - public void testConversionToBootstrapConfiguration() { - final Set nodes = randomDiscoveryNodeSet(); - assertThat(new GetDiscoveredNodesResponse(nodes).getBootstrapConfiguration().resolve(nodes).getNodeIds(), - equalTo(nodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()))); - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java deleted file mode 100644 index 31486a52bd08f..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.Version; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.coordination.Coordinator; -import org.elasticsearch.cluster.coordination.InMemoryPersistedState; -import org.elasticsearch.cluster.coordination.NoOpClusterApplier; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.io.IOException; -import java.util.Collections; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static java.util.Collections.singletonList; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyZeroInteractions; - -public class TransportBootstrapClusterActionTests extends ESTestCase { - - private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); - - private DiscoveryNode discoveryNode; - private static ThreadPool threadPool; - private TransportService transportService; - private Coordinator coordinator; - - private static BootstrapClusterRequest exampleRequest() { - return new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription("id", "name")))); - } - - @BeforeClass - public static void createThreadPool() { - threadPool = new TestThreadPool("test", Settings.EMPTY); - } - - @AfterClass - public static void shutdownThreadPool() { - threadPool.shutdown(); - } - - @Before - public void setupTest() { - discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); - final MockTransport transport = new MockTransport(); - transportService = transport.createTransportService(Settings.EMPTY, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet()); - - final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService, writableRegistry(), - ESAllocationTestCase.createAllocationService(Settings.EMPTY), - new MasterService("local", Settings.EMPTY, threadPool), - () -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName("cluster")).build()), r -> emptyList(), - new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong())); - } - - public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { - final Discovery discovery = mock(Discovery.class); - verifyZeroInteractions(discovery); - - final String nonstandardDiscoveryType = randomFrom(DiscoveryModule.ZEN_DISCOVERY_TYPE, "single-node", "unknown"); - new TransportBootstrapClusterAction( - Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), nonstandardDiscoveryType).build(), - EMPTY_FILTERS, transportService, discovery); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() { - @Override - public void handleResponse(BootstrapClusterResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat(rootCause.getMessage(), equalTo("cluster bootstrapping is not supported by discovery type [" + - nonstandardDiscoveryType + "]")); - countDownLatch.countDown(); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - public void testFailsOnNonMasterEligibleNodes() throws InterruptedException { - discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - // transport service only picks up local node when started, so we can change it here ^ - - new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() { - @Override - public void handleResponse(BootstrapClusterResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat(rootCause.getMessage(), - equalTo("this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node")); - countDownLatch.countDown(); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - public void testSetsInitialConfiguration() throws InterruptedException { - new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - coordinator.startInitialJoin(); - - assertFalse(coordinator.isInitialConfigurationSet()); - - final BootstrapClusterRequest request - = new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription(discoveryNode)))); - - { - final int parallelRequests = 10; - final CountDownLatch countDownLatch = new CountDownLatch(parallelRequests); - final AtomicInteger successes = new AtomicInteger(); - - for (int i = 0; i < parallelRequests; i++) { - transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() { - @Override - public void handleResponse(BootstrapClusterResponse response) { - if (response.getAlreadyBootstrapped() == false) { - successes.incrementAndGet(); - } - countDownLatch.countDown(); - } - - @Override - public void handleException(TransportException exp) { - throw new AssertionError("should not be called", exp); - } - }); - } - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - assertThat(successes.get(), equalTo(1)); - } - - assertTrue(coordinator.isInitialConfigurationSet()); - - { - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() { - @Override - public void handleResponse(BootstrapClusterResponse response) { - assertTrue(response.getAlreadyBootstrapped()); - countDownLatch.countDown(); - } - - @Override - public void handleException(TransportException exp) { - throw new AssertionError("should not be called", exp); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - } - - private abstract class ResponseHandler implements TransportResponseHandler { - @Override - public String executor() { - return Names.SAME; - } - - @Override - public BootstrapClusterResponse read(StreamInput in) throws IOException { - return new BootstrapClusterResponse(in); - } - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java deleted file mode 100644 index 6d94dcf6eca14..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ /dev/null @@ -1,533 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.cluster.bootstrap; - -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.Version; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException; -import org.elasticsearch.cluster.coordination.ClusterBootstrapService; -import org.elasticsearch.cluster.coordination.CoordinationMetaData; -import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; -import org.elasticsearch.cluster.coordination.Coordinator; -import org.elasticsearch.cluster.coordination.InMemoryPersistedState; -import org.elasticsearch.cluster.coordination.NoOpClusterApplier; -import org.elasticsearch.cluster.coordination.PeersResponse; -import org.elasticsearch.cluster.coordination.PublicationTransportHandler; -import org.elasticsearch.cluster.coordination.PublishWithJoinResponse; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.discovery.PeersRequest; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.transport.BytesTransportRequest; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.TransportService.HandshakeResponse; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static java.util.Collections.singleton; -import static java.util.Collections.singletonList; -import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; -import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME; -import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.startsWith; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyZeroInteractions; - -public class TransportGetDiscoveredNodesActionTests extends ESTestCase { - - private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); - - private static ThreadPool threadPool; - private DiscoveryNode localNode; - private String clusterName; - private TransportService transportService; - private Coordinator coordinator; - private DiscoveryNode otherNode; - - @BeforeClass - public static void createThreadPool() { - threadPool = new TestThreadPool("test", Settings.EMPTY); - } - - @AfterClass - public static void shutdownThreadPool() { - threadPool.shutdown(); - } - - @Before - public void setupTest() { - clusterName = randomAlphaOfLength(10); - localNode = new DiscoveryNode( - "node1", "local", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); - otherNode = new DiscoveryNode( - "node2", "other", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); - - final MockTransport transport = new MockTransport() { - @Override - protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { - if (action.equals(HANDSHAKE_ACTION_NAME) && node.getAddress().equals(otherNode.getAddress())) { - handleResponse(requestId, new HandshakeResponse(otherNode, new ClusterName(clusterName), Version.CURRENT)); - } - } - }; - transportService = transport.createTransportService( - Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); - - final Settings settings = Settings.builder() - .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), - ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap - - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - coordinator = new Coordinator("local", settings, clusterSettings, transportService, writableRegistry(), - ESAllocationTestCase.createAllocationService(settings), - new MasterService("local", settings, threadPool), - () -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName(clusterName)).build()), r -> emptyList(), - new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong())); - } - - public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { - final Discovery discovery = mock(Discovery.class); - verifyZeroInteractions(discovery); - - final String nonstandardDiscoveryType = randomFrom(DiscoveryModule.ZEN_DISCOVERY_TYPE, "single-node", "unknown"); - new TransportGetDiscoveredNodesAction( - Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), nonstandardDiscoveryType).build(), - EMPTY_FILTERS, transportService, discovery); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, new GetDiscoveredNodesRequest(), new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat(rootCause.getMessage(), equalTo("discovered nodes are not exposed by discovery type [" + - nonstandardDiscoveryType + "]")); - countDownLatch.countDown(); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - public void testFailsOnMasterIneligibleNodes() throws InterruptedException { - localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - // transport service only picks up local node when started, so we can change it here ^ - - new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, new GetDiscoveredNodesRequest(), new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat(rootCause.getMessage(), - equalTo("this node is not master-eligible, but discovered nodes are only exposed by master-eligible nodes")); - countDownLatch.countDown(); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException { - new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - coordinator.startInitialJoin(); - - { - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setTimeout(null); - getDiscoveredNodesRequest.setRequiredNodes(singletonList("not-a-node")); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - throw new AssertionError("should not be called", exp); - } - }); - } - - { - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - getDiscoveredNodesRequest.setRequiredNodes(singletonList("not-a-node")); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); - assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{")); - countDownLatch.countDown(); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - } - - public void testFailsIfAlreadyBootstrapped() throws InterruptedException { - new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - coordinator.startInitialJoin(); - coordinator.setInitialConfiguration(new VotingConfiguration(singleton(localNode.getId()))); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setTimeout(null); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) { - countDownLatch.countDown(); - } else { - throw new AssertionError("should not be called", exp); - } - } - }); - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - public void testFailsIfAcceptsClusterStateWithNonemptyConfiguration() throws InterruptedException, IOException { - new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - coordinator.startInitialJoin(); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setTimeout(null); - getDiscoveredNodesRequest.setRequiredNodes(singletonList("not-a-node")); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) { - countDownLatch.countDown(); - } else { - throw new AssertionError("should not be called", exp); - } - } - }); - - ClusterState.Builder publishedClusterState = ClusterState.builder(ClusterName.DEFAULT); - publishedClusterState.incrementVersion(); - publishedClusterState.nodes(DiscoveryNodes.builder() - .add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(otherNode.getId())); - publishedClusterState.metaData(MetaData.builder().coordinationMetaData(CoordinationMetaData.builder() - .term(1) - .lastAcceptedConfiguration(new VotingConfiguration(singleton(otherNode.getId()))) - .lastCommittedConfiguration(new VotingConfiguration(singleton(otherNode.getId()))) - .build())); - - transportService.sendRequest(localNode, PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME, - new BytesTransportRequest(PublicationTransportHandler.serializeFullClusterState(publishedClusterState.build(), Version.CURRENT), - Version.CURRENT), - new TransportResponseHandler() { - @Override - public void handleResponse(PublishWithJoinResponse response) { - // do nothing - } - - @Override - public void handleException(TransportException exp) { - throw new AssertionError("should not be called", exp); - } - - @Override - public String executor() { - return Names.SAME; - } - - @Override - public PublishWithJoinResponse read(StreamInput in) throws IOException { - return new PublishWithJoinResponse(in); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - public void testGetsDiscoveredNodesWithZeroTimeout() throws InterruptedException { - setupGetDiscoveredNodesAction(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertWaitConditionMet(getDiscoveredNodesRequest); - } - - public void testGetsDiscoveredNodesByAddress() throws InterruptedException { - setupGetDiscoveredNodesAction(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), otherNode.getAddress().toString())); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertWaitConditionMet(getDiscoveredNodesRequest); - } - - public void testGetsDiscoveredNodesByName() throws InterruptedException { - setupGetDiscoveredNodesAction(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getName(), otherNode.getName())); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertWaitConditionMet(getDiscoveredNodesRequest); - } - - public void testGetsDiscoveredNodesByIP() throws InterruptedException { - setupGetDiscoveredNodesAction(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - String ip = localNode.getAddress().getAddress(); - getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip)); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, '[' + ip + "] matches ["); - } - - public void testGetsDiscoveredNodesDuplicateName() throws InterruptedException { - setupGetDiscoveredNodesAction(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - String name = localNode.getName(); - getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(name, name)); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches [" + name + ", " + name + ']'); - } - - public void testGetsDiscoveredNodesWithDuplicateMatchNameAndAddress() throws InterruptedException { - setupGetDiscoveredNodesAction(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), localNode.getName())); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches ["); - } - - public void testGetsDiscoveredNodesTimeoutOnMissing() throws InterruptedException { - setupGetDiscoveredNodesAction(); - - final CountDownLatch latch = new CountDownLatch(1); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), "_missing")); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - assertThat(exp.getRootCause(), instanceOf(ElasticsearchTimeoutException.class)); - latch.countDown(); - } - }); - - latch.await(10L, TimeUnit.SECONDS); - } - - public void testThrowsExceptionIfDuplicateDiscoveredLater() throws InterruptedException { - new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - coordinator.startInitialJoin(); - - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - final String ip = localNode.getAddress().getAddress(); - getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(ip, "not-a-node")); - - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - Throwable t = exp.getRootCause(); - assertThat(t, instanceOf(IllegalArgumentException.class)); - assertThat(t.getMessage(), startsWith('[' + ip + "] matches [")); - countDownLatch.countDown(); - } - }); - - executeRequestPeersAction(); - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - private void executeRequestPeersAction() { - threadPool.generic().execute(() -> - transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(otherNode, emptyList()), - new TransportResponseHandler() { - @Override - public PeersResponse read(StreamInput in) throws IOException { - return new PeersResponse(in); - } - - @Override - public void handleResponse(PeersResponse response) { - } - - @Override - public void handleException(TransportException exp) { - } - - @Override - public String executor() { - return Names.SAME; - } - })); - } - - private void setupGetDiscoveredNodesAction() throws InterruptedException { - new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action - transportService.start(); - transportService.acceptIncomingRequests(); - coordinator.start(); - coordinator.startInitialJoin(); - - executeRequestPeersAction(); - - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getName(), otherNode.getName())); - assertWaitConditionMet(getDiscoveredNodesRequest); - } - - private void assertWaitConditionMet(GetDiscoveredNodesRequest getDiscoveredNodesRequest) throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode)); - countDownLatch.countDown(); - } - - @Override - public void handleException(TransportException exp) { - throw new AssertionError("should not be called", exp); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - private void assertWaitConditionFailedOnDuplicate(GetDiscoveredNodesRequest getDiscoveredNodesRequest, String message) - throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } - - @Override - public void handleException(TransportException exp) { - Throwable t = exp.getRootCause(); - assertThat(t, instanceOf(IllegalArgumentException.class)); - assertThat(t.getMessage(), startsWith(message)); - countDownLatch.countDown(); - } - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - - private abstract class ResponseHandler implements TransportResponseHandler { - @Override - public String executor() { - return Names.SAME; - } - - @Override - public GetDiscoveredNodesResponse read(StreamInput in) throws IOException { - return new GetDiscoveredNodesResponse(in); - } - } -} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java index 542247c058861..46a43afa53897 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java @@ -20,49 +20,52 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.Settings.Builder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransport; -import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; import org.junit.Before; -import java.util.Set; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING; import static org.elasticsearch.common.settings.Settings.builder; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; public class ClusterBootstrapServiceTests extends ESTestCase { private DiscoveryNode localNode, otherNode1, otherNode2; private DeterministicTaskQueue deterministicTaskQueue; private TransportService transportService; - private ClusterBootstrapService clusterBootstrapService; @Before public void createServices() { @@ -81,10 +84,6 @@ protected void onSendRequest(long requestId, String action, TransportRequest req transportService = transport.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); - - clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), - localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), - transportService); } private DiscoveryNode newDiscoveryNode(String nodeName) { @@ -92,152 +91,392 @@ private DiscoveryNode newDiscoveryNode(String nodeName) { Version.CURRENT); } - private void startServices() { - transportService.start(); - transportService.acceptIncomingRequests(); - clusterBootstrapService.start(); - } + public void testBootstrapsAutomaticallyWithDefaultConfiguration() { + final Settings.Builder settings = Settings.builder(); + final long timeout; + if (randomBoolean()) { + timeout = UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(Settings.EMPTY).millis(); + } else { + timeout = randomLongBetween(1, 10000); + settings.put(UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.getKey(), timeout + "ms"); + } - public void testDoesNothingOnNonMasterNodes() { - localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> { - throw new AssertionError("should not make a discovery request"); - }); + final AtomicReference>> discoveredNodesSupplier = new AtomicReference<>(() -> { + throw new AssertionError("should not be called yet"); + }); - startServices(); - deterministicTaskQueue.runAllTasks(); + final AtomicBoolean bootstrapped = new AtomicBoolean(); + ClusterBootstrapService clusterBootstrapService + = new ClusterBootstrapService(settings.build(), transportService, () -> discoveredNodesSupplier.get().get(), + () -> false, vc -> { + assertTrue(bootstrapped.compareAndSet(false, true)); + assertThat(vc.getNodeIds(), + equalTo(Stream.of(localNode, otherNode1, otherNode2).map(DiscoveryNode::getId).collect(Collectors.toSet()))); + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), greaterThanOrEqualTo(timeout)); + }); + + deterministicTaskQueue.scheduleAt(timeout - 1, + () -> discoveredNodesSupplier.set(() -> Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()))); + + transportService.start(); + clusterBootstrapService.scheduleUnconfiguredBootstrap(); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertTrue(bootstrapped.get()); } public void testDoesNothingByDefaultIfHostsProviderConfigured() { - testConfiguredIfSettingSet(builder().putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey())); + testDoesNothingWithSettings(builder().putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey())); } public void testDoesNothingByDefaultIfUnicastHostsConfigured() { - testConfiguredIfSettingSet(builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey())); + testDoesNothingWithSettings(builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey())); } public void testDoesNothingByDefaultIfMasterNodesConfigured() { - testConfiguredIfSettingSet(builder().putList(INITIAL_MASTER_NODES_SETTING.getKey())); + testDoesNothingWithSettings(builder().putList(INITIAL_MASTER_NODES_SETTING.getKey())); } - private void testConfiguredIfSettingSet(Builder builder) { - clusterBootstrapService = new ClusterBootstrapService(builder.build(), transportService); - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> { - throw new AssertionError("should not make a discovery request"); - }); - startServices(); + public void testDoesNothingByDefaultOnMasterIneligibleNodes() { + localNode = new DiscoveryNode("local", randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), emptySet(), + Version.CURRENT); + testDoesNothingWithSettings(Settings.builder()); + } + + private void testDoesNothingWithSettings(Settings.Builder builder) { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(builder.build(), transportService, () -> { + throw new AssertionError("should not be called"); + }, () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + transportService.start(); + clusterBootstrapService.scheduleUnconfiguredBootstrap(); deterministicTaskQueue.runAllTasks(); } - public void testBootstrapsAutomaticallyWithDefaultConfiguration() { - clusterBootstrapService = new ClusterBootstrapService(Settings.EMPTY, transportService); + public void testDoesNothingByDefaultIfZen1NodesDiscovered() { + final DiscoveryNode zen1Node = new DiscoveryNode("zen1", buildNewFakeTransportAddress(), singletonMap("zen1", "true"), + singleton(Role.MASTER), Version.CURRENT); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.EMPTY, transportService, () -> + Stream.of(localNode, zen1Node).collect(Collectors.toSet()), () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + transportService.start(); + clusterBootstrapService.scheduleUnconfiguredBootstrap(); + deterministicTaskQueue.runAllTasks(); + } + + + public void testThrowsExceptionOnDuplicates() { + final IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> { + new ClusterBootstrapService(builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), "duplicate-requirement", "duplicate-requirement").build(), + transportService, Collections::emptyList, () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + }); - final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()); - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes))); + assertThat(illegalArgumentException.getMessage(), containsString(INITIAL_MASTER_NODES_SETTING.getKey())); + assertThat(illegalArgumentException.getMessage(), containsString("duplicate-requirement")); + } + public void testBootstrapsOnDiscoveryOfAllRequiredNodes() { final AtomicBoolean bootstrapped = new AtomicBoolean(); - transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new, - (request, channel, task) -> { - assertThat(request.getBootstrapConfiguration().getNodeDescriptions().stream() - .map(NodeDescription::getId).collect(Collectors.toSet()), - equalTo(discoveredNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()))); - channel.sendResponse(new BootstrapClusterResponse(randomBoolean())); - assertTrue(bootstrapped.compareAndSet(false, true)); - }); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> { + assertTrue(bootstrapped.compareAndSet(false, true)); + assertThat(vc.getNodeIds(), containsInAnyOrder(localNode.getId(), otherNode1.getId(), otherNode2.getId())); + assertThat(vc.getNodeIds(), not(hasItem(containsString("placeholder")))); + }); - startServices(); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); deterministicTaskQueue.runAllTasks(); + assertTrue(bootstrapped.get()); + bootstrapped.set(false); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertFalse(bootstrapped.get()); // should only bootstrap once + } + + public void testBootstrapsOnDiscoveryOfTwoOfThreeRequiredNodes() { + final AtomicBoolean bootstrapped = new AtomicBoolean(); + + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> singletonList(otherNode1), () -> false, vc -> { + assertTrue(bootstrapped.compareAndSet(false, true)); + assertThat(vc.getNodeIds(), hasSize(3)); + assertThat(vc.getNodeIds(), hasItem(localNode.getId())); + assertThat(vc.getNodeIds(), hasItem(otherNode1.getId())); + assertThat(vc.getNodeIds(), hasItem(allOf(startsWith(BOOTSTRAP_PLACEHOLDER_PREFIX), containsString(otherNode2.getName())))); + assertTrue(vc.hasQuorum(Stream.of(localNode, otherNode1).map(DiscoveryNode::getId).collect(Collectors.toList()))); + assertFalse(vc.hasQuorum(singletonList(localNode.getId()))); + assertFalse(vc.hasQuorum(singletonList(otherNode1.getId()))); + }); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); assertTrue(bootstrapped.get()); + + bootstrapped.set(false); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertFalse(bootstrapped.get()); // should only bootstrap once } - public void testDoesNotRetryOnDiscoveryFailure() { - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - new TransportRequestHandler() { - private boolean called = false; + public void testBootstrapsOnDiscoveryOfThreeOfFiveRequiredNodes() { + final AtomicBoolean bootstrapped = new AtomicBoolean(); - @Override - public void messageReceived(GetDiscoveredNodesRequest request, TransportChannel channel, Task task) { - assert called == false; - called = true; - throw new IllegalArgumentException("simulate failure of discovery request"); - } - }); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName(), + "missing-node-1", "missing-node-2").build(), + transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> { + assertTrue(bootstrapped.compareAndSet(false, true)); + assertThat(vc.getNodeIds(), hasSize(5)); + assertThat(vc.getNodeIds(), hasItem(localNode.getId())); + assertThat(vc.getNodeIds(), hasItem(otherNode1.getId())); + assertThat(vc.getNodeIds(), hasItem(otherNode2.getId())); + + final List placeholders + = vc.getNodeIds().stream().filter(ClusterBootstrapService::isBootstrapPlaceholder).collect(Collectors.toList()); + assertThat(placeholders.size(), equalTo(2)); + assertNotEquals(placeholders.get(0), placeholders.get(1)); + assertThat(placeholders, hasItem(containsString("missing-node-1"))); + assertThat(placeholders, hasItem(containsString("missing-node-2"))); + + assertTrue(vc.hasQuorum(Stream.of(localNode, otherNode1, otherNode2).map(DiscoveryNode::getId).collect(Collectors.toList()))); + assertFalse(vc.hasQuorum(Stream.of(localNode, otherNode1).map(DiscoveryNode::getId).collect(Collectors.toList()))); + assertFalse(vc.hasQuorum(Stream.of(localNode, otherNode1).map(DiscoveryNode::getId).collect(Collectors.toList()))); + }); - startServices(); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); deterministicTaskQueue.runAllTasks(); + assertTrue(bootstrapped.get()); + + bootstrapped.set(false); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertFalse(bootstrapped.get()); // should only bootstrap once } - public void testBootstrapsOnDiscoverySuccess() { - final AtomicBoolean discoveryAttempted = new AtomicBoolean(); - final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()); - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> { - assertTrue(discoveryAttempted.compareAndSet(false, true)); - channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes)); - }); + public void testDoesNotBootstrapIfNoNodesDiscovered() { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, Collections::emptyList, () -> true, vc -> { + throw new AssertionError("should not be called"); + }); - final AtomicBoolean bootstrapAttempted = new AtomicBoolean(); - transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new, - (request, channel, task) -> { - assertTrue(bootstrapAttempted.compareAndSet(false, true)); - channel.sendResponse(new BootstrapClusterResponse(false)); - }); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } - startServices(); + public void testDoesNotBootstrapIfTwoOfFiveNodesDiscovered() { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), + localNode.getName(), otherNode1.getName(), otherNode2.getName(), "not-a-node-1", "not-a-node-2").build(), + transportService, () -> Stream.of(otherNode1).collect(Collectors.toList()), () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); deterministicTaskQueue.runAllTasks(); + } + + public void testDoesNotBootstrapIfThreeOfSixNodesDiscovered() { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), + localNode.getName(), otherNode1.getName(), otherNode2.getName(), "not-a-node-1", "not-a-node-2", "not-a-node-3").build(), + transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> { + throw new AssertionError("should not be called"); + }); - assertTrue(discoveryAttempted.get()); - assertTrue(bootstrapAttempted.get()); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); } - public void testRetriesOnBootstrapFailure() { - final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()); - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes))); + public void testDoesNotBootstrapIfAlreadyBootstrapped() { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> true, vc -> { + throw new AssertionError("should not be called"); + }); - AtomicLong callCount = new AtomicLong(); - transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new, - (request, channel, task) -> { - callCount.incrementAndGet(); - channel.sendResponse(new ElasticsearchException("simulated exception")); - }); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } - startServices(); - while (callCount.get() < 5) { - if (deterministicTaskQueue.hasDeferredTasks()) { - deterministicTaskQueue.advanceTime(); - } - deterministicTaskQueue.runAllRunnableTasks(); - } + public void testDoesNotBootstrapsOnNonMasterNode() { + localNode = new DiscoveryNode("local", randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), emptySet(), + Version.CURRENT); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> + Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } + + public void testDoesNotBootstrapsIfNotConfigured() { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( + Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()).build(), transportService, + () -> Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + transportService.start(); + clusterBootstrapService.scheduleUnconfiguredBootstrap(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); } - public void testStopsRetryingBootstrapWhenStopped() { - final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()); - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes))); + public void testDoesNotBootstrapsIfZen1NodesDiscovered() { + final DiscoveryNode zen1Node = new DiscoveryNode("zen1", buildNewFakeTransportAddress(), singletonMap("zen1", "true"), + singleton(Role.MASTER), Version.CURRENT); - transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new, - (request, channel, task) -> channel.sendResponse(new ElasticsearchException("simulated exception"))); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> Stream.of(otherNode1, otherNode2, zen1Node).collect(Collectors.toList()), () -> false, vc -> { + throw new AssertionError("should not be called"); + }); - deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 200000, new Runnable() { - @Override - public void run() { - clusterBootstrapService.stop(); - } + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } - @Override - public String toString() { - return "stop cluster bootstrap service"; + public void testRetriesBootstrappingOnException() { + + final AtomicLong bootstrappingAttempts = new AtomicLong(); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> { + bootstrappingAttempts.incrementAndGet(); + if (bootstrappingAttempts.get() < 5L) { + throw new ElasticsearchException("test"); } }); - startServices(); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertThat(bootstrappingAttempts.get(), greaterThanOrEqualTo(5L)); + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), greaterThanOrEqualTo(40000L)); + } + + public void testCancelsBootstrapIfRequirementMatchesMultipleNodes() { + AtomicReference> discoveredNodes + = new AtomicReference<>(Stream.of(otherNode1, otherNode2).collect(Collectors.toList())); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( + Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getAddress().getAddress()).build(), + transportService, discoveredNodes::get, () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + + discoveredNodes.set(emptyList()); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } + + public void testCancelsBootstrapIfNodeMatchesMultipleRequirements() { + AtomicReference> discoveredNodes + = new AtomicReference<>(Stream.of(otherNode1, otherNode2).collect(Collectors.toList())); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( + Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), otherNode1.getAddress().toString(), otherNode1.getName()) + .build(), + transportService, discoveredNodes::get, () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + + discoveredNodes.set(Stream.of(new DiscoveryNode(otherNode1.getName(), randomAlphaOfLength(10), buildNewFakeTransportAddress(), + emptyMap(), singleton(Role.MASTER), Version.CURRENT), + new DiscoveryNode("yet-another-node", randomAlphaOfLength(10), otherNode1.getAddress(), emptyMap(), singleton(Role.MASTER), + Version.CURRENT)).collect(Collectors.toList())); + + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } + + public void testMatchesOnNodeName() { + final AtomicBoolean bootstrapped = new AtomicBoolean(); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( + Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName()).build(), transportService, + Collections::emptyList, () -> false, vc -> assertTrue(bootstrapped.compareAndSet(false, true))); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertTrue(bootstrapped.get()); + } + + public void testMatchesOnNodeAddress() { + final AtomicBoolean bootstrapped = new AtomicBoolean(); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( + Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getAddress().toString()).build(), transportService, + Collections::emptyList, () -> false, vc -> assertTrue(bootstrapped.compareAndSet(false, true))); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertTrue(bootstrapped.get()); + } + + public void testMatchesOnNodeHostAddress() { + final AtomicBoolean bootstrapped = new AtomicBoolean(); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( + Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getAddress().getAddress()).build(), + transportService, Collections::emptyList, () -> false, vc -> assertTrue(bootstrapped.compareAndSet(false, true))); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertTrue(bootstrapped.get()); + } + + public void testDoesNotJustMatchEverything() { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( + Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), randomAlphaOfLength(10)).build(), transportService, + Collections::emptyList, () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } + + public void testDoesNotIncludeExtraNodes() { + final DiscoveryNode extraNode = newDiscoveryNode("extra-node"); + final AtomicBoolean bootstrapped = new AtomicBoolean(); + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> Stream.of(otherNode1, otherNode2, extraNode).collect(Collectors.toList()), () -> false, + vc -> { + assertTrue(bootstrapped.compareAndSet(false, true)); + assertThat(vc.getNodeIds(), not(hasItem(extraNode.getId()))); + }); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); deterministicTaskQueue.runAllTasks(); - // termination means success + assertTrue(bootstrapped.get()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java index 129b29e1f21e5..cf8e1737a7708 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -38,6 +38,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; @@ -245,6 +246,13 @@ public void testDescriptionAfterBootstrapping() { "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", BOOTSTRAP_PLACEHOLDER_PREFIX + "n3"), + emptyList(), emptyList(), 0L).getDescription(), + is("master not discovered or elected yet, an election requires 2 nodes with ids [n1, n2], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList(), 0L) .getDescription(), is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " + @@ -259,6 +267,20 @@ public void testDescriptionAfterBootstrapping() { "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), + emptyList(), emptyList(), 0L).getDescription(), + is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", + BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L).getDescription(), + is("master not discovered or elected yet, an election requires 3 nodes with ids [n1, n2, n3], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n1"}), emptyList(), emptyList(), 0L).getDescription(), is("master not discovered or elected yet, an election requires a node with id [n1], " + diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 96746f3343e0a..a9ca7d917b9d8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -26,7 +26,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -48,7 +47,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -82,15 +80,16 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; @@ -125,7 +124,6 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -726,70 +724,6 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { // assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); } - public void testDiscoveryOfPeersTriggersNotification() { - final Cluster cluster = new Cluster(randomIntBetween(2, 5)); - - // register a listener and then deregister it again to show that it is not called after deregistration - try (Releasable ignored = cluster.getAnyNode().coordinator.withDiscoveryListener(ActionListener.wrap(() -> { - throw new AssertionError("should not be called"); - }))) { - // do nothing - } - - final long startTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis(); - final ClusterNode bootstrapNode = cluster.getAnyNode(); - final AtomicBoolean hasDiscoveredAllPeers = new AtomicBoolean(); - assertFalse(bootstrapNode.coordinator.getFoundPeers().iterator().hasNext()); - try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener( - new ActionListener>() { - @Override - public void onResponse(Iterable discoveryNodes) { - int peerCount = 0; - for (final DiscoveryNode discoveryNode : discoveryNodes) { - peerCount++; - } - assertThat(peerCount, lessThan(cluster.size())); - if (peerCount == cluster.size() - 1 && hasDiscoveredAllPeers.get() == false) { - hasDiscoveredAllPeers.set(true); - final long elapsedTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis; - logger.info("--> {} discovered {} peers in {}ms", bootstrapNode.getId(), cluster.size() - 1, elapsedTimeMillis); - assertThat(elapsedTimeMillis, lessThanOrEqualTo(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2)); - } - } - - @Override - public void onFailure(Exception e) { - throw new AssertionError("unexpected", e); - } - })) { - cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "discovery phase"); - } - - assertTrue(hasDiscoveredAllPeers.get()); - - final AtomicBoolean receivedAlreadyBootstrappedException = new AtomicBoolean(); - try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener( - new ActionListener>() { - @Override - public void onResponse(Iterable discoveryNodes) { - // ignore - } - - @Override - public void onFailure(Exception e) { - if (e instanceof ClusterAlreadyBootstrappedException) { - receivedAlreadyBootstrappedException.set(true); - } else { - throw new AssertionError("unexpected", e); - } - } - })) { - - cluster.stabilise(); - } - assertTrue(receivedAlreadyBootstrappedException.get()); - } - public void testSettingInitialConfigurationTriggersElection() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); @@ -1271,12 +1205,8 @@ public String toString() { } } else if (rarely()) { final ClusterNode clusterNode = getAnyNode(); - clusterNode.onNode( - () -> { - logger.debug("----> [runRandomly {}] applying initial configuration {} to {}", - thisStep, initialConfiguration, clusterNode.getId()); - clusterNode.coordinator.setInitialConfiguration(initialConfiguration); - }).run(); + logger.debug("----> [runRandomly {}] applying initial configuration on {}", step, clusterNode.getId()); + clusterNode.applyInitialConfiguration(); } else { if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { deterministicTaskQueue.advanceTime(); @@ -1803,11 +1733,18 @@ ClusterState getLastAppliedClusterState() { void applyInitialConfiguration() { onNode(() -> { + final Set nodeIdsWithPlaceholders = new HashSet<>(initialConfiguration.getNodeIds()); + Stream.generate(() -> BOOTSTRAP_PLACEHOLDER_PREFIX + UUIDs.randomBase64UUID(random())) + .limit((Math.max(initialConfiguration.getNodeIds().size(), 2) - 1) / 2) + .forEach(nodeIdsWithPlaceholders::add); + final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(new HashSet<>( + randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders))); try { - coordinator.setInitialConfiguration(initialConfiguration); - logger.info("successfully set initial configuration to {}", initialConfiguration); + coordinator.setInitialConfiguration(configurationWithPlaceholders); + logger.info("successfully set initial configuration to {}", configurationWithPlaceholders); } catch (CoordinationStateRejectedException e) { - logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e); + logger.info(new ParameterizedMessage("failed to set initial configuration to {}", + configurationWithPlaceholders), e); } }).run(); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 46523234d1cfa..24f97b67c1458 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; @@ -48,13 +47,14 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.coordination.ClusterBootstrapService; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.CoordinatorTests; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.coordination.MockSinglePrioritizingExecutor; +import org.elasticsearch.cluster.coordination.ClusterBootstrapService; import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -254,14 +254,10 @@ private void startCluster() { deterministicTaskQueue.advanceTime(); deterministicTaskQueue.runAllRunnableTasks(); - final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration( - testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) - .map(node -> new BootstrapConfiguration.NodeDescription(node.node)) - .distinct() - .collect(Collectors.toList())); + final VotingConfiguration votingConfiguration = new VotingConfiguration(testClusterNodes.nodes.values().stream().map(n -> n.node) + .filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet())); testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach( - testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(bootstrapConfiguration) - ); + testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(votingConfiguration)); runUntil( () -> {