1515 * See the License for the specific language governing permissions and
1616 * limitations under the License.
1717 */
18-
1918package org .apache .hadoop .hbase .rsgroup ;
2019
21- import com .google .protobuf .ServiceException ;
2220import java .io .ByteArrayInputStream ;
2321import java .io .IOException ;
2422import java .util .ArrayList ;
3634import org .apache .hadoop .conf .Configuration ;
3735import org .apache .hadoop .hbase .Coprocessor ;
3836import org .apache .hadoop .hbase .DoNotRetryIOException ;
37+ import org .apache .hadoop .hbase .NamespaceDescriptor ;
3938import org .apache .hadoop .hbase .ServerName ;
4039import org .apache .hadoop .hbase .TableName ;
40+ import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
41+ import org .apache .hadoop .hbase .client .AsyncTable ;
4142import org .apache .hadoop .hbase .client .ColumnFamilyDescriptorBuilder ;
42- import org .apache .hadoop .hbase .client .Connection ;
4343import org .apache .hadoop .hbase .client .CoprocessorDescriptorBuilder ;
4444import org .apache .hadoop .hbase .client .Delete ;
4545import org .apache .hadoop .hbase .client .Get ;
4848import org .apache .hadoop .hbase .client .RegionInfo ;
4949import org .apache .hadoop .hbase .client .Result ;
5050import org .apache .hadoop .hbase .client .ResultScanner ;
51- import org .apache .hadoop .hbase .client .Scan ;
52- import org .apache .hadoop .hbase .client .Table ;
5351import org .apache .hadoop .hbase .client .TableDescriptor ;
5452import org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
5553import org .apache .hadoop .hbase .constraint .ConstraintException ;
5654import org .apache .hadoop .hbase .coprocessor .MultiRowMutationEndpoint ;
5755import org .apache .hadoop .hbase .exceptions .DeserializationException ;
58- import org .apache .hadoop .hbase .ipc .CoprocessorRpcChannel ;
5956import org .apache .hadoop .hbase .master .MasterServices ;
6057import org .apache .hadoop .hbase .master .ServerListener ;
6158import org .apache .hadoop .hbase .master .TableStateManager ;
6663import org .apache .hadoop .hbase .procedure2 .Procedure ;
6764import org .apache .hadoop .hbase .protobuf .ProtobufMagic ;
6865import org .apache .hadoop .hbase .protobuf .ProtobufUtil ;
69- import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos ;
66+ import org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto ;
67+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MultiRowMutationService ;
68+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsRequest ;
69+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsResponse ;
7070import org .apache .hadoop .hbase .protobuf .generated .RSGroupProtos ;
7171import org .apache .hadoop .hbase .regionserver .DisabledRegionSplitPolicy ;
7272import org .apache .hadoop .hbase .util .Bytes ;
73+ import org .apache .hadoop .hbase .util .FutureUtils ;
7374import org .apache .hadoop .hbase .util .Threads ;
7475import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
7576import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
7980import org .slf4j .Logger ;
8081import org .slf4j .LoggerFactory ;
8182
83+ import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
8284import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
8385import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
8486import org .apache .hbase .thirdparty .com .google .common .collect .Sets ;
9193 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
9294 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
9395 * zk) on each modification.
94- * <p>
96+ * <p/ >
9597 * Mutations on state are synchronized but reads can continue without having to wait on an instance
9698 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
9799 * state are read-only, just-in-case (see flushConfig).
98- * <p>
100+ * <p/ >
99101 * Reads must not block else there is a danger we'll deadlock.
100- * <p>
102+ * <p/ >
101103 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
102104 * on the results of the query modifying cache in zookeeper without another thread making
103105 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
107109final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
108110 private static final Logger LOG = LoggerFactory .getLogger (RSGroupInfoManagerImpl .class );
109111
112+ private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait" ;
113+ private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L ;
114+
115+ // Assigned before user tables
116+ @ VisibleForTesting
117+ static final TableName RSGROUP_TABLE_NAME =
118+ TableName .valueOf (NamespaceDescriptor .SYSTEM_NAMESPACE_NAME_STR , "rsgroup" );
119+
120+ private static final String RS_GROUP_ZNODE = "rsgroup" ;
121+
122+ @ VisibleForTesting
123+ static final byte [] META_FAMILY_BYTES = Bytes .toBytes ("m" );
124+
125+ @ VisibleForTesting
126+ static final byte [] META_QUALIFIER_BYTES = Bytes .toBytes ("i" );
127+
128+ private static final byte [] ROW_KEY = { 0 };
129+
110130 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
111131 private static final TableDescriptor RSGROUP_TABLE_DESC ;
112132 static {
@@ -129,7 +149,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
129149 private volatile Map <TableName , String > tableMap = Collections .emptyMap ();
130150
131151 private final MasterServices masterServices ;
132- private final Connection conn ;
152+ private final AsyncClusterConnection conn ;
133153 private final ZKWatcher watcher ;
134154 private final RSGroupStartupWorker rsGroupStartupWorker ;
135155 // contains list of groups that were last flushed to persistent store
@@ -141,7 +161,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
141161 private RSGroupInfoManagerImpl (MasterServices masterServices ) throws IOException {
142162 this .masterServices = masterServices ;
143163 this .watcher = masterServices .getZooKeeper ();
144- this .conn = masterServices .getConnection ();
164+ this .conn = masterServices .getAsyncClusterConnection ();
145165 this .rsGroupStartupWorker = new RSGroupStartupWorker ();
146166 }
147167
@@ -357,25 +377,25 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
357377 }
358378 }
359379
360- List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
380+ private List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
361381 List <RSGroupInfo > rsGroupInfoList = Lists .newArrayList ();
362- try ( Table table = conn .getTable (RSGROUP_TABLE_NAME );
363- ResultScanner scanner = table .getScanner (new Scan () )) {
382+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
383+ try ( ResultScanner scanner = table .getScanner (META_FAMILY_BYTES , META_QUALIFIER_BYTES )) {
364384 for (Result result ;;) {
365385 result = scanner .next ();
366386 if (result == null ) {
367387 break ;
368388 }
369389 RSGroupProtos .RSGroupInfo proto = RSGroupProtos .RSGroupInfo
370- .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
390+ .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
371391 rsGroupInfoList .add (ProtobufUtil .toGroupInfo (proto ));
372392 }
373393 }
374394 return rsGroupInfoList ;
375395 }
376396
377- List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
378- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
397+ private List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
398+ String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
379399 List <RSGroupInfo > RSGroupInfoList = Lists .newArrayList ();
380400 // Overwrite any info stored by table, this takes precedence
381401 try {
@@ -527,7 +547,8 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
527547 resetRSGroupAndTableMaps (newGroupMap , newTableMap );
528548
529549 try {
530- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
550+ String groupBasePath =
551+ ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
531552 ZKUtil .createAndFailSilent (watcher , groupBasePath , ProtobufMagic .PB_MAGIC );
532553
533554 List <ZKUtil .ZKUtilOp > zkOps = new ArrayList <>(newGroupMap .size ());
@@ -790,11 +811,8 @@ private boolean waitForGroupTableOnline() {
790811 createRSGroupTable ();
791812 }
792813 // try reading from the table
793- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
794- table .get (new Get (ROW_KEY ));
795- }
796- LOG .info (
797- "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information" );
814+ FutureUtils .get (conn .getTable (RSGROUP_TABLE_NAME ).get (new Get (ROW_KEY )));
815+ LOG .info ("RSGroup table={} is online, refreshing cached information" , RSGROUP_TABLE_NAME );
798816 RSGroupInfoManagerImpl .this .refresh (true );
799817 online = true ;
800818 // flush any inconsistencies between ZK and HTable
@@ -836,8 +854,8 @@ private void createRSGroupTable() throws IOException {
836854 } else {
837855 Procedure <?> result = masterServices .getMasterProcedureExecutor ().getResult (procId );
838856 if (result != null && result .isFailed ()) {
839- throw new IOException (
840- "Failed to create group table. " + MasterProcedureUtil .unwrapRemoteIOException (result ));
857+ throw new IOException ("Failed to create group table. " +
858+ MasterProcedureUtil .unwrapRemoteIOException (result ));
841859 }
842860 }
843861 }
@@ -852,33 +870,24 @@ private static boolean isMasterRunning(MasterServices masterServices) {
852870 }
853871
854872 private void multiMutate (List <Mutation > mutations ) throws IOException {
855- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
856- CoprocessorRpcChannel channel = table .coprocessorService (ROW_KEY );
857- MultiRowMutationProtos .MutateRowsRequest .Builder mmrBuilder =
858- MultiRowMutationProtos .MutateRowsRequest .newBuilder ();
859- for (Mutation mutation : mutations ) {
860- if (mutation instanceof Put ) {
861- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
862- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .PUT ,
863- mutation ));
864- } else if (mutation instanceof Delete ) {
865- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
866- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .DELETE ,
867- mutation ));
868- } else {
869- throw new DoNotRetryIOException (
873+ MutateRowsRequest .Builder builder = MutateRowsRequest .newBuilder ();
874+ for (Mutation mutation : mutations ) {
875+ if (mutation instanceof Put ) {
876+ builder
877+ .addMutationRequest (ProtobufUtil .toMutation (MutationProto .MutationType .PUT , mutation ));
878+ } else if (mutation instanceof Delete ) {
879+ builder .addMutationRequest (
880+ ProtobufUtil .toMutation (MutationProto .MutationType .DELETE , mutation ));
881+ } else {
882+ throw new DoNotRetryIOException (
870883 "multiMutate doesn't support " + mutation .getClass ().getName ());
871- }
872- }
873-
874- MultiRowMutationProtos .MultiRowMutationService .BlockingInterface service =
875- MultiRowMutationProtos .MultiRowMutationService .newBlockingStub (channel );
876- try {
877- service .mutateRows (null , mmrBuilder .build ());
878- } catch (ServiceException ex ) {
879- ProtobufUtil .toIOException (ex );
880884 }
881885 }
886+ MutateRowsRequest request = builder .build ();
887+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
888+ FutureUtils .get (table .<MultiRowMutationService , MutateRowsResponse > coprocessorService (
889+ MultiRowMutationService ::newStub ,
890+ (stub , controller , done ) -> stub .mutateRows (controller , request , done ), ROW_KEY ));
882891 }
883892
884893 private void checkGroupName (String groupName ) throws ConstraintException {
0 commit comments