2020import  java .io .ByteArrayInputStream ;
2121import  java .io .IOException ;
2222import  java .util .ArrayList ;
23+ import  java .util .Collection ;
2324import  java .util .Collections ;
2425import  java .util .HashMap ;
2526import  java .util .HashSet ;
3334import  org .apache .hadoop .hbase .DoNotRetryIOException ;
3435import  org .apache .hadoop .hbase .NamespaceDescriptor ;
3536import  org .apache .hadoop .hbase .ServerName ;
37+ import  org .apache .hadoop .hbase .TableDescriptors ;
3638import  org .apache .hadoop .hbase .TableName ;
3739import  org .apache .hadoop .hbase .client .AsyncClusterConnection ;
3840import  org .apache .hadoop .hbase .client .AsyncTable ;
7678import  org .slf4j .LoggerFactory ;
7779
7880import  org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
81+ import  org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
7982import  org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
8083import  org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
8184import  org .apache .hbase .thirdparty .com .google .common .collect .Sets ;
104107final  class  RSGroupInfoManagerImpl  implements  RSGroupInfoManager  {
105108  private  static  final  Logger  LOG  = LoggerFactory .getLogger (RSGroupInfoManagerImpl .class );
106109
107-   private  static  final  String  REASSIGN_WAIT_INTERVAL_KEY  = "hbase.rsgroup.reassign.wait" ;
108-   private  static  final  long  DEFAULT_REASSIGN_WAIT_INTERVAL  = 30  * 1000L ;
109- 
110110  // Assigned before user tables 
111111  @ VisibleForTesting 
112112  static  final  TableName  RSGROUP_TABLE_NAME  =
@@ -120,6 +120,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
120120  @ VisibleForTesting 
121121  static  final  byte [] META_QUALIFIER_BYTES  = Bytes .toBytes ("i" );
122122
123+   @ VisibleForTesting 
124+   static  final  String  MIGRATE_THREAD_NAME  = "Migrate-RSGroup-Tables" ;
125+ 
123126  private  static  final  byte [] ROW_KEY  = { 0  };
124127
125128  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 
@@ -140,7 +143,30 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
140143
141144  // There two Maps are immutable and wholesale replaced on each modification 
142145  // so are safe to access concurrently. See class comment. 
143-   private  volatile  Map <String , RSGroupInfo > rsGroupMap  = Collections .emptyMap ();
146+   private  static  final  class  RSGroupInfoHolder  {
147+     final  ImmutableMap <String , RSGroupInfo > groupName2Group ;
148+     final  ImmutableMap <TableName , RSGroupInfo > tableName2Group ;
149+ 
150+     RSGroupInfoHolder () {
151+       this (Collections .emptyMap ());
152+     }
153+ 
154+     RSGroupInfoHolder (Map <String , RSGroupInfo > rsGroupMap ) {
155+       ImmutableMap .Builder <String , RSGroupInfo > group2Name2GroupBuilder  = ImmutableMap .builder ();
156+       ImmutableMap .Builder <TableName , RSGroupInfo > tableName2GroupBuilder  = ImmutableMap .builder ();
157+       rsGroupMap .forEach ((groupName , rsGroupInfo ) -> {
158+         group2Name2GroupBuilder .put (groupName , rsGroupInfo );
159+         if  (!groupName .equals (RSGroupInfo .DEFAULT_GROUP )) {
160+           rsGroupInfo .getTables ()
161+             .forEach (tableName  -> tableName2GroupBuilder .put (tableName , rsGroupInfo ));
162+         }
163+       });
164+       this .groupName2Group  = group2Name2GroupBuilder .build ();
165+       this .tableName2Group  = tableName2GroupBuilder .build ();
166+     }
167+   }
168+ 
169+   private  volatile  RSGroupInfoHolder  holder  = new  RSGroupInfoHolder ();
144170
145171  private  final  MasterServices  masterServices ;
146172  private  final  AsyncClusterConnection  conn ;
@@ -160,9 +186,10 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException
160186
161187
162188  private  synchronized  void  init () throws  IOException  {
163-     refresh ();
189+     refresh (false );
164190    serverEventsListenerThread .start ();
165191    masterServices .getServerManager ().registerListener (serverEventsListenerThread );
192+     migrate ();
166193  }
167194
168195  static  RSGroupInfoManager  getInstance (MasterServices  master ) throws  IOException  {
@@ -179,6 +206,7 @@ public void start() {
179206  @ Override 
180207  public  synchronized  void  addRSGroup (RSGroupInfo  rsGroupInfo ) throws  IOException  {
181208    checkGroupName (rsGroupInfo .getName ());
209+     Map <String , RSGroupInfo > rsGroupMap  = holder .groupName2Group ;
182210    if  (rsGroupMap .get (rsGroupInfo .getName ()) != null  ||
183211      rsGroupInfo .getName ().equals (RSGroupInfo .DEFAULT_GROUP )) {
184212      throw  new  DoNotRetryIOException ("Group already exists: "  + rsGroupInfo .getName ());
@@ -235,7 +263,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
235263      }
236264      dst .addServer (el );
237265    }
238-     Map <String , RSGroupInfo > newGroupMap  = Maps .newHashMap (rsGroupMap );
266+     Map <String , RSGroupInfo > newGroupMap  = Maps .newHashMap (holder . groupName2Group );
239267    newGroupMap .put (src .getName (), src );
240268    newGroupMap .put (dst .getName (), dst );
241269    flushConfig (newGroupMap );
@@ -244,7 +272,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
244272
245273  @ Override 
246274  public  RSGroupInfo  getRSGroupOfServer (Address  serverHostPort ) throws  IOException  {
247-     for  (RSGroupInfo  info  : rsGroupMap .values ()) {
275+     for  (RSGroupInfo  info  : holder . groupName2Group .values ()) {
248276      if  (info .containsServer (serverHostPort )) {
249277        return  info ;
250278      }
@@ -254,11 +282,12 @@ public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException
254282
255283  @ Override 
256284  public  RSGroupInfo  getRSGroup (String  groupName ) {
257-     return  rsGroupMap .get (groupName );
285+     return  holder . groupName2Group .get (groupName );
258286  }
259287
260288  @ Override 
261289  public  synchronized  void  removeRSGroup (String  groupName ) throws  IOException  {
290+     Map <String , RSGroupInfo > rsGroupMap  = holder .groupName2Group ;
262291    if  (!rsGroupMap .containsKey (groupName ) || groupName .equals (RSGroupInfo .DEFAULT_GROUP )) {
263292      throw  new  DoNotRetryIOException (
264293        "Group "  + groupName  + " does not exist or is a reserved "  + "group" );
@@ -270,7 +299,7 @@ public synchronized void removeRSGroup(String groupName) throws IOException {
270299
271300  @ Override 
272301  public  List <RSGroupInfo > listRSGroups () {
273-     return  Lists .newArrayList (rsGroupMap .values ());
302+     return  Lists .newArrayList (holder . groupName2Group .values ());
274303  }
275304
276305  @ Override 
@@ -298,7 +327,7 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
298327    }
299328
300329    if  (rsGroupInfos .size () > 0 ) {
301-       Map <String , RSGroupInfo > newGroupMap  = Maps .newHashMap (rsGroupMap );
330+       Map <String , RSGroupInfo > newGroupMap  = Maps .newHashMap (holder . groupName2Group );
302331      newGroupMap .putAll (rsGroupInfos );
303332      flushConfig (newGroupMap );
304333    }
@@ -349,9 +378,90 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
349378    return  RSGroupInfoList ;
350379  }
351380
352-   @ Override 
353-   public  void  refresh () throws  IOException  {
354-     refresh (false );
381+   private  void  migrate (Collection <RSGroupInfo > groupList ) {
382+     TableDescriptors  tds  = masterServices .getTableDescriptors ();
383+     for  (RSGroupInfo  groupInfo  : groupList ) {
384+       if  (groupInfo .getName ().equals (RSGroupInfo .DEFAULT_GROUP )) {
385+         continue ;
386+       }
387+       SortedSet <TableName > failedTables  = new  TreeSet <>();
388+       for  (TableName  tableName  : groupInfo .getTables ()) {
389+         LOG .debug ("Migrating {} in group {}" , tableName , groupInfo .getName ());
390+         TableDescriptor  oldTd ;
391+         try  {
392+           oldTd  = tds .get (tableName );
393+         } catch  (IOException  e ) {
394+           LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
395+           failedTables .add (tableName );
396+           continue ;
397+         }
398+         if  (oldTd  == null ) {
399+           continue ;
400+         }
401+         if  (oldTd .getRegionServerGroup ().isPresent ()) {
402+           // either we have already migrated it or that user has set the rs group using the new 
403+           // code which will set the group directly on table descriptor, skip. 
404+           LOG .debug ("Skip migrating {} since it is already in group {}" , tableName ,
405+             oldTd .getRegionServerGroup ().get ());
406+           continue ;
407+         }
408+         TableDescriptor  newTd  = TableDescriptorBuilder .newBuilder (oldTd )
409+           .setRegionServerGroup (groupInfo .getName ()).build ();
410+         // This is a bit tricky. Since we know that the region server group config in 
411+         // TableDescriptor will only be used at master side, it is fine to just update the table 
412+         // descriptor on file system and also the cache, without reopening all the regions. This 
413+         // will be much faster than the normal modifyTable. And when upgrading, we will update 
414+         // master first and then region server, so after all the region servers has been reopened, 
415+         // the new TableDescriptor will be loaded. 
416+         try  {
417+           tds .add (newTd );
418+         } catch  (IOException  e ) {
419+           LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
420+           failedTables .add (tableName );
421+           continue ;
422+         }
423+       }
424+       LOG .debug ("Done migrating {}, failed tables {}" , groupInfo .getName (), failedTables );
425+       synchronized  (RSGroupInfoManagerImpl .this ) {
426+         Map <String , RSGroupInfo > rsGroupMap  = holder .groupName2Group ;
427+         RSGroupInfo  currentInfo  = rsGroupMap .get (groupInfo .getName ());
428+         if  (currentInfo  != null ) {
429+           RSGroupInfo  newInfo  =
430+             new  RSGroupInfo (currentInfo .getName (), currentInfo .getServers (), failedTables );
431+           Map <String , RSGroupInfo > newGroupMap  = new  HashMap <>(rsGroupMap );
432+           newGroupMap .put (groupInfo .getName (), newInfo );
433+           try  {
434+             flushConfig (newGroupMap );
435+           } catch  (IOException  e ) {
436+             LOG .warn ("Failed to persist rs group {}" , newInfo .getName (), e );
437+           }
438+         }
439+       }
440+     }
441+   }
442+ 
443+   // Migrate the table rs group info from RSGroupInfo into the table descriptor 
444+   // Notice that we do not want to block the initialize so this will be done in background, and 
445+   // during the migrating, the rs group info maybe incomplete and cause region to be misplaced. 
446+   private  void  migrate () {
447+     Thread  migrateThread  = new  Thread (MIGRATE_THREAD_NAME ) {
448+ 
449+       @ Override 
450+       public  void  run () {
451+         LOG .info ("Start migrating table rs group config" );
452+         while  (!masterServices .isStopped ()) {
453+           Collection <RSGroupInfo > groups  = holder .groupName2Group .values ();
454+           boolean  hasTables  = groups .stream ().anyMatch (r  -> !r .getTables ().isEmpty ());
455+           if  (!hasTables ) {
456+             break ;
457+           }
458+           migrate (groups );
459+         }
460+         LOG .info ("Done migrating table rs group info" );
461+       }
462+     };
463+     migrateThread .setDaemon (true );
464+     migrateThread .start ();
355465  }
356466
357467  /** 
@@ -381,7 +491,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException {
381491      newGroupMap .put (group .getName (), group );
382492    }
383493    resetRSGroupMap (newGroupMap );
384-     updateCacheOfRSGroups (rsGroupMap .keySet ());
494+     updateCacheOfRSGroups (newGroupMap .keySet ());
385495  }
386496
387497  private  void  flushConfigTable (Map <String , RSGroupInfo > groupMap ) throws  IOException  {
@@ -411,20 +521,20 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
411521  }
412522
413523  private  synchronized  void  flushConfig () throws  IOException  {
414-     flushConfig (this . rsGroupMap );
524+     flushConfig (holder . groupName2Group );
415525  }
416526
417527  private  synchronized  void  flushConfig (Map <String , RSGroupInfo > newGroupMap ) throws  IOException  {
418528    // For offline mode persistence is still unavailable 
419529    // We're refreshing in-memory state but only for servers in default group 
420530    if  (!isOnline ()) {
421-       if  (newGroupMap  == this . rsGroupMap ) {
531+       if  (newGroupMap  == holder . groupName2Group ) {
422532        // When newGroupMap is this.rsGroupMap itself, 
423533        // do not need to check default group and other groups as followed 
424534        return ;
425535      }
426536
427-       Map <String , RSGroupInfo > oldGroupMap  = Maps .newHashMap (rsGroupMap );
537+       Map <String , RSGroupInfo > oldGroupMap  = Maps .newHashMap (holder . groupName2Group );
428538      RSGroupInfo  oldDefaultGroup  = oldGroupMap .remove (RSGroupInfo .DEFAULT_GROUP );
429539      RSGroupInfo  newDefaultGroup  = newGroupMap .remove (RSGroupInfo .DEFAULT_GROUP );
430540      if  (!oldGroupMap .equals (newGroupMap ) /* compare both tables and servers in other groups */  ||
@@ -438,7 +548,7 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
438548
439549      // Refresh rsGroupMap 
440550      // according to the inputted newGroupMap (an updated copy of rsGroupMap) 
441-       rsGroupMap  = newGroupMap ;
551+       this . holder  = new   RSGroupInfoHolder ( newGroupMap ) ;
442552
443553      // Do not need to update tableMap 
444554      // because only the update on servers in default group is allowed above, 
@@ -495,8 +605,7 @@ private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOE
495605   * Make changes visible. Caller must be synchronized on 'this'. 
496606   */ 
497607  private  void  resetRSGroupMap (Map <String , RSGroupInfo > newRSGroupMap ) {
498-     // Make maps Immutable. 
499-     this .rsGroupMap  = Collections .unmodifiableMap (newRSGroupMap );
608+     this .holder  = new  RSGroupInfoHolder (newRSGroupMap );
500609  }
501610
502611  /** 
@@ -549,6 +658,7 @@ private SortedSet<Address> getDefaultServers() throws IOException {
549658  // Called by ServerEventsListenerThread. Synchronize on this because redoing 
550659  // the rsGroupMap then writing it out. 
551660  private  synchronized  void  updateDefaultServers (SortedSet <Address > servers ) {
661+     Map <String , RSGroupInfo > rsGroupMap  = holder .groupName2Group ;
552662    RSGroupInfo  info  = rsGroupMap .get (RSGroupInfo .DEFAULT_GROUP );
553663    RSGroupInfo  newInfo  = new  RSGroupInfo (info .getName (), servers );
554664    HashMap <String , RSGroupInfo > newGroupMap  = Maps .newHashMap (rsGroupMap );
@@ -647,6 +757,8 @@ private boolean waitForGroupTableOnline() {
647757          online  = true ;
648758          // flush any inconsistencies between ZK and HTable 
649759          RSGroupInfoManagerImpl .this .flushConfig ();
760+           // migrate after we are online. 
761+           migrate ();
650762          return  true ;
651763        } catch  (Exception  e ) {
652764          LOG .warn ("Failed to perform check" , e );
@@ -725,4 +837,10 @@ private void checkGroupName(String groupName) throws ConstraintException {
725837      throw  new  ConstraintException ("RSGroup name should only contain alphanumeric characters" );
726838    }
727839  }
840+ 
841+ 
842+   @ Override 
843+   public  RSGroupInfo  getRSGroupForTable (TableName  tableName ) throws  IOException  {
844+     return  holder .tableName2Group .get (tableName );
845+   }
728846}
0 commit comments