1717 */
1818package org .apache .hadoop .hbase .rsgroup ;
1919
20- import com .google .protobuf .ServiceException ;
2120import java .io .ByteArrayInputStream ;
2221import java .io .IOException ;
2322import java .util .ArrayList ;
3433import java .util .TreeSet ;
3534import org .apache .hadoop .hbase .Coprocessor ;
3635import org .apache .hadoop .hbase .DoNotRetryIOException ;
36+ import org .apache .hadoop .hbase .NamespaceDescriptor ;
3737import org .apache .hadoop .hbase .ServerName ;
3838import org .apache .hadoop .hbase .TableName ;
39+ import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
40+ import org .apache .hadoop .hbase .client .AsyncTable ;
3941import org .apache .hadoop .hbase .client .ColumnFamilyDescriptorBuilder ;
40- import org .apache .hadoop .hbase .client .Connection ;
4142import org .apache .hadoop .hbase .client .CoprocessorDescriptorBuilder ;
4243import org .apache .hadoop .hbase .client .Delete ;
4344import org .apache .hadoop .hbase .client .Get ;
4445import org .apache .hadoop .hbase .client .Mutation ;
4546import org .apache .hadoop .hbase .client .Put ;
4647import org .apache .hadoop .hbase .client .Result ;
4748import org .apache .hadoop .hbase .client .ResultScanner ;
48- import org .apache .hadoop .hbase .client .Scan ;
49- import org .apache .hadoop .hbase .client .Table ;
5049import org .apache .hadoop .hbase .client .TableDescriptor ;
5150import org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
5251import org .apache .hadoop .hbase .constraint .ConstraintException ;
5352import org .apache .hadoop .hbase .coprocessor .MultiRowMutationEndpoint ;
5453import org .apache .hadoop .hbase .exceptions .DeserializationException ;
55- import org .apache .hadoop .hbase .ipc .CoprocessorRpcChannel ;
5654import org .apache .hadoop .hbase .master .MasterServices ;
5755import org .apache .hadoop .hbase .master .ServerListener ;
5856import org .apache .hadoop .hbase .master .TableStateManager ;
6260import org .apache .hadoop .hbase .procedure2 .Procedure ;
6361import org .apache .hadoop .hbase .protobuf .ProtobufMagic ;
6462import org .apache .hadoop .hbase .protobuf .ProtobufUtil ;
65- import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos ;
63+ import org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto ;
64+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MultiRowMutationService ;
65+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsRequest ;
66+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsResponse ;
6667import org .apache .hadoop .hbase .protobuf .generated .RSGroupProtos ;
6768import org .apache .hadoop .hbase .regionserver .DisabledRegionSplitPolicy ;
6869import org .apache .hadoop .hbase .util .Bytes ;
70+ import org .apache .hadoop .hbase .util .FutureUtils ;
6971import org .apache .hadoop .hbase .util .Threads ;
7072import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
7173import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
7577import org .slf4j .Logger ;
7678import org .slf4j .LoggerFactory ;
7779
80+ import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
7881import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
7982import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
8083import org .apache .hbase .thirdparty .com .google .common .collect .Sets ;
8790 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
8891 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
8992 * zk) on each modification.
90- * <p>
93+ * <p/ >
9194 * Mutations on state are synchronized but reads can continue without having to wait on an instance
9295 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
9396 * state are read-only, just-in-case (see flushConfig).
94- * <p>
97+ * <p/ >
9598 * Reads must not block else there is a danger we'll deadlock.
96- * <p>
99+ * <p/ >
97100 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
98101 * on the results of the query modifying cache in zookeeper without another thread making
99102 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
103106final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
104107 private static final Logger LOG = LoggerFactory .getLogger (RSGroupInfoManagerImpl .class );
105108
109+ private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait" ;
110+ private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L ;
111+
112+ // Assigned before user tables
113+ @ VisibleForTesting
114+ static final TableName RSGROUP_TABLE_NAME =
115+ TableName .valueOf (NamespaceDescriptor .SYSTEM_NAMESPACE_NAME_STR , "rsgroup" );
116+
117+ private static final String RS_GROUP_ZNODE = "rsgroup" ;
118+
119+ @ VisibleForTesting
120+ static final byte [] META_FAMILY_BYTES = Bytes .toBytes ("m" );
121+
122+ @ VisibleForTesting
123+ static final byte [] META_QUALIFIER_BYTES = Bytes .toBytes ("i" );
124+
125+ private static final byte [] ROW_KEY = { 0 };
126+
106127 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
107128 private static final TableDescriptor RSGROUP_TABLE_DESC ;
108129 static {
@@ -125,7 +146,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
125146 private volatile Map <TableName , String > tableMap = Collections .emptyMap ();
126147
127148 private final MasterServices masterServices ;
128- private final Connection conn ;
149+ private final AsyncClusterConnection conn ;
129150 private final ZKWatcher watcher ;
130151 private final RSGroupStartupWorker rsGroupStartupWorker ;
131152 // contains list of groups that were last flushed to persistent store
@@ -136,7 +157,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
136157 private RSGroupInfoManagerImpl (MasterServices masterServices ) throws IOException {
137158 this .masterServices = masterServices ;
138159 this .watcher = masterServices .getZooKeeper ();
139- this .conn = masterServices .getConnection ();
160+ this .conn = masterServices .getAsyncClusterConnection ();
140161 this .rsGroupStartupWorker = new RSGroupStartupWorker ();
141162 }
142163
@@ -349,25 +370,25 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
349370 }
350371 }
351372
352- List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
373+ private List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
353374 List <RSGroupInfo > rsGroupInfoList = Lists .newArrayList ();
354- try ( Table table = conn .getTable (RSGROUP_TABLE_NAME );
355- ResultScanner scanner = table .getScanner (new Scan () )) {
375+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
376+ try ( ResultScanner scanner = table .getScanner (META_FAMILY_BYTES , META_QUALIFIER_BYTES )) {
356377 for (Result result ;;) {
357378 result = scanner .next ();
358379 if (result == null ) {
359380 break ;
360381 }
361382 RSGroupProtos .RSGroupInfo proto = RSGroupProtos .RSGroupInfo
362- .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
383+ .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
363384 rsGroupInfoList .add (ProtobufUtil .toGroupInfo (proto ));
364385 }
365386 }
366387 return rsGroupInfoList ;
367388 }
368389
369- List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
370- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
390+ private List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
391+ String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
371392 List <RSGroupInfo > RSGroupInfoList = Lists .newArrayList ();
372393 // Overwrite any info stored by table, this takes precedence
373394 try {
@@ -519,7 +540,8 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
519540 resetRSGroupAndTableMaps (newGroupMap , newTableMap );
520541
521542 try {
522- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
543+ String groupBasePath =
544+ ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
523545 ZKUtil .createAndFailSilent (watcher , groupBasePath , ProtobufMagic .PB_MAGIC );
524546
525547 List <ZKUtil .ZKUtilOp > zkOps = new ArrayList <>(newGroupMap .size ());
@@ -702,11 +724,8 @@ private boolean waitForGroupTableOnline() {
702724 createRSGroupTable ();
703725 }
704726 // try reading from the table
705- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
706- table .get (new Get (ROW_KEY ));
707- }
708- LOG .info (
709- "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information" );
727+ FutureUtils .get (conn .getTable (RSGROUP_TABLE_NAME ).get (new Get (ROW_KEY )));
728+ LOG .info ("RSGroup table={} is online, refreshing cached information" , RSGROUP_TABLE_NAME );
710729 RSGroupInfoManagerImpl .this .refresh (true );
711730 online = true ;
712731 // flush any inconsistencies between ZK and HTable
@@ -748,8 +767,8 @@ private void createRSGroupTable() throws IOException {
748767 } else {
749768 Procedure <?> result = masterServices .getMasterProcedureExecutor ().getResult (procId );
750769 if (result != null && result .isFailed ()) {
751- throw new IOException (
752- "Failed to create group table. " + MasterProcedureUtil .unwrapRemoteIOException (result ));
770+ throw new IOException ("Failed to create group table. " +
771+ MasterProcedureUtil .unwrapRemoteIOException (result ));
753772 }
754773 }
755774 }
@@ -764,33 +783,24 @@ private static boolean isMasterRunning(MasterServices masterServices) {
764783 }
765784
766785 private void multiMutate (List <Mutation > mutations ) throws IOException {
767- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
768- CoprocessorRpcChannel channel = table .coprocessorService (ROW_KEY );
769- MultiRowMutationProtos .MutateRowsRequest .Builder mmrBuilder =
770- MultiRowMutationProtos .MutateRowsRequest .newBuilder ();
771- for (Mutation mutation : mutations ) {
772- if (mutation instanceof Put ) {
773- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
774- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .PUT ,
775- mutation ));
776- } else if (mutation instanceof Delete ) {
777- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
778- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .DELETE ,
779- mutation ));
780- } else {
781- throw new DoNotRetryIOException (
786+ MutateRowsRequest .Builder builder = MutateRowsRequest .newBuilder ();
787+ for (Mutation mutation : mutations ) {
788+ if (mutation instanceof Put ) {
789+ builder
790+ .addMutationRequest (ProtobufUtil .toMutation (MutationProto .MutationType .PUT , mutation ));
791+ } else if (mutation instanceof Delete ) {
792+ builder .addMutationRequest (
793+ ProtobufUtil .toMutation (MutationProto .MutationType .DELETE , mutation ));
794+ } else {
795+ throw new DoNotRetryIOException (
782796 "multiMutate doesn't support " + mutation .getClass ().getName ());
783- }
784- }
785-
786- MultiRowMutationProtos .MultiRowMutationService .BlockingInterface service =
787- MultiRowMutationProtos .MultiRowMutationService .newBlockingStub (channel );
788- try {
789- service .mutateRows (null , mmrBuilder .build ());
790- } catch (ServiceException ex ) {
791- ProtobufUtil .toIOException (ex );
792797 }
793798 }
799+ MutateRowsRequest request = builder .build ();
800+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
801+ FutureUtils .get (table .<MultiRowMutationService , MutateRowsResponse > coprocessorService (
802+ MultiRowMutationService ::newStub ,
803+ (stub , controller , done ) -> stub .mutateRows (controller , request , done ), ROW_KEY ));
794804 }
795805
796806 private void checkGroupName (String groupName ) throws ConstraintException {
0 commit comments