Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class AsyncConnectionImpl implements AsyncConnection {
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
10, TimeUnit.MILLISECONDS);

private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

private final Configuration conf;

final AsyncConnectionConfiguration connConf;
Expand All @@ -93,8 +91,6 @@ class AsyncConnectionImpl implements AsyncConnection {

final RpcControllerFactory rpcControllerFactory;

private final boolean hostnameCanChange;

private final AsyncRegionLocator locator;

final AsyncRpcRetryingCallerFactory callerFactory;
Expand Down Expand Up @@ -137,7 +133,6 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
}
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
Expand Down Expand Up @@ -258,7 +253,7 @@ private ClientService.Interface createRegionServerStub(ServerName serverName) th

ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
getStubKey(ClientService.getDescriptor().getName(), serverName, hostnameCanChange),
getStubKey(ClientService.getDescriptor().getName(), serverName),
() -> createRegionServerStub(serverName));
}

Expand All @@ -272,7 +267,7 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro

AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange),
getStubKey(AdminService.getDescriptor().getName(), serverName),
() -> createAdminServerStub(serverName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
"hbase.client.master.state.cache.timeout.sec";
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);

private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

private final boolean hostnamesCanChange;
private final long pause;
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
// The mode tells if HedgedRead, LoadBalance mode is supported.
Expand Down Expand Up @@ -308,7 +305,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {

boolean shouldListen =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
Class<? extends ClusterStatusListener.Listener> listenerClass =
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
Expand Down Expand Up @@ -511,8 +507,8 @@ public Hbck getHbck(ServerName masterServer) throws IOException {
if (isDeadServer(masterServer)) {
throw new RegionServerStoppedException(masterServer + " is dead.");
}
String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
masterServer, this.hostnamesCanChange);
String key =
getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), masterServer);

return new HBaseHbck(
(MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
Expand Down Expand Up @@ -1306,8 +1302,7 @@ private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
throw new MasterNotRunningException(sn + " is dead.");
}
// Use the security info interface name as our stub key
String key =
getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn);
MasterProtos.MasterService.BlockingInterface stub =
(MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
Expand Down Expand Up @@ -1355,8 +1350,7 @@ public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName
if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead.");
}
String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
this.hostnamesCanChange);
String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName);
return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
Expand All @@ -1370,8 +1364,8 @@ public BlockingInterface getClient(ServerName serverName) throws IOException {
if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead.");
}
String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
serverName, this.hostnamesCanChange);
String key =
getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), serverName);
return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -152,32 +151,17 @@ public boolean isTableDisabled(TableName tableName) throws IOException {
}

/**
* Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE].
* Get a unique key for the rpc stub to the given server.
*/
static int retries2Attempts(int retries) {
return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1);
static String getStubKey(String serviceName, ServerName serverName) {
return String.format("%s@%s", serviceName, serverName);
}

/**
* Get a unique key for the rpc stub to the given server.
* Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE].
*/
static String getStubKey(String serviceName, ServerName serverName, boolean hostnameCanChange) {
// Sometimes, servers go down and they come back up with the same hostname but a different
// IP address. Force a resolution of the rsHostname by trying to instantiate an
// InetSocketAddress, and this way we will rightfully get a new stubKey.
// Also, include the hostname in the key so as to take care of those cases where the
// DNS name is different but IP address remains the same.
String hostname = serverName.getHostname();
int port = serverName.getPort();
if (hostnameCanChange) {
try {
InetAddress ip = InetAddress.getByName(hostname);
return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port;
} catch (UnknownHostException e) {
LOG.warn("Can not resolve " + hostname + ", please check your network", e);
}
}
return serviceName + "@" + hostname + ":" + port;
static int retries2Attempts(int retries) {
return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1);
}

static void checkHasFamilies(Mutation mutation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class MetricsConnection implements StatisticTrackable {
private static final String HEAP_BASE = "heapOccupancy_";
private static final String CACHE_BASE = "cacheDroppingExceptions_";
private static final String UNKNOWN_EXCEPTION = "UnknownException";
private static final String NS_LOOKUPS = "nsLookups";
private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
private static final String CLIENT_SVC = ClientService.getDescriptor().getName();

/** A container class for collecting details about the RPC call as it percolates. */
Expand Down Expand Up @@ -293,6 +295,8 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
protected final Timer userRegionLockWaitingTimer;
protected final Timer userRegionLockHeldTimer;
protected final Histogram userRegionLockQueueHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;

// dynamic metrics

Expand Down Expand Up @@ -360,6 +364,8 @@ protected Ratio getRatio() {
registry.timer(name(this.getClass(), "userRegionLockHeldDuration", scope));
this.userRegionLockQueueHist =
registry.histogram(name(MetricsConnection.class, "userRegionLockQueueLength", scope));
this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));

this.reporter = JmxReporter.forRegistry(this.registry).build();
this.reporter.start();
Expand Down Expand Up @@ -564,4 +570,12 @@ public void incrCacheDroppingExceptions(Object exception) {
CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
cacheDroppingExceptions, counterFactory).inc();
}

public void incrNsLookups() {
this.nsLookups.inc();
}

public void incrNsLookupsFailed() {
this.nsLookupsFailed.inc();
}
}
Loading