Skip to content

Commit b7d4dc6

Browse files
authored
HADOOP-18365. Update the remote address when a change is detected (#4692)
Avoid reconnecting to the old address after detecting that the address has been updated. * Fix Checkstyle line length violation * Keep ConnectionId as Immutable for map key The ConnectionId is used as a key in the connections map, and updating the remoteId caused problems with the cleanup of connections when the removeMethod was used. Instead of updating the address within the remoteId, use the removeMethod to cleanup references to the current identifier and then replace it with a new identifier using the updated address. * Use final to protect immutable ConnectionId Mark non-test fields as private and final, and add a missing accessor. * Use a stable hashCode to allow safe IP addr changes * Add test that updated address is used Once the address has been updated, it should be used in future calls. Check to ensure that a second request succeeds and that it uses the existing updated address instead of having to re-resolve. Signed-off-by: Nick Dimiduk <[email protected]> Signed-off-by: sokui Signed-off-by: XanderZu Signed-off-by: stack <[email protected]>
1 parent d09dd4a commit b7d4dc6

File tree

3 files changed

+113
-8
lines changed

3 files changed

+113
-8
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ public synchronized Writable getRpcResponse() {
419419
* socket: responses may be delivered out of order. */
420420
private class Connection extends Thread {
421421
private InetSocketAddress server; // server ip:port
422-
private final ConnectionId remoteId; // connection id
422+
private final ConnectionId remoteId; // connection id
423423
private AuthMethod authMethod; // authentication method
424424
private AuthProtocol authProtocol;
425425
private int serviceClass;
@@ -645,6 +645,9 @@ private synchronized boolean updateAddress() throws IOException {
645645
LOG.warn("Address change detected. Old: " + server.toString() +
646646
" New: " + currentAddr.toString());
647647
server = currentAddr;
648+
// Update the remote address so that reconnections are with the updated address.
649+
// This avoids thrashing.
650+
remoteId.setAddress(currentAddr);
648651
UserGroupInformation ticket = remoteId.getTicket();
649652
this.setName("IPC Client (" + socketFactory.hashCode()
650653
+ ") connection to " + server.toString() + " from "
@@ -1700,9 +1703,9 @@ private Connection getConnection(ConnectionId remoteId,
17001703
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
17011704
@InterfaceStability.Evolving
17021705
public static class ConnectionId {
1703-
InetSocketAddress address;
1704-
UserGroupInformation ticket;
1705-
final Class<?> protocol;
1706+
private InetSocketAddress address;
1707+
private final UserGroupInformation ticket;
1708+
private final Class<?> protocol;
17061709
private static final int PRIME = 16777619;
17071710
private final int rpcTimeout;
17081711
private final int maxIdleTime; //connections will be culled if it was idle for
@@ -1717,7 +1720,7 @@ public static class ConnectionId {
17171720
private final int pingInterval; // how often sends ping to the server in msecs
17181721
private String saslQop; // here for testing
17191722
private final Configuration conf; // used to get the expected kerberos principal name
1720-
1723+
17211724
public ConnectionId(InetSocketAddress address, Class<?> protocol,
17221725
UserGroupInformation ticket, int rpcTimeout,
17231726
RetryPolicy connectionRetryPolicy, Configuration conf) {
@@ -1753,7 +1756,28 @@ public ConnectionId(InetSocketAddress address, Class<?> protocol,
17531756
InetSocketAddress getAddress() {
17541757
return address;
17551758
}
1756-
1759+
1760+
/**
1761+
* This is used to update the remote address when an address change is detected. This method
1762+
* ensures that the {@link #hashCode()} won't change.
1763+
*
1764+
* @param address the updated address
1765+
* @throws IllegalArgumentException if the hostname or port doesn't match
1766+
* @see Connection#updateAddress()
1767+
*/
1768+
void setAddress(InetSocketAddress address) {
1769+
if (!Objects.equals(this.address.getHostName(), address.getHostName())) {
1770+
throw new IllegalArgumentException("Hostname must match: " + this.address + " vs "
1771+
+ address);
1772+
}
1773+
if (this.address.getPort() != address.getPort()) {
1774+
throw new IllegalArgumentException("Port must match: " + this.address + " vs " + address);
1775+
}
1776+
1777+
this.address = address;
1778+
}
1779+
1780+
17571781
Class<?> getProtocol() {
17581782
return protocol;
17591783
}
@@ -1864,7 +1888,11 @@ && isEqual(this.protocol, that.protocol)
18641888
@Override
18651889
public int hashCode() {
18661890
int result = connectionRetryPolicy.hashCode();
1867-
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
1891+
// We calculate based on the host name and port without the IP address, since the hashCode
1892+
// must be stable even if the IP address is updated.
1893+
result = PRIME * result + ((address == null || address.getHostName() == null) ? 0 :
1894+
address.getHostName().hashCode());
1895+
result = PRIME * result + ((address == null) ? 0 : address.getPort());
18681896
result = PRIME * result + (doPing ? 1231 : 1237);
18691897
result = PRIME * result + maxIdleTime;
18701898
result = PRIME * result + pingInterval;

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
323323
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
324324
throws IOException {
325325
return getProxy(protocol, clientVersion, connId.getAddress(),
326-
connId.ticket, conf, factory, connId.getRpcTimeout(),
326+
connId.getTicket(), conf, factory, connId.getRpcTimeout(),
327327
connId.getRetryPolicy(), null, null);
328328
}
329329

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertFalse;
2324
import static org.junit.Assert.assertNotNull;
@@ -93,6 +94,7 @@
9394
import org.apache.hadoop.test.LambdaTestUtils;
9495
import org.apache.hadoop.test.Whitebox;
9596
import org.apache.hadoop.util.StringUtils;
97+
import org.assertj.core.api.Condition;
9698
import org.junit.Assert;
9799
import org.junit.Assume;
98100
import org.junit.Before;
@@ -815,6 +817,81 @@ public Void call() throws IOException {
815817
}
816818
}
817819

820+
/**
821+
* The {@link ConnectionId#hashCode} has to be stable despite updates that occur as the the
822+
* address evolves over time. The {@link ConnectionId} is used as a primary key in maps, so
823+
* its hashCode can't change.
824+
*
825+
* @throws IOException if there is a client or server failure
826+
*/
827+
@Test
828+
public void testStableHashCode() throws IOException {
829+
Server server = new TestServer(5, false);
830+
try {
831+
server.start();
832+
833+
// Leave host unresolved to start. Use "localhost" as opposed
834+
// to local IP from NetUtils.getConnectAddress(server) to force
835+
// resolution later
836+
InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved(
837+
"localhost", NetUtils.getConnectAddress(server).getPort());
838+
839+
// Setup: Create a ConnectionID using an unresolved address, and get it's hashCode to serve
840+
// as a point of comparison.
841+
int rpcTimeout = MIN_SLEEP_TIME * 2;
842+
final ConnectionId remoteId = getConnectionId(unresolvedAddr, rpcTimeout, conf);
843+
int expected = remoteId.hashCode();
844+
845+
// Start client
846+
Client.setConnectTimeout(conf, 100);
847+
Client client = new Client(LongWritable.class, conf);
848+
try {
849+
// Test: Call should re-resolve host and succeed
850+
LongWritable param = new LongWritable(RANDOM.nextLong());
851+
client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
852+
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
853+
int actual = remoteId.hashCode();
854+
855+
// Verify: The hashCode should match, although the InetAddress is different since it has
856+
// now been resolved
857+
assertThat(remoteId.getAddress()).isNotEqualTo(unresolvedAddr);
858+
assertThat(remoteId.getAddress().getHostName()).isEqualTo(unresolvedAddr.getHostName());
859+
assertThat(remoteId.hashCode()).isEqualTo(expected);
860+
861+
// Test: Call should succeed without having to re-resolve
862+
InetSocketAddress expectedSocketAddress = remoteId.getAddress();
863+
param = new LongWritable(RANDOM.nextLong());
864+
client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
865+
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
866+
867+
// Verify: The same instance of the InetSocketAddress has been used to make the second
868+
// call
869+
assertThat(remoteId.getAddress()).isSameAs(expectedSocketAddress);
870+
871+
// Verify: The hashCode is protected against updates to the host name
872+
String hostName = InetAddress.getLocalHost().getHostName();
873+
InetSocketAddress mismatchedHostName = NetUtils.createSocketAddr(
874+
InetAddress.getLocalHost().getHostName(),
875+
remoteId.getAddress().getPort());
876+
assertThatExceptionOfType(IllegalArgumentException.class)
877+
.isThrownBy(() -> remoteId.setAddress(mismatchedHostName))
878+
.withMessageStartingWith("Hostname must match");
879+
880+
// Verify: The hashCode is protected against updates to the port
881+
InetSocketAddress mismatchedPort = NetUtils.createSocketAddr(
882+
remoteId.getAddress().getHostName(),
883+
remoteId.getAddress().getPort() + 1);
884+
assertThatExceptionOfType(IllegalArgumentException.class)
885+
.isThrownBy(() -> remoteId.setAddress(mismatchedPort))
886+
.withMessageStartingWith("Port must match");
887+
} finally {
888+
client.stop();
889+
}
890+
} finally {
891+
server.stop();
892+
}
893+
}
894+
818895
@Test(timeout=60000)
819896
public void testIpcFlakyHostResolution() throws IOException {
820897
// start server

0 commit comments

Comments
 (0)