2020import java .io .ByteArrayInputStream ;
2121import java .io .IOException ;
2222import java .util .ArrayList ;
23+ import java .util .Collection ;
2324import java .util .Collections ;
2425import java .util .HashMap ;
2526import java .util .HashSet ;
3435import org .apache .hadoop .hbase .DoNotRetryIOException ;
3536import org .apache .hadoop .hbase .NamespaceDescriptor ;
3637import org .apache .hadoop .hbase .ServerName ;
38+ import org .apache .hadoop .hbase .TableDescriptors ;
3739import org .apache .hadoop .hbase .TableName ;
3840import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
3941import org .apache .hadoop .hbase .client .AsyncTable ;
7981import org .slf4j .LoggerFactory ;
8082
8183import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
84+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
8285import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
8386import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
8487import org .apache .hbase .thirdparty .com .google .common .collect .Sets ;
@@ -123,6 +126,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
123126 @ VisibleForTesting
124127 static final byte [] META_QUALIFIER_BYTES = Bytes .toBytes ("i" );
125128
129+ @ VisibleForTesting
130+ static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables" ;
131+
126132 private static final byte [] ROW_KEY = { 0 };
127133
128134 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
@@ -143,7 +149,30 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
143149
144150 // There two Maps are immutable and wholesale replaced on each modification
145151 // so are safe to access concurrently. See class comment.
146- private volatile Map <String , RSGroupInfo > rsGroupMap = Collections .emptyMap ();
152+ private static final class RSGroupInfoHolder {
153+ final ImmutableMap <String , RSGroupInfo > groupName2Group ;
154+ final ImmutableMap <TableName , RSGroupInfo > tableName2Group ;
155+
156+ RSGroupInfoHolder () {
157+ this (Collections .emptyMap ());
158+ }
159+
160+ RSGroupInfoHolder (Map <String , RSGroupInfo > rsGroupMap ) {
161+ ImmutableMap .Builder <String , RSGroupInfo > group2Name2GroupBuilder = ImmutableMap .builder ();
162+ ImmutableMap .Builder <TableName , RSGroupInfo > tableName2GroupBuilder = ImmutableMap .builder ();
163+ rsGroupMap .forEach ((groupName , rsGroupInfo ) -> {
164+ group2Name2GroupBuilder .put (groupName , rsGroupInfo );
165+ if (!groupName .equals (RSGroupInfo .DEFAULT_GROUP )) {
166+ rsGroupInfo .getTables ()
167+ .forEach (tableName -> tableName2GroupBuilder .put (tableName , rsGroupInfo ));
168+ }
169+ });
170+ this .groupName2Group = group2Name2GroupBuilder .build ();
171+ this .tableName2Group = tableName2GroupBuilder .build ();
172+ }
173+ }
174+
175+ private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder ();
147176
148177 private final MasterServices masterServices ;
149178 private final AsyncClusterConnection conn ;
@@ -164,12 +193,13 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException
164193
165194
166195 private synchronized void init () throws IOException {
167- refresh ();
196+ refresh (false );
168197 serverEventsListenerThread .start ();
169198 masterServices .getServerManager ().registerListener (serverEventsListenerThread );
170199 failedOpenUpdaterThread = new FailedOpenUpdaterThread (masterServices .getConfiguration ());
171200 failedOpenUpdaterThread .start ();
172201 masterServices .getServerManager ().registerListener (failedOpenUpdaterThread );
202+ migrate ();
173203 }
174204
175205 static RSGroupInfoManager getInstance (MasterServices master ) throws IOException {
@@ -186,6 +216,7 @@ public void start() {
186216 @ Override
187217 public synchronized void addRSGroup (RSGroupInfo rsGroupInfo ) throws IOException {
188218 checkGroupName (rsGroupInfo .getName ());
219+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
189220 if (rsGroupMap .get (rsGroupInfo .getName ()) != null ||
190221 rsGroupInfo .getName ().equals (RSGroupInfo .DEFAULT_GROUP )) {
191222 throw new DoNotRetryIOException ("Group already exists: " + rsGroupInfo .getName ());
@@ -242,7 +273,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
242273 }
243274 dst .addServer (el );
244275 }
245- Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (rsGroupMap );
276+ Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (holder . groupName2Group );
246277 newGroupMap .put (src .getName (), src );
247278 newGroupMap .put (dst .getName (), dst );
248279 flushConfig (newGroupMap );
@@ -251,7 +282,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
251282
252283 @ Override
253284 public RSGroupInfo getRSGroupOfServer (Address serverHostPort ) throws IOException {
254- for (RSGroupInfo info : rsGroupMap .values ()) {
285+ for (RSGroupInfo info : holder . groupName2Group .values ()) {
255286 if (info .containsServer (serverHostPort )) {
256287 return info ;
257288 }
@@ -261,11 +292,12 @@ public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException
261292
262293 @ Override
263294 public RSGroupInfo getRSGroup (String groupName ) {
264- return rsGroupMap .get (groupName );
295+ return holder . groupName2Group .get (groupName );
265296 }
266297
267298 @ Override
268299 public synchronized void removeRSGroup (String groupName ) throws IOException {
300+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
269301 if (!rsGroupMap .containsKey (groupName ) || groupName .equals (RSGroupInfo .DEFAULT_GROUP )) {
270302 throw new DoNotRetryIOException (
271303 "Group " + groupName + " does not exist or is a reserved " + "group" );
@@ -277,7 +309,7 @@ public synchronized void removeRSGroup(String groupName) throws IOException {
277309
278310 @ Override
279311 public List <RSGroupInfo > listRSGroups () {
280- return Lists .newArrayList (rsGroupMap .values ());
312+ return Lists .newArrayList (holder . groupName2Group .values ());
281313 }
282314
283315 @ Override
@@ -305,7 +337,7 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
305337 }
306338
307339 if (rsGroupInfos .size () > 0 ) {
308- Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (rsGroupMap );
340+ Map <String , RSGroupInfo > newGroupMap = Maps .newHashMap (holder . groupName2Group );
309341 newGroupMap .putAll (rsGroupInfos );
310342 flushConfig (newGroupMap );
311343 }
@@ -356,9 +388,90 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
356388 return RSGroupInfoList ;
357389 }
358390
359- @ Override
360- public void refresh () throws IOException {
361- refresh (false );
391+ private void migrate (Collection <RSGroupInfo > groupList ) {
392+ TableDescriptors tds = masterServices .getTableDescriptors ();
393+ for (RSGroupInfo groupInfo : groupList ) {
394+ if (groupInfo .getName ().equals (RSGroupInfo .DEFAULT_GROUP )) {
395+ continue ;
396+ }
397+ SortedSet <TableName > failedTables = new TreeSet <>();
398+ for (TableName tableName : groupInfo .getTables ()) {
399+ LOG .info ("Migrating {} in group {}" , tableName , groupInfo .getName ());
400+ TableDescriptor oldTd ;
401+ try {
402+ oldTd = tds .get (tableName );
403+ } catch (IOException e ) {
404+ LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
405+ failedTables .add (tableName );
406+ continue ;
407+ }
408+ if (oldTd == null ) {
409+ continue ;
410+ }
411+ if (oldTd .getRegionServerGroup ().isPresent ()) {
412+ // either we have already migrated it or that user has set the rs group using the new
413+ // code which will set the group directly on table descriptor, skip.
414+ LOG .debug ("Skip migrating {} since it is already in group {}" , tableName ,
415+ oldTd .getRegionServerGroup ().get ());
416+ continue ;
417+ }
418+ TableDescriptor newTd = TableDescriptorBuilder .newBuilder (oldTd )
419+ .setRegionServerGroup (groupInfo .getName ()).build ();
420+ // This is a bit tricky. Since we know that the region server group config in
421+ // TableDescriptor will only be used at master side, it is fine to just update the table
422+ // descriptor on file system and also the cache, without reopening all the regions. This
423+ // will be much faster than the normal modifyTable. And when upgrading, we will update
424+ // master first and then region server, so after all the region servers has been reopened,
425+ // the new TableDescriptor will be loaded.
426+ try {
427+ tds .add (newTd );
428+ } catch (IOException e ) {
429+ LOG .warn ("Failed to migrate {} in group {}" , tableName , groupInfo .getName (), e );
430+ failedTables .add (tableName );
431+ continue ;
432+ }
433+ }
434+ LOG .info ("Done migrating {}, failed tables {}" , groupInfo .getName (), failedTables );
435+ synchronized (RSGroupInfoManagerImpl .this ) {
436+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
437+ RSGroupInfo currentInfo = rsGroupMap .get (groupInfo .getName ());
438+ if (currentInfo != null ) {
439+ RSGroupInfo newInfo =
440+ new RSGroupInfo (currentInfo .getName (), currentInfo .getServers (), failedTables );
441+ Map <String , RSGroupInfo > newGroupMap = new HashMap <>(rsGroupMap );
442+ newGroupMap .put (groupInfo .getName (), newInfo );
443+ try {
444+ flushConfig (newGroupMap );
445+ } catch (IOException e ) {
446+ LOG .warn ("Failed to persist rs group" , e );
447+ }
448+ }
449+ }
450+ }
451+ }
452+
453+ // Migrate the table rs group info from RSGroupInfo into the table descriptor
454+ // Notice that we do not want to block the initialize so this will be done in background, and
455+ // during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
456+ private void migrate () {
457+ Thread migrateThread = new Thread (MIGRATE_THREAD_NAME ) {
458+
459+ @ Override
460+ public void run () {
461+ LOG .info ("Start migrating table rs group config" );
462+ while (!masterServices .isStopped ()) {
463+ Collection <RSGroupInfo > groups = holder .groupName2Group .values ();
464+ boolean hasTables = groups .stream ().anyMatch (r -> !r .getTables ().isEmpty ());
465+ if (!hasTables ) {
466+ break ;
467+ }
468+ migrate (groups );
469+ }
470+ LOG .info ("Done migrating table rs group info" );
471+ }
472+ };
473+ migrateThread .setDaemon (true );
474+ migrateThread .start ();
362475 }
363476
364477 /**
@@ -388,7 +501,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException {
388501 newGroupMap .put (group .getName (), group );
389502 }
390503 resetRSGroupMap (newGroupMap );
391- updateCacheOfRSGroups (rsGroupMap .keySet ());
504+ updateCacheOfRSGroups (newGroupMap .keySet ());
392505 }
393506
394507 private void flushConfigTable (Map <String , RSGroupInfo > groupMap ) throws IOException {
@@ -418,20 +531,20 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
418531 }
419532
420533 private synchronized void flushConfig () throws IOException {
421- flushConfig (this . rsGroupMap );
534+ flushConfig (holder . groupName2Group );
422535 }
423536
424537 private synchronized void flushConfig (Map <String , RSGroupInfo > newGroupMap ) throws IOException {
425538 // For offline mode persistence is still unavailable
426539 // We're refreshing in-memory state but only for servers in default group
427540 if (!isOnline ()) {
428- if (newGroupMap == this . rsGroupMap ) {
541+ if (newGroupMap == holder . groupName2Group ) {
429542 // When newGroupMap is this.rsGroupMap itself,
430543 // do not need to check default group and other groups as followed
431544 return ;
432545 }
433546
434- Map <String , RSGroupInfo > oldGroupMap = Maps .newHashMap (rsGroupMap );
547+ Map <String , RSGroupInfo > oldGroupMap = Maps .newHashMap (holder . groupName2Group );
435548 RSGroupInfo oldDefaultGroup = oldGroupMap .remove (RSGroupInfo .DEFAULT_GROUP );
436549 RSGroupInfo newDefaultGroup = newGroupMap .remove (RSGroupInfo .DEFAULT_GROUP );
437550 if (!oldGroupMap .equals (newGroupMap ) /* compare both tables and servers in other groups */ ||
@@ -445,7 +558,7 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
445558
446559 // Refresh rsGroupMap
447560 // according to the inputted newGroupMap (an updated copy of rsGroupMap)
448- rsGroupMap = newGroupMap ;
561+ this . holder = new RSGroupInfoHolder ( newGroupMap ) ;
449562
450563 // Do not need to update tableMap
451564 // because only the update on servers in default group is allowed above,
@@ -502,8 +615,7 @@ private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOE
502615 * Make changes visible. Caller must be synchronized on 'this'.
503616 */
504617 private void resetRSGroupMap (Map <String , RSGroupInfo > newRSGroupMap ) {
505- // Make maps Immutable.
506- this .rsGroupMap = Collections .unmodifiableMap (newRSGroupMap );
618+ this .holder = new RSGroupInfoHolder (newRSGroupMap );
507619 }
508620
509621 /**
@@ -556,6 +668,7 @@ private SortedSet<Address> getDefaultServers() throws IOException {
556668 // Called by ServerEventsListenerThread. Synchronize on this because redoing
557669 // the rsGroupMap then writing it out.
558670 private synchronized void updateDefaultServers (SortedSet <Address > servers ) {
671+ Map <String , RSGroupInfo > rsGroupMap = holder .groupName2Group ;
559672 RSGroupInfo info = rsGroupMap .get (RSGroupInfo .DEFAULT_GROUP );
560673 RSGroupInfo newInfo = new RSGroupInfo (info .getName (), servers );
561674 HashMap <String , RSGroupInfo > newGroupMap = Maps .newHashMap (rsGroupMap );
@@ -734,6 +847,8 @@ private boolean waitForGroupTableOnline() {
734847 online = true ;
735848 // flush any inconsistencies between ZK and HTable
736849 RSGroupInfoManagerImpl .this .flushConfig ();
850+ // migrate after we are online.
851+ migrate ();
737852 return true ;
738853 } catch (Exception e ) {
739854 LOG .warn ("Failed to perform check" , e );
@@ -812,4 +927,10 @@ private void checkGroupName(String groupName) throws ConstraintException {
812927 throw new ConstraintException ("RSGroup name should only contain alphanumeric characters" );
813928 }
814929 }
930+
931+
932+ @ Override
933+ public RSGroupInfo getRSGroupForTable (TableName tableName ) throws IOException {
934+ return holder .tableName2Group .get (tableName );
935+ }
815936}
0 commit comments