Skip to content

Commit 94d4f73

Browse files
committed
Use the underlying connection version for CCS connections (#28093)
Previously this would default to the version of the remote Node. However, if the remote cluster was mixed-version (e.g. it was part way through a rolling upgrade), then the TransportService may have negotiated a connection version that is not identical to connected Node's version. This mismatch would cause the Stream and the (Remote)Connection to report different version numbers, which could cause data to be sent over the wire using an incorrect serialization version.
1 parent 88d4b73 commit 94d4f73

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.lucene.store.AlreadyClosedException;
2424
import org.apache.lucene.util.IOUtils;
2525
import org.apache.lucene.util.SetOnce;
26+
import org.elasticsearch.Version;
2627
import org.elasticsearch.action.ActionListener;
2728
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
2829
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
@@ -280,6 +281,11 @@ public void sendRequest(long requestId, String action, TransportRequest request,
280281
public void close() throws IOException {
281282
assert false: "proxy connections must not be closed";
282283
}
284+
285+
@Override
286+
public Version getVersion() {
287+
return connection.getVersion();
288+
}
283289
};
284290
}
285291

core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@
8181

8282
import static java.util.Collections.emptyMap;
8383
import static java.util.Collections.emptySet;
84+
import static org.hamcrest.Matchers.equalTo;
8485
import static org.hamcrest.Matchers.instanceOf;
86+
import static org.hamcrest.Matchers.iterableWithSize;
87+
import static org.hamcrest.Matchers.not;
88+
import static org.hamcrest.Matchers.notNullValue;
8589
import static org.hamcrest.Matchers.startsWith;
8690

8791
public class RemoteClusterConnectionTests extends ESTestCase {
@@ -305,6 +309,63 @@ public void testConnectWithIncompatibleTransports() throws Exception {
305309
}
306310
}
307311

312+
public void testRemoteConnectionVersionMatchesTransportConnectionVersion() throws Exception {
313+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
314+
final Version previousVersion = VersionUtils.getPreviousVersion();
315+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, previousVersion);
316+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
317+
318+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
319+
assertThat(seedNode, notNullValue());
320+
knownNodes.add(seedNode);
321+
322+
DiscoveryNode oldVersionNode = discoverableTransport.getLocalDiscoNode();
323+
assertThat(oldVersionNode, notNullValue());
324+
knownNodes.add(oldVersionNode);
325+
326+
assertThat(seedNode.getVersion(), not(equalTo(oldVersionNode.getVersion())));
327+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
328+
final Transport.Connection seedConnection = new Transport.Connection() {
329+
@Override
330+
public DiscoveryNode getNode() {
331+
return seedNode;
332+
}
333+
334+
@Override
335+
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
336+
throws IOException, TransportException {
337+
// no-op
338+
}
339+
340+
@Override
341+
public void close() throws IOException {
342+
// no-op
343+
}
344+
};
345+
service.addDelegate(seedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
346+
@Override
347+
public Connection getConnection(DiscoveryNode node) {
348+
if (node == seedNode) {
349+
return seedConnection;
350+
}
351+
return super.getConnection(node);
352+
}
353+
});
354+
service.start();
355+
service.acceptIncomingRequests();
356+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
357+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
358+
connection.addConnectedNode(seedNode);
359+
for (DiscoveryNode node : knownNodes) {
360+
final Transport.Connection transportConnection = connection.getConnection(node);
361+
assertThat(transportConnection.getVersion(), equalTo(previousVersion));
362+
}
363+
assertThat(knownNodes, iterableWithSize(2));
364+
}
365+
}
366+
}
367+
}
368+
308369
@SuppressForbidden(reason = "calls getLocalHost here but it's fine in this case")
309370
public void testSlowNodeCanBeCanceled() throws IOException, InterruptedException {
310371
try (ServerSocket socket = new MockServerSocket()) {

0 commit comments

Comments
 (0)