From a1ad88e6d458a14be26736183894ed0ed2a31017 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Wed, 17 Jun 2020 14:45:04 -0700 Subject: [PATCH 01/12] Update RouterWebHdfsMethods.java --- .../router/RouterWebHdfsMethods.java | 67 ++++++++++++------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java index 9f0d06d7695cd..a8191308cdfdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -454,32 +454,8 @@ private URI redirectURI(final Router router, final UserGroupInformation ugi, private DatanodeInfo chooseDatanode(final Router router, final String path, final HttpOpParam.Op op, final long openOffset, final String excludeDatanodes) throws IOException { - // We need to get the DNs as a privileged user - final RouterRpcServer rpcServer = getRPCServer(router); - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - RouterRpcServer.setCurrentUser(loginUser); - - DatanodeInfo[] dns = null; - try { - dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE); - } catch (IOException e) { - LOG.error("Cannot get the datanodes from the RPC server", e); - } finally { - // Reset ugi to remote user for remaining operations. - RouterRpcServer.resetCurrentUser(); - } - - HashSet excludes = new HashSet(); - if (excludeDatanodes != null) { - Collection collection = - getTrimmedStringCollection(excludeDatanodes); - for (DatanodeInfo dn : dns) { - if (collection.contains(dn.getName())) { - excludes.add(dn); - } - } - } - + DatanodeInfo[] dns = null; + if (op == GetOpParam.Op.OPEN || op == PostOpParam.Op.APPEND || op == GetOpParam.Op.GETFILECHECKSUM) { @@ -502,12 +478,27 @@ private DatanodeInfo chooseDatanode(final Router router, final LocatedBlocks locations = cp.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { + if (excludeDatanodes != null) { + Collection collection = + getTrimmedStringCollection(excludeDatanodes); + dns = getDatanodeReport(router); + for (DatanodeInfo dn : dns) { + if (collection.contains(dn.getName())) { + excludes.add(dn); + } + } + } + LocatedBlock location0 = locations.get(0); return bestNode(location0.getLocations(), excludes); } } } + if (dns == null) { + dns = getDatanodeReport(router); + } + return getRandomDatanode(dns, excludes); } @@ -564,4 +555,28 @@ public Credentials createCredentials( renewer != null? renewer: ugi.getShortUserName()); return c; } + + /** + * Get the datanode report from all namespaces that are registered + * and active in the federation. + * @param router Router from which to get the report. + * @return List of datanodes. + * @throws IOException If it cannot get the RPC Server. + */ + private static DatanodeInfo[] getDatanodeReport( + final Router router) throws IOException { + // We need to get the DNs as a privileged user + final RouterRpcServer rpcServer = getRPCServer(router); + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + RouterRpcServer.setCurrentUser(loginUser); + + try { + return rpcServer.getDatanodeReport(DatanodeReportType.LIVE); + } catch (IOException e) { + LOG.error("Cannot get the datanodes from the RPC server", e); + } finally { + // Reset ugi to remote user for remaining operations. + RouterRpcServer.resetCurrentUser(); + } + } } From 3369ba7b5eea4355487d8961a1f89fee15650407 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Wed, 17 Jun 2020 14:48:54 -0700 Subject: [PATCH 02/12] Update RouterWebHdfsMethods.java --- .../hdfs/server/federation/router/RouterWebHdfsMethods.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java index a8191308cdfdf..a1dcf237d866e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -455,7 +455,8 @@ private DatanodeInfo chooseDatanode(final Router router, final String path, final HttpOpParam.Op op, final long openOffset, final String excludeDatanodes) throws IOException { DatanodeInfo[] dns = null; - + HashSet excludes = new HashSet(); + if (op == GetOpParam.Op.OPEN || op == PostOpParam.Op.APPEND || op == GetOpParam.Op.GETFILECHECKSUM) { From 712835aa8d579f6b9d2c3b4d4798b393d2941657 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Fri, 19 Jun 2020 13:15:50 -0700 Subject: [PATCH 03/12] Update RouterWebHdfsMethods.java --- .../router/RouterWebHdfsMethods.java | 57 ++++++------------- 1 file changed, 17 insertions(+), 40 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java index a1dcf237d866e..39f06a3b66f4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -454,8 +454,24 @@ private URI redirectURI(final Router router, final UserGroupInformation ugi, private DatanodeInfo chooseDatanode(final Router router, final String path, final HttpOpParam.Op op, final long openOffset, final String excludeDatanodes) throws IOException { - DatanodeInfo[] dns = null; + final RouterRpcServer rpcServer = getRPCServer(router); + DatanodeInfo[] dns = null; + try { + dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + } catch (IOException e) { + LOG.error("Cannot get the datanodes from the RPC server", e); + } + HashSet excludes = new HashSet(); + if (excludeDatanodes != null) { + Collection collection = + getTrimmedStringCollection(excludeDatanodes); + for (DatanodeInfo dn : dns) { + if (collection.contains(dn.getName())) { + excludes.add(dn); + } + } + } if (op == GetOpParam.Op.OPEN || op == PostOpParam.Op.APPEND || @@ -479,27 +495,12 @@ private DatanodeInfo chooseDatanode(final Router router, final LocatedBlocks locations = cp.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { - if (excludeDatanodes != null) { - Collection collection = - getTrimmedStringCollection(excludeDatanodes); - dns = getDatanodeReport(router); - for (DatanodeInfo dn : dns) { - if (collection.contains(dn.getName())) { - excludes.add(dn); - } - } - } - LocatedBlock location0 = locations.get(0); return bestNode(location0.getLocations(), excludes); } } } - if (dns == null) { - dns = getDatanodeReport(router); - } - return getRandomDatanode(dns, excludes); } @@ -556,28 +557,4 @@ public Credentials createCredentials( renewer != null? renewer: ugi.getShortUserName()); return c; } - - /** - * Get the datanode report from all namespaces that are registered - * and active in the federation. - * @param router Router from which to get the report. - * @return List of datanodes. - * @throws IOException If it cannot get the RPC Server. - */ - private static DatanodeInfo[] getDatanodeReport( - final Router router) throws IOException { - // We need to get the DNs as a privileged user - final RouterRpcServer rpcServer = getRPCServer(router); - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - RouterRpcServer.setCurrentUser(loginUser); - - try { - return rpcServer.getDatanodeReport(DatanodeReportType.LIVE); - } catch (IOException e) { - LOG.error("Cannot get the datanodes from the RPC server", e); - } finally { - // Reset ugi to remote user for remaining operations. - RouterRpcServer.resetCurrentUser(); - } - } } From 53723474f41259315f1f9919b8c989b73a8b20a3 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Fri, 19 Jun 2020 13:21:58 -0700 Subject: [PATCH 04/12] Update RouterRpcServer.java --- .../federation/router/RouterRpcServer.java | 137 +++++++++++++++++- 1 file changed, 135 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 4f1310bb25911..774dcf2147c45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; +import static org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE; +import static org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT; @@ -41,7 +43,21 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; - +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; @@ -219,6 +235,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, private static final ThreadLocal CUR_USER = new ThreadLocal<>(); + /** DN type -> full DN report. */ + private final LoadingCache dnCache; + /** * Construct a router RPC server. * @@ -361,6 +380,23 @@ public RouterRpcServer(Configuration configuration, Router router, this.nnProto = new RouterNamenodeProtocol(this); this.clientProto = new RouterClientProtocol(conf, this); this.routerProto = new RouterUserProtocol(this); + + long dnCacheExpire = conf.getTimeDuration( + DN_REPORT_CACHE_EXPIRE, + DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS); + this.dnCache = CacheBuilder.newBuilder() + .build(new DatanodeReportCacheLoader()); + + // Actively refresh the dn cache in a configured interval + Executors + .newSingleThreadScheduledExecutor() + .scheduleWithFixedDelay(() -> this.dnCache + .asMap() + .keySet() + .parallelStream() + .forEach((key) -> this.dnCache.refresh(key)), + 0, + dnCacheExpire, TimeUnit.MILLISECONDS); } @Override @@ -868,6 +904,49 @@ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) return clientProto.getDatanodeReport(type); } + /** + * Get the datanode report from cache. + * + * @param type Type of the datanode. + * @return List of datanodes. + * @throws IOException If it cannot get the report. + */ + public DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) + throws IOException { + try { + DatanodeInfo[] dns = this.dnCache.get(type); + if (dns == null) { + LOG.debug("Get null DN report from cache"); + dns = getCachedDatanodeReportImpl(type); + } + return dns; + } catch (ExecutionException e) { + LOG.error("Cannot get the DN report for {}", type, e); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new IOException(cause); + } + } + } + + private DatanodeInfo[] getCachedDatanodeReportImpl + (final DatanodeReportType type) throws IOException{ + // We need to get the DNs as a privileged user + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + RouterRpcServer.setCurrentUser(loginUser); + + try { + DatanodeInfo[] dns = clientProto.getDatanodeReport(type); + LOG.debug("Refresh cached DN report with {} datanodes", dns.length); + return dns; + } finally { + // Reset ugi to remote user for remaining operations. + RouterRpcServer.resetCurrentUser(); + } + } + /** * Get the datanode report with a timeout. * @param type Type of the datanode. @@ -1748,4 +1827,58 @@ public void refreshSuperUserGroupsConfiguration() throws IOException { public String[] getGroupsForUser(String user) throws IOException { return routerProto.getGroupsForUser(user); } -} \ No newline at end of file + + /** + * Deals with loading datanode report into the cache and refresh. + */ + private class DatanodeReportCacheLoader + extends CacheLoader { + + private ListeningExecutorService executorService; + + DatanodeReportCacheLoader() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("DatanodeReport-Cache-Reload") + .setDaemon(true) + .build(); + + // Only use 1 thread to refresh cache. + // With coreThreadCount == maxThreadCount we effectively + // create a fixed size thread pool. As allowCoreThreadTimeOut + // has been set, all threads will die after 60 seconds of non use. + ThreadPoolExecutor parentExecutor = new ThreadPoolExecutor( + 1, + 1, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue(), + threadFactory); + parentExecutor.allowCoreThreadTimeOut(true); + executorService = MoreExecutors.listeningDecorator(parentExecutor); + } + + @Override + public DatanodeInfo[] load(DatanodeReportType type) throws Exception { + return getCachedDatanodeReportImpl(type); + } + + /** + * Override the reload method to provide an asynchronous implementation, + * so that the query will not be slowed down by the cache refresh. It + * will return the old cache value and schedule a background refresh. + */ + @Override + public ListenableFuture reload( + final DatanodeReportType type, DatanodeInfo[] oldValue) + throws Exception { + ListenableFuture listenableFuture = + executorService.submit(new Callable() { + @Override + public DatanodeInfo[] call() throws Exception { + return load(type); + } + }); + return listenableFuture; + } + } +} From 66b2b33ab275a6819ca1494ec3b272392134d5e1 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Fri, 19 Jun 2020 13:45:50 -0700 Subject: [PATCH 05/12] Update RouterRpcServer.java --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 774dcf2147c45..e74c04721d504 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -918,6 +918,7 @@ public DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) if (dns == null) { LOG.debug("Get null DN report from cache"); dns = getCachedDatanodeReportImpl(type); + this.dnCache.put(type, dns); } return dns; } catch (ExecutionException e) { From 801216bc6b2ab05b0be9f4fc82bb6714c8120200 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Mon, 22 Jun 2020 12:01:57 -0700 Subject: [PATCH 06/12] Update RouterRpcServer.java --- .../hdfs/server/federation/router/RouterRpcServer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index e74c04721d504..7af5e3fde2cb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; -import static org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE; -import static org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT; @@ -28,6 +26,8 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT; import java.io.FileNotFoundException; import java.io.IOException; @@ -383,7 +383,7 @@ public RouterRpcServer(Configuration configuration, Router router, long dnCacheExpire = conf.getTimeDuration( DN_REPORT_CACHE_EXPIRE, - DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS); + DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); this.dnCache = CacheBuilder.newBuilder() .build(new DatanodeReportCacheLoader()); From 6bbbf46eb96ab315daf6d00d87c716e9543d04b8 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Mon, 22 Jun 2020 13:47:08 -0700 Subject: [PATCH 07/12] Remove whitespace-eol --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 7af5e3fde2cb2..a358b6e0a97c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1828,7 +1828,7 @@ public void refreshSuperUserGroupsConfiguration() throws IOException { public String[] getGroupsForUser(String user) throws IOException { return routerProto.getGroupsForUser(user); } - + /** * Deals with loading datanode report into the cache and refresh. */ From 33f104b75cdb61cd453c07cc356116e4231edb42 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Wed, 24 Jun 2020 00:01:23 -0700 Subject: [PATCH 08/12] Address comments --- .../federation/router/RouterRpcServer.java | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index a358b6e0a97c7..e5f7796f5e03d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -46,9 +46,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheBuilder; @@ -911,7 +909,7 @@ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) * @return List of datanodes. * @throws IOException If it cannot get the report. */ - public DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) + DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) throws IOException { try { DatanodeInfo[] dns = this.dnCache.get(type); @@ -933,7 +931,7 @@ public DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) } private DatanodeInfo[] getCachedDatanodeReportImpl - (final DatanodeReportType type) throws IOException{ + (final DatanodeReportType type) throws IOException { // We need to get the DNs as a privileged user UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); RouterRpcServer.setCurrentUser(loginUser); @@ -1843,19 +1841,8 @@ private class DatanodeReportCacheLoader .setDaemon(true) .build(); - // Only use 1 thread to refresh cache. - // With coreThreadCount == maxThreadCount we effectively - // create a fixed size thread pool. As allowCoreThreadTimeOut - // has been set, all threads will die after 60 seconds of non use. - ThreadPoolExecutor parentExecutor = new ThreadPoolExecutor( - 1, - 1, - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - threadFactory); - parentExecutor.allowCoreThreadTimeOut(true); - executorService = MoreExecutors.listeningDecorator(parentExecutor); + executorService = MoreExecutors.listeningDecorator( + Executors.newSingleThreadExecutor(threadFactory)); } @Override @@ -1872,14 +1859,12 @@ public DatanodeInfo[] load(DatanodeReportType type) throws Exception { public ListenableFuture reload( final DatanodeReportType type, DatanodeInfo[] oldValue) throws Exception { - ListenableFuture listenableFuture = - executorService.submit(new Callable() { - @Override - public DatanodeInfo[] call() throws Exception { - return load(type); - } - }); - return listenableFuture; + return executorService.submit(new Callable() { + @Override + public DatanodeInfo[] call() throws Exception { + return load(type); + } + }); } } } From 8970a0ed39d645fe47c2ec31ce5a39a490f7989e Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Wed, 24 Jun 2020 11:00:20 -0700 Subject: [PATCH 09/12] Fix checkstyle --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index e5f7796f5e03d..5905a1dbbd370 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -930,8 +930,8 @@ DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) } } - private DatanodeInfo[] getCachedDatanodeReportImpl - (final DatanodeReportType type) throws IOException { + private DatanodeInfo[] getCachedDatanodeReportImpl( + final DatanodeReportType type) throws IOException { // We need to get the DNs as a privileged user UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); RouterRpcServer.setCurrentUser(loginUser); From 7eeb2b527dbce451e3006b5f3b0698132ad8b2ad Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Sun, 5 Jul 2020 01:19:15 -0700 Subject: [PATCH 10/12] Add testGetCachedDatanodeReport --- .../federation/router/TestRouterRpc.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 2b7669d26af42..05254358a9ac3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1777,6 +1777,43 @@ public void testgetGroupsForUser() throws IOException { assertArrayEquals(group, result); } + @Test + public void testGetCachedDatanodeReport() throws Exception { + final DatanodeInfo[] datanodeReport = + routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + + // We should have 12 nodes in total + assertEquals(12, datanodeReport.length); + + // We should be caching this information + DatanodeInfo[] datanodeReport1 = + routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + assertArrayEquals(datanodeReport1, datanodeReport); + + // Add one datanode + getCluster().getCluster().startDataNodes(getCluster().getCluster().getConfiguration(0), + 1, true, null, null, null); + + // We wait until the cached value is updated + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + DatanodeInfo[] dn = null; + try { + dn = routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + } catch (IOException ex) { + LOG.error("Error on getDatanodeReport"); + } + return !Arrays.equals(datanodeReport, dn); + } + }, 500, 5 * 1000); + + // The cache should be updated now + final DatanodeInfo[] datanodeReport2 = + routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + assertFalse(Arrays.equals(datanodeReport, datanodeReport2)); + } + /** * Check the erasure coding policies in the Router and the Namenode. * @return The erasure coding policies. @@ -1814,4 +1851,4 @@ private DFSClient getFileDFSClient(final String path) { } return null; } -} \ No newline at end of file +} From f4d4b9ac945f5983c7d7bb9f5295594b39959980 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Sun, 5 Jul 2020 18:38:54 -0700 Subject: [PATCH 11/12] Update TestRouterRpc.java --- .../federation/router/TestRouterRpc.java | 46 +++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 05254358a9ac3..154fd01369d6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; @@ -216,6 +217,12 @@ public static void globalSetUp() throws Exception { // Register and verify all NNs with all routers cluster.registerNamenodes(); cluster.waitNamenodeRegistration(); + + // We decrease the DN heartbeat expire interval to make them dead faster + cluster.getCluster().getNamesystem(0).getBlockManager() + .getDatanodeManager().setHeartbeatExpireInterval(5000); + cluster.getCluster().getNamesystem(1).getBlockManager() + .getDatanodeManager().setHeartbeatExpireInterval(5000); } @AfterClass @@ -1779,20 +1786,22 @@ public void testgetGroupsForUser() throws IOException { @Test public void testGetCachedDatanodeReport() throws Exception { + RouterRpcServer rpcServer = router.getRouter().getRpcServer(); final DatanodeInfo[] datanodeReport = - routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); // We should have 12 nodes in total assertEquals(12, datanodeReport.length); // We should be caching this information DatanodeInfo[] datanodeReport1 = - routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); assertArrayEquals(datanodeReport1, datanodeReport); // Add one datanode - getCluster().getCluster().startDataNodes(getCluster().getCluster().getConfiguration(0), - 1, true, null, null, null); + MiniDFSCluster cluster = getCluster().getCluster(); + cluster.startDataNodes( + cluster.getConfiguration(0), 1, true, null, null, null); // We wait until the cached value is updated GenericTestUtils.waitFor(new Supplier() { @@ -1800,9 +1809,9 @@ public void testGetCachedDatanodeReport() throws Exception { public Boolean get() { DatanodeInfo[] dn = null; try { - dn = routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); } catch (IOException ex) { - LOG.error("Error on getDatanodeReport"); + LOG.error("Error on getCachedDatanodeReport"); } return !Arrays.equals(datanodeReport, dn); } @@ -1810,8 +1819,29 @@ public Boolean get() { // The cache should be updated now final DatanodeInfo[] datanodeReport2 = - routerProtocol.getDatanodeReport(DatanodeReportType.ALL); - assertFalse(Arrays.equals(datanodeReport, datanodeReport2)); + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + assertEquals(datanodeReport.length + 1, datanodeReport2.length); + + // Remove the DN we just added + cluster.stopDataNode(0); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + DatanodeInfo[] dn = null; + try { + dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + } catch (IOException ex) { + LOG.error("Error on getCachedDatanodeReport"); + } + return datanodeReport.length == dn.length; + } + }, 500, 5 * 1000); + + // The cache should be updated now + final DatanodeInfo[] datanodeReport3 = + rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); + assertEquals(datanodeReport.length, datanodeReport3.length); } /** From dfb68119968e1601c7e0dbc0e7701f49d37c88a9 Mon Sep 17 00:00:00 2001 From: Ye Ni <141253+NickyYe@users.noreply.github.com> Date: Sun, 5 Jul 2020 22:54:51 -0700 Subject: [PATCH 12/12] Update TestRouterRpc.java --- .../server/federation/router/TestRouterRpc.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 154fd01369d6b..b9a17ac9bdd5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; @@ -1798,10 +1799,9 @@ public void testGetCachedDatanodeReport() throws Exception { rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); assertArrayEquals(datanodeReport1, datanodeReport); - // Add one datanode - MiniDFSCluster cluster = getCluster().getCluster(); - cluster.startDataNodes( - cluster.getConfiguration(0), 1, true, null, null, null); + // Stop one datanode + MiniDFSCluster miniDFSCluster = getCluster().getCluster(); + DataNodeProperties dnprop = miniDFSCluster.stopDataNode(0); // We wait until the cached value is updated GenericTestUtils.waitFor(new Supplier() { @@ -1820,10 +1820,11 @@ public Boolean get() { // The cache should be updated now final DatanodeInfo[] datanodeReport2 = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); - assertEquals(datanodeReport.length + 1, datanodeReport2.length); + assertEquals(datanodeReport.length - 1, datanodeReport2.length); - // Remove the DN we just added - cluster.stopDataNode(0); + // Restart the DN we just stopped + miniDFSCluster.restartDataNode(dnprop); + miniDFSCluster.waitActive(); GenericTestUtils.waitFor(new Supplier() { @Override