Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class AsyncConnectionImpl implements AsyncConnection {

private volatile boolean closed = false;

private final String metricsScope;
private final Optional<MetricsConnection> metrics;

private final ClusterStatusListener clusterStatusListener;
Expand All @@ -123,15 +124,16 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
User user) {
this.conf = conf;
this.user = user;
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);

if (user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
String scope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
this.metrics =
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
Expand Down Expand Up @@ -226,7 +228,9 @@ public void close() {
choreService = null;
}
}
metrics.ifPresent(MetricsConnection::shutdown);
if (metrics.isPresent()) {
MetricsConnection.deleteMetricsConnection(metricsScope);
}
closed = true;
}, "AsyncConnection.close");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
private final RpcClient rpcClient;

private final MetaCache metaCache;

private String metricsScope = null;
private final MetricsConnection metrics;

protected User user;
Expand Down Expand Up @@ -312,8 +314,9 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
retrieveClusterId();

if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
String scope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool);
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = MetricsConnection.getMetricsConnection(this.metricsScope, this::getBatchPool,
this::getMetaLookupPool);
} else {
this.metrics = null;
}
Expand Down Expand Up @@ -2131,7 +2134,7 @@ public void close() {
closeMaster();
shutdownPools();
if (this.metrics != null) {
this.metrics.shutdown();
MetricsConnection.deleteMetricsConnection(metricsScope);
}
this.closed = true;
if (this.registry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -47,12 +49,43 @@
/**
* This class is for maintaining the various connection statistics and publishing them through the
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
* as to not conflict with other uses of Yammer Metrics within the client application. Instantiating
* this class implicitly creates and "starts" instances of these classes; be sure to call
* {@link #shutdown()} to terminate the thread pools they allocate.
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
* {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
* instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
* the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
* all connections within this metrics instances are closed.
*/
@InterfaceAudience.Private
public class MetricsConnection implements StatisticTrackable {
public final class MetricsConnection implements StatisticTrackable {

private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
new ConcurrentHashMap<>();

static MetricsConnection getMetricsConnection(final String scope,
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
if (metricsConnection == null) {
MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
newMetricsConn.incrConnectionCount();
return newMetricsConn;
} else {
metricsConnection.addThreadPools(batchPool, metaPool);
metricsConnection.incrConnectionCount();
return metricsConnection;
}
});
}

static void deleteMetricsConnection(final String scope) {
METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
metricsConnection.decrConnectionCount();
if (metricsConnection.getConnectionCount() == 0) {
metricsConnection.shutdown();
return null;
}
return metricsConnection;
});
}

/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
Expand Down Expand Up @@ -231,7 +264,7 @@ public void updateDelayInterval(long interval) {
}
}

protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
new ConcurrentHashMap<>();

public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
Expand Down Expand Up @@ -272,7 +305,7 @@ private static interface NewMetric<T> {

private final MetricRegistry registry;
private final JmxReporter reporter;
protected final String scope;
private final String scope;

private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
@Override
Expand All @@ -295,66 +328,93 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
}
};

// List of thread pool per connection of the metrics.
private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();

// static metrics

protected final Counter metaCacheHits;
protected final Counter metaCacheMisses;
protected final CallTracker getTracker;
protected final CallTracker scanTracker;
protected final CallTracker appendTracker;
protected final CallTracker deleteTracker;
protected final CallTracker incrementTracker;
protected final CallTracker putTracker;
protected final CallTracker multiTracker;
protected final RunnerStats runnerStats;
protected final Counter metaCacheNumClearServer;
protected final Counter metaCacheNumClearRegion;
protected final Counter hedgedReadOps;
protected final Counter hedgedReadWin;
protected final Histogram concurrentCallsPerServerHist;
protected final Histogram numActionsPerServerHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;
protected final Timer overloadedBackoffTimer;
private final Counter connectionCount;
private final Counter metaCacheHits;
private final Counter metaCacheMisses;
private final CallTracker getTracker;
private final CallTracker scanTracker;
private final CallTracker appendTracker;
private final CallTracker deleteTracker;
private final CallTracker incrementTracker;
private final CallTracker putTracker;
private final CallTracker multiTracker;
private final RunnerStats runnerStats;
private final Counter metaCacheNumClearServer;
private final Counter metaCacheNumClearRegion;
private final Counter hedgedReadOps;
private final Counter hedgedReadWin;
private final Histogram concurrentCallsPerServerHist;
private final Histogram numActionsPerServerHist;
private final Counter nsLookups;
private final Counter nsLookupsFailed;
private final Timer overloadedBackoffTimer;

// dynamic metrics

// These maps are used to cache references to the metric instances that are managed by the
// registry. I don't think their use perfectly removes redundant allocations, but it's
// a big improvement over calling registry.newMetric each time.
protected final ConcurrentMap<String, Timer> rpcTimers =
private final ConcurrentMap<String, Timer> rpcTimers =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
protected final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
protected final ConcurrentMap<String, Counter> rpcCounters =
private final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);

MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
this.scope = scope;
addThreadPools(batchPool, metaPool);
this.registry = new MetricRegistry();
this.registry.register(getExecutorPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = batchPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among batch pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
return Ratio.of(numerator, denominator);
}
});
this.registry.register(getMetaPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = metaPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among meta lookup pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
return Ratio.of(numerator, denominator);
}
});
this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
this.metaCacheNumClearServer =
Expand Down Expand Up @@ -397,8 +457,84 @@ MetricRegistry getMetricRegistry() {
return registry;
}

public void shutdown() {
this.reporter.stop();
/** scope of the metrics object */
public String getMetricScope() {
return scope;
}

/** serverStats metric */
public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
return serverStats;
}

/** runnerStats metric */
public RunnerStats getRunnerStats() {
return runnerStats;
}

/** metaCacheNumClearServer metric */
public Counter getMetaCacheNumClearServer() {
return metaCacheNumClearServer;
}

/** metaCacheNumClearRegion metric */
public Counter getMetaCacheNumClearRegion() {
return metaCacheNumClearRegion;
}

/** hedgedReadOps metric */
public Counter getHedgedReadOps() {
return hedgedReadOps;
}

/** hedgedReadWin metric */
public Counter getHedgedReadWin() {
return hedgedReadWin;
}

/** numActionsPerServerHist metric */
public Histogram getNumActionsPerServerHist() {
return numActionsPerServerHist;
}

/** rpcCounters metric */
public ConcurrentMap<String, Counter> getRpcCounters() {
return rpcCounters;
}

/** getTracker metric */
public CallTracker getGetTracker() {
return getTracker;
}

/** scanTracker metric */
public CallTracker getScanTracker() {
return scanTracker;
}

/** multiTracker metric */
public CallTracker getMultiTracker() {
return multiTracker;
}

/** appendTracker metric */
public CallTracker getAppendTracker() {
return appendTracker;
}

/** deleteTracker metric */
public CallTracker getDeleteTracker() {
return deleteTracker;
}

/** incrementTracker metric */
public CallTracker getIncrementTracker() {
return incrementTracker;
}

/** putTracker metric */
public CallTracker getPutTracker() {
return putTracker;
}

/** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
Expand Down Expand Up @@ -457,6 +593,28 @@ public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
overloadedBackoffTimer.update(time, timeUnit);
}

/** Return the connection count of the metrics within a scope */
public long getConnectionCount() {
return connectionCount.getCount();
}

/** Increment the connection count of the metrics within a scope */
private void incrConnectionCount() {
connectionCount.inc();
}

/** Decrement the connection count of the metrics within a scope */
private void decrConnectionCount() {
connectionCount.dec();
}

/** Add thread pools of additional connections to the metrics */
private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
batchPools.add(batchPool);
metaPools.add(metaPool);
}

/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
Expand All @@ -474,6 +632,10 @@ private void updateRpcGeneric(String methodName, CallStats stats) {
.update(stats.getResponseSizeBytes());
}

private void shutdown() {
this.reporter.stop();
}

/** Report RPC context to metrics system. */
public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
int callsPerServer = stats.getConcurrentCallsPerServer();
Expand Down
Loading