diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 7917f834937fc..a0c67958b053e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -128,7 +128,7 @@ public class RouterRpcClient { /** Connection pool to the Namenodes per user for performance. */ private final ConnectionManager connectionManager; /** Service to run asynchronous calls. */ - private final ThreadPoolExecutor executorService; + private ThreadPoolExecutor executorService; /** Retry policy for router -> NN communication. */ private final RetryPolicy retryPolicy; /** Optional perf monitor. */ @@ -184,24 +184,7 @@ public RouterRpcClient(Configuration conf, Router router, this.connectionManager.start(); this.routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); - - int numThreads = conf.getInt( - RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, - RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("RPC Router Client-%d") - .build(); - BlockingQueue workQueue; - if (conf.getBoolean( - RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, - RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) { - workQueue = new ArrayBlockingQueue<>(numThreads); - } else { - workQueue = new LinkedBlockingQueue<>(); - } - this.executorService = new ThreadPoolExecutor(numThreads, numThreads, - 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); - + initConcurrentCallExecutorService(conf); this.rpcMonitor = monitor; int maxFailoverAttempts = conf.getInt( @@ -245,6 +228,25 @@ public RouterRpcClient(Configuration conf, Router router, this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>(); } + protected void initConcurrentCallExecutorService(Configuration conf) { + int numThreads = conf.getInt( + RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, + RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("RPC Router Client-%d") + .build(); + BlockingQueue workQueue; + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, + RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) { + workQueue = new ArrayBlockingQueue<>(numThreads); + } else { + workQueue = new LinkedBlockingQueue<>(); + } + this.executorService = new ThreadPoolExecutor(numThreads, numThreads, + 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); + } + /** * Get the configuration for the RPC client. It takes the Router * configuration and transforms it into regular RPC Client configuration. @@ -278,6 +280,15 @@ public ActiveNamenodeResolver getNamenodeResolver() { return this.namenodeResolver; } + /** + * Get the executor service used by invoking concurrent calls. + * @return the executor service. + */ + @VisibleForTesting + public ThreadPoolExecutor getExecutorService() { + return executorService; + } + /** * Shutdown the client. */ @@ -364,9 +375,11 @@ public String getJSON() { */ public String getAsyncCallerPoolJson() { final Map info = new LinkedHashMap<>(); - info.put("active", executorService.getActiveCount()); - info.put("total", executorService.getPoolSize()); - info.put("max", executorService.getMaximumPoolSize()); + if (executorService != null) { + info.put("active", executorService.getActiveCount()); + info.put("total", executorService.getPoolSize()); + info.put("max", executorService.getMaximumPoolSize()); + } return JSON.toString(info); } @@ -2027,7 +2040,6 @@ protected boolean shouldRotateCache(IOException ioe) { return isUnavailableException(ioe); } - /** * The {@link ExecutionStatus} class is a utility class used to track the status of * execution operations performed by the {@link RouterRpcClient}. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index ea2d3b40ca527..13f6dd3b952eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -116,6 +116,11 @@ public RouterAsyncRpcClient(Configuration conf, this.rpcMonitor = monitor; } + @Override + protected void initConcurrentCallExecutorService(Configuration conf) { + // No need to initialize the thread pool for concurrent call. + } + /** * Invoke method in all locations and return success if any succeeds. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 98551fea3d157..ddbfdc9727c3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -2426,4 +2426,9 @@ public void testCallerContextNotResetByAsyncHandler() throws IOException { // The audit log should not contain async:true. assertFalse(auditLog.getOutput().contains("async:true")); } + + @Test + public void testConcurrentCallExecutorInitial() { + assertNotNull(router.getRouterRpcClient().getExecutorService()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java index bfc72b2c47324..42ab49affd0f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java @@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNull; /** * Testing the asynchronous RPC functionality of the router. @@ -83,4 +84,9 @@ public void testgetGroupsForUser() throws Exception { assertArrayEquals(group, result); } + @Test + @Override + public void testConcurrentCallExecutorInitial() { + assertNull(rndRouter.getRouterRpcClient().getExecutorService()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java index ab82afba371b4..5783107d927ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNull; /** * Testing the asynchronous RPC functionality of the router with multiple mounts. @@ -70,4 +71,11 @@ public void testgetGroupsForUser() throws Exception { String[] result = syncReturn(String[].class); assertArrayEquals(group, result); } + + @Test + @Override + public void testConcurrentCallExecutorInitial() { + MiniRouterDFSCluster.RouterContext rndRouter = super.getRouterContext(); + assertNull(rndRouter.getRouterRpcClient().getExecutorService()); + } }