Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.metrics;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -26,6 +28,7 @@
import javax.management.StandardMBean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
Expand Down Expand Up @@ -61,6 +64,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {

/** JMX interface to monitor the RPC metrics. */
private FederationRPCMetrics metrics;
/** JMX interface to monitor each Nameservice RPC metrics. */
private Map<String, NameserviceRPCMetrics> nameserviceRPCMetricsMap =
new ConcurrentHashMap<>();
private ObjectName registeredBean;

/** Thread pool for logging stats. */
Expand All @@ -77,6 +83,11 @@ public void init(Configuration configuration, RouterRpcServer rpcServer,

// Create metrics
this.metrics = FederationRPCMetrics.create(conf, server);
for (String nameservice : FederationUtil.getAllConfiguredNS(conf)) {
LOG.info("Create Nameservice RPC Metrics for " + nameservice);
this.nameserviceRPCMetricsMap.computeIfAbsent(nameservice,
k -> NameserviceRPCMetrics.create(conf, k));
}

// Create thread pool
ThreadFactory threadFactory = new ThreadFactoryBuilder()
Expand Down Expand Up @@ -136,27 +147,41 @@ public long proxyOp() {
}

@Override
public void proxyOpComplete(boolean success) {
public void proxyOpComplete(boolean success, String nsId) {
if (success) {
long proxyTime = getProxyTime();
if (metrics != null && proxyTime >= 0) {
metrics.addProxyTime(proxyTime);
if (proxyTime >= 0) {
if (metrics != null) {
metrics.addProxyTime(proxyTime);
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).addProxyTime(proxyTime);
}
}
}
}

@Override
public void proxyOpFailureStandby() {
public void proxyOpFailureStandby(String nsId) {
if (metrics != null) {
metrics.incrProxyOpFailureStandby();
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).incrProxyOpFailureStandby();
}
}

@Override
public void proxyOpFailureCommunicate() {
public void proxyOpFailureCommunicate(String nsId) {
if (metrics != null) {
metrics.incrProxyOpFailureCommunicate();
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).incrProxyOpFailureCommunicate();
}
}

@Override
Expand All @@ -181,10 +206,14 @@ public void proxyOpRetries() {
}

@Override
public void proxyOpNoNamenodes() {
public void proxyOpNoNamenodes(String nsId) {
if (metrics != null) {
metrics.incrProxyOpNoNamenodes();
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).incrProxyOpNoNamenodes();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.metrics;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* JMX interface for the RPC server of Nameservice.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface NameserviceRPCMBean {

long getProxyOps();

double getProxyAvg();

long getProxyOpFailureCommunicate();

long getProxyOpFailureStandby();

long getProxyOpNoNamenodes();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.metrics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;

import java.util.concurrent.ThreadLocalRandom;

/**
* Implementation of the Nameservice RPC metrics collector.
*/
@Metrics(name = "NameserviceRPCActivity", about = "Nameservice RPC Activity",
context = "dfs")
public class NameserviceRPCMetrics implements NameserviceRPCMBean {

public final static String NAMESERVICE_RPC_METRICS_PREFIX = "NameserviceActivity-";

private final String nsId;

@Metric("Time for the Router to proxy an operation to the Nameservice")
private MutableRate proxy;
@Metric("Number of operations the Router proxied to a NameService")
private MutableCounterLong proxyOp;

@Metric("Number of operations to hit a standby NN")
private MutableCounterLong proxyOpFailureStandby;
@Metric("Number of operations to fail to reach NN")
private MutableCounterLong proxyOpFailureCommunicate;
@Metric("Number of operations to hit no namenodes available")
private MutableCounterLong proxyOpNoNamenodes;

public NameserviceRPCMetrics(Configuration conf, String nsId) {
this.nsId = nsId;
}

public static NameserviceRPCMetrics create(Configuration conf,
String nameService) {
MetricsSystem ms = DefaultMetricsSystem.instance();
String name = NAMESERVICE_RPC_METRICS_PREFIX + (nameService.isEmpty()
? "UndefinedNameService"+ ThreadLocalRandom.current().nextInt()
: nameService);
return ms.register(name, "HDFS Federation NameService RPC Metrics",
new NameserviceRPCMetrics(conf, name));
}

public void incrProxyOpFailureStandby() {
proxyOpFailureStandby.incr();
}

@Override
public long getProxyOpFailureStandby() {
return proxyOpFailureStandby.value();
}

public void incrProxyOpFailureCommunicate() {
proxyOpFailureCommunicate.incr();
}

@Override
public long getProxyOpFailureCommunicate() {
return proxyOpFailureCommunicate.value();
}

public void incrProxyOpNoNamenodes() {
proxyOpNoNamenodes.incr();
}

@Override
public long getProxyOpNoNamenodes() {
return proxyOpNoNamenodes.value();
}


/**
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
* @param time Proxy time of an operation in nanoseconds.
*/
public void addProxyTime(long time) {
proxy.add(time);
proxyOp.incr();
}

@Override
public double getProxyAvg() {
return proxy.lastStat().mean();
}

@Override
public long getProxyOps() {
return proxyOp.value();
}

public String getNsId() {
return this.nsId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private Object invokeMethod(
namenodeResolver.updateActiveNamenode(nsId, address);
}
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(true);
this.rpcMonitor.proxyOpComplete(true, nsId);
}
if (this.router.getRouterClientMetrics() != null) {
this.router.getRouterClientMetrics().incInvokedMethod(method);
Expand All @@ -500,17 +500,17 @@ private Object invokeMethod(
if (ioe instanceof StandbyException) {
// Fail over indicated by retry policy and/or NN
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureStandby();
this.rpcMonitor.proxyOpFailureStandby(nsId);
}
failover = true;
} else if (isUnavailableException(ioe)) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
this.rpcMonitor.proxyOpFailureCommunicate(nsId);
}
failover = true;
} else if (ioe instanceof RemoteException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(true);
this.rpcMonitor.proxyOpComplete(true, nsId);
}
RemoteException re = (RemoteException) ioe;
ioe = re.unwrapRemoteException();
Expand All @@ -519,7 +519,7 @@ private Object invokeMethod(
throw ioe;
} else if (ioe instanceof ConnectionNullException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
this.rpcMonitor.proxyOpFailureCommunicate(nsId);
}
LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
ioe.getMessage());
Expand All @@ -529,7 +529,7 @@ private Object invokeMethod(
throw se;
} else if (ioe instanceof NoNamenodesAvailableException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpNoNamenodes();
this.rpcMonitor.proxyOpNoNamenodes(nsId);
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
Expand All @@ -539,8 +539,8 @@ private Object invokeMethod(
// Other communication error, this is a failure
// Communication retries are handled by the retry policy
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
this.rpcMonitor.proxyOpComplete(false);
this.rpcMonitor.proxyOpFailureCommunicate(nsId);
this.rpcMonitor.proxyOpComplete(false, nsId);
}
throw ioe;
}
Expand All @@ -551,7 +551,7 @@ private Object invokeMethod(
}
}
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(false);
this.rpcMonitor.proxyOpComplete(false, null);
}

// All namenodes were unavailable or in standby
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ void init(
* Mark a proxy operation as completed.
* @param success If the operation was successful.
*/
void proxyOpComplete(boolean success);
void proxyOpComplete(boolean success, String nsId);

/**
* Failed to proxy an operation to a Namenode because it was in standby.
*/
void proxyOpFailureStandby();
void proxyOpFailureStandby(String nsId);

/**
* Failed to proxy an operation to a Namenode because of an unexpected
* exception.
*/
void proxyOpFailureCommunicate();
void proxyOpFailureCommunicate(String nsId);

/**
* Failed to proxy an operation to a Namenode because the client was
Expand All @@ -95,7 +95,7 @@ void init(
/**
* Failed to proxy an operation because of no namenodes available.
*/
void proxyOpNoNamenodes();
void proxyOpNoNamenodes(String nsId);

/**
* If the Router cannot contact the State Store in an operation.
Expand Down
Loading