diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index 33b3321755fa..ebf8e62b509e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -32,6 +32,7 @@ */ @InterfaceAudience.Private public interface ReplicationService { + /** * Initializes the replication service object. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 56576a6cf3e1..797d68cfd807 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +42,14 @@ public abstract class BaseReplicationEndpoint extends AbstractService public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY = "hbase.replication.source.custom.walentryfilters"; protected Context ctx; + protected ServerName hostServerName; @Override public void init(Context context) throws IOException { this.ctx = context; if (this.ctx != null){ + hostServerName = this.ctx.getHostServerName(); ReplicationPeer peer = this.ctx.getReplicationPeer(); if (peer != null){ peer.registerPeerConfigListener(this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 86786856f214..b1bb2bbee3a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -18,25 +18,39 @@ package org.apache.hadoop.hbase.replication; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.zookeeper.ZKListener; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.AuthFailedException; @@ -44,7 +58,6 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.Maps; /** @@ -77,9 +90,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint */ public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; + static final float DEFAULT_REPLICATION_SOURCE_GROUP_RATIO = 1f; + // Ratio of total number of potential peer region servers to be used private float ratio; + private float groupRatio; + // Maximum number of times a sink can be reported as bad before the pool of // replication sinks is refreshed private int badSinkThreshold; @@ -88,6 +105,17 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private List sinkServers = new ArrayList<>(0); + private static ThreadLocal threadLocal = new ThreadLocal() { + @Override + protected AtomicBoolean initialValue() { + return new AtomicBoolean(false); + } + }; + + public boolean getIsGroup() { + return threadLocal.get().get(); + } + /* * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different * Connection implementations, or initialize it in a different way, so defining createConnection @@ -104,6 +132,8 @@ public void init(Context context) throws IOException { this.conf = HBaseConfiguration.create(ctx.getConfiguration()); this.ratio = ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); + this.groupRatio = conf.getFloat("replication.source.group.ratio", + DEFAULT_REPLICATION_SOURCE_GROUP_RATIO); this.badSinkThreshold = ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); this.badReportCounts = Maps.newHashMap(); @@ -228,6 +258,10 @@ public boolean isAborted() { * @return list of region server addresses or an empty list if the slave is unavailable */ protected List fetchSlavesAddresses() { + return fetchSlavesAddresses(hostServerName); + } + + private List fetchSlavesAddresses(ServerName hostServerName) { List children = null; try { synchronized (zkwLock) { @@ -242,21 +276,150 @@ protected List fetchSlavesAddresses() { if (children == null) { return Collections.emptyList(); } + + Configuration conf = HBaseConfiguration.create(); + + /** if use other balancer, return all regionservers */ + if (!conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS) + .equals(RSGroupBasedLoadBalancer.class.getName()) + || hostServerName == null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Use replication random choose policy..."); + } + return parseServerNameFromList(children); + } else { + /** if use rsgroup balancer, + * just return regionservers belong to the same rsgroup or default rsgroup */ + if(LOG.isDebugEnabled()) { + LOG.debug("Use replication rsgroup choose policy..."); + } + Map serverNameHostPortMapping = new HashMap<>(); + for (String serverName : children) { + String mappingKey = + serverName.split(",")[0] + ServerName.SERVERNAME_SEPARATOR + serverName.split(",")[1]; + serverNameHostPortMapping.put(mappingKey, serverName); + } + + String groupName = null; + RSGroupInfo rsGroupInfo = null; + try { + rsGroupInfo = getRSGroupInfoOfServer(conn.toConnection(), hostServerName.getAddress()); + }catch (IOException e) { + e.printStackTrace(); + LOG.error("rsGroupInfo error!", e); + } + if (rsGroupInfo != null) { + groupName = rsGroupInfo.getName(); + } + try { + List serverList = + getGroupServerListFromTargetZkCluster(groupName, zkw, serverNameHostPortMapping); + if (serverList.size() > 0) { + // if target cluster open group balancer, serverList must has server(s) + LOG.debug("group list > 0"); + threadLocal.get().getAndSet(true); + return serverList; + } + else { + // if not, choose sinkers from all regionservers + LOG.debug("target group list <= 0"); + return parseServerNameFromList(children); + } + }catch (IOException | KeeperException e) { + LOG.error("Get server list from target zk error", e); + return Collections.emptyList(); + } + } + } + + protected List parseServerNameFromList(List children) { + if (children == null) { + return Collections.emptyList(); + } + StringBuffer sb = new StringBuffer(); List addresses = new ArrayList<>(children.size()); for (String child : children) { addresses.add(ServerName.parseServerName(child)); + sb.append(ServerName.parseServerName(child)).append("/"); + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Find " + addresses.size() + " child znodes from target cluster zk. " + sb.toString()); } return addresses; } + protected List getGroupServerListFromTargetZkCluster(String groupName, + ZKWatcher zkw, Map serverNameHostPortMapping) + throws KeeperException, IOException { + /** get group info from slave cluster zk */ + List groupInfos = ZKUtil.listChildrenAndWatchForNewChildren( + zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup")); + /** if target cluster have same name group */ + if(groupInfos == null){ + if(LOG.isDebugEnabled()){ + LOG.debug("groupInfos == null"); + } + return Collections.emptyList(); + }else{ + if (groupInfos.size() > 0) { + if (groupInfos.contains(groupName)) { + return getServerListFromWithRSGroupName(groupName, zkw, serverNameHostPortMapping); + } else if (!groupInfos.contains(groupName)) { + /** if target cluster does not have same name group, return a empty list */ + return Collections.emptyList(); + } + } else { + /** if target cluster does not use group balancer, return a empty list */ + return Collections.emptyList(); + } + } + + return Collections.emptyList(); + } + + protected List getServerListFromWithRSGroupName( + String groupName, ZKWatcher zkw, Map serverNameHostPortMapping) + throws IOException { + List serverList = new ArrayList<>(); + RSGroupInfo detail = retrieveGroupInfo( + zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"), groupName); + // choose server from rsZNode children which also in same group with local mashine + for (Address serverInfo : detail.getServers()) { + String serverPrefix = + serverInfo.getHostname() + ServerName.SERVERNAME_SEPARATOR + serverInfo.getPort(); + if (serverNameHostPortMapping.containsKey(serverPrefix)) { + ServerName sn = ServerName.parseServerName(serverNameHostPortMapping.get(serverPrefix)); + if(LOG.isDebugEnabled()) { + LOG.debug("Match server in " + groupName + " success " + serverPrefix + "/" + sn); + } + serverList.add(sn); + } + } + return serverList; + } + protected synchronized void chooseSinks() { List slaveAddresses = fetchSlavesAddresses(); if (slaveAddresses.isEmpty()) { LOG.warn("No sinks available at peer. Will not be able to replicate"); } Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); - int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); + float actualRatio=ratio; + if(getIsGroup() && conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory + .getDefaultLoadBalancerClass().getName()).equals(RSGroupBasedLoadBalancer.class.getName()) + && hostServerName != null) { + actualRatio=groupRatio; + } + + int numSinks = (int) Math.ceil(slaveAddresses.size() * actualRatio); this.sinkServers = slaveAddresses.subList(0, numSinks); + StringBuffer sb = new StringBuffer(); + sb.append("choose sinker(s) of target cluster "); + for(ServerName sn : sinkServers){ + sb.append(sn.getServerName()).append("/"); + } + LOG.debug(sb.toString()); badReportCounts.clear(); } @@ -314,6 +477,45 @@ List getSinkServers() { return sinkServers; } + public RSGroupInfo getRSGroupInfoOfServer(Connection connection, Address hostAndPort) + throws IOException { + Admin rsGroupAdmin = + connection.getAdmin(); + RSGroupInfo rsGroupInfo = null; + try { + rsGroupInfo = rsGroupAdmin.getRSGroup(hostAndPort); + }catch (Exception e){ + LOG.error("failed to fetch the RSGroupInfo!",e); + } + finally { + if(rsGroupAdmin != null) { + rsGroupAdmin.close(); + } + } + return rsGroupInfo; + } + + public RSGroupInfo retrieveGroupInfo(ZKWatcher watcher, String groupBasePath, + String groupName) throws IOException { + ByteArrayInputStream bis = null; + try { + String groupInfoPath = ZNodePaths.joinZNode(groupBasePath, groupName); + LOG.debug("---groupInfoPath: " + groupInfoPath); + if (-1 != ZKUtil.checkExists(watcher, groupInfoPath)) { + byte[] data = ZKUtil.getData(watcher, groupInfoPath); + if (data.length > 0) { + ProtobufUtil.expectPBMagicPrefix(data); + bis = new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); + } + } + } catch (KeeperException | InterruptedException e) { + throw new IOException("Failed to read groupZNode", e); + } catch (DeserializationException e) { + throw new IOException("Failed to read groupZNode", e); + } + return ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)); + } + /** * Tracks changes to the list of region servers in a peer's cluster. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 3fec8131d090..d190d90406f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -125,6 +126,12 @@ public TableDescriptors getTableDescriptors() { public Abortable getAbortable() { return abortable; } + + public ServerName getHostServerName() { + if (server != null) + return server.getServerName(); + return null; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 4cf2b495fa1a..71f2d4df1c97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; @@ -70,6 +71,8 @@ public class Replication implements ReplicationSourceService { private PeerProcedureHandler peerProcedureHandler; + private ServerName serverName; + /** * Empty constructor */ @@ -164,6 +167,7 @@ public void stopReplicationService() { @Override public void startReplicationService() throws IOException { this.replicationManager.init(); + this.serverName = this.replicationManager.getServerName(); this.server.getChoreService().scheduleChore( new ReplicationStatisticsChore("ReplicationSourceStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond))); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb13abb8da04..ca78d6f8a328 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -163,6 +163,12 @@ public class ReplicationSource implements ReplicationSourceInterface { */ private final List baseFilterOutWALEntries; + private ServerName serverName; + + public ServerName getServerName() { + return serverName; + } + ReplicationSource() { // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. this(p -> !AbstractFSWALProvider.isMetaFile(p), @@ -284,6 +290,7 @@ private ReplicationEndpoint createReplicationEndpoint() RegionServerCoprocessorHost rsServerHost = null; if (server instanceof HRegionServer) { rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + this.serverName = server.getServerName(); } String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 73efcfe6b5fa..cbde5e47b8a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -192,6 +192,12 @@ public class ReplicationSourceManager { */ AtomicReference catalogReplicationSource = new AtomicReference<>(); + private ServerName serverName; + + public ServerName getServerName() { + return serverName; + } + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues