Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
}
}


private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
assert Thread.holdsLock(mutex) == false;
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible";
Expand All @@ -413,30 +414,37 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
stateForJoinValidation.getNodes().getMinNodeVersion());
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);

// validate the join on the joining node, will throw a failure if it fails the validation
joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
try {
processJoinRequest(joinRequest, joinCallback);
} catch (Exception e) {
joinCallback.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]",
joinRequest.getSourceNode()), e);
joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
}
});
} else {
processJoinRequest(joinRequest, joinCallback);
}
}

// package private for tests
void sendValidateJoinRequest(ClusterState stateForJoinValidation, JoinRequest joinRequest,
JoinHelper.JoinCallback joinCallback) {
// validate the join on the joining node, will throw a failure if it fails the validation
joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
try {
processJoinRequest(joinRequest, joinCallback);
} catch (Exception e) {
joinCallback.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]",
joinRequest.getSourceNode()), e);
joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
}
});
}


private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,41 @@
* under the License.
*/

package org.elasticsearch.discovery.zen;
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.TestCustomMetaData;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.RemoteTransportException;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -73,13 +63,6 @@
@TestLogging("_root:DEBUG")
public class ZenDiscoveryIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // Zen1-specific stuff in some tests
.build();
}

public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Exception {
Settings defaultSettings = Settings.builder()
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s")
Expand Down Expand Up @@ -122,7 +105,7 @@ public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Excep
assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster));
}

public void testNodeFailuresAreProcessedOnce() throws ExecutionException, InterruptedException, IOException {
public void testNodeFailuresAreProcessedOnce() throws IOException {
Settings defaultSettings = Settings.builder()
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s")
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1")
Expand Down Expand Up @@ -161,78 +144,39 @@ public void testNodeFailuresAreProcessedOnce() throws ExecutionException, Interr
assertThat(numUpdates.get(), either(equalTo(1)).or(equalTo(2))); // due to batching, both nodes can be handled in same CS update
}

public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception {
List<String> nodeNames = internalCluster().startNodes(2);

List<String> nonMasterNodes = new ArrayList<>(nodeNames);
nonMasterNodes.remove(internalCluster().getMasterName());
String noneMasterNode = nonMasterNodes.get(0);

ClusterState state = internalCluster().getInstance(ClusterService.class).state();
DiscoveryNode node = null;
for (DiscoveryNode discoveryNode : state.nodes()) {
if (discoveryNode.getName().equals(noneMasterNode)) {
node = discoveryNode;
}
}
assert node != null;

DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(state.nodes())
.add(new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT)).masterNodeId("abc");
ClusterState.Builder builder = ClusterState.builder(state);
builder.nodes(nodes);
BytesReference bytes = PublishClusterStateAction.serializeFullClusterState(builder.build(), node.getVersion());

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.SEND_ACTION_NAME,
new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

@Override
public void handleResponse(TransportResponse.Empty response) {
super.handleResponse(response);
latch.countDown();
}

@Override
public void handleException(TransportException exp) {
super.handleException(exp);
reference.set(exp);
latch.countDown();
}
});
latch.await();
assertThat(reference.get(), notNullValue());
assertThat(ExceptionsHelper.detailedMessage(reference.get()),
containsString("cluster state from a different master than the current one, rejecting"));
}

public void testHandleNodeJoin_incompatibleClusterState() throws UnknownHostException {
String masterOnlyNode = internalCluster().startMasterOnlyNode();
public void testHandleNodeJoin_incompatibleClusterState()
throws InterruptedException, ExecutionException, TimeoutException {
String masterNode = internalCluster().startMasterOnlyNode();
String node1 = internalCluster().startNode();
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, masterOnlyNode);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, masterNode);
final ClusterState state = clusterService.state();
MetaData.Builder mdBuilder = MetaData.builder(state.metaData());
mdBuilder.putCustom(CustomMetaData.TYPE, new CustomMetaData("data"));
ClusterState stateWithCustomMetaData = ClusterState.builder(state).metaData(mdBuilder).build();

final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
final CompletableFuture<Throwable> future = new CompletableFuture<>();
DiscoveryNode node = state.nodes().getLocalNode();
zenDiscovery.handleJoinRequest(node, stateWithCustomMetaData, new MembershipAction.JoinCallback() {

coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, Optional.empty()),
new JoinHelper.JoinCallback() {
@Override
public void onSuccess() {
future.completeExceptionally(new AssertionError("onSuccess should not be called"));
}

@Override
public void onFailure(Exception e) {
holder.set((IllegalStateException) e);
future.complete(e);
}
});

assertThat(holder.get(), notNullValue());
assertThat(holder.get().getMessage(), equalTo("failure when sending a validation request to node"));
Throwable t = future.get(10, TimeUnit.SECONDS);

assertTrue(t instanceof IllegalStateException);
assertTrue(t.getCause() instanceof RemoteTransportException);
assertTrue(t.getCause().getCause() instanceof IllegalArgumentException);
assertThat(t.getCause().getCause().getMessage(), containsString("Unknown NamedWriteable"));
}

public static class CustomMetaData extends TestCustomMetaData {
Expand Down