diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..f23568d4c8b9f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterAsyncRpcFairnessPolicyController.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. + DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys. + DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; + +/** + * When router async rpc enabled, it is recommended to use this fairness controller. + */ +public class RouterAsyncRpcFairnessPolicyController extends + AbstractRouterRpcFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterAsyncRpcFairnessPolicyController.class); + + public static final String INIT_MSG = "Max async call permits per nameservice: %d"; + + public RouterAsyncRpcFairnessPolicyController(Configuration conf) { + init(conf); + } + + public void init(Configuration conf) throws IllegalArgumentException { + super.init(conf); + + int maxAsyncCallPermit = conf.getInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, + DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT); + if (maxAsyncCallPermit <= 0) { + maxAsyncCallPermit = DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; + } + LOG.info(String.format(INIT_MSG, maxAsyncCallPermit)); + + // Get all name services configured. + Set allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); + + for (String nsId : allConfiguredNS) { + LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, nsId); + insertNameServiceWithPermits(nsId, maxAsyncCallPermit); + logAssignment(nsId, maxAsyncCallPermit); + } + // Avoid NPE when router async rpc disable. + insertNameServiceWithPermits(CONCURRENT_NS, maxAsyncCallPermit); + LOG.info("Dedicated permits {} for ns {} ", maxAsyncCallPermit, CONCURRENT_NS); + } + + private static void logAssignment(String nsId, int count) { + LOG.info("Assigned {} permits to nsId {} ", count, nsId); + } + + @Override + public boolean acquirePermit(String nsId) { + if (nsId.equals(CONCURRENT_NS)) { + return true; + } + return super.acquirePermit(nsId); + } + + @Override + public void releasePermit(String nsId) { + if (nsId.equals(CONCURRENT_NS)) { + return; + } + super.releasePermit(nsId); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 955d98cfc852c..cedcddfdeb862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -88,6 +88,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY = FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count"; public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "max.asynccall.permit"; + public static final int DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT = 20000; public static final String DFS_ROUTER_METRICS_ENABLE = FEDERATION_ROUTER_PREFIX + "metrics.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c7c3699f33ec7..7917f834937fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1024,7 +1024,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(nsId, ugi, method, controller); + acquirePermit(nsId, ugi, method.getMethodName(), controller); try { boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); List nns = getOrderedNamenodes(nsId, isObserverRead); @@ -1199,7 +1199,7 @@ public RemoteResult invokeSequential( boolean isObserverRead = isObserverReadEligible(ns, m); List namenodes = getOrderedNamenodes(ns, isObserverRead); - acquirePermit(ns, ugi, remoteMethod, controller); + acquirePermit(ns, ugi, remoteMethod.getMethodName(), controller); try { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); @@ -1579,7 +1579,7 @@ protected static Map postProcessResul return invokeSingle(locations.iterator().next(), method); } RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(CONCURRENT_NS, ugi, method, controller); + acquirePermit(CONCURRENT_NS, ugi, method.getMethodName(), controller); List orderedLocations = new ArrayList<>(); List> callables = new ArrayList<>(); @@ -1758,7 +1758,7 @@ public List> invokeSingl final List namenodes = getOrderedNamenodes(ns, isObserverRead); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); + acquirePermit(ns, ugi, method.getMethodName(), controller); try { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); @@ -1829,12 +1829,12 @@ private String getNameserviceForBlockPoolId(final String bpId) * * @param nsId Identifier of the block pool. * @param ugi UserGroupIdentifier associated with the user. - * @param m Remote method that needs to be invoked. + * @param methodName The name of remote method that needs to be invoked. * @param controller fairness policy controller to acquire permit from * @throws IOException If permit could not be acquired for the nsId. */ protected void acquirePermit(final String nsId, final UserGroupInformation ugi, - final RemoteMethod m, RouterRpcFairnessPolicyController controller) + final String methodName, RouterRpcFairnessPolicyController controller) throws IOException { if (controller != null) { if (!controller.acquirePermit(nsId)) { @@ -1845,7 +1845,7 @@ protected void acquirePermit(final String nsId, final UserGroupInformation ugi, } incrRejectedPermitForNs(nsId); LOG.debug("Permit denied for ugi: {} for method: {}", - ugi, m.getMethodName()); + ugi, methodName); String msg = "Router " + router.getRouterId() + " is overloaded for NS: " + nsId; @@ -1880,7 +1880,7 @@ protected void releasePermit(final String nsId, final UserGroupInformation ugi, return routerRpcFairnessPolicyController; } - private void incrRejectedPermitForNs(String ns) { + protected void incrRejectedPermitForNs(String ns) { rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); } @@ -1889,7 +1889,7 @@ public Long getRejectedPermitForNs(String ns) { rejectedPermitsPerNs.get(ns).longValue() : 0L; } - private void incrAcceptedPermitForNs(String ns) { + protected void incrAcceptedPermitForNs(String ns) { acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index c214adf1f2abb..ea2d3b40ca527 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -58,7 +58,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor; @@ -178,8 +177,14 @@ public Object invokeMethod( namenodes.toString(), params); } threadLocalContext.transfer(); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(nsid, ugi, method.getName(), controller); invokeMethodAsync(ugi, (List) namenodes, useObserver, protocol, method, params); + asyncFinally(object -> { + releasePermit(nsid, ugi, method, controller); + return object; + }); }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, router.getRpcServer().getRouterAsyncHandlerDefaultExecutor())); return null; @@ -227,7 +232,7 @@ private void invokeMethodAsync( connection[0] = getConnection(ugi, nsId, rpcAddress, protocol); NameNodeProxiesClient.ProxyAndInfo client = connection[0].getClient(); invoke(namenode, status.isShouldUseObserver(), 0, method, - client.getProxy(), params); + client.getProxy(), params); asyncApply(res -> { status.setComplete(true); postProcessResult(method, status, namenode, nsId, client); @@ -363,7 +368,6 @@ public RemoteResult invokeSequential( Class expectedResultClass, Object expectedResultValue) throws IOException { - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = remoteMethod.getMethod(); List thrownExceptions = new ArrayList<>(); @@ -378,7 +382,6 @@ public RemoteResult invokeSequential( boolean isObserverRead = isObserverReadEligible(ns, m); List namenodes = getOrderedNamenodes(ns, isObserverRead); - acquirePermit(ns, ugi, remoteMethod, controller); asyncTry(() -> { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); @@ -419,10 +422,6 @@ public RemoteResult invokeSequential( } return ret; }, Exception.class); - asyncFinally(ret -> { - releasePermit(ns, ugi, remoteMethod, controller); - return ret; - }); }); asyncApply(result -> { if (status.isComplete()) { @@ -498,7 +497,6 @@ public Map invokeConcurrent( protected List> getRemoteResults( RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller, List orderedLocations, List> callables) throws IOException { - final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = method.getMethod(); final CompletableFuture[] futures = new CompletableFuture[callables.size()]; @@ -523,8 +521,6 @@ protected List> getRemot LOG.error("Unexpected error while invoking API: {}", e.getMessage()); throw warpCompletionException(new IOException( "Unexpected error while invoking API " + e.getMessage(), e)); - } finally { - releasePermit(CONCURRENT_NS, ugi, method, controller); } })); return asyncReturn(List.class); @@ -553,8 +549,6 @@ public List> invokeSingl boolean isObserverRead = isObserverReadEligible(ns, m); final List namenodes = getOrderedNamenodes(ns, isObserverRead); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); asyncTry(() -> { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); @@ -567,10 +561,6 @@ public List> invokeSingl asyncCatch((o, ioe) -> { throw processException(ioe, location); }, IOException.class); - asyncFinally(o -> { - releasePermit(ns, ugi, method, controller); - return o; - }); return asyncReturn(List.class); } @@ -589,21 +579,13 @@ public List> invokeSingl public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(nsId, ugi, method, controller); - asyncTry(() -> { - boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); - List nns = getOrderedNamenodes(nsId, isObserverRead); - RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); - Class proto = method.getProtocol(); - Method m = method.getMethod(); - Object[] params = method.getParams(loc); - invokeMethod(ugi, nns, isObserverRead, proto, m, params); - }); - asyncFinally(o -> { - releasePermit(nsId, ugi, method, controller); - return o; - }); + boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); + List nns = getOrderedNamenodes(nsId, isObserverRead); + RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); + Class proto = method.getProtocol(); + Method m = method.getMethod(); + Object[] params = method.getParams(loc); + invokeMethod(ugi, nns, isObserverRead, proto, m, params); return null; } @@ -627,4 +609,21 @@ public T invokeSingle( invokeSequential(locations, remoteMethod); return asyncReturn(clazz); } + + /** + * Release permit for specific nsId after processing against downstream + * nsId is completed. + * @param nsId Identifier of the block pool. + * @param ugi UserGroupIdentifier associated with the user. + * @param m Remote method that needs to be invoked. + * @param controller fairness policy controller to release permit from + */ + protected void releasePermit(final String nsId, final UserGroupInformation ugi, + final Method m, RouterRpcFairnessPolicyController controller) { + if (controller != null) { + controller.releasePermit(nsId); + LOG.trace("Permit released for ugi: {} for method: {}", ugi, + m.getName()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 46d273ab522bd..470dc61e8eb41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -152,6 +152,15 @@ + + dfs.federation.router.async.rpc.max.asynccall.permit + 20000 + + Maximum number of asynchronous RPC requests the Router can send to + one downstream nameservice. + + + dfs.federation.router.connection.creator.queue-size 100 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..e0e49636fda4e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterAsyncRpcFairnessPolicyController.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test functionality of {@link RouterAsyncRpcFairnessPolicyController). + */ +public class TestRouterAsyncRpcFairnessPolicyController { + + private static String nameServices = + "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2"; + private static int perNsPermits = 30; + + @Test + public void testHandlerAllocationEqualAssignment() { + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController + = getFairnessPolicyController(perNsPermits); + verifyHandlerAllocation(routerRpcFairnessPolicyController); + } + + @Test + public void testAcquireTimeout() { + Configuration conf = createConf(perNsPermits); + conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + + // Ns1 should have number of perNsPermits permits allocated. + for (int i = 0; i < perNsPermits; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + } + long acquireBeginTimeMs = Time.monotonicNow(); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs; + + // There are some other operations, so acquireTimeMs >= 100ms. + assertTrue(acquireTimeMs >= 100); + } + + @Test + public void testAllocationSuccessfullyWithZeroHandlers() { + Configuration conf = createConf(0); + verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT); + } + + @Test + public void testAllocationSuccessfullyWithNegativePermits() { + Configuration conf = createConf(-1); + verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT); + } + + @Test + public void testGetAvailableHandlerOnPerNs() { + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController + = getFairnessPolicyController(perNsPermits); + assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":30}", + routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); + routerRpcFairnessPolicyController.acquirePermit("ns1"); + assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":29}", + routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); + } + + @Test + public void testGetAvailableHandlerOnPerNsForNoFairness() { + Configuration conf = new Configuration(); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + assertEquals("N/A", + routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); + } + + private void verifyInstantiationStatus(Configuration conf, int permits) { + GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer + .captureLogs(LoggerFactory.getLogger( + RouterAsyncRpcFairnessPolicyController.class)); + try { + FederationUtil.newFairnessPolicyController(conf); + } catch (IllegalArgumentException e) { + // Ignore the exception as it is expected here. + } + String infoMsg = String.format( + RouterAsyncRpcFairnessPolicyController.INIT_MSG, permits); + assertTrue("Should contain info message: " + infoMsg, + logs.getOutput().contains(infoMsg)); + } + + private RouterRpcFairnessPolicyController getFairnessPolicyController( + int asyncCallPermits) { + return FederationUtil.newFairnessPolicyController(createConf(asyncCallPermits)); + } + + private void verifyHandlerAllocation( + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) { + for (int i = 0; i < perNsPermits; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + // CONCURRENT_NS doesn't acquire permits. + assertTrue( + routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + + routerRpcFairnessPolicyController.releasePermit("ns1"); + routerRpcFairnessPolicyController.releasePermit("ns2"); + routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS); + + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + + private Configuration createConf(int asyncCallPermits) { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, asyncCallPermits); + conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices); + conf.setClass( + RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); + return conf; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java index 7290c0a0aee81..0e007e6eb729d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpc; import org.apache.hadoop.security.UserGroupInformation; @@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; @@ -42,16 +45,20 @@ public class TestRouterAsyncRpc extends TestRouterRpc { @BeforeClass public static void globalSetUp() throws Exception { - // Start routers with only an RPC service + // Start routers with only an RPC service. Configuration routerConf = new RouterConfigBuilder() .metrics() .rpc() .build(); - // We decrease the DN cache times to make the test faster + // We decrease the DN cache times to make the test faster. routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); - // use async router. + // Use async router. routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + // Use RouterAsyncRpcFairnessPolicyController as the fairness controller. + routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); setUp(routerConf); } @@ -59,7 +66,7 @@ public static void globalSetUp() throws Exception { public void testSetup() throws Exception { super.testSetup(); cluster = super.getCluster(); - // Random router for this test + // Random router for this test. rndRouter = cluster.getRandomRouter(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java index ec1ff0ce97b9f..0a2ee3e03595f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpcMultiDestination; import org.apache.hadoop.security.UserGroupInformation; @@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; @@ -49,6 +52,10 @@ public static void globalSetUp() throws Exception { RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); // use async router. routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + // Use RouterAsyncRpcFairnessPolicyController as the fairness controller. + routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); setUp(routerConf); }