diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 4f1310bb25911..5905a1dbbd370 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -26,6 +26,8 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT; import java.io.FileNotFoundException; import java.io.IOException; @@ -41,7 +43,19 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; - +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; @@ -219,6 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, private static final ThreadLocal CUR_USER = new ThreadLocal<>(); + /** DN type -> full DN report. */ + private final LoadingCache dnCache; + /** * Construct a router RPC server. * @@ -361,6 +378,23 @@ public RouterRpcServer(Configuration configuration, Router router, this.nnProto = new RouterNamenodeProtocol(this); this.clientProto = new RouterClientProtocol(conf, this); this.routerProto = new RouterUserProtocol(this); + + long dnCacheExpire = conf.getTimeDuration( + DN_REPORT_CACHE_EXPIRE, + DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); + this.dnCache = CacheBuilder.newBuilder() + .build(new DatanodeReportCacheLoader()); + + // Actively refresh the dn cache in a configured interval + Executors + .newSingleThreadScheduledExecutor() + .scheduleWithFixedDelay(() -> this.dnCache + .asMap() + .keySet() + .parallelStream() + .forEach((key) -> this.dnCache.refresh(key)), + 0, + dnCacheExpire, TimeUnit.MILLISECONDS); } @Override @@ -868,6 +902,50 @@ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) return clientProto.getDatanodeReport(type); } + /** + * Get the datanode report from cache. + * + * @param type Type of the datanode. + * @return List of datanodes. + * @throws IOException If it cannot get the report. + */ + DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) + throws IOException { + try { + DatanodeInfo[] dns = this.dnCache.get(type); + if (dns == null) { + LOG.debug("Get null DN report from cache"); + dns = getCachedDatanodeReportImpl(type); + this.dnCache.put(type, dns); + } + return dns; + } catch (ExecutionException e) { + LOG.error("Cannot get the DN report for {}", type, e); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new IOException(cause); + } + } + } + + private DatanodeInfo[] getCachedDatanodeReportImpl( + final DatanodeReportType type) throws IOException { + // We need to get the DNs as a privileged user + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + RouterRpcServer.setCurrentUser(loginUser); + + try { + DatanodeInfo[] dns = clientProto.getDatanodeReport(type); + LOG.debug("Refresh cached DN report with {} datanodes", dns.length); + return dns; + } finally { + // Reset ugi to remote user for remaining operations. + RouterRpcServer.resetCurrentUser(); + } + } + /** * Get the datanode report with a timeout. * @param type Type of the datanode. @@ -1748,4 +1826,45 @@ public void refreshSuperUserGroupsConfiguration() throws IOException { public String[] getGroupsForUser(String user) throws IOException { return routerProto.getGroupsForUser(user); } -} \ No newline at end of file + + /** + * Deals with loading datanode report into the cache and refresh. + */ + private class DatanodeReportCacheLoader + extends CacheLoader { + + private ListeningExecutorService executorService; + + DatanodeReportCacheLoader() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("DatanodeReport-Cache-Reload") + .setDaemon(true) + .build(); + + executorService = MoreExecutors.listeningDecorator( + Executors.newSingleThreadExecutor(threadFactory)); + } + + @Override + public DatanodeInfo[] load(DatanodeReportType type) throws Exception { + return getCachedDatanodeReportImpl(type); + } + + /** + * Override the reload method to provide an asynchronous implementation, + * so that the query will not be slowed down by the cache refresh. It + * will return the old cache value and schedule a background refresh. + */ + @Override + public ListenableFuture reload( + final DatanodeReportType type, DatanodeInfo[] oldValue) + throws Exception { + return executorService.submit(new Callable() { + @Override + public DatanodeInfo[] call() throws Exception { + return load(type); + } + }); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java index 9f0d06d7695cd..39f06a3b66f4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -454,19 +454,12 @@ private URI redirectURI(final Router router, final UserGroupInformation ugi, private DatanodeInfo chooseDatanode(final Router router, final String path, final HttpOpParam.Op op, final long openOffset, final String excludeDatanodes) throws IOException { - // We need to get the DNs as a privileged user final RouterRpcServer rpcServer = getRPCServer(router); - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - RouterRpcServer.setCurrentUser(loginUser); - DatanodeInfo[] dns = null; try { - dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE); + dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); } catch (IOException e) { LOG.error("Cannot get the datanodes from the RPC server", e); - } finally { - // Reset ugi to remote user for remaining operations. - RouterRpcServer.resetCurrentUser(); } HashSet excludes = new HashSet(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 2b7669d26af42..b9a17ac9bdd5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -67,6 +67,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; @@ -216,6 +218,12 @@ public static void globalSetUp() throws Exception { // Register and verify all NNs with all routers cluster.registerNamenodes(); cluster.waitNamenodeRegistration(); + + // We decrease the DN heartbeat expire interval to make them dead faster + cluster.getCluster().getNamesystem(0).getBlockManager() + .getDatanodeManager().setHeartbeatExpireInterval(5000); + cluster.getCluster().getNamesystem(1).getBlockManager() + .getDatanodeManager().setHeartbeatExpireInterval(5000); } @AfterClass @@ -1777,6 +1785,66 @@ public void testgetGroupsForUser() throws IOException { assertArrayEquals(group, result); } + @Test + public void testGetCachedDatanodeReport() throws Exception { + RouterRpcServer rpcServer = router.getRouter().getRpcServer(); + final DatanodeInfo[] datanodeReport = + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + + // We should have 12 nodes in total + assertEquals(12, datanodeReport.length); + + // We should be caching this information + DatanodeInfo[] datanodeReport1 = + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + assertArrayEquals(datanodeReport1, datanodeReport); + + // Stop one datanode + MiniDFSCluster miniDFSCluster = getCluster().getCluster(); + DataNodeProperties dnprop = miniDFSCluster.stopDataNode(0); + + // We wait until the cached value is updated + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + DatanodeInfo[] dn = null; + try { + dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + } catch (IOException ex) { + LOG.error("Error on getCachedDatanodeReport"); + } + return !Arrays.equals(datanodeReport, dn); + } + }, 500, 5 * 1000); + + // The cache should be updated now + final DatanodeInfo[] datanodeReport2 = + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + assertEquals(datanodeReport.length - 1, datanodeReport2.length); + + // Restart the DN we just stopped + miniDFSCluster.restartDataNode(dnprop); + miniDFSCluster.waitActive(); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + DatanodeInfo[] dn = null; + try { + dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + } catch (IOException ex) { + LOG.error("Error on getCachedDatanodeReport"); + } + return datanodeReport.length == dn.length; + } + }, 500, 5 * 1000); + + // The cache should be updated now + final DatanodeInfo[] datanodeReport3 = + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + assertEquals(datanodeReport.length, datanodeReport3.length); + } + /** * Check the erasure coding policies in the Router and the Namenode. * @return The erasure coding policies. @@ -1814,4 +1882,4 @@ private DFSClient getFileDFSClient(final String path) { } return null; } -} \ No newline at end of file +}