diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 2e5d1813aa6d..bcd3fdaaca47 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -77,8 +77,6 @@ class AsyncConnectionImpl implements AsyncConnection { .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10, TimeUnit.MILLISECONDS); - private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; - private final Configuration conf; final AsyncConnectionConfiguration connConf; @@ -93,8 +91,6 @@ class AsyncConnectionImpl implements AsyncConnection { final RpcControllerFactory rpcControllerFactory; - private final boolean hostnameCanChange; - private final AsyncRegionLocator locator; final AsyncRpcRetryingCallerFactory callerFactory; @@ -137,7 +133,6 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri } this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null)); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); this.locator = new AsyncRegionLocator(this, RETRY_TIMER); @@ -258,7 +253,7 @@ private ClientService.Interface createRegionServerStub(ServerName serverName) th ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { return ConcurrentMapUtils.computeIfAbsentEx(rsStubs, - getStubKey(ClientService.getDescriptor().getName(), serverName, hostnameCanChange), + getStubKey(ClientService.getDescriptor().getName(), serverName), () -> createRegionServerStub(serverName)); } @@ -272,7 +267,7 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro AdminService.Interface getAdminStub(ServerName serverName) throws IOException { return ConcurrentMapUtils.computeIfAbsentEx(adminSubs, - getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange), + getStubKey(AdminService.getDescriptor().getName(), serverName), () -> createAdminServerStub(serverName)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 0af0be523249..c7347261796b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -165,9 +165,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { "hbase.client.master.state.cache.timeout.sec"; private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); - private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; - - private final boolean hostnamesCanChange; private final long pause; private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified // The mode tells if HedgedRead, LoadBalance mode is supported. @@ -308,7 +305,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); - this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); Class listenerClass = conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); @@ -511,8 +507,8 @@ public Hbck getHbck(ServerName masterServer) throws IOException { if (isDeadServer(masterServer)) { throw new RegionServerStoppedException(masterServer + " is dead."); } - String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), - masterServer, this.hostnamesCanChange); + String key = + getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), masterServer); return new HBaseHbck( (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { @@ -1306,8 +1302,7 @@ private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key - String key = - getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange); + String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn); MasterProtos.MasterService.BlockingInterface stub = (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); @@ -1355,8 +1350,7 @@ public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } - String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, - this.hostnamesCanChange); + String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName); return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); @@ -1370,8 +1364,8 @@ public BlockingInterface getClient(ServerName serverName) throws IOException { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } - String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), - serverName, this.hostnamesCanChange); + String key = + getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), serverName); return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d85b4399aff8..a7814924176e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; @@ -152,32 +151,17 @@ public boolean isTableDisabled(TableName tableName) throws IOException { } /** - * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. + * Get a unique key for the rpc stub to the given server. */ - static int retries2Attempts(int retries) { - return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); + static String getStubKey(String serviceName, ServerName serverName) { + return String.format("%s@%s", serviceName, serverName); } /** - * Get a unique key for the rpc stub to the given server. + * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. */ - static String getStubKey(String serviceName, ServerName serverName, boolean hostnameCanChange) { - // Sometimes, servers go down and they come back up with the same hostname but a different - // IP address. Force a resolution of the rsHostname by trying to instantiate an - // InetSocketAddress, and this way we will rightfully get a new stubKey. - // Also, include the hostname in the key so as to take care of those cases where the - // DNS name is different but IP address remains the same. - String hostname = serverName.getHostname(); - int port = serverName.getPort(); - if (hostnameCanChange) { - try { - InetAddress ip = InetAddress.getByName(hostname); - return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port; - } catch (UnknownHostException e) { - LOG.warn("Can not resolve " + hostname + ", please check your network", e); - } - } - return serviceName + "@" + hostname + ":" + port; + static int retries2Attempts(int retries) { + return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); } static void checkHasFamilies(Mutation mutation) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 16a13df1e68e..ee9861cc3fa7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -69,6 +69,8 @@ public class MetricsConnection implements StatisticTrackable { private static final String HEAP_BASE = "heapOccupancy_"; private static final String CACHE_BASE = "cacheDroppingExceptions_"; private static final String UNKNOWN_EXCEPTION = "UnknownException"; + private static final String NS_LOOKUPS = "nsLookups"; + private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); /** A container class for collecting details about the RPC call as it percolates. */ @@ -293,6 +295,8 @@ public Counter newMetric(Class clazz, String name, String scope) { protected final Timer userRegionLockWaitingTimer; protected final Timer userRegionLockHeldTimer; protected final Histogram userRegionLockQueueHist; + protected final Counter nsLookups; + protected final Counter nsLookupsFailed; // dynamic metrics @@ -360,6 +364,8 @@ protected Ratio getRatio() { registry.timer(name(this.getClass(), "userRegionLockHeldDuration", scope)); this.userRegionLockQueueHist = registry.histogram(name(MetricsConnection.class, "userRegionLockQueueLength", scope)); + this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); + this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); this.reporter = JmxReporter.forRegistry(this.registry).build(); this.reporter.start(); @@ -564,4 +570,12 @@ public void incrCacheDroppingExceptions(Object exception) { CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), cacheDroppingExceptions, counterFactory).inc(); } + + public void incrNsLookups() { + this.nsLookups.inc(); + } + + public void incrNsLookupsFailed() { + this.nsLookupsFailed.inc(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 4e437909ab33..29f51415dce7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -133,11 +134,11 @@ public abstract class AbstractRpcClient implements RpcC private int maxConcurrentCallsPerServer; - private static final LoadingCache concurrentCounterCache = + private static final LoadingCache concurrentCounterCache = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS) - .build(new CacheLoader() { + .build(new CacheLoader() { @Override - public AtomicInteger load(InetSocketAddress key) throws Exception { + public AtomicInteger load(Address key) throws Exception { return new AtomicInteger(0); } }); @@ -206,7 +207,7 @@ private void cleanupIdleConnections() { // The connection itself will disconnect if there is no pending call for maxIdleTime. if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { if (LOG.isTraceEnabled()) { - LOG.trace("Cleanup idle connection to {}", conn.remoteId().address); + LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress()); } connections.remove(conn.remoteId(), conn); conn.cleanupConnection(); @@ -344,11 +345,11 @@ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcCont private T getConnection(ConnectionId remoteId) throws IOException { if (failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + remoteId.address + LOG.debug("Not trying to connect to " + remoteId.getAddress() + " this server is in the failed servers list"); } throw new FailedServerException( - "This server is in the failed servers list: " + remoteId.address); + "This server is in the failed servers list: " + remoteId.getAddress()); } T conn; synchronized (connections) { @@ -366,7 +367,7 @@ private T getConnection(ConnectionId remoteId) throws IOException { */ protected abstract T createConnection(ConnectionId remoteId) throws IOException; - private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, + private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, RpcCallback callback) { call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); if (metrics != null) { @@ -392,7 +393,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress } Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, - final Message param, Message returnType, final User ticket, final InetSocketAddress addr, + final Message param, Message returnType, final User ticket, final InetSocketAddress inetAddr, final RpcCallback callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); @@ -407,6 +408,7 @@ Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController cs.setNumActionsPerServer(numActions); } + final Address addr = Address.fromSocketAddress(inetAddr); final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { @@ -431,12 +433,8 @@ public void run(Call call) { return call; } - InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { - InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); - if (addr.isUnresolved()) { - throw new UnknownHostException("can not resolve " + sn.getServerName()); - } - return addr; + private static Address createAddr(ServerName sn) { + return Address.fromParts(sn.getHostname(), sn.getPort()); } /** @@ -452,8 +450,8 @@ public void cancelConnections(ServerName sn) { for (T connection : connections.values()) { ConnectionId remoteId = connection.remoteId(); if ( - remoteId.address.getPort() == sn.getPort() - && remoteId.address.getHostName().equals(sn.getHostname()) + remoteId.getAddress().getPort() == sn.getPort() + && remoteId.getAddress().getHostname().equals(sn.getHostname()) ) { LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " + connection.remoteId); @@ -514,19 +512,25 @@ public void close() { @Override public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, - int rpcTimeout) throws UnknownHostException { + int rpcTimeout) { return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); } @Override - public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) - throws UnknownHostException { + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); } private static class AbstractRpcChannel { - protected final InetSocketAddress addr; + protected final Address addr; + + // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup + // per method call on the channel. If the remote target is removed or reprovisioned and + // its identity changes a new channel with a newly resolved InetSocketAddress will be + // created as part of retry, so caching here is fine. + // Normally, caching an InetSocketAddress is an anti-pattern. + protected InetSocketAddress isa; protected final AbstractRpcClient rpcClient; @@ -534,8 +538,8 @@ private static class AbstractRpcChannel { protected final int rpcTimeout; - protected AbstractRpcChannel(AbstractRpcClient rpcClient, InetSocketAddress addr, - User ticket, int rpcTimeout) { + protected AbstractRpcChannel(AbstractRpcClient rpcClient, Address addr, User ticket, + int rpcTimeout) { this.addr = addr; this.rpcClient = rpcClient; this.ticket = ticket; @@ -570,16 +574,30 @@ protected HBaseRpcController configureRpcController(RpcController controller) { public static class BlockingRpcChannelImplementation extends AbstractRpcChannel implements BlockingRpcChannel { - protected BlockingRpcChannelImplementation(AbstractRpcClient rpcClient, - InetSocketAddress addr, User ticket, int rpcTimeout) { + protected BlockingRpcChannelImplementation(AbstractRpcClient rpcClient, Address addr, + User ticket, int rpcTimeout) { super(rpcClient, addr, ticket, rpcTimeout); } @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { + // Look up remote address upon first call + if (isa == null) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + isa = Address.toSocketAddress(addr); + if (isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + isa = null; + throw new ServiceException(new UnknownHostException(addr + " could not be resolved")); + } + } return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType, - ticket, addr); + ticket, isa); } } @@ -588,20 +606,34 @@ public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController */ public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel { - protected RpcChannelImplementation(AbstractRpcClient rpcClient, InetSocketAddress addr, - User ticket, int rpcTimeout) throws UnknownHostException { + protected RpcChannelImplementation(AbstractRpcClient rpcClient, Address addr, User ticket, + int rpcTimeout) { super(rpcClient, addr, ticket, rpcTimeout); } @Override public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType, RpcCallback done) { + HBaseRpcController configuredController = configureRpcController( + Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call")); + // Look up remote address upon first call + if (isa == null || isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + isa = Address.toSocketAddress(addr); + if (isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + isa = null; + controller.setFailed(addr + " could not be resolved"); + return; + } + } // This method does not throw any exceptions, so the caller must provide a // HBaseRpcController which is used to pass the exceptions. - this.rpcClient.callMethod(md, - configureRpcController(Preconditions.checkNotNull(controller, - "RpcController can not be null for async rpc call")), - param, returnType, ticket, addr, done); + this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d789417aef7a..10847c0155d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -32,6 +32,7 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; @@ -216,7 +218,7 @@ public void run() { */ public void cleanup(IOException e) { IOException ie = - new ConnectionClosingException("Connection to " + remoteId.address + " is closing."); + new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing."); for (Call call : callsToWrite) { call.setException(ie); } @@ -226,12 +228,9 @@ public void cleanup(IOException e) { BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, - rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, + rpcClient.metrics); this.rpcClient = rpcClient; - if (remoteId.getAddress().isUnresolved()) { - throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); - } - this.connectionHeaderPreamble = getConnectionHeaderPreamble(); ConnectionHeader header = getConnectionHeader(); ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); @@ -266,7 +265,17 @@ protected void setupConnection() throws IOException { if (this.rpcClient.localAddr != null) { this.socket.bind(this.rpcClient.localAddr); } - NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO); + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } + NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); this.socket.setSoTimeout(this.rpcClient.readTO); return; } catch (SocketTimeoutException toe) { @@ -411,8 +420,18 @@ private void disposeSasl() { private boolean setupSaslConnection(final InputStream in2, final OutputStream out2) throws IOException { - saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, serverAddress, - securityInfo, this.rpcClient.fallbackAllowed, + if (this.metrics != null) { + this.metrics.incrNsLookups(); + } + InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); + if (serverAddr.isUnresolved()) { + if (this.metrics != null) { + this.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } + saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, + serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); @@ -490,16 +509,16 @@ private void setupIOstreams() throws IOException { if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + remoteId.address + LOG.debug("Not trying to connect to " + remoteId.getAddress() + " this server is in the failed servers list"); } throw new FailedServerException( - "This server is in the failed servers list: " + remoteId.address); + "This server is in the failed servers list: " + remoteId.getAddress()); } try { if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to " + remoteId.address); + LOG.debug("Connecting to " + remoteId.getAddress()); } short numRetries = 0; @@ -554,14 +573,14 @@ public Boolean run() throws IOException { closeSocket(); IOException e = ExceptionUtil.asInterrupt(t); if (e == null) { - this.rpcClient.failedServers.addToFailedServers(remoteId.address, t); + this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t); if (t instanceof LinkageError) { // probably the hbase hadoop version does not match the running hadoop version e = new DoNotRetryIOException(t); } else if (t instanceof IOException) { e = (IOException) t; } else { - e = new IOException("Could not set up IO Streams to " + remoteId.address, t); + e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t); } } throw e; @@ -836,7 +855,7 @@ public synchronized void shutdown() { if (callSender != null) { callSender.interrupt(); } - closeConn(new IOException("connection to " + remoteId.address + " closed")); + closeConn(new IOException("connection to " + remoteId.getAddress() + " closed")); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java index 48aec5cc9aac..6cb9cddd9feb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; -import java.net.InetSocketAddress; import java.util.Objects; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; @@ -31,9 +31,9 @@ class ConnectionId { private static final int PRIME = 16777619; final User ticket; final String serviceName; - final InetSocketAddress address; + final Address address; - public ConnectionId(User ticket, String serviceName, InetSocketAddress address) { + public ConnectionId(User ticket, String serviceName, Address address) { this.address = address; this.ticket = ticket; this.serviceName = serviceName; @@ -43,7 +43,7 @@ public String getServiceName() { return this.serviceName; } - public InetSocketAddress getAddress() { + public Address getAddress() { return address; } @@ -72,7 +72,7 @@ public int hashCode() { return hashCode(ticket, serviceName, address); } - public static int hashCode(User ticket, String serviceName, InetSocketAddress address) { + public static int hashCode(User ticket, String serviceName, Address address) { return (address.hashCode() + PRIME * (PRIME * serviceName.hashCode() ^ (ticket == null ? 0 : ticket.hashCode()))); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java index b59d84d17f58..0a8da3c20151 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hbase.ipc; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -31,7 +31,7 @@ */ @InterfaceAudience.Private public class FailedServers { - private final Map failedServers = new HashMap(); + private final Map failedServers = new HashMap(); private long latestExpiry = 0; private final int recheckServersTimeout; private static final Logger LOG = LoggerFactory.getLogger(FailedServers.class); @@ -44,12 +44,12 @@ public FailedServers(Configuration conf) { /** * Add an address to the list of the failed servers list. */ - public synchronized void addToFailedServers(InetSocketAddress address, Throwable throwable) { + public synchronized void addToFailedServers(Address address, Throwable throwable) { final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout; - this.failedServers.put(address.toString(), expiry); + this.failedServers.put(address, expiry); this.latestExpiry = expiry; if (LOG.isDebugEnabled()) { - LOG.debug("Added failed server with address " + address.toString() + " to list caused by " + LOG.debug("Added failed server with address " + address + " to list caused by " + throwable.toString()); } } @@ -58,7 +58,7 @@ public synchronized void addToFailedServers(InetSocketAddress address, Throwable * Check if the server should be considered as bad. Clean the old entries of the list. * @return true if the server is in the failed servers list */ - public synchronized boolean isFailedServer(final InetSocketAddress address) { + public synchronized boolean isFailedServer(final Address address) { if (failedServers.isEmpty()) { return false; } @@ -67,15 +67,14 @@ public synchronized boolean isFailedServer(final InetSocketAddress address) { failedServers.clear(); return false; } - String key = address.toString(); - Long expiry = this.failedServers.get(key); + Long expiry = this.failedServers.get(address); if (expiry == null) { return false; } if (expiry >= now) { return true; } else { - this.failedServers.remove(key); + this.failedServers.remove(address); } return false; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 708d8d3e5282..86be0584329c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -21,7 +21,6 @@ import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; -import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeoutException; @@ -34,6 +33,7 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -157,7 +157,7 @@ static IOException toIOE(Throwable t) { } } - private static String getCallTarget(InetSocketAddress addr, RegionInfo regionInfo) { + private static String getCallTarget(Address addr, RegionInfo regionInfo) { return "address=" + addr + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : ""); } @@ -179,7 +179,7 @@ private static String getCallTarget(InetSocketAddress addr, RegionInfo regionInf * @return an exception to throw * @see ClientExceptionsUtil#isConnectionException(Throwable) */ - static IOException wrapException(InetSocketAddress addr, RegionInfo regionInfo, Throwable error) { + static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) { if (error instanceof ConnectException) { // connection refused; include the host:port in the error return (IOException) new ConnectException( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index dfba6d6cf935..a0aba5635add 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -24,12 +24,15 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; @@ -97,7 +100,8 @@ class NettyRpcConnection extends RpcConnection { NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, - rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, + rpcClient.metrics); this.rpcClient = rpcClient; this.eventLoop = rpcClient.group.next(); byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); @@ -206,8 +210,18 @@ private void saslNegotiate(final Channel ch) { Promise saslPromise = ch.eventLoop().newPromise(); final NettyHBaseSaslRpcClientHandler saslHandler; try { + if (this.metrics != null) { + this.metrics.incrNsLookups(); + } + InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); + if (serverAddr.isUnresolved()) { + if (this.metrics != null) { + this.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, - serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); + serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); } catch (IOException e) { failInit(ch, e); return; @@ -268,10 +282,19 @@ public void operationComplete(Future future) throws Exception { }); } - private void connect() { + private void connect() throws UnknownHostException { assert eventLoop.inEventLoop(); - LOG.trace("Connecting to {}", remoteId.address); - + LOG.trace("Connecting to {}", remoteId.getAddress()); + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) @@ -283,7 +306,7 @@ protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME, new BufferCallBeforeInitHandler()); } - }).localAddress(rpcClient.localAddr).remoteAddress(remoteId.address).connect() + }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect() .addListener(new ChannelFutureListener() { @Override @@ -294,7 +317,7 @@ public void operationComplete(ChannelFuture future) throws Exception { LOG.warn( "Exception encountered while connecting to the server " + remoteId.getAddress(), ex); failInit(ch, ex); - rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); + rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause()); return; } ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 1b5f3e0a0eb1..6ecff49e52b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.Closeable; -import java.io.IOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; @@ -62,10 +61,8 @@ public interface RpcClient extends Closeable { * @param user which is to use the connection * @param rpcTimeout default rpc operation timeout * @return A blocking rpc channel that goes via this rpc client instance. - * @throws IOException when channel could not be created */ - BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) - throws IOException; + BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout); /** * Creates a "channel" that can be used by a protobuf service. Useful setting up protobuf stubs. @@ -74,8 +71,7 @@ BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTim * @param rpcTimeout default rpc operation timeout * @return A rpc channel that goes via this rpc client instance. */ - RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) - throws IOException; + RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout); /** * Interrupt the connections to the given server. This should be called if the server is known as diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 28a628e0fcf1..a3e2bef30c86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; @@ -59,8 +58,6 @@ abstract class RpcConnection { protected final Token token; - protected final InetAddress serverAddress; - protected final SecurityInfo securityInfo; protected final int reloginMaxBackoff; // max pause before relogin on sasl failure @@ -69,6 +66,8 @@ abstract class RpcConnection { protected final CompressionCodec compressor; + protected final MetricsConnection metrics; + protected final HashedWheelTimer timeoutTimer; protected final Configuration conf; @@ -83,17 +82,13 @@ abstract class RpcConnection { protected SaslClientAuthenticationProvider provider; protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, - String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor) - throws IOException { - if (remoteId.getAddress().isUnresolved()) { - throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); - } - this.serverAddress = remoteId.getAddress().getAddress(); + String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, + MetricsConnection metrics) throws IOException { this.timeoutTimer = timeoutTimer; this.codec = codec; this.compressor = compressor; this.conf = conf; - + this.metrics = metrics; User ticket = remoteId.getTicket(); this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); this.useSasl = isSecurityEnabled; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java index 6f6bbf67c548..6c22ca94e428 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; /** @@ -27,7 +28,14 @@ @SuppressWarnings("serial") @InterfaceAudience.Public public class ServerTooBusyException extends DoNotRetryIOException { + + public ServerTooBusyException(Address address, long count) { + super("Busy Server! " + count + " concurrent RPCs against " + address); + } + + @Deprecated public ServerTooBusyException(InetSocketAddress address, long count) { super("Busy Server! " + count + " concurrent RPCs against " + address); } + } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java index 0b307bf2e569..2ff6bd640eb2 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java @@ -90,14 +90,12 @@ public RpcClientImpl(Configuration configuration, String clusterId, SocketAddres } @Override - public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) - throws IOException { + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) { throw new UnsupportedOperationException(); } @Override - public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) - throws IOException { + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { return new RpcChannelImpl(); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java index 9b55ecf17438..da962cac0d3c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -45,7 +45,7 @@ public class TestConnectionId { private User testUser2 = User.createUserForTesting(testConfig, "test", new String[] { "testgroup" }); private String serviceName = "test"; - private InetSocketAddress address = new InetSocketAddress(999); + private Address address = Address.fromParts("localhost", 999); private ConnectionId connectionId1 = new ConnectionId(testUser1, serviceName, address); private ConnectionId connectionId2 = new ConnectionId(testUser2, serviceName, address); @@ -68,7 +68,7 @@ public void testGetTicket() { @Test public void testToString() { - String expectedString = "0.0.0.0/0.0.0.0:999/test/test (auth:SIMPLE)"; + String expectedString = "localhost:999/test/test (auth:SIMPLE)"; assertEquals(expectedString, connectionId1.toString()); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java index 3998fa0880f4..e119861a30d7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java @@ -21,9 +21,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.log4j.Appender; @@ -51,7 +51,7 @@ public class TestFailedServersLog { HBaseClassTestRule.forClass(TestFailedServersLog.class); static final int TEST_PORT = 9999; - private InetSocketAddress addr; + private Address addr; @Mock private Appender mockAppender; @@ -74,7 +74,7 @@ public void testAddToFailedServersLogging() { Throwable nullException = new NullPointerException(); FailedServers fs = new FailedServers(new Configuration()); - addr = new InetSocketAddress(TEST_PORT); + addr = Address.fromParts("localhost", TEST_PORT); fs.addToFailedServers(addr, nullException); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 8bdef8eb4a3d..c327896f72ab 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -34,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FutureUtils; @@ -101,7 +101,7 @@ public void testWrapConnectionException() throws Exception { for (Class clazz : ClientExceptionsUtil.getConnectionExceptionTypes()) { exceptions.add(create(clazz)); } - InetSocketAddress addr = InetSocketAddress.createUnresolved("127.0.0.1", 12345); + Address addr = Address.fromParts("127.0.0.1", 12345); for (Throwable exception : exceptions) { if (exception instanceof TimeoutException) { assertThat(IPCUtil.wrapException(addr, null, exception), diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java index 6d543573de7e..a9c40fd3bb79 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java @@ -26,9 +26,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.net.InetSocketAddress; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -59,7 +59,7 @@ public class TestNettyRpcConnection { public static void setUp() throws IOException { CLIENT = new NettyRpcClient(HBaseConfiguration.create()); CONN = new NettyRpcConnection(CLIENT, - new ConnectionId(User.getCurrent(), "test", new InetSocketAddress("localhost", 1234))); + new ConnectionId(User.getCurrent(), "test", Address.fromParts("localhost", 1234))); } @AfterClass diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java index 0d5670965f7d..61b445fcefe0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.net; +import java.net.InetSocketAddress; import org.apache.commons.lang3.StringUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -33,7 +34,7 @@ */ @InterfaceAudience.Public public class Address implements Comparable
{ - private HostAndPort hostAndPort; + private final HostAndPort hostAndPort; private Address(HostAndPort hostAndPort) { this.hostAndPort = hostAndPort; @@ -47,6 +48,33 @@ public static Address fromString(String hostnameAndPort) { return new Address(HostAndPort.fromString(hostnameAndPort)); } + public static Address fromSocketAddress(InetSocketAddress addr) { + return Address.fromParts(addr.getHostString(), addr.getPort()); + } + + public static InetSocketAddress toSocketAddress(Address addr) { + return new InetSocketAddress(addr.getHostName(), addr.getPort()); + } + + public static InetSocketAddress[] toSocketAddress(Address[] addrs) { + if (addrs == null) { + return null; + } + InetSocketAddress[] result = new InetSocketAddress[addrs.length]; + for (int i = 0; i < addrs.length; i++) { + result[i] = toSocketAddress(addrs[i]); + } + return result; + } + + public String getHostName() { + return this.hostAndPort.getHost(); + } + + /** + * @deprecated Use {@link #getHostName()} instead + */ + @Deprecated public String getHostname() { return this.hostAndPort.getHost(); } @@ -66,7 +94,7 @@ public String toString() { * otherwise returns same as {@link #toString()}} */ public String toStringWithoutDomain() { - String hostname = getHostname(); + String hostname = getHostName(); String[] parts = hostname.split("\\."); if (parts.length > 1) { for (String part : parts) { @@ -87,19 +115,19 @@ public boolean equals(Object other) { } if (other instanceof Address) { Address that = (Address) other; - return this.getHostname().equals(that.getHostname()) && this.getPort() == that.getPort(); + return this.getHostName().equals(that.getHostName()) && this.getPort() == that.getPort(); } return false; } @Override public int hashCode() { - return this.getHostname().hashCode() ^ getPort(); + return this.getHostName().hashCode() ^ getPort(); } @Override public int compareTo(Address that) { - int compare = this.getHostname().compareTo(that.getHostname()); + int compare = this.getHostName().compareTo(that.getHostName()); if (compare != 0) { return compare; } diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 3077ddb15f1b..0003a451d72d 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -107,6 +107,9 @@ public MemcachedBlockCache(Configuration c) throws IOException { // case. String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211"); String[] servers = serverListString.split(","); + // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any + // resolved identities cannot have their address mappings changed while the MemcachedClient + // instance is alive. We won't get a chance to trigger re-resolution. List serverAddresses = new ArrayList<>(servers.length); for (String s : servers) { serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 62e781de0d1a..2bed2a95cb54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; @@ -316,14 +317,16 @@ public class HRegionServer extends Thread private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); /** - * Map of encoded region names to the DataNode locations they should be hosted on We store the - * value as InetSocketAddress since this is used only in HDFS API (create() that takes favored - * nodes as hints for placing file blocks). We could have used ServerName here as the value class, - * but we'd need to convert it to InetSocketAddress at some point before the HDFS API call, and it - * seems a bit weird to store ServerName since ServerName refers to RegionServers and here we - * really mean DataNode locations. + * Map of encoded region names to the DataNode locations they should be hosted on We store the We + * store the value as Address since InetSocketAddress is required by the HDFS nodes as hints for + * placing file blocks). We could have used ServerName here as the value class, but we'd need to + * convert it to InetSocketAddress at some point before the HDFS API call, and it seems a bit + * weird to store ServerName since ServerName refers to RegionServers and here we really mean + * DataNode locations. We don't store it as InetSocketAddress here because the conversion on + * demand from Address to InetSocketAddress will guarantee the resolution results will be fresh + * when we need it. */ - private final Map regionFavoredNodesMap = new ConcurrentHashMap<>(); + private final Map regionFavoredNodesMap = new ConcurrentHashMap<>(); private LeaseManager leaseManager; @@ -3455,24 +3458,23 @@ boolean checkFileSystem() { @Override public void updateRegionFavoredNodesMapping(String encodedRegionName, List favoredNodes) { - InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; + Address[] addr = new Address[favoredNodes.size()]; // Refer to the comment on the declaration of regionFavoredNodesMap on why - // it is a map of region name to InetSocketAddress[] + // it is a map of region name to Address[] for (int i = 0; i < favoredNodes.size(); i++) { - addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), - favoredNodes.get(i).getPort()); + addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); } regionFavoredNodesMap.put(encodedRegionName, addr); } /** * Return the favored nodes for a region given its encoded name. Look at the comment around - * {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] + * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. * @return array of favored locations */ @Override public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { - return regionFavoredNodesMap.get(encodedRegionName); + return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCIBadHostname.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCIBadHostname.java deleted file mode 100644 index c148efe2fef1..000000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCIBadHostname.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.fail; - -import java.net.UnknownHostException; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Tests that we fail fast when hostname resolution is not working and do not cache unresolved - * InetSocketAddresses. - */ -@Category({ MediumTests.class, ClientTests.class }) -public class TestCIBadHostname { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestCIBadHostname.class); - - private static HBaseTestingUtility testUtil; - private static ConnectionImplementation conn; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - testUtil = HBaseTestingUtility.createLocalHTU(); - testUtil.startMiniCluster(); - conn = (ConnectionImplementation) testUtil.getConnection(); - } - - @AfterClass - public static void teardownAfterClass() throws Exception { - conn.close(); - testUtil.shutdownMiniCluster(); - } - - @Test(expected = UnknownHostException.class) - public void testGetAdminBadHostname() throws Exception { - // verify that we can get an instance with the cluster hostname - ServerName master = testUtil.getHBaseCluster().getMaster().getServerName(); - try { - conn.getAdmin(master); - } catch (UnknownHostException uhe) { - fail("Obtaining admin to the cluster master should have succeeded"); - } - - // test that we fail to get a client to an unresolvable hostname, which - // means it won't be cached - ServerName badHost = ServerName.valueOf("unknownhost.invalid:" + HConstants.DEFAULT_MASTER_PORT, - System.currentTimeMillis()); - conn.getAdmin(badHost); - fail("Obtaining admin to unresolvable hostname should have failed"); - } - - @Test(expected = UnknownHostException.class) - public void testGetClientBadHostname() throws Exception { - // verify that we can get an instance with the cluster hostname - ServerName rs = testUtil.getHBaseCluster().getRegionServer(0).getServerName(); - try { - conn.getClient(rs); - } catch (UnknownHostException uhe) { - fail("Obtaining client to the cluster regionserver should have succeeded"); - } - - // test that we fail to get a client to an unresolvable hostname, which - // means it won't be cached - ServerName badHost = ServerName.valueOf( - "unknownhost.invalid:" + HConstants.DEFAULT_REGIONSERVER_PORT, System.currentTimeMillis()); - conn.getAdmin(badHost); - fail("Obtaining client to unresolvable hostname should have failed"); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 86b981276f68..3134356cb65c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -20,10 +20,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; -import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcClient; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -145,14 +144,12 @@ public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddres // Return my own instance, one that does random timeouts @Override - public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) - throws UnknownHostException { + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); } @Override - public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) - throws UnknownHostException { + public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) { return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); } } @@ -168,7 +165,7 @@ static class RandomTimeoutBlockingRpcChannel RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, final User ticket, final int rpcTimeout) { - super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); + super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override @@ -188,8 +185,8 @@ public Message callBlockingMethod(MethodDescriptor md, RpcController controller, private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { RandomTimeoutRpcChannel(AbstractRpcClient rpcClient, ServerName sn, User ticket, - int rpcTimeout) throws UnknownHostException { - super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); + int rpcTimeout) { + super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java index 843b7dfa354b..90e696c45918 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hbase.ipc; -import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -42,11 +42,11 @@ public void testFailedServer() { FailedServers fs = new FailedServers(new Configuration()); Throwable testThrowable = new Throwable();// throwable already tested in TestFailedServers.java - InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12); + Address ia = Address.fromParts("bad", 12); // same server as ia - InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); - InetSocketAddress ia3 = InetSocketAddress.createUnresolved("badtoo", 12); - InetSocketAddress ia4 = InetSocketAddress.createUnresolved("badtoo", 13); + Address ia2 = Address.fromParts("bad", 12); + Address ia3 = Address.fromParts("badtoo", 12); + Address ia4 = Address.fromParts("badtoo", 13); Assert.assertFalse(fs.isFailedServer(ia)); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index cd9d829b60c4..c2b6dac10861 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -25,6 +25,7 @@ import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -1633,10 +1634,12 @@ private static String[] getServerStats(String server, int timeout) throws IOExce String host = sp[0]; int port = sp.length > 1 ? Integer.parseInt(sp[1]) : HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; - InetSocketAddress sockAddr = new InetSocketAddress(host, port); try (Socket socket = new Socket()) { + InetSocketAddress sockAddr = new InetSocketAddress(host, port); + if (sockAddr.isUnresolved()) { + throw new UnknownHostException(host + " cannot be resolved"); + } socket.connect(sockAddr, timeout); - socket.setSoTimeout(timeout); try ( PrintWriter out = new PrintWriter(new BufferedWriter(