diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 17d20e064e403..8e0824e563617 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -34,6 +35,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; @@ -68,10 +70,27 @@ import java.util.function.Predicate; import java.util.function.Supplier; -public class TransportService extends AbstractLifecycleComponent implements ReportingService, TransportMessageListener, - TransportConnectionListener { +public class TransportService extends AbstractLifecycleComponent + implements ReportingService, TransportMessageListener, TransportConnectionListener { + private static final Logger logger = LogManager.getLogger(TransportService.class); + private static final String PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY = "es.unsafely_permit_handshake_from_incompatible_builds"; + private static final boolean PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS; + + static { + final String value = System.getProperty(PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY); + if (value == null) { + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = false; + } else if (Boolean.parseBoolean(value)) { + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = true; + } else { + throw new IllegalArgumentException("invalid value [" + value + "] for system property [" + + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "]"); + } + } + + public static final String DIRECT_RESPONSE_PROFILE = ".direct"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; @@ -182,7 +201,14 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa false, false, HandshakeRequest::new, (request, channel, task) -> channel.sendResponse( - new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); + new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName))); + + if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) { + logger.warn("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" + + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning"); + DeprecationLogger.getLogger(TransportService.class).deprecate("permit_handshake_from_incompatible_builds", + "system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed"); + } } public RemoteClusterService getRemoteClusterService() { @@ -440,8 +466,8 @@ public void onResponse(HandshakeResponse response) { public void onFailure(Exception e) { listener.onFailure(e); } - } - , HandshakeResponse::new, ThreadPool.Names.GENERIC + }, + HandshakeResponse::new, ThreadPool.Names.GENERIC )); } @@ -463,28 +489,89 @@ private HandshakeRequest() { } public static class HandshakeResponse extends TransportResponse { + + private static final Version BUILD_HASH_HANDSHAKE_VERSION = Version.V_8_0_0; + + private final Version version; + + @Nullable // if version < BUILD_HASH_HANDSHAKE_VERSION + private final String buildHash; + private final DiscoveryNode discoveryNode; + private final ClusterName clusterName; - private final Version version; - public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { - this.discoveryNode = discoveryNode; - this.version = version; - this.clusterName = clusterName; + public HandshakeResponse(Version version, String buildHash, DiscoveryNode discoveryNode, ClusterName clusterName) { + this.buildHash = Objects.requireNonNull(buildHash); + this.discoveryNode = Objects.requireNonNull(discoveryNode); + this.version = Objects.requireNonNull(version); + this.clusterName = Objects.requireNonNull(clusterName); } public HandshakeResponse(StreamInput in) throws IOException { super(in); - discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); - clusterName = new ClusterName(in); - version = Version.readVersion(in); + if (in.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) { + // the first two fields need only VInts and raw (ASCII) characters, so we cross our fingers and hope that they appear + // on the wire as we expect them to even if this turns out to be an incompatible build + version = Version.readVersion(in); + buildHash = in.readString(); + + try { + // If the remote node is incompatible then make an effort to identify it anyway, so we can mention it in the exception + // message, but recognise that this may fail + discoveryNode = new DiscoveryNode(in); + } catch (Exception e) { + if (isIncompatibleBuild(version, buildHash)) { + throw new IllegalArgumentException("unidentifiable remote node is build [" + buildHash + + "] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() + + "] of version [" + Version.CURRENT + "] which has an incompatible wire format", e); + } else { + throw e; + } + } + + if (isIncompatibleBuild(version, buildHash)) { + if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) { + logger.warn("remote node [{}] is build [{}] of version [{}] but this node is build [{}] of version [{}] " + + "which may not be compatible; remove system property [{}] to resolve this warning", + discoveryNode, buildHash, version, Build.CURRENT.hash(), Version.CURRENT, + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY); + } else { + throw new IllegalArgumentException("remote node [" + discoveryNode + "] is build [" + buildHash + + "] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() + + "] of version [" + Version.CURRENT + "] which has an incompatible wire format"); + } + } + + clusterName = new ClusterName(in); + } else { + discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); + clusterName = new ClusterName(in); + version = Version.readVersion(in); + buildHash = null; + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(discoveryNode); - clusterName.writeTo(out); - Version.writeVersion(version, out); + if (out.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) { + Version.writeVersion(version, out); + out.writeString(buildHash); + discoveryNode.writeTo(out); + clusterName.writeTo(out); + } else { + out.writeOptionalWriteable(discoveryNode); + clusterName.writeTo(out); + Version.writeVersion(version, out); + } + } + + public Version getVersion() { + return version; + } + + public String getBuildHash() { + return buildHash; } public DiscoveryNode getDiscoveryNode() { @@ -494,6 +581,10 @@ public DiscoveryNode getDiscoveryNode() { public ClusterName getClusterName() { return clusterName; } + + private static boolean isIncompatibleBuild(Version version, String buildHash) { + return version == Version.CURRENT && Build.CURRENT.hash().equals(buildHash) == false; + } } public void disconnectFromNode(DiscoveryNode node) { @@ -1293,4 +1384,5 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } } } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index cd42613abac3e..9270bcc82e89b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -479,7 +480,7 @@ private TestTransportService(Transport transport, ThreadPool threadPool) { @Override public void handshake(Transport.Connection connection, TimeValue timeout, Predicate clusterNamePredicate, ActionListener listener) { - listener.onResponse(new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT)); + listener.onResponse(new HandshakeResponse(Version.CURRENT, Build.CURRENT.hash(), connection.getNode(), new ClusterName(""))); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index 329332b6afd42..497bd723b2122 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -227,7 +228,11 @@ public void testFailsNodeThatDisconnects() { protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { assertFalse(node.equals(localNode)); if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + node, + ClusterName.DEFAULT)); return; } deterministicTaskQueue.scheduleNow(new Runnable() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index b4911fc62fc0a..e577f9ef5494d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -222,7 +223,11 @@ public void testFollowerFailsImmediatelyOnDisconnection() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + node, + ClusterName.DEFAULT)); return; } assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); @@ -332,7 +337,11 @@ public void testFollowerFailsImmediatelyOnHealthCheckFailure() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + node, + ClusterName.DEFAULT)); return; } assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 82a2b2ee1923f..7d7849de52283 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; @@ -161,8 +162,12 @@ private void setupMasterServiceAndCoordinator(long term, ClusterState initialSta @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(), - destination.getVersion())); + handleResponse(requestId, new TransportService.HandshakeResponse( + destination.getVersion(), + Build.CURRENT.hash(), + destination, + initialState.getClusterName() + )); } else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) { handleResponse(requestId, new TransportResponse.Empty()); } else { diff --git a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java index 5a83c5555949d..e5b6fb5963037 100644 --- a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -98,7 +99,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) { handleError(requestId, fullConnectionFailure); } else { - handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT)); + handleResponse(requestId, new HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + remoteNode, + new ClusterName(remoteClusterName))); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java index d7e18d46cb150..28bd1ba7d216b 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; @@ -58,7 +59,11 @@ public void testDeserializationFailureLogIdentifiesListener() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) { - handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT)); + handleResponse(requestId, new TransportService.HandshakeResponse( + Version.CURRENT, + Build.CURRENT.hash(), + otherNode, + new ClusterName(""))); } } }; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 8957d9d76635e..bf3c6e453b19e 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,6 +48,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; public class TransportServiceHandshakeTests extends ESTestCase { @@ -65,8 +67,9 @@ private NetworkHandle startServices(String nodeNameAndId, Settings settings, Ver new MockNioTransport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + final DisruptingTransportInterceptor transportInterceptor = new DisruptingTransportInterceptor(); TransportService transportService = new MockTransportService(settings, transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, (boundAddress) -> new DiscoveryNode( + transportInterceptor, (boundAddress) -> new DiscoveryNode( nodeNameAndId, nodeNameAndId, boundAddress.publishAddress(), @@ -76,7 +79,7 @@ PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections transportService.start(); transportService.acceptIncomingRequests(); transportServices.add(transportService); - return new NetworkHandle(transportService, transportService.getLocalNode()); + return new NetworkHandle(transportService, transportService.getLocalNode(), transportInterceptor); } @After @@ -180,13 +183,105 @@ public void testNodeConnectWithDifferentNodeId() { assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } + public void testRejectsMismatchedBuildHash() { + final Settings settings = Settings.builder().put("cluster.name", "a").build(); + final NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT); + final NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT); + final DiscoveryNode discoveryNode = new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()); + handleA.transportInterceptor.setModifyBuildHash(true); + handleB.transportInterceptor.setModifyBuildHash(true); + TransportSerializationException ex = expectThrows(TransportSerializationException.class, () -> { + try (Transport.Connection connection = + AbstractSimpleTransportTestCase.openConnection(handleA.transportService, discoveryNode, TestProfiles.LIGHT_PROFILE)) { + PlainActionFuture.get(fut -> handleA.transportService.handshake(connection, timeout, fut.map(x -> null))); + } + }); + assertThat( + ExceptionsHelper.unwrap(ex, IllegalArgumentException.class).getMessage(), + containsString("which has an incompatible wire format")); + assertFalse(handleA.transportService.nodeConnected(discoveryNode)); + } + + public void testAcceptsMismatchedBuildHashFromDifferentVersion() { + final NetworkHandle handleA = startServices( + "TS_A", + Settings.builder().put("cluster.name", "a").build(), + Version.CURRENT); + final NetworkHandle handleB = startServices( + "TS_B", + Settings.builder().put("cluster.name", "a").build(), + Version.CURRENT.minimumCompatibilityVersion()); + handleA.transportInterceptor.setModifyBuildHash(true); + handleB.transportInterceptor.setModifyBuildHash(true); + AbstractSimpleTransportTestCase.connectToNode(handleA.transportService, handleB.discoveryNode, TestProfiles.LIGHT_PROFILE); + assertTrue(handleA.transportService.nodeConnected(handleB.discoveryNode)); + } + private static class NetworkHandle { - private TransportService transportService; - private DiscoveryNode discoveryNode; + final TransportService transportService; + final DiscoveryNode discoveryNode; + final DisruptingTransportInterceptor transportInterceptor; - NetworkHandle(TransportService transportService, DiscoveryNode discoveryNode) { + NetworkHandle(TransportService transportService, DiscoveryNode discoveryNode, DisruptingTransportInterceptor transportInterceptor) { this.transportService = transportService; this.discoveryNode = discoveryNode; + this.transportInterceptor = transportInterceptor; + } + } + + private static class DisruptingTransportInterceptor implements TransportInterceptor { + + private boolean modifyBuildHash; + + public void setModifyBuildHash(boolean modifyBuildHash) { + this.modifyBuildHash = modifyBuildHash; + } + + @Override + public TransportRequestHandler interceptHandler( + String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler) { + + if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { + return (request, channel, task) -> actualHandler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public String getChannelType() { + return channel.getChannelType(); + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + assertThat(response, instanceOf(TransportService.HandshakeResponse.class)); + if (modifyBuildHash) { + final TransportService.HandshakeResponse handshakeResponse = (TransportService.HandshakeResponse) response; + channel.sendResponse(new TransportService.HandshakeResponse( + handshakeResponse.getVersion(), + handshakeResponse.getBuildHash() + "-modified", + handshakeResponse.getDiscoveryNode(), + handshakeResponse.getClusterName())); + } else { + channel.sendResponse(response); + } + } + + @Override + public void sendResponse(Exception exception) throws IOException { + channel.sendResponse(exception); + + } + }, task); + } else { + return actualHandler; + } } }