diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 8576eae160e76..254740470354f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -17,9 +17,38 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +<<<<<<<< HEAD:hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY; import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus; +======== +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Array; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +>>>>>>>> c804b284628 (HDFS-13215. RBF: Move Router to its own module.):hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; @@ -185,11 +214,90 @@ public class RouterClientProtocol implements ClientProtocol { this.superGroup = conf.get( DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); +<<<<<<<< HEAD:hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java this.erasureCoding = new ErasureCoding(rpcServer); this.storagePolicy = new RouterStoragePolicy(rpcServer); this.snapshotProto = new RouterSnapshot(rpcServer); this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); this.securityManager = rpcServer.getRouterSecurityManager(); +======== + + // RPC server settings + int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, + DFS_ROUTER_HANDLER_COUNT_DEFAULT); + + int readerCount = this.conf.getInt(DFS_ROUTER_READER_COUNT_KEY, + DFS_ROUTER_READER_COUNT_DEFAULT); + + int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY, + DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT); + + // Override Hadoop Common IPC setting + int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY, + DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT); + this.conf.setInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, + readerQueueSize); + + RPC.setProtocolEngine(this.conf, ClientNamenodeProtocolPB.class, + ProtobufRpcEngine.class); + + ClientNamenodeProtocolServerSideTranslatorPB + clientProtocolServerTranslator = + new ClientNamenodeProtocolServerSideTranslatorPB(this); + BlockingService clientNNPbService = ClientNamenodeProtocol + .newReflectiveBlockingService(clientProtocolServerTranslator); + + InetSocketAddress confRpcAddress = conf.getSocketAddr( + RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, + RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_DEFAULT, + RBFConfigKeys.DFS_ROUTER_RPC_PORT_DEFAULT); + LOG.info("RPC server binding to {} with {} handlers for Router {}", + confRpcAddress, handlerCount, this.router.getRouterId()); + + this.rpcServer = new RPC.Builder(this.conf) + .setProtocol(ClientNamenodeProtocolPB.class) + .setInstance(clientNNPbService) + .setBindAddress(confRpcAddress.getHostName()) + .setPort(confRpcAddress.getPort()) + .setNumHandlers(handlerCount) + .setnumReaders(readerCount) + .setQueueSizePerHandler(handlerQueueSize) + .setVerbose(false) + .build(); + // We don't want the server to log the full stack trace for some exceptions + this.rpcServer.addTerseExceptions( + RemoteException.class, + StandbyException.class, + SafeModeException.class, + FileNotFoundException.class, + FileAlreadyExistsException.class, + AccessControlException.class, + LeaseExpiredException.class, + NotReplicatedYetException.class, + IOException.class); + + // The RPC-server port can be ephemeral... ensure we have the correct info + InetSocketAddress listenAddress = this.rpcServer.getListenerAddress(); + this.rpcAddress = new InetSocketAddress( + confRpcAddress.getHostName(), listenAddress.getPort()); + + // Create metrics monitor + Class rpcMonitorClass = this.conf.getClass( + RBFConfigKeys.DFS_ROUTER_METRICS_CLASS, + RBFConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT, + RouterRpcMonitor.class); + this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); + + // Create the client + this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), + this.namenodeResolver, this.rpcMonitor); + + // Initialize modules + this.quotaCall = new Quota(this.router, this); + this.erasureCoding = new ErasureCoding(this); +>>>>>>>> c804b284628 (HDFS-13215. RBF: Move Router to its own module.):hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 896d08f2c49b6..f5e6bdca51265 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -28,8 +28,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +<<<<<<<< HEAD:hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; +======== +>>>>>>>> c804b284628 (HDFS-13215. RBF: Move Router to its own module.):hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered; @@ -45,7 +48,10 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; +<<<<<<<< HEAD:hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE; +======== +>>>>>>>> c804b284628 (HDFS-13215. RBF: Move Router to its own module.):hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS; import static org.junit.Assert.assertEquals; diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index f1ac43ed5b38a..48fb45c63a5b1 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -379,6 +379,11 @@ hadoop-hdfs-rbf ${hadoop.version} + + org.apache.hadoop + hadoop-hdfs-rbf + ${project.version} + org.apache.hadoop hadoop-hdfs