From 668145c3a5c55b817c7030e1c295ab8b63280702 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 2 Dec 2021 19:38:47 +0530 Subject: [PATCH 1/2] HDFS-16369. RBF: Fix the retry logic of RouterRpcServer#invokeAtAvailableNs. --- .../federation/router/RouterRpcServer.java | 69 +++++++++---------- ...MultipleDestinationMountTableResolver.java | 17 ++++- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 48d4ff0d6c555..f0c14454834af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -44,6 +44,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -671,8 +672,8 @@ static String getMethodName() { /** * Invokes the method at default namespace, if default namespace is not - * available then at the first available namespace. - * If the namespace is unavailable, retry once with other namespace. + * available then at the other available namespaces. + * If the namespace is unavailable, retry with other namespaces. * @param expected return type. * @param method the remote method. * @return the response received after invoking method. @@ -681,28 +682,29 @@ static String getMethodName() { T invokeAtAvailableNs(RemoteMethod method, Class clazz) throws IOException { String nsId = subclusterResolver.getDefaultNamespace(); - // If default Ns is not present return result from first namespace. Set nss = namenodeResolver.getNamespaces(); - try { - if (!nsId.isEmpty()) { + // If no namespace is available, then throw this IOException. + IOException io = new IOException("No namespace available."); + // If default Ns is present return result from that namespace. + if (!nsId.isEmpty()) { + try { return rpcClient.invokeSingle(nsId, method, clazz); + } catch (IOException ioe) { + if (!clientProto.isUnavailableSubclusterException(ioe)) { + LOG.debug("{} exception cannot be retried", + ioe.getClass().getSimpleName()); + throw ioe; + } + // Remove the already tried namespace. + nss.removeIf(n -> n.getNameserviceId().equals(nsId)); + return invokeOnNs(method, clazz, io, nss); } - // If no namespace is available, throw IOException. - IOException io = new IOException("No namespace available."); - return invokeOnNs(method, clazz, io, nss); - } catch (IOException ioe) { - if (!clientProto.isUnavailableSubclusterException(ioe)) { - LOG.debug("{} exception cannot be retried", - ioe.getClass().getSimpleName()); - throw ioe; - } - Set nssWithoutFailed = getNameSpaceInfo(nss, nsId); - return invokeOnNs(method, clazz, ioe, nssWithoutFailed); } + return invokeOnNs(method, clazz, io, nss); } /** - * Invoke the method on first available namespace, + * Invoke the method sequentially on available namespaces, * throw no namespace available exception, if no namespaces are available. * @param method the remote method. * @param clazz Class for the return type. @@ -716,26 +718,23 @@ T invokeOnNs(RemoteMethod method, Class clazz, IOException ioe, if (nss.isEmpty()) { throw ioe; } - String nsId = nss.iterator().next().getNameserviceId(); - return rpcClient.invokeSingle(nsId, method, clazz); - } - - /** - * Get set of namespace info's removing the already invoked namespaceinfo. - * @param nss List of namespaces in the federation. - * @param nsId Already invoked namespace id. - * @return List of name spaces in the federation on - * removing the already invoked namespaceinfo. - */ - private static Set getNameSpaceInfo( - final Set nss, final String nsId) { - Set namespaceInfos = new HashSet<>(); - for (FederationNamespaceInfo ns : nss) { - if (!nsId.equals(ns.getNameserviceId())) { - namespaceInfos.add(ns); + for (Iterator it = nss.iterator(); it + .hasNext(); ) { + String nsId = it.next().getNameserviceId(); + LOG.debug("Invoking {} on namespace {}", method, nsId); + try { + return rpcClient.invokeSingle(nsId, method, clazz); + } catch (IOException e) { + LOG.debug("Failed to invoke {} on namespace {}", method, nsId, e); + // Ignore the exception and try on other namespace, if the tried + // namespace is unavailable, else throw the received exception. + if (!clientProto.isUnavailableSubclusterException(e)) { + throw e; + } } } - return namespaceInfos; + // Couldn't get a response from any of the namespace, throw ioe. + throw ioe; } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java index ee92ec4f771b4..a72b84b21b95e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java @@ -63,26 +63,30 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; /** * Tests router rpc with multiple destination mount table resolver. */ public class TestRouterRPCMultipleDestinationMountTableResolver { - private static final List NS_IDS = Arrays.asList("ns0", "ns1"); + private static final List NS_IDS = Arrays.asList("ns0", "ns1", "ns2"); private static StateStoreDFSCluster cluster; private static RouterContext routerContext; private static MountTableResolver resolver; private static DistributedFileSystem nnFs0; private static DistributedFileSystem nnFs1; + private static DistributedFileSystem nnFs2; private static DistributedFileSystem routerFs; private static RouterRpcServer rpcServer; @@ -90,7 +94,7 @@ public class TestRouterRPCMultipleDestinationMountTableResolver { public static void setUp() throws Exception { // Build and start a federated cluster - cluster = new StateStoreDFSCluster(false, 2, + cluster = new StateStoreDFSCluster(false, 3, MultipleDestinationMountTableResolver.class); Configuration routerConf = new RouterConfigBuilder().stateStore().admin().quota().rpc().build(); @@ -111,6 +115,8 @@ public static void setUp() throws Exception { .getNamenode(cluster.getNameservices().get(0), null).getFileSystem(); nnFs1 = (DistributedFileSystem) cluster .getNamenode(cluster.getNameservices().get(1), null).getFileSystem(); + nnFs2 = (DistributedFileSystem) cluster + .getNamenode(cluster.getNameservices().get(2), null).getFileSystem(); routerFs = (DistributedFileSystem) routerContext.getFileSystem(); rpcServer =routerContext.getRouter().getRpcServer(); } @@ -668,6 +674,7 @@ public void testInvokeAtAvailableNs() throws IOException { // Make one subcluster unavailable. MiniDFSCluster dfsCluster = cluster.getCluster(); dfsCluster.shutdownNameNode(0); + dfsCluster.shutdownNameNode(1); try { // Verify that #invokeAtAvailableNs works by calling #getServerDefaults. RemoteMethod method = new RemoteMethod("getServerDefaults"); @@ -675,7 +682,8 @@ public void testInvokeAtAvailableNs() throws IOException { rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class); assertNotNull(serverDefaults); } finally { - dfsCluster.restartNameNode(0); + dfsCluster.restartNameNode(0, false); + dfsCluster.restartNameNode(1); } } @@ -893,6 +901,9 @@ private static FileSystem getFileSystem(final String nsId) { if (nsId.equals("ns1")) { return nnFs1; } + if (nsId.equals("ns2")) { + return nnFs2; + } return null; } From be36f312e9274c730d235b1181df095d8c055335 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Fri, 3 Dec 2021 10:22:06 +0530 Subject: [PATCH 2/2] Fix Comments & Checkstyles. --- .../hdfs/server/federation/router/RouterRpcServer.java | 7 ++----- ...TestRouterRPCMultipleDestinationMountTableResolver.java | 3 --- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index f0c14454834af..2b6c4a1f2f4c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -43,8 +43,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; -import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -718,9 +716,8 @@ T invokeOnNs(RemoteMethod method, Class clazz, IOException ioe, if (nss.isEmpty()) { throw ioe; } - for (Iterator it = nss.iterator(); it - .hasNext(); ) { - String nsId = it.next().getNameserviceId(); + for (FederationNamespaceInfo fnInfo : nss) { + String nsId = fnInfo.getNameserviceId(); LOG.debug("Invoking {} on namespace {}", method, nsId); try { return rpcClient.invokeSingle(nsId, method, clazz); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java index a72b84b21b95e..aa29e8d15e55c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java @@ -63,17 +63,14 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; /** * Tests router rpc with multiple destination mount table resolver.