diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index dc53f8a24e67c..5c6dac465fb7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.federation.metrics; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -26,6 +28,7 @@ import javax.management.StandardMBean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; @@ -61,6 +64,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor { /** JMX interface to monitor the RPC metrics. */ private FederationRPCMetrics metrics; + /** JMX interface to monitor each Nameservice RPC metrics. */ + private Map nameserviceRPCMetricsMap = + new ConcurrentHashMap<>(); private ObjectName registeredBean; /** Thread pool for logging stats. */ @@ -77,6 +83,11 @@ public void init(Configuration configuration, RouterRpcServer rpcServer, // Create metrics this.metrics = FederationRPCMetrics.create(conf, server); + for (String nameservice : FederationUtil.getAllConfiguredNS(conf)) { + LOG.info("Create Nameservice RPC Metrics for " + nameservice); + this.nameserviceRPCMetricsMap.computeIfAbsent(nameservice, + k -> NameserviceRPCMetrics.create(conf, k)); + } // Create thread pool ThreadFactory threadFactory = new ThreadFactoryBuilder() @@ -136,27 +147,41 @@ public long proxyOp() { } @Override - public void proxyOpComplete(boolean success) { + public void proxyOpComplete(boolean success, String nsId) { if (success) { long proxyTime = getProxyTime(); - if (metrics != null && proxyTime >= 0) { - metrics.addProxyTime(proxyTime); + if (proxyTime >= 0) { + if (metrics != null) { + metrics.addProxyTime(proxyTime); + } + if (nameserviceRPCMetricsMap != null && + nameserviceRPCMetricsMap.containsKey(nsId)) { + nameserviceRPCMetricsMap.get(nsId).addProxyTime(proxyTime); + } } } } @Override - public void proxyOpFailureStandby() { + public void proxyOpFailureStandby(String nsId) { if (metrics != null) { metrics.incrProxyOpFailureStandby(); } + if (nameserviceRPCMetricsMap != null && + nameserviceRPCMetricsMap.containsKey(nsId)) { + nameserviceRPCMetricsMap.get(nsId).incrProxyOpFailureStandby(); + } } @Override - public void proxyOpFailureCommunicate() { + public void proxyOpFailureCommunicate(String nsId) { if (metrics != null) { metrics.incrProxyOpFailureCommunicate(); } + if (nameserviceRPCMetricsMap != null && + nameserviceRPCMetricsMap.containsKey(nsId)) { + nameserviceRPCMetricsMap.get(nsId).incrProxyOpFailureCommunicate(); + } } @Override @@ -181,10 +206,14 @@ public void proxyOpRetries() { } @Override - public void proxyOpNoNamenodes() { + public void proxyOpNoNamenodes(String nsId) { if (metrics != null) { metrics.incrProxyOpNoNamenodes(); } + if (nameserviceRPCMetricsMap != null && + nameserviceRPCMetricsMap.containsKey(nsId)) { + nameserviceRPCMetricsMap.get(nsId).incrProxyOpNoNamenodes(); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMBean.java new file mode 100644 index 0000000000000..54a40bdd631d5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMBean.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * JMX interface for the RPC server of Nameservice. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface NameserviceRPCMBean { + + long getProxyOps(); + + double getProxyAvg(); + + long getProxyOpFailureCommunicate(); + + long getProxyOpFailureStandby(); + + long getProxyOpNoNamenodes(); + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java new file mode 100644 index 0000000000000..1cc0234780f8d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * Implementation of the Nameservice RPC metrics collector. + */ +@Metrics(name = "NameserviceRPCActivity", about = "Nameservice RPC Activity", + context = "dfs") +public class NameserviceRPCMetrics implements NameserviceRPCMBean { + + public final static String NAMESERVICE_RPC_METRICS_PREFIX = "NameserviceActivity-"; + + private final String nsId; + + @Metric("Time for the Router to proxy an operation to the Nameservice") + private MutableRate proxy; + @Metric("Number of operations the Router proxied to a NameService") + private MutableCounterLong proxyOp; + + @Metric("Number of operations to hit a standby NN") + private MutableCounterLong proxyOpFailureStandby; + @Metric("Number of operations to fail to reach NN") + private MutableCounterLong proxyOpFailureCommunicate; + @Metric("Number of operations to hit no namenodes available") + private MutableCounterLong proxyOpNoNamenodes; + + public NameserviceRPCMetrics(Configuration conf, String nsId) { + this.nsId = nsId; + } + + public static NameserviceRPCMetrics create(Configuration conf, + String nameService) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + String name = NAMESERVICE_RPC_METRICS_PREFIX + (nameService.isEmpty() + ? "UndefinedNameService"+ ThreadLocalRandom.current().nextInt() + : nameService); + return ms.register(name, "HDFS Federation NameService RPC Metrics", + new NameserviceRPCMetrics(conf, name)); + } + + public void incrProxyOpFailureStandby() { + proxyOpFailureStandby.incr(); + } + + @Override + public long getProxyOpFailureStandby() { + return proxyOpFailureStandby.value(); + } + + public void incrProxyOpFailureCommunicate() { + proxyOpFailureCommunicate.incr(); + } + + @Override + public long getProxyOpFailureCommunicate() { + return proxyOpFailureCommunicate.value(); + } + + public void incrProxyOpNoNamenodes() { + proxyOpNoNamenodes.incr(); + } + + @Override + public long getProxyOpNoNamenodes() { + return proxyOpNoNamenodes.value(); + } + + + /** + * Add the time to proxy an operation from the moment the Router sends it to + * the Namenode until it replied. + * @param time Proxy time of an operation in nanoseconds. + */ + public void addProxyTime(long time) { + proxy.add(time); + proxyOp.incr(); + } + + @Override + public double getProxyAvg() { + return proxy.lastStat().mean(); + } + + @Override + public long getProxyOps() { + return proxyOp.value(); + } + + public String getNsId() { + return this.nsId; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 70c8880cec716..7554c0fc1976c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -489,7 +489,7 @@ private Object invokeMethod( namenodeResolver.updateActiveNamenode(nsId, address); } if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true); + this.rpcMonitor.proxyOpComplete(true, nsId); } if (this.router.getRouterClientMetrics() != null) { this.router.getRouterClientMetrics().incInvokedMethod(method); @@ -500,17 +500,17 @@ private Object invokeMethod( if (ioe instanceof StandbyException) { // Fail over indicated by retry policy and/or NN if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureStandby(); + this.rpcMonitor.proxyOpFailureStandby(nsId); } failover = true; } else if (isUnavailableException(ioe)) { if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(); + this.rpcMonitor.proxyOpFailureCommunicate(nsId); } failover = true; } else if (ioe instanceof RemoteException) { if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true); + this.rpcMonitor.proxyOpComplete(true, nsId); } RemoteException re = (RemoteException) ioe; ioe = re.unwrapRemoteException(); @@ -519,7 +519,7 @@ private Object invokeMethod( throw ioe; } else if (ioe instanceof ConnectionNullException) { if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(); + this.rpcMonitor.proxyOpFailureCommunicate(nsId); } LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); @@ -529,7 +529,7 @@ private Object invokeMethod( throw se; } else if (ioe instanceof NoNamenodesAvailableException) { if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpNoNamenodes(); + this.rpcMonitor.proxyOpNoNamenodes(nsId); } LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); @@ -539,8 +539,8 @@ private Object invokeMethod( // Other communication error, this is a failure // Communication retries are handled by the retry policy if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(); - this.rpcMonitor.proxyOpComplete(false); + this.rpcMonitor.proxyOpFailureCommunicate(nsId); + this.rpcMonitor.proxyOpComplete(false, nsId); } throw ioe; } @@ -551,7 +551,7 @@ private Object invokeMethod( } } if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(false); + this.rpcMonitor.proxyOpComplete(false, null); } // All namenodes were unavailable or in standby diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java index 5a2adb9e54e41..388fc5a0da496 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -62,18 +62,18 @@ void init( * Mark a proxy operation as completed. * @param success If the operation was successful. */ - void proxyOpComplete(boolean success); + void proxyOpComplete(boolean success, String nsId); /** * Failed to proxy an operation to a Namenode because it was in standby. */ - void proxyOpFailureStandby(); + void proxyOpFailureStandby(String nsId); /** * Failed to proxy an operation to a Namenode because of an unexpected * exception. */ - void proxyOpFailureCommunicate(); + void proxyOpFailureCommunicate(String nsId); /** * Failed to proxy an operation to a Namenode because the client was @@ -95,7 +95,7 @@ void init( /** * Failed to proxy an operation because of no namenodes available. */ - void proxyOpNoNamenodes(); + void proxyOpNoNamenodes(String nsId); /** * If the Router cannot contact the State Store in an operation. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestNameserviceRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestNameserviceRPCMetrics.java new file mode 100644 index 0000000000000..7b6bcb2143737 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestNameserviceRPCMetrics.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.server.federation.metrics.NameserviceRPCMetrics.NAMESERVICE_RPC_METRICS_PREFIX; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +/** + * Test case for RouterClientMetrics. + */ +public class TestNameserviceRPCMetrics { + private static final Configuration CONF = new HdfsConfiguration(); + static { + CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100); + CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); + CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + } + + private static final int NUM_SUBCLUSTERS = 2; + private static final int NUM_DNS = 3; + + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + + /** The first Router Context for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext routerContext; + + /** The first Router for this federated cluster. */ + private Router router; + + /** Filesystem interface to the Router. */ + private FileSystem routerFS; + /** Filesystem interface to the Namenode. */ + private FileSystem nnFS; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS); + cluster.setNumDatanodesPerNameservice(NUM_DNS); + cluster.startCluster(); + + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .quota() + .build(); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + + } + + @Before + public void testSetup() throws Exception { + // Create mock locations + cluster.installMockLocations(); + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + routerContext = cluster.getRouters().get(0); + this.routerFS = routerContext.getFileSystem(); + + // Add extra location to the root mount / such that the root mount points: + // / + // ns0 -> /target-ns0 + // ns1 -> /target-ns1 + router = routerContext.getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + resolver.addLocation("/target-ns0", cluster.getNameservices().get(0), "/target-ns0"); + resolver.addLocation("/target-ns1", cluster.getNameservices().get(1), "/target-ns1"); + + } + + @AfterClass + public static void tearDown() throws Exception { + cluster.shutdown(); + } + + @Test + public void testProxyOp() throws IOException { + routerFS.listStatus(new Path("/target-ns0")); + assertCounter("ProxyOp", 1L, + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0")); + assertCounter("ProxyOp", 0L, + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1")); + + routerFS.listStatus(new Path("/target-ns1")); + assertCounter("ProxyOp", 1L, + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0")); + assertCounter("ProxyOp", 1L, + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1")); + } + +} +