deadServers = new HashSet<>();
- for(ServerName server: master.getServerManager().getDeadServers().copyServerNames()) {
- deadServers.add(server.getAddress());
- }
-
- for (Address address: servers) {
- if (onlineServers.contains(address)) {
- throw new ConstraintException(
- "Server " + address + " is an online server, not allowed to remove.");
- }
- if (deadServers.contains(address)) {
- throw new ConstraintException(
- "Server " + address + " is on the dead servers list,"
- + " Maybe it will come back again, not allowed to remove.");
- }
- }
- }
-}
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
deleted file mode 100644
index b2d168a4ff29..000000000000
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ /dev/null
@@ -1,801 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.rsgroup;
-
-import com.google.protobuf.ServiceException;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.OptionalLong;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.constraint.ConstraintException;
-import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.ServerListener;
-import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.hadoop.hbase.net.Address;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
-import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-
-/**
- * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
- * persistence store for the group information. It also makes use of zookeeper to store group
- * information needed for bootstrapping during offline mode.
- * Concurrency
RSGroup state is kept locally in Maps. There is a rsgroup name to cached
- * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
- * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
- * zk) on each modification.
- *
- * Mutations on state are synchronized but reads can continue without having to wait on an instance
- * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
- * state are read-only, just-in-case (see flushConfig).
- *
- * Reads must not block else there is a danger we'll deadlock.
- *
- * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
- * on the results of the query modifying cache in zookeeper without another thread making
- * intermediate modifications. These clients synchronize on the 'this' instance so no other has
- * access concurrently. Reads must be able to continue concurrently.
- */
-@InterfaceAudience.Private
-final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
- private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
-
- /** Table descriptor for hbase:rsgroup catalog table */
- private static final TableDescriptor RSGROUP_TABLE_DESC;
- static {
- TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
- .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
- try {
- builder.setCoprocessor(
- CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
- .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
- } catch (IOException ex) {
- throw new Error(ex);
- }
- RSGROUP_TABLE_DESC = builder.build();
- }
-
- // There two Maps are immutable and wholesale replaced on each modification
- // so are safe to access concurrently. See class comment.
- private volatile Map rsGroupMap = Collections.emptyMap();
- private volatile Map tableMap = Collections.emptyMap();
-
- private final MasterServices masterServices;
- private final Connection conn;
- private final ZKWatcher watcher;
- private final RSGroupStartupWorker rsGroupStartupWorker;
- // contains list of groups that were last flushed to persistent store
- private Set prevRSGroups = new HashSet<>();
- private final ServerEventsListenerThread serverEventsListenerThread =
- new ServerEventsListenerThread();
-
- private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
- this.masterServices = masterServices;
- this.watcher = masterServices.getZooKeeper();
- this.conn = masterServices.getConnection();
- this.rsGroupStartupWorker = new RSGroupStartupWorker();
- }
-
-
- private synchronized void init() throws IOException {
- refresh();
- serverEventsListenerThread.start();
- masterServices.getServerManager().registerListener(serverEventsListenerThread);
- }
-
- static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
- RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
- instance.init();
- return instance;
- }
-
- public void start() {
- // create system table of rsgroup
- rsGroupStartupWorker.start();
- }
-
- @Override
- public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
- checkGroupName(rsGroupInfo.getName());
- if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
- rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
- throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
- }
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
- newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
- flushConfig(newGroupMap);
- }
-
- private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
- RSGroupInfo rsGroupInfo = getRSGroup(groupName);
- if (rsGroupInfo == null) {
- throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
- }
- return rsGroupInfo;
- }
-
- /**
- * @param master the master to get online servers for
- * @return Set of online Servers named for their hostname and port (not ServerName).
- */
- private static Set getOnlineServers(final MasterServices master) {
- Set onlineServers = new HashSet();
- if (master == null) {
- return onlineServers;
- }
-
- for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
- onlineServers.add(server.getAddress());
- }
- return onlineServers;
- }
-
- @Override
- public synchronized Set moveServers(Set servers, String srcGroup,
- String dstGroup) throws IOException {
- RSGroupInfo src = getRSGroupInfo(srcGroup);
- RSGroupInfo dst = getRSGroupInfo(dstGroup);
- // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
- // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
- // rsgroup of dead servers that are to come back later).
- Set onlineServers =
- dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
- : null;
- for (Address el : servers) {
- src.removeServer(el);
- if (onlineServers != null) {
- if (!onlineServers.contains(el)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
- }
- continue;
- }
- }
- dst.addServer(el);
- }
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
- newGroupMap.put(src.getName(), src);
- newGroupMap.put(dst.getName(), dst);
- flushConfig(newGroupMap);
- return dst.getServers();
- }
-
- @Override
- public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
- for (RSGroupInfo info : rsGroupMap.values()) {
- if (info.containsServer(serverHostPort)) {
- return info;
- }
- }
- return null;
- }
-
- @Override
- public RSGroupInfo getRSGroup(String groupName) {
- return rsGroupMap.get(groupName);
- }
-
- @Override
- public String getRSGroupOfTable(TableName tableName) {
- return tableMap.get(tableName);
- }
-
- @Override
- public synchronized void moveTables(Set tableNames, String groupName)
- throws IOException {
- // Check if rsGroupMap contains the destination rsgroup
- if (groupName != null && !rsGroupMap.containsKey(groupName)) {
- throw new DoNotRetryIOException("Group " + groupName + " does not exist");
- }
-
- // Make a copy of rsGroupMap to update
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
-
- // Remove tables from their original rsgroups
- // and update the copy of rsGroupMap
- for (TableName tableName : tableNames) {
- if (tableMap.containsKey(tableName)) {
- RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
- src.removeTable(tableName);
- newGroupMap.put(src.getName(), src);
- }
- }
-
- // Add tables to the destination rsgroup
- // and update the copy of rsGroupMap
- if (groupName != null) {
- RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName));
- dstGroup.addAllTables(tableNames);
- newGroupMap.put(dstGroup.getName(), dstGroup);
- }
-
- // Flush according to the updated copy of rsGroupMap
- flushConfig(newGroupMap);
- }
-
- @Override
- public synchronized void removeRSGroup(String groupName) throws IOException {
- if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
- throw new DoNotRetryIOException(
- "Group " + groupName + " does not exist or is a reserved " + "group");
- }
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
- newGroupMap.remove(groupName);
- flushConfig(newGroupMap);
- }
-
- @Override
- public List listRSGroups() {
- return Lists.newLinkedList(rsGroupMap.values());
- }
-
- @Override
- public boolean isOnline() {
- return rsGroupStartupWorker.isOnline();
- }
-
- @Override
- public void moveServersAndTables(Set servers, Set tables, String srcGroup,
- String dstGroup) throws IOException {
- // get server's group
- RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
- RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
-
- // move servers
- for (Address el : servers) {
- srcGroupInfo.removeServer(el);
- dstGroupInfo.addServer(el);
- }
- // move tables
- for (TableName tableName : tables) {
- srcGroupInfo.removeTable(tableName);
- dstGroupInfo.addTable(tableName);
- }
-
- // flush changed groupinfo
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
- newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
- newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
- flushConfig(newGroupMap);
- }
-
- @Override
- public synchronized void removeServers(Set servers) throws IOException {
- Map rsGroupInfos = new HashMap();
- for (Address el : servers) {
- RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
- if (rsGroupInfo != null) {
- RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
- if (newRsGroupInfo == null) {
- rsGroupInfo.removeServer(el);
- rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
- } else {
- newRsGroupInfo.removeServer(el);
- rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
- }
- } else {
- LOG.warn("Server " + el + " does not belong to any rsgroup.");
- }
- }
-
- if (rsGroupInfos.size() > 0) {
- Map newGroupMap = Maps.newHashMap(rsGroupMap);
- newGroupMap.putAll(rsGroupInfos);
- flushConfig(newGroupMap);
- }
- }
-
- List retrieveGroupListFromGroupTable() throws IOException {
- List rsGroupInfoList = Lists.newArrayList();
- try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
- ResultScanner scanner = table.getScanner(new Scan())) {
- for (Result result;;) {
- result = scanner.next();
- if (result == null) {
- break;
- }
- RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
- .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
- rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
- }
- }
- return rsGroupInfoList;
- }
-
- List retrieveGroupListFromZookeeper() throws IOException {
- String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
- List RSGroupInfoList = Lists.newArrayList();
- // Overwrite any info stored by table, this takes precedence
- try {
- if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
- List children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
- if (children == null) {
- return RSGroupInfoList;
- }
- for (String znode : children) {
- byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
- if (data.length > 0) {
- ProtobufUtil.expectPBMagicPrefix(data);
- ByteArrayInputStream bis =
- new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
- RSGroupInfoList
- .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
- }
- }
- LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
- }
- } catch (KeeperException | DeserializationException | InterruptedException e) {
- throw new IOException("Failed to read rsGroupZNode", e);
- }
- return RSGroupInfoList;
- }
-
- @Override
- public void refresh() throws IOException {
- refresh(false);
- }
-
- /**
- * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
- * startup of the manager.
- */
- private synchronized void refresh(boolean forceOnline) throws IOException {
- List groupList = new LinkedList<>();
-
- // Overwrite anything read from zk, group table is source of truth
- // if online read from GROUP table
- if (forceOnline || isOnline()) {
- LOG.debug("Refreshing in Online mode.");
- groupList.addAll(retrieveGroupListFromGroupTable());
- } else {
- LOG.debug("Refreshing in Offline mode.");
- groupList.addAll(retrieveGroupListFromZookeeper());
- }
-
- // refresh default group, prune
- NavigableSet orphanTables = new TreeSet<>();
- for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
- orphanTables.add(TableName.valueOf(entry));
- }
- for (RSGroupInfo group : groupList) {
- if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
- orphanTables.removeAll(group.getTables());
- }
- }
-
- // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
- // from region group table or zk
- groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables));
-
- // populate the data
- HashMap newGroupMap = Maps.newHashMap();
- HashMap newTableMap = Maps.newHashMap();
- for (RSGroupInfo group : groupList) {
- newGroupMap.put(group.getName(), group);
- for (TableName table : group.getTables()) {
- newTableMap.put(table, group.getName());
- }
- }
- resetRSGroupAndTableMaps(newGroupMap, newTableMap);
- updateCacheOfRSGroups(rsGroupMap.keySet());
- }
-
- private synchronized Map flushConfigTable(Map groupMap)
- throws IOException {
- Map newTableMap = Maps.newHashMap();
- List mutations = Lists.newArrayList();
-
- // populate deletes
- for (String groupName : prevRSGroups) {
- if (!groupMap.containsKey(groupName)) {
- Delete d = new Delete(Bytes.toBytes(groupName));
- mutations.add(d);
- }
- }
-
- // populate puts
- for (RSGroupInfo RSGroupInfo : groupMap.values()) {
- RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
- Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
- p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
- mutations.add(p);
- for (TableName entry : RSGroupInfo.getTables()) {
- newTableMap.put(entry, RSGroupInfo.getName());
- }
- }
-
- if (mutations.size() > 0) {
- multiMutate(mutations);
- }
- return newTableMap;
- }
-
- private synchronized void flushConfig() throws IOException {
- flushConfig(this.rsGroupMap);
- }
-
- private synchronized void flushConfig(Map newGroupMap) throws IOException {
- Map newTableMap;
-
- // For offline mode persistence is still unavailable
- // We're refreshing in-memory state but only for servers in default group
- if (!isOnline()) {
- if (newGroupMap == this.rsGroupMap) {
- // When newGroupMap is this.rsGroupMap itself,
- // do not need to check default group and other groups as followed
- return;
- }
-
- Map oldGroupMap = Maps.newHashMap(rsGroupMap);
- RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
- RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
- if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ ||
- !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())
- /* compare tables in default group */) {
- throw new IOException("Only servers in default group can be updated during offline mode");
- }
-
- // Restore newGroupMap by putting its default group back
- newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
-
- // Refresh rsGroupMap
- // according to the inputted newGroupMap (an updated copy of rsGroupMap)
- rsGroupMap = newGroupMap;
-
- // Do not need to update tableMap
- // because only the update on servers in default group is allowed above,
- // or IOException will be thrown
- return;
- }
-
- /* For online mode, persist to Zookeeper */
- newTableMap = flushConfigTable(newGroupMap);
-
- // Make changes visible after having been persisted to the source of truth
- resetRSGroupAndTableMaps(newGroupMap, newTableMap);
-
- try {
- String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
- ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
-
- List zkOps = new ArrayList<>(newGroupMap.size());
- for (String groupName : prevRSGroups) {
- if (!newGroupMap.containsKey(groupName)) {
- String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
- zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
- }
- }
-
- for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
- String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
- RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
- LOG.debug("Updating znode: " + znode);
- ZKUtil.createAndFailSilent(watcher, znode);
- zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
- zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
- ProtobufUtil.prependPBMagic(proto.toByteArray())));
- }
- LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
-
- ZKUtil.multiOrSequential(watcher, zkOps, false);
- } catch (KeeperException e) {
- LOG.error("Failed to write to rsGroupZNode", e);
- masterServices.abort("Failed to write to rsGroupZNode", e);
- throw new IOException("Failed to write to rsGroupZNode", e);
- }
- updateCacheOfRSGroups(newGroupMap.keySet());
- }
-
- /**
- * Make changes visible. Caller must be synchronized on 'this'.
- */
- private void resetRSGroupAndTableMaps(Map newRSGroupMap,
- Map newTableMap) {
- // Make maps Immutable.
- this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
- this.tableMap = Collections.unmodifiableMap(newTableMap);
- }
-
- /**
- * Update cache of rsgroups. Caller must be synchronized on 'this'.
- * @param currentGroups Current list of Groups.
- */
- private void updateCacheOfRSGroups(final Set currentGroups) {
- this.prevRSGroups.clear();
- this.prevRSGroups.addAll(currentGroups);
- }
-
- // Called by getDefaultServers. Presume it has lock in place.
- private List getOnlineRS() throws IOException {
- if (masterServices != null) {
- return masterServices.getServerManager().getOnlineServersList();
- }
- LOG.debug("Reading online RS from zookeeper");
- List servers = new LinkedList<>();
- try {
- for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
- servers.add(ServerName.parseServerName(el));
- }
- } catch (KeeperException e) {
- throw new IOException("Failed to retrieve server list from zookeeper", e);
- }
- return servers;
- }
-
- // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
- private SortedSet getDefaultServers() throws IOException {
- // Build a list of servers in other groups than default group, from rsGroupMap
- Set serversInOtherGroup = new HashSet<>();
- for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) {
- if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
- serversInOtherGroup.addAll(group.getServers());
- }
- }
-
- // Get all online servers from Zookeeper and find out servers in default group
- SortedSet defaultServers = Sets.newTreeSet();
- for (ServerName serverName : getOnlineRS()) {
- Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
- if (!serversInOtherGroup.contains(server)) { // not in other groups
- defaultServers.add(server);
- }
- }
- return defaultServers;
- }
-
- // Called by ServerEventsListenerThread. Synchronize on this because redoing
- // the rsGroupMap then writing it out.
- private synchronized void updateDefaultServers(SortedSet servers) throws IOException {
- RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
- RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
- HashMap newGroupMap = Maps.newHashMap(rsGroupMap);
- newGroupMap.put(newInfo.getName(), newInfo);
- flushConfig(newGroupMap);
- }
-
- /**
- * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
- * servers. Notifications about server changes are received by registering {@link ServerListener}.
- * As a listener, we need to return immediately, so the real work of updating the servers is done
- * asynchronously in this thread.
- */
- private class ServerEventsListenerThread extends Thread implements ServerListener {
- private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
- private boolean changed = false;
-
- ServerEventsListenerThread() {
- setDaemon(true);
- }
-
- @Override
- public void serverAdded(ServerName serverName) {
- serverChanged();
- }
-
- @Override
- public void serverRemoved(ServerName serverName) {
- serverChanged();
- }
-
- private synchronized void serverChanged() {
- changed = true;
- this.notify();
- }
-
- @Override
- public void run() {
- setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
- SortedSet prevDefaultServers = new TreeSet<>();
- while (isMasterRunning(masterServices)) {
- try {
- LOG.info("Updating default servers.");
- SortedSet servers = RSGroupInfoManagerImpl.this.getDefaultServers();
- if (!servers.equals(prevDefaultServers)) {
- RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
- prevDefaultServers = servers;
- LOG.info("Updated with servers: " + servers.size());
- }
- try {
- synchronized (this) {
- while (!changed) {
- wait();
- }
- changed = false;
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted", e);
- }
- } catch (IOException e) {
- LOG.warn("Failed to update default servers", e);
- }
- }
- }
- }
-
- private class RSGroupStartupWorker extends Thread {
- private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
- private volatile boolean online = false;
-
- RSGroupStartupWorker() {
- super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
- setDaemon(true);
- }
-
- @Override
- public void run() {
- if (waitForGroupTableOnline()) {
- LOG.info("GroupBasedLoadBalancer is now online");
- } else {
- LOG.warn("Quit without making region group table online");
- }
- }
-
- private boolean waitForGroupTableOnline() {
- while (isMasterRunning(masterServices)) {
- try {
- TableStateManager tsm = masterServices.getTableStateManager();
- if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
- createRSGroupTable();
- }
- // try reading from the table
- try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
- table.get(new Get(ROW_KEY));
- }
- LOG.info(
- "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
- RSGroupInfoManagerImpl.this.refresh(true);
- online = true;
- // flush any inconsistencies between ZK and HTable
- RSGroupInfoManagerImpl.this.flushConfig();
- return true;
- } catch (Exception e) {
- LOG.warn("Failed to perform check", e);
- // 100ms is short so let's just ignore the interrupt
- Threads.sleepWithoutInterrupt(100);
- }
- }
- return false;
- }
-
- private void createRSGroupTable() throws IOException {
- OptionalLong optProcId = masterServices.getProcedures().stream()
- .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
- .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
- .findFirst();
- long procId;
- if (optProcId.isPresent()) {
- procId = optProcId.getAsLong();
- } else {
- procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
- }
- // wait for region to be online
- int tries = 600;
- while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
- masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- throw new IOException("Wait interrupted ", e);
- }
- tries--;
- }
- if (tries <= 0) {
- throw new IOException("Failed to create group table in a given time.");
- } else {
- Procedure> result = masterServices.getMasterProcedureExecutor().getResult(procId);
- if (result != null && result.isFailed()) {
- throw new IOException(
- "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
- }
- }
- }
-
- public boolean isOnline() {
- return online;
- }
- }
-
- private static boolean isMasterRunning(MasterServices masterServices) {
- return !masterServices.isAborted() && !masterServices.isStopped();
- }
-
- private void multiMutate(List mutations) throws IOException {
- try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
- CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
- MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
- MultiRowMutationProtos.MutateRowsRequest.newBuilder();
- for (Mutation mutation : mutations) {
- if (mutation instanceof Put) {
- mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
- org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
- mutation));
- } else if (mutation instanceof Delete) {
- mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
- org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
- mutation));
- } else {
- throw new DoNotRetryIOException(
- "multiMutate doesn't support " + mutation.getClass().getName());
- }
- }
-
- MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
- MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
- try {
- service.mutateRows(null, mmrBuilder.build());
- } catch (ServiceException ex) {
- ProtobufUtil.toIOException(ex);
- }
- }
- }
-
- private void checkGroupName(String groupName) throws ConstraintException {
- if (!groupName.matches("[a-zA-Z0-9_]+")) {
- throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
- }
- }
-}
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java
deleted file mode 100644
index 56e35e76197c..000000000000
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.rsgroup;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.net.Address;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
-import org.apache.yetus.audience.InterfaceAudience;
-
-@InterfaceAudience.Private
-final class RSGroupProtobufUtil {
- private RSGroupProtobufUtil() {
- }
-
- static RSGroupInfo toGroupInfo(RSGroupProtos.RSGroupInfo proto) {
- RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName());
- for(HBaseProtos.ServerName el: proto.getServersList()) {
- RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort()));
- }
- for(HBaseProtos.TableName pTableName: proto.getTablesList()) {
- RSGroupInfo.addTable(ProtobufUtil.toTableName(pTableName));
- }
- return RSGroupInfo;
- }
-
- static RSGroupProtos.RSGroupInfo toProtoGroupInfo(RSGroupInfo pojo) {
- List tables = new ArrayList<>(pojo.getTables().size());
- for(TableName arg: pojo.getTables()) {
- tables.add(ProtobufUtil.toProtoTableName(arg));
- }
- List hostports = new ArrayList<>(pojo.getServers().size());
- for(Address el: pojo.getServers()) {
- hostports.add(HBaseProtos.ServerName.newBuilder()
- .setHostName(el.getHostname())
- .setPort(el.getPort())
- .build());
- }
- return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName())
- .addAllServers(hostports)
- .addAllTables(tables).build();
- }
-}
diff --git a/hbase-rsgroup/src/test/resources/log4j.properties b/hbase-rsgroup/src/test/resources/log4j.properties
deleted file mode 100644
index c322699ced24..000000000000
--- a/hbase-rsgroup/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,68 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Define some default values that can be overridden by system properties
-hbase.root.logger=INFO,console
-hbase.log.dir=.
-hbase.log.file=hbase.log
-
-# Define the root logger to the system property "hbase.root.logger".
-log4j.rootLogger=${hbase.root.logger}
-
-# Logging Threshold
-log4j.threshold=ALL
-
-#
-# Daily Rolling File Appender
-#
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}
-
-# Rollver at midnight
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-# Debugging Pattern format
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
-
-
-#
-# console
-# Add "console" to rootlogger above if you want to use this
-#
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
-
-# Custom Logging levels
-
-#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
-
-log4j.logger.org.apache.hadoop=WARN
-log4j.logger.org.apache.zookeeper=ERROR
-log4j.logger.org.apache.hadoop.hbase=DEBUG
-
-#These settings are workarounds against spurious logs from the minicluster.
-#See HBASE-4709
-log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
-log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
-log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
-log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
-# Enable this to get detailed connection error/retry logging.
-# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index 2a2e66675bc2..c22b0f952d7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -1340,6 +1340,24 @@ default void preGetRSGroupInfoOfServer(final ObserverContext ctx,
final Address server) throws IOException {}
+ /**
+ * Called before setting rsgroup for tables
+ * @param ctx the environment to interact with the framework and master
+ * @param tables tables to set group
+ * @param groupName group name
+ */
+ default void preSetRSGroupForTables(final ObserverContext ctx,
+ final Set tables, final String groupName) throws IOException {}
+
+ /**
+ * Called after setting rsgroup for tables
+ * @param ctx the environment to interact with the framework and master
+ * @param tables tables to set group
+ * @param groupName group name
+ */
+ default void postSetRSGroupForTables(final ObserverContext ctx,
+ final Set tables, final String groupName) throws IOException {}
+
/**
* Called before add a replication peer
* @param ctx the environment to interact with the framework and master
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bb2aadbf74db..cab3a64004ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -186,6 +186,7 @@
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -350,6 +351,8 @@ public void run() {
// manager of assignment nodes in zookeeper
private AssignmentManager assignmentManager;
+ private RSGroupInfoManager rsGroupInfoManager;
+
// manager of replication
private ReplicationPeerManager replicationPeerManager;
@@ -772,6 +775,8 @@ protected void initializeZKBasedSystemTrackers()
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
+ this.rsGroupInfoManager = RSGroupInfoManager.create(this);
+
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
@@ -1963,7 +1968,7 @@ private void warmUpRegion(ServerName server, RegionInfo region) {
// Replace with an async implementation from which you can get
// a success/failure result.
@VisibleForTesting
- public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException {
+ public void move(final byte[] encodedRegionName, byte[] destServerName) throws IOException {
RegionState regionState = assignmentManager.getRegionStates().
getRegionState(Bytes.toString(encodedRegionName));
@@ -3557,7 +3562,7 @@ public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplic
* @param servers Region servers to decommission.
*/
public void decommissionRegionServers(final List servers, final boolean offload)
- throws HBaseIOException {
+ throws IOException {
List serversAdded = new ArrayList<>(servers.size());
// Place the decommission marker first.
String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
@@ -3751,4 +3756,9 @@ public Map getWalGroupsReplicationStatus() {
public HbckChore getHbckChore() {
return this.hbckChore;
}
+
+ @Override
+ public RSGroupInfoManager getRSRSGroupInfoManager() {
+ return rsGroupInfoManager;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 816636f8ae07..0fc544a6aec1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -19,12 +19,12 @@
package org.apache.hadoop.hbase.master;
import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
@@ -65,95 +65,72 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
ServerName BOGUS_SERVER_NAME = ServerName.valueOf("localhost,1,1");
/**
- * Set the current cluster status. This allows a LoadBalancer to map host name to a server
- * @param st
+ * Set the current cluster status. This allows a LoadBalancer to map host name to a server
*/
void setClusterMetrics(ClusterMetrics st);
/**
* Pass RegionStates and allow balancer to set the current cluster load.
- * @param ClusterLoad
*/
void setClusterLoad(Map>> ClusterLoad);
/**
* Set the master service.
- * @param masterServices
*/
void setMasterServices(MasterServices masterServices);
/**
* Perform the major balance operation
- * @param tableName
- * @param clusterState
* @return List of plans
*/
- List balanceCluster(TableName tableName, Map> clusterState) throws HBaseIOException;
+ List balanceCluster(TableName tableName,
+ Map> clusterState) throws IOException;
/**
* Perform the major balance operation
- * @param clusterState
* @return List of plans
*/
- List balanceCluster(Map