diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a4e1d3ed8c990..aabe5466d69a9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -397,6 +397,7 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) { } } + private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { assert Thread.holdsLock(mutex) == false; assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible"; @@ -413,30 +414,37 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(), stateForJoinValidation.getNodes().getMinNodeVersion()); } + sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback); - // validate the join on the joining node, will throw a failure if it fails the validation - joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener() { - @Override - public void onResponse(Empty empty) { - try { - processJoinRequest(joinRequest, joinCallback); - } catch (Exception e) { - joinCallback.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", - joinRequest.getSourceNode()), e); - joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e)); - } - }); } else { processJoinRequest(joinRequest, joinCallback); } } + // package private for tests + void sendValidateJoinRequest(ClusterState stateForJoinValidation, JoinRequest joinRequest, + JoinHelper.JoinCallback joinCallback) { + // validate the join on the joining node, will throw a failure if it fails the validation + joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener() { + @Override + public void onResponse(Empty empty) { + try { + processJoinRequest(joinRequest, joinCallback); + } catch (Exception e) { + joinCallback.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", + joinRequest.getSourceNode()), e); + joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e)); + } + }); + } + + private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { final Optional optionalJoin = joinRequest.getOptionalJoin(); synchronized (mutex) { diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java similarity index 70% rename from server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java rename to server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java index cda7065612714..d737ef790b5bd 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java @@ -17,9 +17,8 @@ * under the License. */ -package org.elasticsearch.discovery.zen; +package org.elasticsearch.cluster.coordination; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -27,41 +26,32 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; +import org.elasticsearch.discovery.zen.FaultDetection; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.TestCustomMetaData; -import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.BytesTransportRequest; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.RemoteTransportException; import java.io.IOException; -import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.EnumSet; -import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; @@ -73,13 +63,6 @@ @TestLogging("_root:DEBUG") public class ZenDiscoveryIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(TestZenDiscovery.USE_ZEN2.getKey(), false) // Zen1-specific stuff in some tests - .build(); - } - public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Exception { Settings defaultSettings = Settings.builder() .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") @@ -122,7 +105,7 @@ public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Excep assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster)); } - public void testNodeFailuresAreProcessedOnce() throws ExecutionException, InterruptedException, IOException { + public void testNodeFailuresAreProcessedOnce() throws IOException { Settings defaultSettings = Settings.builder() .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") @@ -161,78 +144,39 @@ public void testNodeFailuresAreProcessedOnce() throws ExecutionException, Interr assertThat(numUpdates.get(), either(equalTo(1)).or(equalTo(2))); // due to batching, both nodes can be handled in same CS update } - public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception { - List nodeNames = internalCluster().startNodes(2); - - List nonMasterNodes = new ArrayList<>(nodeNames); - nonMasterNodes.remove(internalCluster().getMasterName()); - String noneMasterNode = nonMasterNodes.get(0); - - ClusterState state = internalCluster().getInstance(ClusterService.class).state(); - DiscoveryNode node = null; - for (DiscoveryNode discoveryNode : state.nodes()) { - if (discoveryNode.getName().equals(noneMasterNode)) { - node = discoveryNode; - } - } - assert node != null; - - DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(state.nodes()) - .add(new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(), - emptySet(), Version.CURRENT)).masterNodeId("abc"); - ClusterState.Builder builder = ClusterState.builder(state); - builder.nodes(nodes); - BytesReference bytes = PublishClusterStateAction.serializeFullClusterState(builder.build(), node.getVersion()); - - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference reference = new AtomicReference<>(); - internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.SEND_ACTION_NAME, - new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - - @Override - public void handleResponse(TransportResponse.Empty response) { - super.handleResponse(response); - latch.countDown(); - } - - @Override - public void handleException(TransportException exp) { - super.handleException(exp); - reference.set(exp); - latch.countDown(); - } - }); - latch.await(); - assertThat(reference.get(), notNullValue()); - assertThat(ExceptionsHelper.detailedMessage(reference.get()), - containsString("cluster state from a different master than the current one, rejecting")); - } - - public void testHandleNodeJoin_incompatibleClusterState() throws UnknownHostException { - String masterOnlyNode = internalCluster().startMasterOnlyNode(); + public void testHandleNodeJoin_incompatibleClusterState() + throws InterruptedException, ExecutionException, TimeoutException { + String masterNode = internalCluster().startMasterOnlyNode(); String node1 = internalCluster().startNode(); - ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, masterOnlyNode); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1); + Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, masterNode); final ClusterState state = clusterService.state(); MetaData.Builder mdBuilder = MetaData.builder(state.metaData()); mdBuilder.putCustom(CustomMetaData.TYPE, new CustomMetaData("data")); ClusterState stateWithCustomMetaData = ClusterState.builder(state).metaData(mdBuilder).build(); - final AtomicReference holder = new AtomicReference<>(); + final CompletableFuture future = new CompletableFuture<>(); DiscoveryNode node = state.nodes().getLocalNode(); - zenDiscovery.handleJoinRequest(node, stateWithCustomMetaData, new MembershipAction.JoinCallback() { + + coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, Optional.empty()), + new JoinHelper.JoinCallback() { @Override public void onSuccess() { + future.completeExceptionally(new AssertionError("onSuccess should not be called")); } @Override public void onFailure(Exception e) { - holder.set((IllegalStateException) e); + future.complete(e); } }); - assertThat(holder.get(), notNullValue()); - assertThat(holder.get().getMessage(), equalTo("failure when sending a validation request to node")); + Throwable t = future.get(10, TimeUnit.SECONDS); + + assertTrue(t instanceof IllegalStateException); + assertTrue(t.getCause() instanceof RemoteTransportException); + assertTrue(t.getCause().getCause() instanceof IllegalArgumentException); + assertThat(t.getCause().getCause().getMessage(), containsString("Unknown NamedWriteable")); } public static class CustomMetaData extends TestCustomMetaData {