8585import  org .apache .hadoop .hdfs .server .federation .resolver .MountTableResolver ;
8686import  org .apache .hadoop .hdfs .server .federation .resolver .RemoteLocation ;
8787import  org .apache .hadoop .hdfs .server .federation .resolver .RouterResolveException ;
88+ import  org .apache .hadoop .hdfs .server .federation .router .async .AsyncErasureCoding ;
89+ import  org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncCacheAdmin ;
90+ import  org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncSnapshot ;
91+ import  org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncStoragePolicy ;
8892import  org .apache .hadoop .hdfs .server .federation .router .security .RouterSecurityManager ;
8993import  org .apache .hadoop .hdfs .server .federation .store .records .MountTable ;
9094import  org .apache .hadoop .hdfs .server .namenode .NameNode ;
@@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol {
166170  /** Router security manager to handle token operations. */ 
167171  private  RouterSecurityManager  securityManager  = null ;
168172
169-   RouterClientProtocol (Configuration  conf , RouterRpcServer  rpcServer ) {
173+   public   RouterClientProtocol (Configuration  conf , RouterRpcServer  rpcServer ) {
170174    this .rpcServer  = rpcServer ;
171175    this .rpcClient  = rpcServer .getRPCClient ();
172176    this .subclusterResolver  = rpcServer .getSubclusterResolver ();
@@ -194,10 +198,17 @@ public class RouterClientProtocol implements ClientProtocol {
194198    this .superGroup  = conf .get (
195199        DFSConfigKeys .DFS_PERMISSIONS_SUPERUSERGROUP_KEY ,
196200        DFSConfigKeys .DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT );
197-     this .erasureCoding  = new  ErasureCoding (rpcServer );
198-     this .storagePolicy  = new  RouterStoragePolicy (rpcServer );
199-     this .snapshotProto  = new  RouterSnapshot (rpcServer );
200-     this .routerCacheAdmin  = new  RouterCacheAdmin (rpcServer );
201+     if  (rpcServer .isAsync ()) {
202+       this .erasureCoding  = new  AsyncErasureCoding (rpcServer );
203+       this .storagePolicy  = new  RouterAsyncStoragePolicy (rpcServer );
204+       this .snapshotProto  = new  RouterAsyncSnapshot (rpcServer );
205+       this .routerCacheAdmin  = new  RouterAsyncCacheAdmin (rpcServer );
206+     } else  {
207+       this .erasureCoding  = new  ErasureCoding (rpcServer );
208+       this .storagePolicy  = new  RouterStoragePolicy (rpcServer );
209+       this .snapshotProto  = new  RouterSnapshot (rpcServer );
210+       this .routerCacheAdmin  = new  RouterCacheAdmin (rpcServer );
211+     }
201212    this .securityManager  = rpcServer .getRouterSecurityManager ();
202213    this .rbfRename  = new  RouterFederationRename (rpcServer , conf );
203214    this .defaultNameServiceEnabled  = conf .getBoolean (
@@ -347,7 +358,7 @@ protected static boolean isUnavailableSubclusterException(
347358   * @throws IOException If this path is not fault tolerant or the exception 
348359   *                     should not be retried (e.g., NSQuotaExceededException). 
349360   */ 
350-   private  List <RemoteLocation > checkFaultTolerantRetry (
361+   protected  List <RemoteLocation > checkFaultTolerantRetry (
351362      final  RemoteMethod  method , final  String  src , final  IOException  ioe ,
352363      final  RemoteLocation  excludeLoc , final  List <RemoteLocation > locations )
353364          throws  IOException  {
@@ -820,7 +831,7 @@ public void renewLease(String clientName, List<String> namespaces)
820831  /** 
821832   * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results. 
822833   */ 
823-   private  static  class  GetListingComparator 
834+   protected  static  class  GetListingComparator 
824835      implements  Comparator <byte []>, Serializable  {
825836    @ Override 
826837    public  int  compare (byte [] o1 , byte [] o2 ) {
@@ -831,6 +842,10 @@ public int compare(byte[] o1, byte[] o2) {
831842  private  static  GetListingComparator  comparator  =
832843      new  GetListingComparator ();
833844
845+   public  static  GetListingComparator  getComparator () {
846+     return  comparator ;
847+   }
848+ 
834849  @ Override 
835850  public  DirectoryListing  getListing (String  src , byte [] startAfter ,
836851      boolean  needLocation ) throws  IOException  {
@@ -1104,7 +1119,7 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
11041119    return  mergeDtanodeStorageReport (dnSubcluster );
11051120  }
11061121
1107-   private  DatanodeStorageReport [] mergeDtanodeStorageReport (
1122+   protected  DatanodeStorageReport [] mergeDtanodeStorageReport (
11081123      Map <String , DatanodeStorageReport []> dnSubcluster ) {
11091124    // Avoid repeating machines in multiple subclusters 
11101125    Map <String , DatanodeStorageReport > datanodesMap  = new  LinkedHashMap <>();
@@ -1335,20 +1350,23 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio
13351350  }
13361351
13371352  /** 
1338-    * Get all the locations of the path for {@link this #getContentSummary(String)}. 
1353+    * Get all the locations of the path for {@link RouterClientProtocol #getContentSummary(String)}. 
13391354   * For example, there are some mount points: 
1340-    *   /a -> ns0 -> /a 
1341-    *   /a/b -> ns0 -> /a/b 
1342-    *   /a/b/c -> ns1 -> /a/b/c 
1355+    * <p> 
1356+    *   /a - [ns0 - /a] 
1357+    *   /a/b - [ns0 - /a/b] 
1358+    *   /a/b/c - [ns1 - /a/b/c] 
1359+    * </p> 
13431360   * When the path is '/a', the result of locations should be 
13441361   * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] 
13451362   * When the path is '/b', will throw NoLocationException. 
1363+    * 
13461364   * @param path the path to get content summary 
13471365   * @return one list contains all the remote location 
1348-    * @throws IOException 
1366+    * @throws IOException if an I/O error occurs  
13491367   */ 
13501368  @ VisibleForTesting 
1351-   List <RemoteLocation > getLocationsForContentSummary (String  path ) throws  IOException  {
1369+   protected   List <RemoteLocation > getLocationsForContentSummary (String  path ) throws  IOException  {
13521370    // Try to get all the locations of the path. 
13531371    final  Map <String , List <RemoteLocation >> ns2Locations  = getAllLocations (path );
13541372    if  (ns2Locations .isEmpty ()) {
@@ -2039,7 +2057,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
20392057   *         replacement value. 
20402058   * @throws IOException If the dst paths could not be determined. 
20412059   */ 
2042-   private  RemoteParam  getRenameDestinations (
2060+   protected  RemoteParam  getRenameDestinations (
20432061      final  List <RemoteLocation > srcLocations ,
20442062      final  List <RemoteLocation > dstLocations ) throws  IOException  {
20452063
@@ -2087,7 +2105,7 @@ private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
20872105   * @param summaries Collection of individual summaries. 
20882106   * @return Aggregated content summary. 
20892107   */ 
2090-   private  ContentSummary  aggregateContentSummary (
2108+   protected  ContentSummary  aggregateContentSummary (
20912109      Collection <ContentSummary > summaries ) {
20922110    if  (summaries .size () == 1 ) {
20932111      return  summaries .iterator ().next ();
@@ -2142,7 +2160,7 @@ private ContentSummary aggregateContentSummary(
21422160   *         everywhere. 
21432161   * @throws IOException If all the locations throw an exception. 
21442162   */ 
2145-   private  HdfsFileStatus  getFileInfoAll (final  List <RemoteLocation > locations ,
2163+   protected  HdfsFileStatus  getFileInfoAll (final  List <RemoteLocation > locations ,
21462164      final  RemoteMethod  method ) throws  IOException  {
21472165    return  getFileInfoAll (locations , method , -1 );
21482166  }
@@ -2157,7 +2175,7 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21572175   *         everywhere. 
21582176   * @throws IOException If all the locations throw an exception. 
21592177   */ 
2160-   private  HdfsFileStatus  getFileInfoAll (final  List <RemoteLocation > locations ,
2178+   protected  HdfsFileStatus  getFileInfoAll (final  List <RemoteLocation > locations ,
21612179      final  RemoteMethod  method , long  timeOutMs ) throws  IOException  {
21622180
21632181    // Get the file info from everybody 
@@ -2186,12 +2204,11 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21862204
21872205  /** 
21882206   * Get the permissions for the parent of a child with given permissions. 
2189-    * Add implicit u+wx permission for parent. This is based on 
2190-    * @{FSDirMkdirOp#addImplicitUwx}. 
2207+    * Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx. 
21912208   * @param mask The permission mask of the child. 
21922209   * @return The permission mask of the parent. 
21932210   */ 
2194-   private  static  FsPermission  getParentPermission (final  FsPermission  mask ) {
2211+   protected  static  FsPermission  getParentPermission (final  FsPermission  mask ) {
21952212    FsPermission  ret  = new  FsPermission (
21962213        mask .getUserAction ().or (FsAction .WRITE_EXECUTE ),
21972214        mask .getGroupAction (),
@@ -2208,7 +2225,7 @@ private static FsPermission getParentPermission(final FsPermission mask) {
22082225   * @return New HDFS file status representing a mount point. 
22092226   */ 
22102227  @ VisibleForTesting 
2211-   HdfsFileStatus  getMountPointStatus (
2228+   protected   HdfsFileStatus  getMountPointStatus (
22122229      String  name , int  childrenNum , long  date ) {
22132230    return  getMountPointStatus (name , childrenNum , date , true );
22142231  }
@@ -2223,7 +2240,7 @@ HdfsFileStatus getMountPointStatus(
22232240   * @return New HDFS file status representing a mount point. 
22242241   */ 
22252242  @ VisibleForTesting 
2226-   HdfsFileStatus  getMountPointStatus (
2243+   protected   HdfsFileStatus  getMountPointStatus (
22272244      String  name , int  childrenNum , long  date , boolean  setPath ) {
22282245    long  modTime  = date ;
22292246    long  accessTime  = date ;
@@ -2300,7 +2317,7 @@ HdfsFileStatus getMountPointStatus(
23002317   * @param path Name of the path to start checking dates from. 
23012318   * @return Map with the modification dates for all sub-entries. 
23022319   */ 
2303-   private  Map <String , Long > getMountPointDates (String  path ) {
2320+   protected  Map <String , Long > getMountPointDates (String  path ) {
23042321    Map <String , Long > ret  = new  TreeMap <>();
23052322    if  (subclusterResolver  instanceof  MountTableResolver ) {
23062323      try  {
@@ -2361,9 +2378,15 @@ private long getModifiedTime(Map<String, Long> ret, String path,
23612378  }
23622379
23632380  /** 
2364-    * Get listing on remote locations. 
2381+    * Get a partial listing of the indicated directory. 
2382+    * 
2383+    * @param src the directory name 
2384+    * @param startAfter the name to start after 
2385+    * @param needLocation if blockLocations need to be returned 
2386+    * @return a partial listing starting after startAfter 
2387+    * @throws IOException if other I/O error occurred 
23652388   */ 
2366-   private  List <RemoteResult <RemoteLocation , DirectoryListing >> getListingInt (
2389+   protected  List <RemoteResult <RemoteLocation , DirectoryListing >> getListingInt (
23672390      String  src , byte [] startAfter , boolean  needLocation ) throws  IOException  {
23682391    try  {
23692392      List <RemoteLocation > locations  =
@@ -2400,9 +2423,9 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
24002423   * @param startAfter starting listing from client, used to define listing 
24012424   *                   start boundary 
24022425   * @param remainingEntries how many entries left from subcluster 
2403-    * @return 
2426+    * @return true if should add mount point, otherwise false;  
24042427   */ 
2405-   private  static  boolean  shouldAddMountPoint (
2428+   protected  static  boolean  shouldAddMountPoint (
24062429      byte [] mountPoint , byte [] lastEntry , byte [] startAfter ,
24072430      int  remainingEntries ) {
24082431    if  (comparator .compare (mountPoint , startAfter ) > 0  &&
@@ -2425,7 +2448,7 @@ private static boolean shouldAddMountPoint(
24252448   * @throws IOException if unable to get the file status. 
24262449   */ 
24272450  @ VisibleForTesting 
2428-   boolean  isMultiDestDirectory (String  src ) throws  IOException  {
2451+   protected   boolean  isMultiDestDirectory (String  src ) throws  IOException  {
24292452    try  {
24302453      if  (rpcServer .isPathAll (src )) {
24312454        List <RemoteLocation > locations ;
@@ -2449,4 +2472,56 @@ boolean isMultiDestDirectory(String src) throws IOException {
24492472  public  int  getRouterFederationRenameCount () {
24502473    return  rbfRename .getRouterFederationRenameCount ();
24512474  }
2475+ 
2476+   public  RouterRpcServer  getRpcServer () {
2477+     return  rpcServer ;
2478+   }
2479+ 
2480+   public  RouterRpcClient  getRpcClient () {
2481+     return  rpcClient ;
2482+   }
2483+ 
2484+   public  FileSubclusterResolver  getSubclusterResolver () {
2485+     return  subclusterResolver ;
2486+   }
2487+ 
2488+   public  ActiveNamenodeResolver  getNamenodeResolver () {
2489+     return  namenodeResolver ;
2490+   }
2491+ 
2492+   public  long  getServerDefaultsLastUpdate () {
2493+     return  serverDefaultsLastUpdate ;
2494+   }
2495+ 
2496+   public  long  getServerDefaultsValidityPeriod () {
2497+     return  serverDefaultsValidityPeriod ;
2498+   }
2499+ 
2500+   public  boolean  isAllowPartialList () {
2501+     return  allowPartialList ;
2502+   }
2503+ 
2504+   public  long  getMountStatusTimeOut () {
2505+     return  mountStatusTimeOut ;
2506+   }
2507+ 
2508+   public  String  getSuperUser () {
2509+     return  superUser ;
2510+   }
2511+ 
2512+   public  String  getSuperGroup () {
2513+     return  superGroup ;
2514+   }
2515+ 
2516+   public  RouterStoragePolicy  getStoragePolicy () {
2517+     return  storagePolicy ;
2518+   }
2519+ 
2520+   public  void  setServerDefaultsLastUpdate (long  serverDefaultsLastUpdate ) {
2521+     this .serverDefaultsLastUpdate  = serverDefaultsLastUpdate ;
2522+   }
2523+ 
2524+   public  RouterFederationRename  getRbfRename () {
2525+     return  rbfRename ;
2526+   }
24522527}
0 commit comments