From afbec68e895f3ab8431cef3bcc1bda040c59e01b Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 9 Nov 2018 14:53:30 +0000 Subject: [PATCH 01/17] [Zen2] Introduce vote withdrawal If shutting down half or more of the master-eligible nodes, their votes must first be explicitly withdrawn to ensure that the cluster doesn't lose its quorum. This works via _voting tombstones_, stored in the cluster state, which tell the reconfigurator to remove nodes from the voting configuration. This change introduces voting tombstones to the cluster state, together with transport APIs for adding and removing them, and makes use of these APIs in `InternalTestCluster` to support tests which remove at least half of the master-eligible nodes at once (e.g. shrinking from two master-eligible nodes to one). --- .../elasticsearch/action/ActionModule.java | 6 + .../AddVotingTombstonesAction.java | 41 ++ .../AddVotingTombstonesRequest.java | 106 +++++ .../AddVotingTombstonesResponse.java | 69 +++ .../ClearVotingTombstonesAction.java | 41 ++ .../ClearVotingTombstonesRequest.java | 102 +++++ .../ClearVotingTombstonesResponse.java | 52 +++ .../TransportAddVotingTombstonesAction.java | 178 ++++++++ .../TransportClearVotingTombstonesAction.java | 150 +++++++ .../action/support/ActionFilters.java | 5 + .../elasticsearch/cluster/ClusterState.java | 42 +- .../cluster/coordination/Coordinator.java | 6 +- .../settings/AbstractScopedSettings.java | 3 +- .../common/settings/ClusterSettings.java | 4 +- .../TransportBootstrapClusterActionTests.java | 3 +- ...ransportGetDiscoveredNodesActionTests.java | 24 +- .../AddVotingTombstonesRequestTests.java | 41 ++ .../AddVotingTombstonesResponseTests.java | 43 ++ .../ClearVotingTombstonesRequestTests.java | 42 ++ .../ClearVotingTombstonesResponseTests.java | 31 ++ ...ansportAddVotingTombstonesActionTests.java | 421 ++++++++++++++++++ ...sportClearVotingTombstonesActionTests.java | 202 +++++++++ .../cluster/routing/AllocationIdIT.java | 8 + .../test/InternalTestCluster.java | 41 +- .../test/test/InternalTestClusterIT.java | 57 +++ 25 files changed, 1694 insertions(+), 24 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponseTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java create mode 100644 test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index b5474a05ec26b..ad5538f653714 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -27,6 +27,10 @@ import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; import org.elasticsearch.action.admin.cluster.bootstrap.TransportBootstrapClusterAction; import org.elasticsearch.action.admin.cluster.bootstrap.TransportGetDiscoveredNodesAction; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingTombstonesAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -428,6 +432,8 @@ public void reg actions.register(GetDiscoveredNodesAction.INSTANCE, TransportGetDiscoveredNodesAction.class); actions.register(BootstrapClusterAction.INSTANCE, TransportBootstrapClusterAction.class); + actions.register(AddVotingTombstonesAction.INSTANCE, TransportAddVotingTombstonesAction.class); + actions.register(ClearVotingTombstonesAction.INSTANCE, TransportClearVotingTombstonesAction.class); actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesAction.java new file mode 100644 index 0000000000000..78650726ecf9a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable.Reader; + +public class AddVotingTombstonesAction extends Action { + public static final AddVotingTombstonesAction INSTANCE = new AddVotingTombstonesAction(); + public static final String NAME = "cluster:admin/voting/add_tombstones"; + + private AddVotingTombstonesAction() { + super(NAME); + } + + @Override + public AddVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Reader getResponseReader() { + return AddVotingTombstonesResponse::new; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java new file mode 100644 index 0000000000000..ca9767f7549a2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java @@ -0,0 +1,106 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A request to add voting tombstones for certain master-eligible nodes, and wait for these nodes to be removed from the voting + * configuration. + */ +public class AddVotingTombstonesRequest extends MasterNodeRequest { + private final String[] nodeDescriptions; + private final TimeValue timeout; + + /** + * Construct a request to add voting tombstones for master-eligible nodes matching the given descriptions, and wait for a default 30 + * seconds for these nodes to be removed from the voting configuration. + * @param nodeDescriptions Descriptions of the nodes to add - see {@link DiscoveryNodes#resolveNodes(String...)} + */ + public AddVotingTombstonesRequest(String[] nodeDescriptions) { + this(nodeDescriptions, TimeValue.timeValueSeconds(30)); + } + + /** + * Construct a request to add voting tombstones for master-eligible nodes matching the given descriptions, and wait for these nodes to + * be removed from the voting configuration. + * @param nodeDescriptions Descriptions of the nodes whose tombstones to add - see {@link DiscoveryNodes#resolveNodes(String...)}. + * @param timeout How long to wait for the nodes to be removed from the voting configuration. + */ + public AddVotingTombstonesRequest(String[] nodeDescriptions, TimeValue timeout) { + if (timeout.compareTo(TimeValue.ZERO) < 0) { + throw new IllegalArgumentException("timeout [" + timeout + "] must be non-negative"); + } + this.nodeDescriptions = nodeDescriptions; + this.timeout = timeout; + } + + public AddVotingTombstonesRequest(StreamInput in) throws IOException { + super(in); + nodeDescriptions = in.readStringArray(); + timeout = in.readTimeValue(); + } + + /** + * @return descriptions of the nodes for whom to add tombstones. + */ + public String[] getNodeDescriptions() { + return nodeDescriptions; + } + + /** + * @return how long to wait after adding the tombstones for the nodes to be removed from the voting configuration. + */ + public TimeValue getTimeout() { + return timeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(nodeDescriptions); + out.writeTimeValue(timeout); + } + + @Override + public String toString() { + return "AddVotingTombstonesRequest{" + + "nodeDescriptions=" + Arrays.asList(nodeDescriptions) + + ", timeout=" + timeout + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java new file mode 100644 index 0000000000000..764ebd2d8e2e3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Set; + +/** + * A response to {@link AddVotingTombstonesRequest} indicating that voting tombstones have been added for the requested nodes and these + * nodes have been removed from the voting configuration. + */ +public class AddVotingTombstonesResponse extends ActionResponse { + private final Set currentTombstones; + + public AddVotingTombstonesResponse(Set currentTombstones) { + this.currentTombstones = currentTombstones; + } + + public AddVotingTombstonesResponse(StreamInput in) throws IOException { + super(in); + currentTombstones = in.readSet(DiscoveryNode::new); + } + + /** + * @return the current set of tombstones at the point in time where all the requested nodes were removed from the voting configuration. + */ + public Set getCurrentTombstones() { + return currentTombstones; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeCollection(currentTombstones, (o, v) -> v.writeTo(o)); + } + + @Override + public String toString() { + return "AddVotingTombstonesResponse{" + + "currentTombstones=" + currentTombstones + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesAction.java new file mode 100644 index 0000000000000..3c6181ecb12b4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable.Reader; + +public class ClearVotingTombstonesAction extends Action { + public static final ClearVotingTombstonesAction INSTANCE = new ClearVotingTombstonesAction(); + public static final String NAME = "cluster:admin/voting/clear_tombstones"; + + private ClearVotingTombstonesAction() { + super(NAME); + } + + @Override + public ClearVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Reader getResponseReader() { + return ClearVotingTombstonesResponse::new; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java new file mode 100644 index 0000000000000..fbe7968dba87b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; + +/** + * A request to clear the voting tombstones from the cluster state, optionally waiting for these nodes to be removed from the cluster first. + */ +public class ClearVotingTombstonesRequest extends MasterNodeRequest { + private boolean waitForRemoval; + private TimeValue timeout = TimeValue.timeValueSeconds(30); + + /** + * Construct a request to remove all the voting tombstones from the cluster state. + */ + public ClearVotingTombstonesRequest() { + } + + public ClearVotingTombstonesRequest(StreamInput in) throws IOException { + super(in); + waitForRemoval = in.readBoolean(); + timeout = in.readTimeValue(); + } + + /** + * @return whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. + */ + public boolean getWaitForRemoval() { + return waitForRemoval; + } + + /** + * @param waitForRemoval whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. + */ + public void setWaitForRemoval(boolean waitForRemoval) { + this.waitForRemoval = waitForRemoval; + } + + /** + * @param timeout how long to wait for the tombstoned nodes to be removed if {@link ClearVotingTombstonesRequest#waitForRemoval} is + * true. Defaults to 30 seconds. + */ + public void setTimeout(TimeValue timeout) { + this.timeout = timeout; + } + + /** + * @return how long to wait for the tombstoned nodes to be removed if {@link ClearVotingTombstonesRequest#waitForRemoval} is + * true. Defaults to 30 seconds. + */ + public TimeValue getTimeout() { + return timeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(waitForRemoval); + out.writeTimeValue(timeout); + } + + @Override + public String toString() { + return "ClearVotingTombstonesRequest{" + + ", waitForRemoval=" + waitForRemoval + + ", timeout=" + timeout + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java new file mode 100644 index 0000000000000..eb827e0a70f65 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A response to {@link ClearVotingTombstonesRequest} indicating that voting tombstones have been cleared from the cluster state. + */ +public class ClearVotingTombstonesResponse extends ActionResponse { + public ClearVotingTombstonesResponse() { + } + + public ClearVotingTombstonesResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public String toString() { + return "ClearVotingTombstonesResponse{}"; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java new file mode 100644 index 0000000000000..3403d8fc8a3c7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -0,0 +1,178 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateObserver.Listener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class TransportAddVotingTombstonesAction extends TransportMasterNodeAction { + + public static final Setting MAXIMUM_VOTING_TOMBSTONES_SETTING + = Setting.intSetting("cluster.max_voting_tombstones", 10, 1, Property.Dynamic, Property.NodeScope); + + @Inject + public TransportAddVotingTombstonesAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(AddVotingTombstonesAction.NAME, transportService, clusterService, threadPool, actionFilters, AddVotingTombstonesRequest::new, + indexNameExpressionResolver); + } + + @Override + protected String executor() { + return Names.SAME; + } + + @Override + protected AddVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected AddVotingTombstonesResponse read(StreamInput in) throws IOException { + return new AddVotingTombstonesResponse(in); + } + + @Override + protected void masterOperation(AddVotingTombstonesRequest request, ClusterState state, + ActionListener listener) throws Exception { + + clusterService.getMasterService().submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask() { + + final ClusterStateObserver observer + = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext()); + + private Set resolvedNodes; + + @Override + public ClusterState execute(ClusterState currentState) { + final DiscoveryNodes allNodes = currentState.nodes(); + assert resolvedNodes == null : resolvedNodes; + resolvedNodes = Arrays.stream(allNodes.resolveNodes(request.getNodeDescriptions())) + .map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet()); + + if (resolvedNodes.isEmpty()) { + throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(request.getNodeDescriptions()) + + " matched no master-eligible nodes"); + } + + resolvedNodes.removeIf(n -> currentState.getVotingTombstones().contains(n)); + if (resolvedNodes.isEmpty()) { + throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(request.getNodeDescriptions()) + + " matched no master-eligible nodes that do not already have tombstones"); + } + + final int oldTombstoneCount = currentState.getVotingTombstones().size(); + final int newTombstoneCount = resolvedNodes.size(); + final int maxTombstoneCount = MAXIMUM_VOTING_TOMBSTONES_SETTING.get(currentState.metaData().settings()); + if (oldTombstoneCount + newTombstoneCount > maxTombstoneCount) { + throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(request.getNodeDescriptions()) + + " would add [" + newTombstoneCount + "] voting tombstones to the existing [" + oldTombstoneCount + + "] which would exceed the maximum of [" + maxTombstoneCount + "] set by [" + + MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey() + "]"); + } + + final Builder builder = ClusterState.builder(currentState); + resolvedNodes.forEach(builder::addVotingTombstone); + final ClusterState newState = builder.build(); + assert newState.getVotingTombstones().size() <= maxTombstoneCount; + return newState; + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + + final Set resolvedNodeIds = resolvedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + + final Predicate allNodesRemoved = new Predicate() { + @Override + public boolean test(ClusterState clusterState) { + final Set votingNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); + return resolvedNodeIds.stream().anyMatch(votingNodeIds::contains) == false; + } + + @Override + public String toString() { + return "withdrawal of votes from " + resolvedNodes; + } + }; + + final Listener clusterStateListener = new Listener() { + @Override + public void onNewClusterState(ClusterState state) { + listener.onResponse(new AddVotingTombstonesResponse(state.getVotingTombstones())); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new ElasticsearchException("cluster service closed while waiting for " + allNodesRemoved)); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for " + allNodesRemoved)); + } + }; + + if (allNodesRemoved.test(newState)) { + clusterStateListener.onNewClusterState(newState); + } else { + observer.waitForNextChange(clusterStateListener, allNodesRemoved); + } + } + }); + } + + @Override + protected ClusterBlockException checkBlock(AddVotingTombstonesRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java new file mode 100644 index 0000000000000..830fb3d269337 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java @@ -0,0 +1,150 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateObserver.Listener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.function.Predicate; + +public class TransportClearVotingTombstonesAction + extends TransportMasterNodeAction { + + @Inject + public TransportClearVotingTombstonesAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(ClearVotingTombstonesAction.NAME, transportService, clusterService, threadPool, actionFilters, + ClearVotingTombstonesRequest::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return Names.SAME; + } + + @Override + protected ClearVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected ClearVotingTombstonesResponse read(StreamInput in) throws IOException { + return new ClearVotingTombstonesResponse(in); + } + + @Override + protected void masterOperation(ClearVotingTombstonesRequest request, ClusterState initialState, + ActionListener listener) throws Exception { + + final long startTimeMillis = threadPool.relativeTimeInMillis(); + + final Predicate allTombstonedNodesRemoved = new Predicate() { + @Override + public boolean test(ClusterState newState) { + for (DiscoveryNode tombstone : initialState.getVotingTombstones()) { + if (newState.nodes().nodeExists(tombstone.getId())) { + return false; + } + } + return true; + } + + @Override + public String toString() { + return "removal of nodes " + initialState.getVotingTombstones(); + } + }; + + if (request.getWaitForRemoval() && allTombstonedNodesRemoved.test(initialState) == false) { + final ClusterStateObserver clusterStateObserver = new ClusterStateObserver(initialState, clusterService, request.getTimeout(), + logger, threadPool.getThreadContext()); + + clusterStateObserver.waitForNextChange(new Listener() { + @Override + public void onNewClusterState(ClusterState state) { + submitClearTombstonesTask(request, startTimeMillis, listener); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new ElasticsearchException("cluster service closed while waiting for " + allTombstonedNodesRemoved)); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for " + allTombstonedNodesRemoved)); + } + }, allTombstonedNodesRemoved); + } else { + submitClearTombstonesTask(request, startTimeMillis, listener); + } + } + + private void submitClearTombstonesTask(ClearVotingTombstonesRequest request, long startTimeMillis, + ActionListener listener) { + clusterService.getMasterService().submitStateUpdateTask("clear-voting-tombstones", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + final Builder builder = ClusterState.builder(currentState); + builder.clearVotingTombstones(); + return builder.build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public TimeValue timeout() { + return TimeValue.timeValueMillis(request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis()); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new ClearVotingTombstonesResponse()); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(ClearVotingTombstonesRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java b/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java index c66bac31a3dee..3bb0f80d4abc9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java +++ b/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java @@ -23,11 +23,16 @@ import java.util.Comparator; import java.util.Set; +import static java.util.Collections.emptySet; + /** * Holds the action filters injected through plugins, properly sorted by {@link org.elasticsearch.action.support.ActionFilter#order()} */ public class ActionFilters { + // this could be used in many more places - TODO use this where appropriate + public static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private final ActionFilter[] filters; public ActionFilters(Set actionFilters) { diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 99b80d7a1e6b6..bf289d56594e2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -62,6 +62,7 @@ import org.elasticsearch.discovery.zen.PublishClusterStateAction; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -72,6 +73,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * Represents the current state of the cluster. @@ -186,18 +188,21 @@ default boolean isPrivate() { private final VotingConfiguration lastAcceptedConfiguration; + private final Set votingTombstones; + // built on demand private volatile RoutingNodes routingNodes; public ClusterState(long term, long version, String stateUUID, ClusterState state) { this(state.clusterName, term, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), - state.customs(), state.getLastCommittedConfiguration(), state.getLastAcceptedConfiguration(), false); + state.customs(), state.getLastCommittedConfiguration(), state.getLastAcceptedConfiguration(), state.getVotingTombstones(), + false); } public ClusterState(ClusterName clusterName, long term, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration, - boolean wasReadFromDiff) { + Set votingTombstones, boolean wasReadFromDiff) { this.term = term; this.version = version; this.stateUUID = stateUUID; @@ -209,6 +214,7 @@ public ClusterState(ClusterName clusterName, long term, long version, String sta this.customs = customs; this.lastCommittedConfiguration = lastCommittedConfiguration; this.lastAcceptedConfiguration = lastAcceptedConfiguration; + this.votingTombstones = votingTombstones; this.wasReadFromDiff = wasReadFromDiff; } @@ -288,6 +294,10 @@ public VotingConfiguration getLastCommittedConfiguration() { return lastCommittedConfiguration; } + public Set getVotingTombstones() { + return votingTombstones; + } + // Used for testing and logging to determine how this cluster state was send over the wire public boolean wasReadFromDiff() { return wasReadFromDiff; @@ -313,6 +323,7 @@ public String toString() { sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("last committed config: ").append(getLastCommittedConfiguration()).append("\n"); sb.append("last accepted config: ").append(getLastAcceptedConfiguration()).append("\n"); + sb.append("voting tombstones: ").append(votingTombstones.toString()).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); sb.append("meta data version: ").append(metaData.version()).append("\n"); final String TAB = " "; @@ -428,6 +439,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("state_uuid", stateUUID); builder.field("last_committed_config", lastCommittedConfiguration); builder.field("last_accepted_config", lastAcceptedConfiguration); + // TODO include voting tombstones here } if (metrics.contains(Metric.MASTER_NODE)) { @@ -637,6 +649,7 @@ public static class Builder { private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES; private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK; private final ImmutableOpenMap.Builder customs; + private final Set votingTombstones = new HashSet<>(); private boolean fromDiff; @@ -647,6 +660,7 @@ public Builder(ClusterState state) { this.uuid = state.stateUUID(); this.lastCommittedConfiguration = state.getLastCommittedConfiguration(); this.lastAcceptedConfiguration = state.getLastAcceptedConfiguration(); + this.votingTombstones.addAll(state.getVotingTombstones()); this.nodes = state.nodes(); this.routingTable = state.routingTable(); this.metaData = state.metaData(); @@ -752,7 +766,7 @@ public ClusterState build() { uuid = UUIDs.randomBase64UUID(); } return new ClusterState(clusterName, term, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), - lastCommittedConfiguration, lastAcceptedConfiguration, fromDiff); + lastCommittedConfiguration, lastAcceptedConfiguration, Collections.unmodifiableSet(votingTombstones), fromDiff); } public static byte[] toBytes(ClusterState state) throws IOException { @@ -770,6 +784,14 @@ public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode, Named return readFrom(in, localNode); } + + public void addVotingTombstone(DiscoveryNode tombstone) { + votingTombstones.add(tombstone); + } + + public void clearVotingTombstones() { + votingTombstones.clear(); + } } @Override @@ -792,6 +814,7 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr if (in.getVersion().onOrAfter(Version.V_7_0_0)) { builder.lastCommittedConfiguration(new VotingConfiguration(in)); builder.lastAcceptedConfiguration(new VotingConfiguration(in)); + in.readSet(DiscoveryNode::new).forEach(builder::addVotingTombstone); } builder.metaData = MetaData.readFrom(in); builder.routingTable = RoutingTable.readFrom(in); @@ -816,6 +839,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_0_0)) { lastCommittedConfiguration.writeTo(out); lastAcceptedConfiguration.writeTo(out); + out.writeCollection(votingTombstones, (o, v) -> v.writeTo(o)); } metaData.writeTo(out); routingTable.writeTo(out); @@ -852,6 +876,8 @@ private static class ClusterStateDiff implements Diff { private final VotingConfiguration lastAcceptedConfiguration; + private final Set votingTombstones; + private final Diff routingTable; private final Diff nodes; @@ -870,6 +896,7 @@ private static class ClusterStateDiff implements Diff { clusterName = after.clusterName; lastCommittedConfiguration = after.lastCommittedConfiguration; lastAcceptedConfiguration = after.lastAcceptedConfiguration; + votingTombstones = after.votingTombstones; routingTable = after.routingTable.diff(before.routingTable); nodes = after.nodes.diff(before.nodes); metaData = after.metaData.diff(before.metaData); @@ -890,9 +917,11 @@ private static class ClusterStateDiff implements Diff { if (in.getVersion().onOrAfter(Version.V_7_0_0)) { lastCommittedConfiguration = new VotingConfiguration(in); lastAcceptedConfiguration = new VotingConfiguration(in); + votingTombstones = in.readSet(DiscoveryNode::new); } else { lastCommittedConfiguration = VotingConfiguration.EMPTY_CONFIG; lastAcceptedConfiguration = VotingConfiguration.EMPTY_CONFIG; + votingTombstones = Collections.emptySet(); } routingTable = RoutingTable.readDiffFrom(in); nodes = DiscoveryNodes.readDiffFrom(in, localNode); @@ -913,6 +942,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_0_0)) { lastCommittedConfiguration.writeTo(out); lastAcceptedConfiguration.writeTo(out); + out.writeCollection(votingTombstones, (o, v) -> v.writeTo(o)); } routingTable.writeTo(out); nodes.writeTo(out); @@ -936,6 +966,7 @@ public ClusterState apply(ClusterState state) { builder.version(toVersion); builder.lastCommittedConfiguration(lastCommittedConfiguration); builder.lastAcceptedConfiguration(lastAcceptedConfiguration); + votingTombstones.forEach(builder::addVotingTombstone); builder.routingTable(routingTable.apply(state.routingTable)); builder.nodes(nodes.apply(state.nodes)); builder.metaData(metaData.apply(state.metaData)); @@ -1009,5 +1040,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } return builder.endArray(); } + + public static VotingConfiguration of(DiscoveryNode... nodes) { + // this could be used in many more places - TODO use this where appropriate + return new VotingConfiguration(Arrays.stream(nodes).map(DiscoveryNode::getId).collect(Collectors.toSet())); + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 31b481d93c3e1..58d4f2f2a52e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -75,7 +75,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static java.util.Collections.emptySet; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -641,8 +640,9 @@ ClusterState improveConfiguration(ClusterState clusterState) { final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) .filter(this::hasJoinVoteFrom).collect(Collectors.toSet()); - final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure( - liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration()); + final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes, + clusterState.getVotingTombstones().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()), + clusterState.getLastAcceptedConfiguration()); if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build(); diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 9667341cd6ef3..db796d5ee3e24 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -79,7 +79,8 @@ protected AbstractScopedSettings( Map> keySettings = new HashMap<>(); for (Setting setting : settingsSet) { if (setting.getProperties().contains(scope) == false) { - throw new IllegalArgumentException("Setting must be a " + scope + " setting but has: " + setting.getProperties()); + throw new IllegalArgumentException("Setting " + setting + " must be a " + + scope + " setting but has: " + setting.getProperties()); } validateSettingKey(setting); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 871c510f3ded5..f590b59df0e20 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.settings; import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; @@ -456,7 +457,8 @@ public void apply(Settings value, Settings current, Settings previous) { ElectionSchedulerFactory.ELECTION_DURATION_SETTING, Coordinator.PUBLISH_TIMEOUT_SETTING, JoinHelper.JOIN_TIMEOUT_SETTING, - Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION + Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION, + TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java index 8aba67e89bd4a..eb8341aeb4021 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; -import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; @@ -55,6 +54,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; +import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -62,7 +62,6 @@ public class TransportBootstrapClusterActionTests extends ESTestCase { - private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); private DiscoveryNode discoveryNode; private static ThreadPool threadPool; private TransportService transportService; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index 47f1355dd3570..ac3821f768168 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; -import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; @@ -46,8 +45,9 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService.HandshakeResponse; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import java.io.IOException; import java.util.Random; @@ -57,6 +57,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; @@ -69,14 +70,23 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { - private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private static ThreadPool threadPool; private DiscoveryNode localNode; - private ThreadPool threadPool; private String clusterName; private TransportService transportService; private Coordinator coordinator; private DiscoveryNode otherNode; + @BeforeClass + public static void createThreadPool() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + } + + @AfterClass + public static void shutdownThreadPool() { + threadPool.shutdown(); + } + @Before public void setupTest() { clusterName = randomAlphaOfLength(10); @@ -91,7 +101,6 @@ protected void onSendRequest(long requestId, String action, TransportRequest req } } }; - threadPool = new TestThreadPool("test", Settings.EMPTY); transportService = transport.createTransportService( Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); @@ -104,11 +113,6 @@ protected void onSendRequest(long requestId, String action, TransportRequest req new NoOpClusterApplier(), new Random(random().nextLong())); } - @After - public void cleanUp() { - threadPool.shutdown(); - } - public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { final Discovery discovery = mock(Discovery.class); verifyZeroInteractions(discovery); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java new file mode 100644 index 0000000000000..53533e70a4799 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class AddVotingTombstonesRequestTests extends ESTestCase { + public void testSerialization() throws IOException { + int descriptionCount = between(0, 5); + String[] descriptions = new String[descriptionCount]; + for (int i = 0; i < descriptionCount; i++) { + descriptions[i] = randomAlphaOfLength(10); + } + TimeValue timeout = TimeValue.timeValueMillis(between(0, 30000)); + final AddVotingTombstonesRequest originalRequest = new AddVotingTombstonesRequest(descriptions, timeout); + final AddVotingTombstonesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesRequest::new); + assertThat(deserialized.getNodeDescriptions(), equalTo(originalRequest.getNodeDescriptions())); + assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout())); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java new file mode 100644 index 0000000000000..216518017827f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; + +public class AddVotingTombstonesResponseTests extends ESTestCase { + public void testSerialization() throws IOException { + int tombstoneCount = between(0, 5); + Set tombstones = new HashSet<>(); + while (tombstones.size() < tombstoneCount) { + tombstones.add(new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)); + } + final AddVotingTombstonesResponse originalRequest = new AddVotingTombstonesResponse(tombstones); + final AddVotingTombstonesResponse deserialized + = copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesResponse::new); + assertThat(deserialized.getCurrentTombstones(), equalTo(originalRequest.getCurrentTombstones())); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequestTests.java new file mode 100644 index 0000000000000..ad8fa3a3f52c9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequestTests.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class ClearVotingTombstonesRequestTests extends ESTestCase { + public void testSerialization() throws IOException { + final ClearVotingTombstonesRequest originalRequest = new ClearVotingTombstonesRequest(); + if (randomBoolean()) { + originalRequest.setWaitForRemoval(randomBoolean()); + } + if (randomBoolean()) { + originalRequest.setTimeout(TimeValue.timeValueMillis(randomLongBetween(0, 30000))); + } + final ClearVotingTombstonesRequest deserialized + = copyWriteable(originalRequest, writableRegistry(), ClearVotingTombstonesRequest::new); + assertThat(deserialized.getWaitForRemoval(), equalTo(originalRequest.getWaitForRemoval())); + assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout())); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponseTests.java new file mode 100644 index 0000000000000..3ae9eba2c140a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponseTests.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class ClearVotingTombstonesResponseTests extends ESTestCase { + public void testSerialization() throws IOException { + final ClearVotingTombstonesResponse originalRequest = new ClearVotingTombstonesResponse(); + copyWriteable(originalRequest, writableRegistry(), ClearVotingTombstonesResponse::new); + // there are no fields so we're just checking that this doesn't throw anything + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java new file mode 100644 index 0000000000000..12bafed5dbb1f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java @@ -0,0 +1,421 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateObserver.Listener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING; +import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; +import static org.elasticsearch.cluster.ClusterState.builder; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.startsWith; + +public class TransportAddVotingTombstonesActionTests extends ESTestCase { + + private static ThreadPool threadPool; + private static ClusterService clusterService; + private static DiscoveryNode localNode, otherNode1, otherNode2, otherDataNode; + + private TransportService transportService; + private ClusterStateObserver clusterStateObserver; + + @BeforeClass + public static void createThreadPoolAndClusterService() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + localNode = makeDiscoveryNode("local"); + otherNode1 = makeDiscoveryNode("other1"); + otherNode2 = makeDiscoveryNode("other2"); + otherDataNode = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + clusterService = createClusterService(threadPool, localNode); + } + + private static DiscoveryNode makeDiscoveryNode(String name) { + return new DiscoveryNode(name, name, buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + } + + @AfterClass + public static void shutdownThreadPoolAndClusterService() { + clusterService.stop(); + threadPool.shutdown(); + } + + @Before + public void setupForTest() { + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + + new TransportAddVotingTombstonesAction(transportService, clusterService, threadPool, EMPTY_FILTERS, + new IndexNameExpressionResolver()); // registers action + + transportService.start(); + transportService.acceptIncomingRequests(); + + final VotingConfiguration allNodesConfig = VotingConfiguration.of(localNode, otherNode1, otherNode2); + + setState(clusterService, builder(new ClusterName("cluster")) + .nodes(new Builder().add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode) + .localNodeId(localNode.getId()).masterNodeId(localNode.getId())) + .lastAcceptedConfiguration(allNodesConfig).lastCommittedConfiguration(allNodesConfig)); + + clusterStateObserver = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); + } + + public void testWithdrawsVoteFromANode() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(responseHolder.get().getCurrentTombstones(), contains(otherNode1)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); + } + + public void testWithdrawsVotesFromMultipleNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1", "other2"}), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(responseHolder.get().getCurrentTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + } + + public void testWithdrawsVotesFromNodesMatchingWildcard() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other*"}), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(responseHolder.get().getCurrentTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + } + + public void testWithdrawsVotesFromAllMasterEligibleNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"_all"}), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(responseHolder.get().getCurrentTombstones(), containsInAnyOrder(localNode, otherNode1, otherNode2)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), + containsInAnyOrder(localNode, otherNode1, otherNode2)); + } + + public void testWithdrawsVoteFromLocalNode() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"_local"}), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(responseHolder.get().getCurrentTombstones(), contains(localNode)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(localNode)); + } + + public void testReturnsImmediatelyIfVoteAlreadyWithdrawn() throws InterruptedException { + setState(clusterService, builder(clusterService.state()) + .lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2))); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + // no observer to reconfigure + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}, TimeValue.ZERO), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(responseHolder.get().getCurrentTombstones(), contains(otherNode1)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); + } + + public void testReturnsErrorIfNoMatchingNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"not-a-node"}), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), equalTo("add voting tombstones request for [not-a-node] matched no master-eligible nodes")); + } + + public void testOnlyMatchesMasterEligibleNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"_all", "master:false"}), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), + equalTo("add voting tombstones request for [_all, master:false] matched no master-eligible nodes")); + } + + public void testReturnsErrorIfNoTombstonesAdded() throws InterruptedException { + final ClusterState.Builder builder = builder(clusterService.state()); + builder.addVotingTombstone(otherNode1); + setState(clusterService, builder); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), + equalTo("add voting tombstones request for [other1] matched no master-eligible nodes that do not already have tombstones")); + } + + public void testReturnsErrorIfMaximumTombstoneCountExceeded() throws InterruptedException { + final ClusterState.Builder builder = builder(clusterService.state()) + .metaData(MetaData.builder(clusterService.state().metaData()).persistentSettings( + Settings.builder().put(clusterService.state().metaData().persistentSettings()) + .put(MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey(), 2).build())); + builder.addVotingTombstone(localNode); + final int existingCount, newCount; + if (randomBoolean()) { + builder.addVotingTombstone(otherNode1); + existingCount = 2; + newCount = 1; + } else { + existingCount = 1; + newCount = 2; + } + setState(clusterService, builder); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other*"}), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), equalTo("add voting tombstones request for [other*] would add [" + newCount + + "] voting tombstones to the existing [" + existingCount + + "] which would exceed the maximum of [2] set by [cluster.max_voting_tombstones]")); + } + + public void testTimesOut() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}, TimeValue.timeValueMillis(100)), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause,instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), startsWith("timed out waiting for withdrawal of votes from [{other1}")); + } + + private TransportResponseHandler expectSuccess(Consumer onResponse) { + return responseHandler(onResponse, e -> { + throw new AssertionError("unexpected", e); + }); + } + + private TransportResponseHandler expectError(Consumer onException) { + return responseHandler(r -> { + assert false : r; + }, onException); + } + + private TransportResponseHandler responseHandler(Consumer onResponse, + Consumer onException) { + return new TransportResponseHandler() { + @Override + public void handleResponse(AddVotingTombstonesResponse response) { + onResponse.accept(response); + } + + @Override + public void handleException(TransportException exp) { + onException.accept(exp); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public AddVotingTombstonesResponse read(StreamInput in) throws IOException { + return new AddVotingTombstonesResponse(in); + } + }; + } + + private class AdjustConfigurationForTombstones implements Listener { + @Override + public void onNewClusterState(ClusterState state) { + clusterService.getMasterService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + assertThat(currentState, sameInstance(state)); + final Set votingNodeIds = new HashSet<>(); + currentState.nodes().forEach(n -> votingNodeIds.add(n.getId())); + currentState.getVotingTombstones().forEach(t -> votingNodeIds.remove(t.getId())); + final VotingConfiguration votingConfiguration = new VotingConfiguration(votingNodeIds); + return builder(currentState) + .lastAcceptedConfiguration(votingConfiguration) + .lastCommittedConfiguration(votingConfiguration).build(); + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError("unexpected failure", e); + } + }); + } + + @Override + public void onClusterServiceClose() { + throw new AssertionError("unexpected close"); + } + + @Override + public void onTimeout(TimeValue timeout) { + throw new AssertionError("unexpected timeout"); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java new file mode 100644 index 0000000000000..74a349da1a33e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java @@ -0,0 +1,202 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; +import static org.elasticsearch.cluster.ClusterState.builder; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; + +public class TransportClearVotingTombstonesActionTests extends ESTestCase { + + private static ThreadPool threadPool; + private static ClusterService clusterService; + private static DiscoveryNode localNode, otherNode1, otherNode2; + + private TransportService transportService; + + @BeforeClass + public static void createThreadPoolAndClusterService() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + otherNode1 = new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + otherNode2 = new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + clusterService = createClusterService(threadPool, localNode); + } + + @AfterClass + public static void shutdownThreadPoolAndClusterService() { + clusterService.stop(); + threadPool.shutdown(); + } + + @Before + public void setupForTest() { + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + + new TransportClearVotingTombstonesAction(transportService, clusterService, threadPool, EMPTY_FILTERS, + new IndexNameExpressionResolver()); // registers action + + transportService.start(); + transportService.acceptIncomingRequests(); + + final ClusterState.Builder builder = builder(new ClusterName("cluster")) + .nodes(new Builder().add(localNode).add(otherNode1).add(otherNode2) + .localNodeId(localNode.getId()).masterNodeId(localNode.getId())); + builder.addVotingTombstone(otherNode1); + builder.addVotingTombstone(otherNode2); + setState(clusterService, builder); + } + + public void testClearsVotingTombstones() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, + new ClearVotingTombstonesRequest(), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertNotNull(responseHolder.get()); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), empty()); + } + + public void testTimesOutIfWaitingForNodesThatAreNotRemoved() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest(); + clearVotingTombstonesRequest.setWaitForRemoval(true); + clearVotingTombstonesRequest.setTimeout(TimeValue.timeValueMillis(100)); + transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, + clearVotingTombstonesRequest, + expectError(e -> { + responseHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + final Throwable rootCause = responseHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), startsWith("timed out waiting for removal of nodes [")); + } + + public void testSucceedsIfNodesAreRemovedWhileWaiting() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest(); + clearVotingTombstonesRequest.setWaitForRemoval(true); + transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, + clearVotingTombstonesRequest, + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + final ClusterState.Builder builder = builder(clusterService.state()); + builder.nodes(DiscoveryNodes.builder(clusterService.state().nodes()).remove(otherNode1).remove(otherNode2)); + setState(clusterService, builder); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), empty()); + } + + private TransportResponseHandler expectSuccess(Consumer onResponse) { + return responseHandler(onResponse, e -> { + throw new AssertionError("unexpected", e); + }); + } + + private TransportResponseHandler expectError(Consumer onException) { + return responseHandler(r -> { + assert false : r; + }, onException); + } + + private TransportResponseHandler responseHandler(Consumer onResponse, + Consumer onException) { + return new TransportResponseHandler() { + @Override + public void handleResponse(ClearVotingTombstonesResponse response) { + onResponse.accept(response); + } + + @Override + public void handleException(TransportException exp) { + onException.accept(exp); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public ClearVotingTombstonesResponse read(StreamInput in) throws IOException { + return new ClearVotingTombstonesResponse(in); + } + }; + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index f9da7a1aa8e4b..c075141e8c4b9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -45,6 +45,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; @@ -66,6 +67,13 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) public class AllocationIdIT extends ESIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(TestZenDiscovery.USE_ZEN2.getKey(), true) + .build(); + } + @Override protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1f71a2c2d9b6f..0ffd5bea546b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -27,11 +27,14 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesRequest; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; @@ -1622,18 +1625,48 @@ private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws } private synchronized void stopNodesAndClients(Collection nodeAndClients) throws IOException { + final Set withdrawnNodeIds = new HashSet<>(); + if (autoManageMinMasterNodes && nodeAndClients.size() > 0) { - int masters = (int)nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count(); - if (masters > 0) { - updateMinMasterNodes(getMasterNodesCount() - masters); + + final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count(); + final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count(); + + assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters; + if ((stoppingMasters != currentMasters && currentMasters <= stoppingMasters * 2) || rarely()) { + // stopping fewer than _all_ of the master nodes, but at least half of them, requires their votes to be withdrawn first + nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(withdrawnNodeIds::add); + logger.info("withdrawing votes from {} prior to shutdown", withdrawnNodeIds); + try { + client().execute(AddVotingTombstonesAction.INSTANCE, + new AddVotingTombstonesRequest(withdrawnNodeIds.toArray(new String[0]))).get(); + } catch (InterruptedException | ExecutionException e) { + throw new AssertionError("unexpected", e); + } + } + + if (stoppingMasters > 0) { + updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters)); } } + for (NodeAndClient nodeAndClient: nodeAndClients) { removeDisruptionSchemeFromNode(nodeAndClient); NodeAndClient previous = nodes.remove(nodeAndClient.name); assert previous == nodeAndClient; nodeAndClient.close(); } + + if (withdrawnNodeIds.isEmpty() == false) { + logger.info("removing voting tombstones for {} after shutdown", withdrawnNodeIds); + try { + final ClearVotingTombstonesRequest request = new ClearVotingTombstonesRequest(); + request.setWaitForRemoval(true); + client().execute(ClearVotingTombstonesAction.INSTANCE, request).get(); + } catch (InterruptedException | ExecutionException e) { + throw new AssertionError("unexpected", e); + } + } } /** diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java new file mode 100644 index 0000000000000..1109eadf88c8f --- /dev/null +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.test; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.discovery.TestZenDiscovery; + +import java.io.IOException; + +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class InternalTestClusterIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(TestZenDiscovery.USE_ZEN2.getKey(), true) + .build(); + } + + public void testStartingAndStoppingNodes() throws IOException { + logger.info("--> cluster has [{}] nodes", internalCluster().size()); + if (internalCluster().size() < 5) { + final int nodesToStart = randomIntBetween(Math.max(2, internalCluster().size() + 1), 5); + logger.info("--> growing to [{}] nodes", nodesToStart); + internalCluster().startNodes(nodesToStart); + } + ensureGreen(); + + while (internalCluster().size() > 1) { + final int nodesToRemain = randomIntBetween(1, internalCluster().size() - 1); + logger.info("--> reducing to [{}] nodes", nodesToRemain); + internalCluster().ensureAtMostNumDataNodes(nodesToRemain); + ensureGreen(); + assertThat(internalCluster().size(), lessThanOrEqualTo(nodesToRemain)); + } + } +} From cc76db2dce6e8eba6ba279c0530940aac8762dcc Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 13:13:35 +0000 Subject: [PATCH 02/17] assertBusy() because this sometimes gives AWAITING_INFO --- .../cluster/routing/AllocationIdIT.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index c075141e8c4b9..acbc9b45b4f65 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -232,15 +232,17 @@ private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardI } private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception { - final ClusterAllocationExplanation explanation = - client().admin().cluster().prepareAllocationExplain() - .setIndex(indexName).setShard(shardId.id()).setPrimary(true) - .get().getExplanation(); - - final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); - assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); - assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), - equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(shardId.id()).setPrimary(true) + .get().getExplanation(); + + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + }); } } From 1d4eaf67bcefbc4683323c70a6694321c1f0d2df Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 13:34:46 +0000 Subject: [PATCH 03/17] Don't withdraw no votes, because this withdraws _all_ the votes --- .../elasticsearch/test/InternalTestCluster.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 0ffd5bea546b5..19a4b608508e2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1636,12 +1636,14 @@ private synchronized void stopNodesAndClients(Collection nodeAndC if ((stoppingMasters != currentMasters && currentMasters <= stoppingMasters * 2) || rarely()) { // stopping fewer than _all_ of the master nodes, but at least half of them, requires their votes to be withdrawn first nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(withdrawnNodeIds::add); - logger.info("withdrawing votes from {} prior to shutdown", withdrawnNodeIds); - try { - client().execute(AddVotingTombstonesAction.INSTANCE, - new AddVotingTombstonesRequest(withdrawnNodeIds.toArray(new String[0]))).get(); - } catch (InterruptedException | ExecutionException e) { - throw new AssertionError("unexpected", e); + if (withdrawnNodeIds.isEmpty() == false) { + logger.info("withdrawing votes from {} prior to shutdown", withdrawnNodeIds); + try { + client().execute(AddVotingTombstonesAction.INSTANCE, + new AddVotingTombstonesRequest(withdrawnNodeIds.toArray(new String[0]))).get(); + } catch (InterruptedException | ExecutionException e) { + throw new AssertionError("unexpected", e); + } } } From 661ad5ecc2f895ed0d439009b8c2d56eaa835bb2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 15:43:34 +0000 Subject: [PATCH 04/17] Review feedback --- .../AddVotingTombstonesResponse.java | 20 +---------- .../ClearVotingTombstonesResponse.java | 5 --- .../TransportAddVotingTombstonesAction.java | 8 ++--- .../AddVotingTombstonesResponseTests.java | 18 ++-------- ...ansportAddVotingTombstonesActionTests.java | 36 ++++++------------- 5 files changed, 16 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java index 764ebd2d8e2e3..c0cef63b48742 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java @@ -31,22 +31,12 @@ * nodes have been removed from the voting configuration. */ public class AddVotingTombstonesResponse extends ActionResponse { - private final Set currentTombstones; - public AddVotingTombstonesResponse(Set currentTombstones) { - this.currentTombstones = currentTombstones; + public AddVotingTombstonesResponse() { } public AddVotingTombstonesResponse(StreamInput in) throws IOException { super(in); - currentTombstones = in.readSet(DiscoveryNode::new); - } - - /** - * @return the current set of tombstones at the point in time where all the requested nodes were removed from the voting configuration. - */ - public Set getCurrentTombstones() { - return currentTombstones; } @Override @@ -57,13 +47,5 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeCollection(currentTombstones, (o, v) -> v.writeTo(o)); - } - - @Override - public String toString() { - return "AddVotingTombstonesResponse{" + - "currentTombstones=" + currentTombstones + - '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java index eb827e0a70f65..1237e2e265fed 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java @@ -44,9 +44,4 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); } - - @Override - public String toString() { - return "ClearVotingTombstonesResponse{}"; - } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java index 3403d8fc8a3c7..6cd2ebbed0154 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -80,7 +80,7 @@ protected AddVotingTombstonesResponse read(StreamInput in) throws IOException { protected void masterOperation(AddVotingTombstonesRequest request, ClusterState state, ActionListener listener) throws Exception { - clusterService.getMasterService().submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask() { final ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext()); @@ -100,10 +100,6 @@ public ClusterState execute(ClusterState currentState) { } resolvedNodes.removeIf(n -> currentState.getVotingTombstones().contains(n)); - if (resolvedNodes.isEmpty()) { - throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(request.getNodeDescriptions()) - + " matched no master-eligible nodes that do not already have tombstones"); - } final int oldTombstoneCount = currentState.getVotingTombstones().size(); final int newTombstoneCount = resolvedNodes.size(); @@ -148,7 +144,7 @@ public String toString() { final Listener clusterStateListener = new Listener() { @Override public void onNewClusterState(ClusterState state) { - listener.onResponse(new AddVotingTombstonesResponse(state.getVotingTombstones())); + listener.onResponse(new AddVotingTombstonesResponse()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java index 216518017827f..4b7e5a95ecbf2 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java @@ -18,26 +18,14 @@ */ package org.elasticsearch.action.admin.cluster.configuration; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import static org.hamcrest.Matchers.equalTo; public class AddVotingTombstonesResponseTests extends ESTestCase { public void testSerialization() throws IOException { - int tombstoneCount = between(0, 5); - Set tombstones = new HashSet<>(); - while (tombstones.size() < tombstoneCount) { - tombstones.add(new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)); - } - final AddVotingTombstonesResponse originalRequest = new AddVotingTombstonesResponse(tombstones); - final AddVotingTombstonesResponse deserialized - = copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesResponse::new); - assertThat(deserialized.getCurrentTombstones(), equalTo(originalRequest.getCurrentTombstones())); + final AddVotingTombstonesResponse originalRequest = new AddVotingTombstonesResponse(); + copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesResponse::new); + // there are no fields so we're just checking that this doesn't throw anything } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java index 12bafed5dbb1f..0bcf23363bfed 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java @@ -123,92 +123,82 @@ public void setupForTest() { public void testWithdrawsVoteFromANode() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce responseHolder = new SetOnce<>(); clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, new AddVotingTombstonesRequest(new String[]{"other1"}), expectSuccess(r -> { - responseHolder.set(r); + assertNotNull(r); countDownLatch.countDown(); }) ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(responseHolder.get().getCurrentTombstones(), contains(otherNode1)); assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); } public void testWithdrawsVotesFromMultipleNodes() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce responseHolder = new SetOnce<>(); clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, new AddVotingTombstonesRequest(new String[]{"other1", "other2"}), expectSuccess(r -> { - responseHolder.set(r); + assertNotNull(r); countDownLatch.countDown(); }) ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(responseHolder.get().getCurrentTombstones(), containsInAnyOrder(otherNode1, otherNode2)); assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); } public void testWithdrawsVotesFromNodesMatchingWildcard() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce responseHolder = new SetOnce<>(); clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, new AddVotingTombstonesRequest(new String[]{"other*"}), expectSuccess(r -> { - responseHolder.set(r); + assertNotNull(r); countDownLatch.countDown(); }) ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(responseHolder.get().getCurrentTombstones(), containsInAnyOrder(otherNode1, otherNode2)); assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); } public void testWithdrawsVotesFromAllMasterEligibleNodes() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce responseHolder = new SetOnce<>(); clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, new AddVotingTombstonesRequest(new String[]{"_all"}), expectSuccess(r -> { - responseHolder.set(r); + assertNotNull(r); countDownLatch.countDown(); }) ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(responseHolder.get().getCurrentTombstones(), containsInAnyOrder(localNode, otherNode1, otherNode2)); assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(localNode, otherNode1, otherNode2)); } public void testWithdrawsVoteFromLocalNode() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce responseHolder = new SetOnce<>(); clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, new AddVotingTombstonesRequest(new String[]{"_local"}), expectSuccess(r -> { - responseHolder.set(r); + assertNotNull(r); countDownLatch.countDown(); }) ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(responseHolder.get().getCurrentTombstones(), contains(localNode)); assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(localNode)); } @@ -218,19 +208,17 @@ public void testReturnsImmediatelyIfVoteAlreadyWithdrawn() throws InterruptedExc .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2))); final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce responseHolder = new SetOnce<>(); // no observer to reconfigure transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, new AddVotingTombstonesRequest(new String[]{"other1"}, TimeValue.ZERO), expectSuccess(r -> { - responseHolder.set(r); + assertNotNull(r); countDownLatch.countDown(); }) ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(responseHolder.get().getCurrentTombstones(), contains(otherNode1)); assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); } @@ -271,27 +259,23 @@ public void testOnlyMatchesMasterEligibleNodes() throws InterruptedException { equalTo("add voting tombstones request for [_all, master:false] matched no master-eligible nodes")); } - public void testReturnsErrorIfNoTombstonesAdded() throws InterruptedException { + public void testSucceedsEvenIfAllTombstonesAlreadyAdded() throws InterruptedException { final ClusterState.Builder builder = builder(clusterService.state()); builder.addVotingTombstone(otherNode1); setState(clusterService, builder); final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce exceptionHolder = new SetOnce<>(); transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, new AddVotingTombstonesRequest(new String[]{"other1"}), - expectError(e -> { - exceptionHolder.set(e); + expectSuccess(r -> { + assertNotNull(r); countDownLatch.countDown(); }) ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - final Throwable rootCause = exceptionHolder.get().getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat(rootCause.getMessage(), - equalTo("add voting tombstones request for [other1] matched no master-eligible nodes that do not already have tombstones")); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); } public void testReturnsErrorIfMaximumTombstoneCountExceeded() throws InterruptedException { From af714a06a1f3900a0edcbbe669175deddf3bfcac Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 16:04:53 +0000 Subject: [PATCH 05/17] Pre-flight resolution --- .../AddVotingTombstonesRequest.java | 18 +++++++++ .../TransportAddVotingTombstonesAction.java | 14 ++----- .../AddVotingTombstonesRequestTests.java | 37 +++++++++++++++++++ 3 files changed, 58 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java index ca9767f7549a2..18266ca0c7db3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java @@ -20,6 +20,8 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -27,6 +29,8 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; /** * A request to add voting tombstones for certain master-eligible nodes, and wait for these nodes to be removed from the voting @@ -65,6 +69,20 @@ public AddVotingTombstonesRequest(StreamInput in) throws IOException { timeout = in.readTimeValue(); } + Set resolveNodes(ClusterState currentState) { + DiscoveryNodes allNodes = currentState.nodes(); + Set resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions)) + .map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet()); + + if (resolvedNodes.isEmpty()) { + throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions) + + " matched no master-eligible nodes"); + } + + resolvedNodes.removeIf(n -> currentState.getVotingTombstones().contains(n)); + return resolvedNodes; + } + /** * @return descriptions of the nodes for whom to add tombstones. */ diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java index 6cd2ebbed0154..bafaf87748a7c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -80,6 +79,8 @@ protected AddVotingTombstonesResponse read(StreamInput in) throws IOException { protected void masterOperation(AddVotingTombstonesRequest request, ClusterState state, ActionListener listener) throws Exception { + request.resolveNodes(state); // throws IllegalArgumentException if no nodes matched + clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask() { final ClusterStateObserver observer @@ -89,17 +90,8 @@ protected void masterOperation(AddVotingTombstonesRequest request, ClusterState @Override public ClusterState execute(ClusterState currentState) { - final DiscoveryNodes allNodes = currentState.nodes(); assert resolvedNodes == null : resolvedNodes; - resolvedNodes = Arrays.stream(allNodes.resolveNodes(request.getNodeDescriptions())) - .map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet()); - - if (resolvedNodes.isEmpty()) { - throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(request.getNodeDescriptions()) - + " matched no master-eligible nodes"); - } - - resolvedNodes.removeIf(n -> currentState.getVotingTombstones().contains(n)); + resolvedNodes = request.resolveNodes(currentState); final int oldTombstoneCount = currentState.getVotingTombstones().size(); final int newTombstoneCount = resolvedNodes.size(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java index 53533e70a4799..940d3450704b9 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java @@ -18,11 +18,22 @@ */ package org.elasticsearch.action.admin.cluster.configuration; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; public class AddVotingTombstonesRequestTests extends ESTestCase { @@ -38,4 +49,30 @@ public void testSerialization() throws IOException { assertThat(deserialized.getNodeDescriptions(), equalTo(originalRequest.getNodeDescriptions())); assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout())); } + + public void testResolve() { + final DiscoveryNode localNode + = new DiscoveryNode("local", "local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode1 + = new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode2 + = new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherDataNode + = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + + final ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")).nodes(new Builder() + .add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode).localNodeId(localNode.getId())).build(); + + assertThat(makeRequest().resolveNodes(clusterState), containsInAnyOrder(localNode, otherNode1, otherNode2)); + assertThat(makeRequest("_all").resolveNodes(clusterState), containsInAnyOrder(localNode, otherNode1, otherNode2)); + assertThat(makeRequest("_local").resolveNodes(clusterState), contains(localNode)); + assertThat(makeRequest("other*").resolveNodes(clusterState), containsInAnyOrder(otherNode1, otherNode2)); + + assertThat(expectThrows(IllegalArgumentException.class, () -> makeRequest("not-a-node").resolveNodes(clusterState)).getMessage(), + equalTo("add voting tombstones request for [not-a-node] matched no master-eligible nodes")); + } + + private static AddVotingTombstonesRequest makeRequest(String... descriptions) { + return new AddVotingTombstonesRequest(descriptions); + } } From 6a340dcf9c86dca99cce4becabc7f3c6ef017fcc Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 16:06:58 +0000 Subject: [PATCH 06/17] Lambdas --- .../TransportAddVotingTombstonesAction.java | 20 ++++++---------- .../TransportClearVotingTombstonesAction.java | 24 +++++++------------ 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java index bafaf87748a7c..0a7d44f31e52e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -120,17 +120,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final Set resolvedNodeIds = resolvedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - final Predicate allNodesRemoved = new Predicate() { - @Override - public boolean test(ClusterState clusterState) { - final Set votingNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); - return resolvedNodeIds.stream().anyMatch(votingNodeIds::contains) == false; - } - - @Override - public String toString() { - return "withdrawal of votes from " + resolvedNodes; - } + final Predicate allNodesRemoved = clusterState -> { + final Set votingNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); + return resolvedNodeIds.stream().noneMatch(votingNodeIds::contains); }; final Listener clusterStateListener = new Listener() { @@ -141,12 +133,14 @@ public void onNewClusterState(ClusterState state) { @Override public void onClusterServiceClose() { - listener.onFailure(new ElasticsearchException("cluster service closed while waiting for " + allNodesRemoved)); + listener.onFailure(new ElasticsearchException("cluster service closed while waiting for withdrawal of votes from " + + resolvedNodes)); } @Override public void onTimeout(TimeValue timeout) { - listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for " + allNodesRemoved)); + listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for withdrawal of votes from " + + resolvedNodes)); } }; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java index 830fb3d269337..ab5be65869939 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java @@ -74,21 +74,13 @@ protected void masterOperation(ClearVotingTombstonesRequest request, ClusterStat final long startTimeMillis = threadPool.relativeTimeInMillis(); - final Predicate allTombstonedNodesRemoved = new Predicate() { - @Override - public boolean test(ClusterState newState) { - for (DiscoveryNode tombstone : initialState.getVotingTombstones()) { - if (newState.nodes().nodeExists(tombstone.getId())) { - return false; - } + final Predicate allTombstonedNodesRemoved = newState -> { + for (DiscoveryNode tombstone : initialState.getVotingTombstones()) { + if (newState.nodes().nodeExists(tombstone.getId())) { + return false; } - return true; - } - - @Override - public String toString() { - return "removal of nodes " + initialState.getVotingTombstones(); } + return true; }; if (request.getWaitForRemoval() && allTombstonedNodesRemoved.test(initialState) == false) { @@ -103,12 +95,14 @@ public void onNewClusterState(ClusterState state) { @Override public void onClusterServiceClose() { - listener.onFailure(new ElasticsearchException("cluster service closed while waiting for " + allTombstonedNodesRemoved)); + listener.onFailure(new ElasticsearchException("cluster service closed while waiting for removal of nodes " + + initialState.getVotingTombstones())); } @Override public void onTimeout(TimeValue timeout) { - listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for " + allTombstonedNodesRemoved)); + listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for removal of nodes " + + initialState.getVotingTombstones())); } }, allTombstonedNodesRemoved); } else { From 0a1f9fbaa060d305e2b6389f20e72d4fc0046a8c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 16:19:56 +0000 Subject: [PATCH 07/17] Review feedback --- .../configuration/TransportClearVotingTombstonesAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java index ab5be65869939..9d271dd455695 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java @@ -76,6 +76,8 @@ protected void masterOperation(ClearVotingTombstonesRequest request, ClusterStat final Predicate allTombstonedNodesRemoved = newState -> { for (DiscoveryNode tombstone : initialState.getVotingTombstones()) { + // NB checking for the existence of any node with this persistent ID, because persistent IDs are how votes are counted. + // Calling nodeExists(tombstone) is insufficient because this compares on the ephemeral ID. if (newState.nodes().nodeExists(tombstone.getId())) { return false; } @@ -112,7 +114,7 @@ public void onTimeout(TimeValue timeout) { private void submitClearTombstonesTask(ClearVotingTombstonesRequest request, long startTimeMillis, ActionListener listener) { - clusterService.getMasterService().submitStateUpdateTask("clear-voting-tombstones", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("clear-voting-tombstones", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { final Builder builder = ClusterState.builder(currentState); From 2a8b054cfc1f5ee8878e96fdb9a6fce872494826 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 16:20:14 +0000 Subject: [PATCH 08/17] Remove ActionFilters.EMPTY_FILTERS --- .../java/org/elasticsearch/action/support/ActionFilters.java | 3 --- .../bootstrap/TransportBootstrapClusterActionTests.java | 4 +++- .../bootstrap/TransportGetDiscoveredNodesActionTests.java | 5 +++-- .../TransportAddVotingTombstonesActionTests.java | 4 ++-- .../TransportClearVotingTombstonesActionTests.java | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java b/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java index 3bb0f80d4abc9..e7b850ca89326 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java +++ b/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java @@ -30,9 +30,6 @@ */ public class ActionFilters { - // this could be used in many more places - TODO use this where appropriate - public static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); - private final ActionFilter[] filters; public ActionFilters(Set actionFilters) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java index eb8341aeb4021..c94bb145a1f81 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; @@ -54,7 +55,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; -import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -62,6 +62,8 @@ public class TransportBootstrapClusterActionTests extends ESTestCase { + private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private DiscoveryNode discoveryNode; private static ThreadPool threadPool; private TransportService transportService; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index ac3821f768168..1a8ef39fdf8e7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; @@ -57,7 +58,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; @@ -70,6 +70,8 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { + private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private static ThreadPool threadPool; private DiscoveryNode localNode; private String clusterName; @@ -118,7 +120,6 @@ public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedEx verifyZeroInteractions(discovery); new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action - transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java index 0bcf23363bfed..87db69ef7877b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState.VotingConfiguration; @@ -59,7 +60,6 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING; -import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; import static org.elasticsearch.cluster.ClusterState.builder; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; @@ -105,7 +105,7 @@ public void setupForTest() { transportService = transport.createTransportService(Settings.EMPTY, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); - new TransportAddVotingTombstonesAction(transportService, clusterService, threadPool, EMPTY_FILTERS, + new TransportAddVotingTombstonesAction(transportService, clusterService, threadPool, new ActionFilters(emptySet()), new IndexNameExpressionResolver()); // registers action transportService.start(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java index 74a349da1a33e..41a0cc6d8a47d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -50,7 +51,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.elasticsearch.action.support.ActionFilters.EMPTY_FILTERS; import static org.elasticsearch.cluster.ClusterState.builder; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; @@ -88,7 +88,7 @@ public void setupForTest() { transportService = transport.createTransportService(Settings.EMPTY, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); - new TransportClearVotingTombstonesAction(transportService, clusterService, threadPool, EMPTY_FILTERS, + new TransportClearVotingTombstonesAction(transportService, clusterService, threadPool, new ActionFilters(emptySet()), new IndexNameExpressionResolver()); // registers action transportService.start(); From cac7101f6609de1f0d93d1c6f2957f8de3110eb0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 16:23:01 +0000 Subject: [PATCH 09/17] Review feedback --- .../main/java/org/elasticsearch/cluster/ClusterState.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index bf289d56594e2..90f8c9317fa3f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -323,7 +323,7 @@ public String toString() { sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("last committed config: ").append(getLastCommittedConfiguration()).append("\n"); sb.append("last accepted config: ").append(getLastAcceptedConfiguration()).append("\n"); - sb.append("voting tombstones: ").append(votingTombstones.toString()).append("\n"); + sb.append("voting tombstones: ").append(votingTombstones).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); sb.append("meta data version: ").append(metaData.version()).append("\n"); final String TAB = " "; @@ -766,7 +766,8 @@ public ClusterState build() { uuid = UUIDs.randomBase64UUID(); } return new ClusterState(clusterName, term, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), - lastCommittedConfiguration, lastAcceptedConfiguration, Collections.unmodifiableSet(votingTombstones), fromDiff); + lastCommittedConfiguration, lastAcceptedConfiguration, Collections.unmodifiableSet(new HashSet<>(votingTombstones)), + fromDiff); } public static byte[] toBytes(ClusterState state) throws IOException { From 39c757929a111b1c7142e5e3c2be68fac7f158e8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 16:25:35 +0000 Subject: [PATCH 10/17] Create observer later --- .../configuration/TransportAddVotingTombstonesAction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java index 0a7d44f31e52e..0aa5d3ec7046e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -83,9 +83,6 @@ protected void masterOperation(AddVotingTombstonesRequest request, ClusterState clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask() { - final ClusterStateObserver observer - = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext()); - private Set resolvedNodes; @Override @@ -118,6 +115,9 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + final ClusterStateObserver observer + = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext()); + final Set resolvedNodeIds = resolvedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); final Predicate allNodesRemoved = clusterState -> { From 403b5b37e4e889f85256aafb4fae181f02ebd02c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Nov 2018 16:29:44 +0000 Subject: [PATCH 11/17] Imports --- .../cluster/configuration/AddVotingTombstonesResponse.java | 2 -- .../java/org/elasticsearch/action/support/ActionFilters.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java index c0cef63b48742..2fee3c848c5f0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java @@ -19,12 +19,10 @@ package org.elasticsearch.action.admin.cluster.configuration; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; -import java.util.Set; /** * A response to {@link AddVotingTombstonesRequest} indicating that voting tombstones have been added for the requested nodes and these diff --git a/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java b/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java index e7b850ca89326..c66bac31a3dee 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java +++ b/server/src/main/java/org/elasticsearch/action/support/ActionFilters.java @@ -23,8 +23,6 @@ import java.util.Comparator; import java.util.Set; -import static java.util.Collections.emptySet; - /** * Holds the action filters injected through plugins, properly sorted by {@link org.elasticsearch.action.support.ActionFilter#order()} */ From 24134d50bf7a819114591327115e2754f4295880 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 13 Nov 2018 12:35:28 +0000 Subject: [PATCH 12/17] Change default of waitForRemoval to true --- .../configuration/ClearVotingTombstonesRequest.java | 7 ++++--- .../TransportClearVotingTombstonesAction.java | 3 ++- .../TransportClearVotingTombstonesActionTests.java | 9 ++++----- .../java/org/elasticsearch/test/InternalTestCluster.java | 4 +--- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java index fbe7968dba87b..84d3917e86e36 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java @@ -30,7 +30,7 @@ * A request to clear the voting tombstones from the cluster state, optionally waiting for these nodes to be removed from the cluster first. */ public class ClearVotingTombstonesRequest extends MasterNodeRequest { - private boolean waitForRemoval; + private boolean waitForRemoval = true; private TimeValue timeout = TimeValue.timeValueSeconds(30); /** @@ -46,14 +46,15 @@ public ClearVotingTombstonesRequest(StreamInput in) throws IOException { } /** - * @return whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. + * @return whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. True by default. */ public boolean getWaitForRemoval() { return waitForRemoval; } /** - * @param waitForRemoval whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. + * @param waitForRemoval whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. True + * by default. */ public void setWaitForRemoval(boolean waitForRemoval) { this.waitForRemoval = waitForRemoval; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java index 9d271dd455695..b1269f32da9b3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java @@ -103,7 +103,8 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for removal of nodes " + listener.onFailure(new ElasticsearchTimeoutException( + "timed out waiting for removal of nodes; if nodes should not be removed, set waitForRemoval to false. " + initialState.getVotingTombstones())); } }, allTombstonedNodesRemoved); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java index 41a0cc6d8a47d..c7ccdd20b84eb 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java @@ -106,8 +106,10 @@ public void testClearsVotingTombstones() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); final SetOnce responseHolder = new SetOnce<>(); + final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest(); + clearVotingTombstonesRequest.setWaitForRemoval(false); transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, - new ClearVotingTombstonesRequest(), + clearVotingTombstonesRequest, expectSuccess(r -> { responseHolder.set(r); countDownLatch.countDown(); @@ -124,7 +126,6 @@ public void testTimesOutIfWaitingForNodesThatAreNotRemoved() throws InterruptedE final SetOnce responseHolder = new SetOnce<>(); final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest(); - clearVotingTombstonesRequest.setWaitForRemoval(true); clearVotingTombstonesRequest.setTimeout(TimeValue.timeValueMillis(100)); transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, clearVotingTombstonesRequest, @@ -145,10 +146,8 @@ public void testSucceedsIfNodesAreRemovedWhileWaiting() throws InterruptedExcept final CountDownLatch countDownLatch = new CountDownLatch(1); final SetOnce responseHolder = new SetOnce<>(); - final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest(); - clearVotingTombstonesRequest.setWaitForRemoval(true); transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, - clearVotingTombstonesRequest, + new ClearVotingTombstonesRequest(), expectSuccess(r -> { responseHolder.set(r); countDownLatch.countDown(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 19a4b608508e2..f11a0270a9fb8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1662,9 +1662,7 @@ private synchronized void stopNodesAndClients(Collection nodeAndC if (withdrawnNodeIds.isEmpty() == false) { logger.info("removing voting tombstones for {} after shutdown", withdrawnNodeIds); try { - final ClearVotingTombstonesRequest request = new ClearVotingTombstonesRequest(); - request.setWaitForRemoval(true); - client().execute(ClearVotingTombstonesAction.INSTANCE, request).get(); + client().execute(ClearVotingTombstonesAction.INSTANCE, new ClearVotingTombstonesRequest()).get(); } catch (InterruptedException | ExecutionException e) { throw new AssertionError("unexpected", e); } From ae9eb028037ad9c176455163bc1ba627505f9107 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 13 Nov 2018 12:36:18 +0000 Subject: [PATCH 13/17] Tombstone tasks are URGENT --- .../configuration/TransportAddVotingTombstonesAction.java | 3 ++- .../configuration/TransportClearVotingTombstonesAction.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java index 0aa5d3ec7046e..9c18fc5baab82 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Setting; @@ -81,7 +82,7 @@ protected void masterOperation(AddVotingTombstonesRequest request, ClusterState request.resolveNodes(state); // throws IllegalArgumentException if no nodes matched - clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) { private Set resolvedNodes; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java index b1269f32da9b3..14592b37ed13f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; @@ -115,7 +116,7 @@ public void onTimeout(TimeValue timeout) { private void submitClearTombstonesTask(ClearVotingTombstonesRequest request, long startTimeMillis, ActionListener listener) { - clusterService.submitStateUpdateTask("clear-voting-tombstones", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("clear-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { final Builder builder = ClusterState.builder(currentState); From ab72d36170bc746b94cd560f6a701130749beaa1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 13 Nov 2018 12:54:56 +0000 Subject: [PATCH 14/17] Check maximum in preflight too --- .../AddVotingTombstonesRequest.java | 18 ++++++++++-- .../TransportAddVotingTombstonesAction.java | 22 ++++++--------- .../AddVotingTombstonesRequestTests.java | 28 +++++++++++++++++++ 3 files changed, 52 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java index 18266ca0c7db3..0ffc3d5567aa3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java @@ -70,8 +70,8 @@ public AddVotingTombstonesRequest(StreamInput in) throws IOException { } Set resolveNodes(ClusterState currentState) { - DiscoveryNodes allNodes = currentState.nodes(); - Set resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions)) + final DiscoveryNodes allNodes = currentState.nodes(); + final Set resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions)) .map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet()); if (resolvedNodes.isEmpty()) { @@ -83,6 +83,20 @@ Set resolveNodes(ClusterState currentState) { return resolvedNodes; } + Set resolveNodesAndCheckMaximum(ClusterState currentState, int maxTombstoneCount, String maximumSettingKey) { + final Set resolvedNodes = resolveNodes(currentState); + + final int oldTombstoneCount = currentState.getVotingTombstones().size(); + final int newTombstoneCount = resolvedNodes.size(); + if (oldTombstoneCount + newTombstoneCount > maxTombstoneCount) { + throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions) + + " would add [" + newTombstoneCount + "] voting tombstones to the existing [" + oldTombstoneCount + + "] which would exceed the maximum of [" + maxTombstoneCount + "] set by [" + + maximumSettingKey + "]"); + } + return resolvedNodes; + } + /** * @return descriptions of the nodes for whom to add tombstones. */ diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java index 9c18fc5baab82..34f452988d209 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -44,7 +44,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Arrays; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -80,7 +79,7 @@ protected AddVotingTombstonesResponse read(StreamInput in) throws IOException { protected void masterOperation(AddVotingTombstonesRequest request, ClusterState state, ActionListener listener) throws Exception { - request.resolveNodes(state); // throws IllegalArgumentException if no nodes matched + resolveNodesAndCheckMaximum(request, state); // throws IllegalArgumentException if no nodes matched or maximum exceeded clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) { @@ -89,22 +88,12 @@ protected void masterOperation(AddVotingTombstonesRequest request, ClusterState @Override public ClusterState execute(ClusterState currentState) { assert resolvedNodes == null : resolvedNodes; - resolvedNodes = request.resolveNodes(currentState); - - final int oldTombstoneCount = currentState.getVotingTombstones().size(); - final int newTombstoneCount = resolvedNodes.size(); - final int maxTombstoneCount = MAXIMUM_VOTING_TOMBSTONES_SETTING.get(currentState.metaData().settings()); - if (oldTombstoneCount + newTombstoneCount > maxTombstoneCount) { - throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(request.getNodeDescriptions()) - + " would add [" + newTombstoneCount + "] voting tombstones to the existing [" + oldTombstoneCount - + "] which would exceed the maximum of [" + maxTombstoneCount + "] set by [" - + MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey() + "]"); - } + resolvedNodes = resolveNodesAndCheckMaximum(request, currentState); final Builder builder = ClusterState.builder(currentState); resolvedNodes.forEach(builder::addVotingTombstone); final ClusterState newState = builder.build(); - assert newState.getVotingTombstones().size() <= maxTombstoneCount; + assert newState.getVotingTombstones().size() <= MAXIMUM_VOTING_TOMBSTONES_SETTING.get(currentState.metaData().settings()); return newState; } @@ -154,6 +143,11 @@ public void onTimeout(TimeValue timeout) { }); } + private static Set resolveNodesAndCheckMaximum(AddVotingTombstonesRequest request, ClusterState state) { + return request.resolveNodesAndCheckMaximum(state, + MAXIMUM_VOTING_TOMBSTONES_SETTING.get(state.metaData().settings()), MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey()); + } + @Override protected ClusterBlockException checkBlock(AddVotingTombstonesRequest request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java index 940d3450704b9..cac669aa7c820 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java @@ -72,6 +72,34 @@ public void testResolve() { equalTo("add voting tombstones request for [not-a-node] matched no master-eligible nodes")); } + public void testResolveAndCheckMaximum() { + final DiscoveryNode localNode + = new DiscoveryNode("local", "local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode1 + = new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode2 + = new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + + final ClusterState.Builder builder = ClusterState.builder(new ClusterName("cluster")).nodes(new Builder() + .add(localNode).add(otherNode1).add(otherNode2).localNodeId(localNode.getId())); + builder.addVotingTombstone(otherNode1); + final ClusterState clusterState = builder.build(); + + assertThat(makeRequest().resolveNodesAndCheckMaximum(clusterState, 3, "setting.name"), + containsInAnyOrder(localNode, otherNode2)); + assertThat(makeRequest("_local").resolveNodesAndCheckMaximum(clusterState, 2, "setting.name"), + contains(localNode)); + + assertThat(expectThrows(IllegalArgumentException.class, + () -> makeRequest().resolveNodesAndCheckMaximum(clusterState, 2, "setting.name")).getMessage(), + equalTo("add voting tombstones request for [] would add [2] voting tombstones to the existing [1] which would exceed the " + + "maximum of [2] set by [setting.name]")); + assertThat(expectThrows(IllegalArgumentException.class, + () -> makeRequest("_local").resolveNodesAndCheckMaximum(clusterState, 1, "setting.name")).getMessage(), + equalTo("add voting tombstones request for [_local] would add [1] voting tombstones to the existing [1] which would exceed " + + "the maximum of [1] set by [setting.name]")); + } + private static AddVotingTombstonesRequest makeRequest(String... descriptions) { return new AddVotingTombstonesRequest(descriptions); } From efbaf1b01f8b975b301689708885cd4985e5ca00 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 13 Nov 2018 13:43:16 +0000 Subject: [PATCH 15/17] Always withdraw votes --- .../test/InternalTestCluster.java | 23 +++++++++++-------- .../test/test/InternalTestClusterIT.java | 21 ++++++++++++++++- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index f11a0270a9fb8..42e2d6008c608 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1633,17 +1633,20 @@ private synchronized void stopNodesAndClients(Collection nodeAndC final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count(); assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters; - if ((stoppingMasters != currentMasters && currentMasters <= stoppingMasters * 2) || rarely()) { - // stopping fewer than _all_ of the master nodes, but at least half of them, requires their votes to be withdrawn first + if (stoppingMasters != currentMasters && stoppingMasters > 0) { + // If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first. + // However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have + // been updated when the previous nodes shut down, so we must always explicitly withdraw votes. + // TODO add cluster health API to check that voting configuration is optimal so this isn't always needed nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(withdrawnNodeIds::add); - if (withdrawnNodeIds.isEmpty() == false) { - logger.info("withdrawing votes from {} prior to shutdown", withdrawnNodeIds); - try { - client().execute(AddVotingTombstonesAction.INSTANCE, - new AddVotingTombstonesRequest(withdrawnNodeIds.toArray(new String[0]))).get(); - } catch (InterruptedException | ExecutionException e) { - throw new AssertionError("unexpected", e); - } + assert withdrawnNodeIds.size() == stoppingMasters; + + logger.info("withdrawing votes from {} prior to shutdown", withdrawnNodeIds); + try { + client().execute(AddVotingTombstonesAction.INSTANCE, + new AddVotingTombstonesRequest(withdrawnNodeIds.toArray(new String[0]))).get(); + } catch (InterruptedException | ExecutionException e) { + throw new AssertionError("unexpected", e); } } diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java index 1109eadf88c8f..cafc812478f38 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java @@ -50,8 +50,27 @@ public void testStartingAndStoppingNodes() throws IOException { final int nodesToRemain = randomIntBetween(1, internalCluster().size() - 1); logger.info("--> reducing to [{}] nodes", nodesToRemain); internalCluster().ensureAtMostNumDataNodes(nodesToRemain); - ensureGreen(); assertThat(internalCluster().size(), lessThanOrEqualTo(nodesToRemain)); } + + ensureGreen(); + } + + public void testStoppingNodesOneByOne() throws IOException { + // In a 5+ node cluster there must be at least one reconfiguration as the nodes are shut down one-by-one before we drop to 2 nodes. + // If the nodes shut down too quickly then this reconfiguration does not have time to occur and the quorum is lost in the 3->2 + // transition, even though in a stable cluster the 3->2 transition requires no special treatment. + + internalCluster().startNodes(5); + ensureGreen(); + + while (internalCluster().size() > 1) { + final int nodesToRemain = internalCluster().size() - 1; + logger.info("--> reducing to [{}] nodes", nodesToRemain); + internalCluster().ensureAtMostNumDataNodes(nodesToRemain); + assertThat(internalCluster().size(), lessThanOrEqualTo(nodesToRemain)); + } + + ensureGreen(); } } From 933662fdadfde12a5003fa08967ac08c93bff718 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 13 Nov 2018 13:58:20 +0000 Subject: [PATCH 16/17] Fix message --- .../TransportClearVotingTombstonesActionTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java index c7ccdd20b84eb..ff978adcee85e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java @@ -139,7 +139,8 @@ public void testTimesOutIfWaitingForNodesThatAreNotRemoved() throws InterruptedE assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); final Throwable rootCause = responseHolder.get().getRootCause(); assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); - assertThat(rootCause.getMessage(), startsWith("timed out waiting for removal of nodes [")); + assertThat(rootCause.getMessage(), + startsWith("timed out waiting for removal of nodes; if nodes should not be removed, set waitForRemoval to false. [")); } public void testSucceedsIfNodesAreRemovedWhileWaiting() throws InterruptedException { From 5ca67f9e195d3209b1d0627ffe6c05bee5919f14 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 13 Nov 2018 15:27:55 +0000 Subject: [PATCH 17/17] Use stopRandomNode not ensureAtMostNumDataNodes --- .../org/elasticsearch/test/test/InternalTestClusterIT.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java index cafc812478f38..f9c90a052024e 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java @@ -65,10 +65,7 @@ public void testStoppingNodesOneByOne() throws IOException { ensureGreen(); while (internalCluster().size() > 1) { - final int nodesToRemain = internalCluster().size() - 1; - logger.info("--> reducing to [{}] nodes", nodesToRemain); - internalCluster().ensureAtMostNumDataNodes(nodesToRemain); - assertThat(internalCluster().size(), lessThanOrEqualTo(nodesToRemain)); + internalCluster().stopRandomNode(s -> true); } ensureGreen();