Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@

package org.elasticsearch.action.support;

import org.elasticsearch.common.CheckedConsumer;

public class PlainActionFuture<T> extends AdapterActionFuture<T, T> {

public static <T> PlainActionFuture<T> newFuture() {
return new PlainActionFuture<>();
}

public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T>, E> e) throws E {
PlainActionFuture<T> fut = newFuture();
e.accept(fut);
return fut.actionGet();
}

@Override
protected T convert(T listenerResponse) {
return listenerResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,23 +442,22 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
return;
}

transportService.connectToNode(joinRequest.getSourceNode());

final ClusterState stateForJoinValidation = getStateForMasterService();

if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
stateForJoinValidation.getNodes().getMinNodeVersion());
transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
final ClusterState stateForJoinValidation = getStateForMasterService();

if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
stateForJoinValidation.getNodes().getMinNodeVersion());
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
} else {
processJoinRequest(joinRequest, joinCallback);
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);

} else {
processJoinRequest(joinRequest, joinCallback);
}
}, joinCallback::onFailure));
}

// package private for tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -70,7 +71,7 @@ public HandshakingTransportAddressConnector(Settings settings, TransportService
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
protected void doRun() {

// TODO if transportService is already connected to this address then skip the handshaking

Expand All @@ -80,38 +81,68 @@ protected void doRun() throws Exception {
emptySet(), Version.CURRENT.minimumCompatibilityVersion());

logger.trace("[{}] opening probe connection", this);
final Connection connection = transportService.openConnection(targetNode,
transportService.openConnection(targetNode,
ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout,
TimeValue.MINUS_ONE, null));
logger.trace("[{}] opened probe connection", this);

final DiscoveryNode remoteNode;
try {
remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis());
// success means (amongst other things) that the cluster names match
logger.trace("[{}] handshake successful: {}", this, remoteNode);
} catch (Exception e) {
// we opened a connection and successfully performed a low-level handshake, so we were definitely talking to an
// Elasticsearch node, but the high-level handshake failed indicating some kind of mismatched configurations
// (e.g. cluster name) that the user should address
logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e);
listener.onFailure(e);
return;
} finally {
IOUtils.closeWhileHandlingException(connection);
}

if (remoteNode.equals(transportService.getLocalNode())) {
// TODO cache this result for some time? forever?
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
} else if (remoteNode.isMasterNode() == false) {
// TODO cache this result for some time?
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
} else {
transportService.connectToNode(remoteNode);
logger.trace("[{}] full connection successful: {}", this, remoteNode);
listener.onResponse(remoteNode);
}
TimeValue.MINUS_ONE, null), new ActionListener<>() {
@Override
public void onResponse(Connection connection) {
logger.trace("[{}] opened probe connection", this);

// use NotifyOnceListener to make sure the following line does not result in onFailure being called when
// the connection is closed in the onResponse handler
transportService.handshake(connection, probeHandshakeTimeout.millis(), new NotifyOnceListener<DiscoveryNode>() {

@Override
protected void innerOnResponse(DiscoveryNode remoteNode) {
try {
// success means (amongst other things) that the cluster names match
logger.trace("[{}] handshake successful: {}", this, remoteNode);
IOUtils.closeWhileHandlingException(connection);

if (remoteNode.equals(transportService.getLocalNode())) {
// TODO cache this result for some time? forever?
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
} else if (remoteNode.isMasterNode() == false) {
// TODO cache this result for some time?
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
} else {
transportService.connectToNode(remoteNode, new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
logger.trace("[{}] full connection successful: {}", this, remoteNode);
listener.onResponse(remoteNode);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
protected void innerOnFailure(Exception e) {
// we opened a connection and successfully performed a low-level handshake, so we were definitely
// talking to an Elasticsearch node, but the high-level handshake failed indicating some kind of
// mismatched configurations (e.g. cluster name) that the user should address
logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e);
IOUtils.closeWhileHandlingException(connection);
listener.onFailure(e);
}

});

}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

@Override
Expand Down
Loading