2020import java .io .ByteArrayInputStream ;
2121import java .io .IOException ;
2222import java .util .ArrayList ;
23+ import java .util .Collection ;
2324import java .util .Collections ;
2425import java .util .HashMap ;
2526import java .util .HashSet ;
3334import org .apache .hadoop .hbase .DoNotRetryIOException ;
3435import org .apache .hadoop .hbase .NamespaceDescriptor ;
3536import org .apache .hadoop .hbase .ServerName ;
37+ import org .apache .hadoop .hbase .TableDescriptors ;
3638import org .apache .hadoop .hbase .TableName ;
3739import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
3840import org .apache .hadoop .hbase .client .AsyncTable ;
7678import org .slf4j .LoggerFactory ;
7779
7880import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
81+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
7982import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
8083import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
8184import org .apache .hbase .thirdparty .com .google .common .collect .Sets ;
104107final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
105108 private static final Logger LOG = LoggerFactory .getLogger (RSGroupInfoManagerImpl .class );
106109
107- private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait" ;
108- private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L ;
109-
110110 // Assigned before user tables
111111 @ VisibleForTesting
112112 static final TableName RSGROUP_TABLE_NAME =
@@ -120,6 +120,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
120120 @ VisibleForTesting
121121 static final byte [] META_QUALIFIER_BYTES = Bytes .toBytes ("i" );
122122
123+ @ VisibleForTesting
124+ static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables" ;
125+
123126 private static final byte [] ROW_KEY = { 0 };
124127
125128 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
@@ -140,7 +143,30 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
140143
141144 // There two Maps are immutable and wholesale replaced on each modification
142145 // so are safe to access concurrently. See class comment.
143- private volatile Map <String , RSGroupInfo > rsGroupMap = Collections .emptyMap ();
146+ private static final class RSGroupInfoHolder {
147+ final ImmutableMap <String , RSGroupInfo > groupName2Group ;
148+ final ImmutableMap <TableName , RSGroupInfo > tableName2Group ;
149+
150+ RSGroupInfoHolder () {
151+ this (Collections .emptyMap ());
152+ }
153+
154+ RSGroupInfoHolder (Map <String , RSGroupInfo > rsGroupMap ) {
155+ ImmutableMap .Builder <String , RSGroupInfo > group2Name2GroupBuilder = ImmutableMap .builder ();
156+ ImmutableMap .Builder <TableName , RSGroupInfo > tableName2GroupBuilder = ImmutableMap .builder ();
157+ rsGroupMap .forEach ((groupName , rsGroupInfo ) -> {
158+ group2Name2GroupBuilder .put (groupName , rsGroupInfo );
159+ if (!groupName .equals (RSGroupInfo .DEFAULT_GROUP )) {
160+ rsGroupInfo .getTables ()
161+ .forEach (tableName -> tableName2GroupBuilder .put (tableName , rsGroupInfo ));
162+ }
163+ });
164+ this .groupName2Group = group2Name2GroupBuilder .build ();
165+ this .tableName2Group = tableName2GroupBuilder .build ();
166+ }
167+ }
168+
169+ private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder ();
144170
145171 private final MasterServices masterServices ;
146172 private final AsyncClusterConnection conn ;
@@ -160,9 +186,10 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException
160186
161187
162188 private synchronized void init () throws IOException {
163- refresh ();
189+ refresh (false );
164190 serverEventsListenerThread .start ();
165191 masterServices .getServerManager ().registerListener (serverEventsListenerThread );
192+ migrate ();
166193 }
167194
168195 static RSGroupInfoManager getInstance (MasterServices master ) throws IOException {
@@ -179,6 +206,7 @@ public void start() {
179206 @ Override
180207 public synchronized void addRSGroup (RSGroupInfo rsGroupInfo ) throws IOException {
181208 checkGroupName (rsGroupInfo .getName ());
209+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
182210 if (rsGroupMap .get (rsGroupInfo .getName ()) != null ||
183211 rsGroupInfo .getName ().equals (RSGroupInfo .DEFAULT_GROUP )) {
184212 throw new DoNotRetryIOException ("Group already exists: " + rsGroupInfo .getName ());
@@ -235,7 +263,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
235263 }
236264 dst .addServer (el );
237265 }
238- Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (rsGroupMap );
266+ Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (holder . groupName2Group );
239267 newGroupMap .put (src .getName (), src );
240268 newGroupMap .put (dst .getName (), dst );
241269 flushConfig (newGroupMap );
@@ -244,7 +272,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
244272
245273 @ Override
246274 public RSGroupInfo getRSGroupOfServer (Address serverHostPort ) throws IOException {
247- for (RSGroupInfo info : rsGroupMap .values ()) {
275+ for (RSGroupInfo info : holder . groupName2Group .values ()) {
248276 if (info .containsServer (serverHostPort )) {
249277 return info ;
250278 }
@@ -254,11 +282,12 @@ public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException
254282
255283 @ Override
256284 public RSGroupInfo getRSGroup (String groupName ) {
257- return rsGroupMap .get (groupName );
285+ return holder . groupName2Group .get (groupName );
258286 }
259287
260288 @ Override
261289 public synchronized void removeRSGroup (String groupName ) throws IOException {
290+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
262291 if (!rsGroupMap .containsKey (groupName ) || groupName .equals (RSGroupInfo .DEFAULT_GROUP )) {
263292 throw new DoNotRetryIOException (
264293 "Group " + groupName + " does not exist or is a reserved " + "group" );
@@ -270,7 +299,7 @@ public synchronized void removeRSGroup(String groupName) throws IOException {
270299
271300 @ Override
272301 public List <RSGroupInfo > listRSGroups () {
273- return Lists .newArrayList (rsGroupMap .values ());
302+ return Lists .newArrayList (holder . groupName2Group .values ());
274303 }
275304
276305 @ Override
@@ -298,7 +327,7 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
298327 }
299328
300329 if (rsGroupInfos .size () > 0 ) {
301- Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (rsGroupMap );
330+ Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (holder . groupName2Group );
302331 newGroupMap .putAll (rsGroupInfos );
303332 flushConfig (newGroupMap );
304333 }
@@ -349,9 +378,90 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
349378 return RSGroupInfoList ;
350379 }
351380
352- @ Override
353- public void refresh () throws IOException {
354- refresh (false );
381+ private void migrate (Collection <RSGroupInfo > groupList ) {
382+ TableDescriptors tds = masterServices .getTableDescriptors ();
383+ for (RSGroupInfo groupInfo : groupList ) {
384+ if (groupInfo .getName ().equals (RSGroupInfo .DEFAULT_GROUP )) {
385+ continue ;
386+ }
387+ SortedSet <TableName > failedTables = new TreeSet <>();
388+ for (TableName tableName : groupInfo .getTables ()) {
389+ LOG .debug ("Migrating {} in group {}" , tableName , groupInfo .getName ());
390+ TableDescriptor oldTd ;
391+ try {
392+ oldTd = tds .get (tableName );
393+ } catch (IOException e ) {
394+ LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
395+ failedTables .add (tableName );
396+ continue ;
397+ }
398+ if (oldTd == null ) {
399+ continue ;
400+ }
401+ if (oldTd .getRegionServerGroup ().isPresent ()) {
402+ // either we have already migrated it or that user has set the rs group using the new
403+ // code which will set the group directly on table descriptor, skip.
404+ LOG .debug ("Skip migrating {} since it is already in group {}" , tableName ,
405+ oldTd .getRegionServerGroup ().get ());
406+ continue ;
407+ }
408+ TableDescriptor newTd = TableDescriptorBuilder .newBuilder (oldTd )
409+ .setRegionServerGroup (groupInfo .getName ()).build ();
410+ // This is a bit tricky. Since we know that the region server group config in
411+ // TableDescriptor will only be used at master side, it is fine to just update the table
412+ // descriptor on file system and also the cache, without reopening all the regions. This
413+ // will be much faster than the normal modifyTable. And when upgrading, we will update
414+ // master first and then region server, so after all the region servers has been reopened,
415+ // the new TableDescriptor will be loaded.
416+ try {
417+ tds .add (newTd );
418+ } catch (IOException e ) {
419+ LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
420+ failedTables .add (tableName );
421+ continue ;
422+ }
423+ }
424+ LOG .debug ("Done migrating {}, failed tables {}" , groupInfo .getName (), failedTables );
425+ synchronized (RSGroupInfoManagerImpl .this ) {
426+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
427+ RSGroupInfo currentInfo = rsGroupMap .get (groupInfo .getName ());
428+ if (currentInfo != null ) {
429+ RSGroupInfo newInfo =
430+ new RSGroupInfo (currentInfo .getName (), currentInfo .getServers (), failedTables );
431+ Map <String , RSGroupInfo > newGroupMap = new HashMap <>(rsGroupMap );
432+ newGroupMap .put (groupInfo .getName (), newInfo );
433+ try {
434+ flushConfig (newGroupMap );
435+ } catch (IOException e ) {
436+ LOG .warn ("Failed to persist rs group {}" , newInfo .getName (), e );
437+ }
438+ }
439+ }
440+ }
441+ }
442+
443+ // Migrate the table rs group info from RSGroupInfo into the table descriptor
444+ // Notice that we do not want to block the initialize so this will be done in background, and
445+ // during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
446+ private void migrate () {
447+ Thread migrateThread = new Thread (MIGRATE_THREAD_NAME ) {
448+
449+ @ Override
450+ public void run () {
451+ LOG .info ("Start migrating table rs group config" );
452+ while (!masterServices .isStopped ()) {
453+ Collection <RSGroupInfo > groups = holder .groupName2Group .values ();
454+ boolean hasTables = groups .stream ().anyMatch (r -> !r .getTables ().isEmpty ());
455+ if (!hasTables ) {
456+ break ;
457+ }
458+ migrate (groups );
459+ }
460+ LOG .info ("Done migrating table rs group info" );
461+ }
462+ };
463+ migrateThread .setDaemon (true );
464+ migrateThread .start ();
355465 }
356466
357467 /**
@@ -381,7 +491,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException {
381491 newGroupMap .put (group .getName (), group );
382492 }
383493 resetRSGroupMap (newGroupMap );
384- updateCacheOfRSGroups (rsGroupMap .keySet ());
494+ updateCacheOfRSGroups (newGroupMap .keySet ());
385495 }
386496
387497 private void flushConfigTable (Map <String , RSGroupInfo > groupMap ) throws IOException {
@@ -411,20 +521,20 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
411521 }
412522
413523 private synchronized void flushConfig () throws IOException {
414- flushConfig (this . rsGroupMap );
524+ flushConfig (holder . groupName2Group );
415525 }
416526
417527 private synchronized void flushConfig (Map <String , RSGroupInfo > newGroupMap ) throws IOException {
418528 // For offline mode persistence is still unavailable
419529 // We're refreshing in-memory state but only for servers in default group
420530 if (!isOnline ()) {
421- if (newGroupMap == this . rsGroupMap ) {
531+ if (newGroupMap == holder . groupName2Group ) {
422532 // When newGroupMap is this.rsGroupMap itself,
423533 // do not need to check default group and other groups as followed
424534 return ;
425535 }
426536
427- Map <String , RSGroupInfo > oldGroupMap = Maps .newHashMap (rsGroupMap );
537+ Map <String , RSGroupInfo > oldGroupMap = Maps .newHashMap (holder . groupName2Group );
428538 RSGroupInfo oldDefaultGroup = oldGroupMap .remove (RSGroupInfo .DEFAULT_GROUP );
429539 RSGroupInfo newDefaultGroup = newGroupMap .remove (RSGroupInfo .DEFAULT_GROUP );
430540 if (!oldGroupMap .equals (newGroupMap ) /* compare both tables and servers in other groups */ ||
@@ -438,7 +548,7 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
438548
439549 // Refresh rsGroupMap
440550 // according to the inputted newGroupMap (an updated copy of rsGroupMap)
441- rsGroupMap = newGroupMap ;
551+ this . holder = new RSGroupInfoHolder ( newGroupMap ) ;
442552
443553 // Do not need to update tableMap
444554 // because only the update on servers in default group is allowed above,
@@ -495,8 +605,7 @@ private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOE
495605 * Make changes visible. Caller must be synchronized on 'this'.
496606 */
497607 private void resetRSGroupMap (Map <String , RSGroupInfo > newRSGroupMap ) {
498- // Make maps Immutable.
499- this .rsGroupMap = Collections .unmodifiableMap (newRSGroupMap );
608+ this .holder = new RSGroupInfoHolder (newRSGroupMap );
500609 }
501610
502611 /**
@@ -549,6 +658,7 @@ private SortedSet<Address> getDefaultServers() throws IOException {
549658 // Called by ServerEventsListenerThread. Synchronize on this because redoing
550659 // the rsGroupMap then writing it out.
551660 private synchronized void updateDefaultServers (SortedSet <Address > servers ) {
661+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
552662 RSGroupInfo info = rsGroupMap .get (RSGroupInfo .DEFAULT_GROUP );
553663 RSGroupInfo newInfo = new RSGroupInfo (info .getName (), servers );
554664 HashMap <String , RSGroupInfo > newGroupMap = Maps .newHashMap (rsGroupMap );
@@ -647,6 +757,8 @@ private boolean waitForGroupTableOnline() {
647757 online = true ;
648758 // flush any inconsistencies between ZK and HTable
649759 RSGroupInfoManagerImpl .this .flushConfig ();
760+ // migrate after we are online.
761+ migrate ();
650762 return true ;
651763 } catch (Exception e ) {
652764 LOG .warn ("Failed to perform check" , e );
@@ -725,4 +837,10 @@ private void checkGroupName(String groupName) throws ConstraintException {
725837 throw new ConstraintException ("RSGroup name should only contain alphanumeric characters" );
726838 }
727839 }
840+
841+
842+ @ Override
843+ public RSGroupInfo getRSGroupForTable (TableName tableName ) throws IOException {
844+ return holder .tableName2Group .get (tableName );
845+ }
728846}
0 commit comments