Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public class RouterRpcClient {
/** Connection pool to the Namenodes per user for performance. */
private final ConnectionManager connectionManager;
/** Service to run asynchronous calls. */
private final ThreadPoolExecutor executorService;
private ThreadPoolExecutor executorService;
/** Retry policy for router -> NN communication. */
private final RetryPolicy retryPolicy;
/** Optional perf monitor. */
Expand Down Expand Up @@ -184,24 +184,7 @@ public RouterRpcClient(Configuration conf, Router router,
this.connectionManager.start();
this.routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);

int numThreads = conf.getInt(
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d")
.build();
BlockingQueue<Runnable> workQueue;
if (conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
workQueue = new ArrayBlockingQueue<>(numThreads);
} else {
workQueue = new LinkedBlockingQueue<>();
}
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);

initConcurrentCallExecutorService(conf);
this.rpcMonitor = monitor;

int maxFailoverAttempts = conf.getInt(
Expand Down Expand Up @@ -245,6 +228,25 @@ public RouterRpcClient(Configuration conf, Router router,
this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>();
}

protected void initConcurrentCallExecutorService(Configuration conf) {
int numThreads = conf.getInt(
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d")
.build();
BlockingQueue<Runnable> workQueue;
if (conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
workQueue = new ArrayBlockingQueue<>(numThreads);
} else {
workQueue = new LinkedBlockingQueue<>();
}
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
}

/**
* Get the configuration for the RPC client. It takes the Router
* configuration and transforms it into regular RPC Client configuration.
Expand Down Expand Up @@ -278,6 +280,15 @@ public ActiveNamenodeResolver getNamenodeResolver() {
return this.namenodeResolver;
}

/**
* Get the executor service used by invoking concurrent calls.
* @return the executor service.
*/
@VisibleForTesting
public ThreadPoolExecutor getExecutorService() {
return executorService;
}

/**
* Shutdown the client.
*/
Expand Down Expand Up @@ -364,9 +375,11 @@ public String getJSON() {
*/
public String getAsyncCallerPoolJson() {
final Map<String, Integer> info = new LinkedHashMap<>();
info.put("active", executorService.getActiveCount());
info.put("total", executorService.getPoolSize());
info.put("max", executorService.getMaximumPoolSize());
if (executorService != null) {
info.put("active", executorService.getActiveCount());
info.put("total", executorService.getPoolSize());
info.put("max", executorService.getMaximumPoolSize());
}
return JSON.toString(info);
}

Expand Down Expand Up @@ -2027,7 +2040,6 @@ protected boolean shouldRotateCache(IOException ioe) {
return isUnavailableException(ioe);
}


/**
* The {@link ExecutionStatus} class is a utility class used to track the status of
* execution operations performed by the {@link RouterRpcClient}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public RouterAsyncRpcClient(Configuration conf,
this.rpcMonitor = monitor;
}

@Override
protected void initConcurrentCallExecutorService(Configuration conf) {
// No need to initialize the thread pool for concurrent call.
}

/**
* Invoke method in all locations and return success if any succeeds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2426,4 +2426,9 @@ public void testCallerContextNotResetByAsyncHandler() throws IOException {
// The audit log should not contain async:true.
assertFalse(auditLog.getOutput().contains("async:true"));
}

@Test
public void testConcurrentCallExecutorInitial() {
assertNotNull(router.getRouterRpcClient().getExecutorService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

/**
* Testing the asynchronous RPC functionality of the router.
Expand Down Expand Up @@ -83,4 +84,9 @@ public void testgetGroupsForUser() throws Exception {
assertArrayEquals(group, result);
}

@Test
@Override
public void testConcurrentCallExecutorInitial() {
assertNull(rndRouter.getRouterRpcClient().getExecutorService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

/**
* Testing the asynchronous RPC functionality of the router with multiple mounts.
Expand Down Expand Up @@ -70,4 +71,11 @@ public void testgetGroupsForUser() throws Exception {
String[] result = syncReturn(String[].class);
assertArrayEquals(group, result);
}

@Test
@Override
public void testConcurrentCallExecutorInitial() {
MiniRouterDFSCluster.RouterContext rndRouter = super.getRouterContext();
assertNull(rndRouter.getRouterRpcClient().getExecutorService());
}
}