diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 1e61816b8afe4..38a18c7177dfa 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -37,6 +38,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; @@ -73,10 +75,27 @@ import java.util.function.Predicate; import java.util.function.Supplier; -public class TransportService extends AbstractLifecycleComponent implements ReportingService, TransportMessageListener, - TransportConnectionListener { +public class TransportService extends AbstractLifecycleComponent + implements ReportingService, TransportMessageListener, TransportConnectionListener { + private static final Logger logger = LogManager.getLogger(TransportService.class); + private static final String PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY = "es.unsafely_permit_handshake_from_incompatible_builds"; + private static final boolean PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS; + + static { + final String value = System.getProperty(PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY); + if (value == null) { + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = false; + } else if (Boolean.parseBoolean(value)) { + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = true; + } else { + throw new IllegalArgumentException("invalid value [" + value + "] for system property [" + + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "]"); + } + } + + public static final String DIRECT_RESPONSE_PROFILE = ".direct"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; @@ -115,6 +134,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private final RemoteClusterService remoteClusterService; private final boolean validateConnections; + private final boolean requireCompatibleBuild; /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; @@ -160,9 +180,15 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders, ConnectionManager connectionManager) { + + final boolean isTransportClient = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())); + + // If we are a transport client then we skip the check that the remote node has a compatible build hash + this.requireCompatibleBuild = isTransportClient == false; + // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler - this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false || - TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings); + this.validateConnections = isTransportClient == false || TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings); + this.transport = transport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); this.threadPool = threadPool; @@ -192,7 +218,14 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa false, false, HandshakeRequest::new, (request, channel, task) -> channel.sendResponse( - new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); + new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName))); + + if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) { + logger.warn("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" + + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning"); + DeprecationLogger.getLogger(TransportService.class).deprecate("permit_handshake_from_incompatible_builds", + "system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed"); + } } public RemoteClusterService getRemoteClusterService() { @@ -482,7 +515,7 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - , HandshakeResponse::new, ThreadPool.Names.GENERIC + , in -> new HandshakeResponse(in, requireCompatibleBuild), ThreadPool.Names.GENERIC )); } @@ -504,28 +537,89 @@ private HandshakeRequest() { } public static class HandshakeResponse extends TransportResponse { + + private static final Version BUILD_HASH_HANDSHAKE_VERSION = Version.V_7_11_0; + + private final Version version; + + @Nullable // if version < BUILD_HASH_HANDSHAKE_VERSION + private final String buildHash; + private final DiscoveryNode discoveryNode; + private final ClusterName clusterName; - private final Version version; - public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { - this.discoveryNode = discoveryNode; - this.version = version; - this.clusterName = clusterName; + public HandshakeResponse(Version version, String buildHash, DiscoveryNode discoveryNode, ClusterName clusterName) { + this.buildHash = Objects.requireNonNull(buildHash); + this.discoveryNode = Objects.requireNonNull(discoveryNode); + this.version = Objects.requireNonNull(version); + this.clusterName = Objects.requireNonNull(clusterName); } - public HandshakeResponse(StreamInput in) throws IOException { + public HandshakeResponse(StreamInput in, boolean requireCompatibleBuild) throws IOException { super(in); - discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); - clusterName = new ClusterName(in); - version = Version.readVersion(in); + if (in.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) { + // the first two fields need only VInts and raw (ASCII) characters, so we cross our fingers and hope that they appear + // on the wire as we expect them to even if this turns out to be an incompatible build + version = Version.readVersion(in); + buildHash = in.readString(); + + try { + // If the remote node is incompatible then make an effort to identify it anyway, so we can mention it in the exception + // message, but recognise that this may fail + discoveryNode = new DiscoveryNode(in); + } catch (Exception e) { + if (isIncompatibleBuild(version, buildHash, requireCompatibleBuild)) { + throw new IllegalArgumentException("unidentifiable remote node is build [" + buildHash + + "] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() + + "] of version [" + Version.CURRENT + "] which has an incompatible wire format", e); + } else { + throw e; + } + } + + if (isIncompatibleBuild(version, buildHash, requireCompatibleBuild)) { + if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) { + logger.warn("remote node [{}] is build [{}] of version [{}] but this node is build [{}] of version [{}] " + + "which may not be compatible; remove system property [{}] to resolve this warning", + discoveryNode, buildHash, version, Build.CURRENT.hash(), Version.CURRENT, + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY); + } else { + throw new IllegalArgumentException("remote node [" + discoveryNode + "] is build [" + buildHash + + "] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() + + "] of version [" + Version.CURRENT + "] which has an incompatible wire format"); + } + } + + clusterName = new ClusterName(in); + } else { + discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); + clusterName = new ClusterName(in); + version = Version.readVersion(in); + buildHash = null; + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(discoveryNode); - clusterName.writeTo(out); - Version.writeVersion(version, out); + if (out.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) { + Version.writeVersion(version, out); + out.writeString(buildHash); + discoveryNode.writeTo(out); + clusterName.writeTo(out); + } else { + out.writeOptionalWriteable(discoveryNode); + clusterName.writeTo(out); + Version.writeVersion(version, out); + } + } + + public Version getVersion() { + return version; + } + + public String getBuildHash() { + return buildHash; } public DiscoveryNode getDiscoveryNode() { @@ -535,6 +629,10 @@ public DiscoveryNode getDiscoveryNode() { public ClusterName getClusterName() { return clusterName; } + + private static boolean isIncompatibleBuild(Version version, String buildHash, boolean requireCompatibleBuild) { + return requireCompatibleBuild && version == Version.CURRENT && Build.CURRENT.hash().equals(buildHash) == false; + } } public void disconnectFromNode(DiscoveryNode node) { @@ -1354,4 +1452,5 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } } } + } diff --git a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 4b77cb8bd29ab..0ed37f71c1220 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.transport; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; @@ -102,7 +103,11 @@ public void sendRequest(long requestId, String action, TransportRequest request, } else if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); Version version = node.getVersion(); - transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(node, clusterName, version)); + transportResponseHandler.handleResponse(new TransportService.HandshakeResponse( + version, + Build.CURRENT.hash(), + node, + clusterName)); } else { throw new UnsupportedOperationException("Mock transport does not understand action " + action); diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index 36fbf11c4e0a7..e79815655d812 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.transport; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; @@ -180,7 +181,11 @@ public void sendRequest(Transport.Connection conne clusterStateLatch.countDown(); } else if (TransportService.HANDSHAKE_ACTION_NAME .equals(action)) { ((TransportResponseHandler) handler).handleResponse( - new TransportService.HandshakeResponse(connection.getNode(), clusterName, connection.getNode().getVersion())); + new TransportService.HandshakeResponse( + connection.getNode().getVersion(), + Build.CURRENT.hash(), + connection.getNode(), + clusterName)); } else { handler.handleException(new TransportException("", new InternalException(action))); } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index cc9a3e793f04d..6d1001014b8f8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -479,7 +480,7 @@ private TestTransportService(Transport transport, ThreadPool threadPool) { @Override public void handshake(Transport.Connection connection, TimeValue timeout, Predicate clusterNamePredicate, ActionListener listener) { - listener.onResponse(new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT)); + listener.onResponse(new HandshakeResponse(Version.CURRENT, Build.CURRENT.hash(), connection.getNode(), new ClusterName(""))); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index 07e48cb710167..93213d9c973f7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -226,7 +227,11 @@ public void testFailsNodeThatDisconnects() { protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { assertFalse(node.equals(localNode)); if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + node, + ClusterName.DEFAULT)); return; } deterministicTaskQueue.scheduleNow(new Runnable() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index c30f284b47346..fb757a0f6fd99 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -221,7 +222,11 @@ public void testFollowerFailsImmediatelyOnDisconnection() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + node, + ClusterName.DEFAULT)); return; } assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); @@ -330,7 +335,11 @@ public void testFollowerFailsImmediatelyOnHealthCheckFailure() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + node, + ClusterName.DEFAULT)); return; } assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 2b448c4369e05..ff963ba6872e4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; @@ -162,8 +163,12 @@ private void setupMasterServiceAndCoordinator(long term, ClusterState initialSta @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(), - destination.getVersion())); + handleResponse(requestId, new TransportService.HandshakeResponse( + destination.getVersion(), + Build.CURRENT.hash(), + destination, + initialState.getClusterName() + )); } else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) { handleResponse(requestId, new TransportResponse.Empty()); } else { diff --git a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java index 5a83c5555949d..e5b6fb5963037 100644 --- a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -98,7 +99,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) { handleError(requestId, fullConnectionFailure); } else { - handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT)); + handleResponse(requestId, new HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + remoteNode, + new ClusterName(remoteClusterName))); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java index d7e18d46cb150..28bd1ba7d216b 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; @@ -58,7 +59,11 @@ public void testDeserializationFailureLogIdentifiesListener() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + otherNode, + new ClusterName(""))); } } }; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 6430583050ee1..18ce9fe824acd 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; @@ -50,6 +51,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; public class TransportServiceHandshakeTests extends ESTestCase { @@ -68,8 +70,9 @@ private NetworkHandle startServices(String nodeNameAndId, Settings settings, Ver new MockNioTransport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + final DisruptingTransportInterceptor transportInterceptor = new DisruptingTransportInterceptor(); TransportService transportService = new MockTransportService(settings, transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, (boundAddress) -> new DiscoveryNode( + transportInterceptor, (boundAddress) -> new DiscoveryNode( nodeNameAndId, nodeNameAndId, boundAddress.publishAddress(), @@ -79,7 +82,7 @@ PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections transportService.start(); transportService.acceptIncomingRequests(); transportServices.add(transportService); - return new NetworkHandle(transportService, transportService.getLocalNode()); + return new NetworkHandle(transportService, transportService.getLocalNode(), transportInterceptor); } @After @@ -219,14 +222,119 @@ public void testNodeConnectWithDifferentNodeIdFailsWhenSnifferTransportClient() assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } + public void testRejectsMismatchedBuildHash() { + final Settings settings = Settings.builder().put("cluster.name", "a").build(); + final NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT); + final NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT); + final DiscoveryNode discoveryNode = new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()); + handleA.transportInterceptor.setModifyBuildHash(true); + handleB.transportInterceptor.setModifyBuildHash(true); + TransportSerializationException ex = expectThrows(TransportSerializationException.class, () -> { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, TestProfiles.LIGHT_PROFILE)) { + PlainActionFuture.get(fut -> handleA.transportService.handshake(connection, timeout, ActionListener.map(fut, x -> null))); + } + }); + assertThat( + ExceptionsHelper.unwrap(ex, IllegalArgumentException.class).getMessage(), + containsString("which has an incompatible wire format")); + assertFalse(handleA.transportService.nodeConnected(discoveryNode)); + } + + public void testAcceptsMismatchedBuildHashFromDifferentVersion() { + final NetworkHandle handleA = startServices( + "TS_A", + Settings.builder().put("cluster.name", "a").build(), + Version.CURRENT); + final NetworkHandle handleB = startServices( + "TS_B", + Settings.builder().put("cluster.name", "a").build(), + Version.CURRENT.minimumCompatibilityVersion()); + handleA.transportInterceptor.setModifyBuildHash(true); + handleB.transportInterceptor.setModifyBuildHash(true); + handleA.transportService.connectToNode(handleB.discoveryNode, TestProfiles.LIGHT_PROFILE); + assertTrue(handleA.transportService.nodeConnected(handleB.discoveryNode)); + } + + public void testAcceptsMismatchedBuildHashForTransportClient() { + final NetworkHandle handleA = startServices( + "TS_A", + Settings.builder().put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE).build(), + Version.CURRENT); + final NetworkHandle handleB = startServices( + "TS_B", + Settings.builder().put("cluster.name", "a").build(), + Version.CURRENT); + handleA.transportInterceptor.setModifyBuildHash(true); + handleB.transportInterceptor.setModifyBuildHash(true); + handleA.transportService.connectToNode(handleB.discoveryNode, TestProfiles.LIGHT_PROFILE); + assertTrue(handleA.transportService.nodeConnected(handleB.discoveryNode)); + } private static class NetworkHandle { - private TransportService transportService; - private DiscoveryNode discoveryNode; + final TransportService transportService; + final DiscoveryNode discoveryNode; + final DisruptingTransportInterceptor transportInterceptor; - NetworkHandle(TransportService transportService, DiscoveryNode discoveryNode) { + NetworkHandle(TransportService transportService, DiscoveryNode discoveryNode, DisruptingTransportInterceptor transportInterceptor) { this.transportService = transportService; this.discoveryNode = discoveryNode; + this.transportInterceptor = transportInterceptor; + } + } + + private static class DisruptingTransportInterceptor implements TransportInterceptor { + + private boolean modifyBuildHash; + + public void setModifyBuildHash(boolean modifyBuildHash) { + this.modifyBuildHash = modifyBuildHash; + } + + @Override + public TransportRequestHandler interceptHandler( + String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler) { + + if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { + return (request, channel, task) -> actualHandler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public String getChannelType() { + return channel.getChannelType(); + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + assertThat(response, instanceOf(TransportService.HandshakeResponse.class)); + if (modifyBuildHash) { + final TransportService.HandshakeResponse handshakeResponse = (TransportService.HandshakeResponse) response; + channel.sendResponse(new TransportService.HandshakeResponse( + handshakeResponse.getVersion(), + handshakeResponse.getBuildHash() + "-modified", + handshakeResponse.getDiscoveryNode(), + handshakeResponse.getClusterName())); + } else { + channel.sendResponse(response); + } + } + + @Override + public void sendResponse(Exception exception) throws IOException { + channel.sendResponse(exception); + + } + }, task); + } else { + return actualHandler; + } } }