Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
@InterfaceAudience.Private
public interface ReplicationService {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid touching unnecessary file.

/**
* Initializes the replication service object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,12 +42,14 @@ public abstract class BaseReplicationEndpoint extends AbstractService
public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY
= "hbase.replication.source.custom.walentryfilters";
protected Context ctx;
protected ServerName hostServerName;

@Override
public void init(Context context) throws IOException {
this.ctx = context;

if (this.ctx != null){
hostServerName = this.ctx.getHostServerName();
ReplicationPeer peer = this.ctx.getReplicationPeer();
if (peer != null){
peer.registerPeerConfigListener(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,46 @@

package org.apache.hadoop.hbase.replication;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Maps;

/**
Expand Down Expand Up @@ -77,9 +90,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
*/
public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;

static final float DEFAULT_REPLICATION_SOURCE_GROUP_RATIO = 1f;

// Ratio of total number of potential peer region servers to be used
private float ratio;

private float groupRatio;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here can add some descriptions/comments as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please add some explaination.


// Maximum number of times a sink can be reported as bad before the pool of
// replication sinks is refreshed
private int badSinkThreshold;
Expand All @@ -88,6 +105,17 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private List<ServerName> sinkServers = new ArrayList<>(0);

private static ThreadLocal<AtomicBoolean> threadLocal = new ThreadLocal<AtomicBoolean>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use AtomicBoolean for a thread local variable?

@Override
protected AtomicBoolean initialValue() {
return new AtomicBoolean(false);
}
};

public boolean getIsGroup() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. redundant space

return threadLocal.get().get();
}

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
Expand All @@ -104,6 +132,8 @@ public void init(Context context) throws IOException {
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
this.ratio =
ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.groupRatio = conf.getFloat("replication.source.group.ratio",
DEFAULT_REPLICATION_SOURCE_GROUP_RATIO);
this.badSinkThreshold =
ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
this.badReportCounts = Maps.newHashMap();
Expand Down Expand Up @@ -228,6 +258,10 @@ public boolean isAborted() {
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() {
return fetchSlavesAddresses(hostServerName);
}

private List<ServerName> fetchSlavesAddresses(ServerName hostServerName) {
List<String> children = null;
try {
synchronized (zkwLock) {
Expand All @@ -242,21 +276,150 @@ protected List<ServerName> fetchSlavesAddresses() {
if (children == null) {
return Collections.emptyList();
}

Configuration conf = HBaseConfiguration.create();

/** if use other balancer, return all regionservers */
if (!conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On master this is not the case now. The balancer will always be a RSGroupBasedLoadBalancer, if we do not enable rs group feature, DisabledRSGroupInfoManager will be used and there will be only one group, which will act as there is no rs group. So I think here we should check for RSGroupUtil.isRSGroupEnabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And another problem is do we need to check the configuration here? I think it is the version at the source cluster, but we need to check the target cluster?

.equals(RSGroupBasedLoadBalancer.class.getName())
|| hostServerName == null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Use replication random choose policy...");
}
return parseServerNameFromList(children);
} else {
/** if use rsgroup balancer,
* just return regionservers belong to the same rsgroup or default rsgroup */
if(LOG.isDebugEnabled()) {
LOG.debug("Use replication rsgroup choose policy...");
}
Map<String, String> serverNameHostPortMapping = new HashMap<>();
for (String serverName : children) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can reuse parseServerNameFromList(children) ? then the followings can just user ServerName#getHost and ServerName#getPort.

String mappingKey =
serverName.split(",")[0] + ServerName.SERVERNAME_SEPARATOR + serverName.split(",")[1];
serverNameHostPortMapping.put(mappingKey, serverName);
}

String groupName = null;
RSGroupInfo rsGroupInfo = null;
try {
rsGroupInfo = getRSGroupInfoOfServer(conn.toConnection(), hostServerName.getAddress());
}catch (IOException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still neede.printStackTrace();?

LOG.error("rsGroupInfo error!", e);
}
if (rsGroupInfo != null) {
groupName = rsGroupInfo.getName();
}
try {
List<ServerName> serverList =
getGroupServerListFromTargetZkCluster(groupName, zkw, serverNameHostPortMapping);
if (serverList.size() > 0) {
// if target cluster open group balancer, serverList must has server(s)
LOG.debug("group list > 0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can log which group > 0?

threadLocal.get().getAndSet(true);
return serverList;
}
else {
// if not, choose sinkers from all regionservers
LOG.debug("target group list <= 0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Please add check LOG.isDebugEnabled() {} and elsewhere

return parseServerNameFromList(children);
}
}catch (IOException | KeeperException e) {
LOG.error("Get server list from target zk error", e);
return Collections.emptyList();
}
}
}

protected List<ServerName> parseServerNameFromList(List<String> children) {
if (children == null) {
return Collections.emptyList();
}
StringBuffer sb = new StringBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use StringBuilder here?

List<ServerName> addresses = new ArrayList<>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
sb.append(ServerName.parseServerName(child)).append("/");
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Find " + addresses.size() + " child znodes from target cluster zk. " + sb.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use parameterized logging, and elsewhere.

}
return addresses;
}

protected List<ServerName> getGroupServerListFromTargetZkCluster(String groupName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On master branch we do not use zk for storing the rs group any more...

ZKWatcher zkw, Map<String, String> serverNameHostPortMapping)
throws KeeperException, IOException {
/** get group info from slave cluster zk */
List<String> groupInfos = ZKUtil.listChildrenAndWatchForNewChildren(
zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"));
/** if target cluster have same name group */
if(groupInfos == null){
if(LOG.isDebugEnabled()){
LOG.debug("groupInfos == null");
}
return Collections.emptyList();
}else{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. spaces between else {} are needed

if (groupInfos.size() > 0) {
if (groupInfos.contains(groupName)) {
return getServerListFromWithRSGroupName(groupName, zkw, serverNameHostPortMapping);
} else if (!groupInfos.contains(groupName)) {
/** if target cluster does not have same name group, return a empty list */
return Collections.emptyList();
}
} else {
/** if target cluster does not use group balancer, return a empty list */
return Collections.emptyList();
}
}

return Collections.emptyList();
}

protected List<ServerName> getServerListFromWithRSGroupName(
String groupName, ZKWatcher zkw, Map<String, String> serverNameHostPortMapping)
throws IOException {
List<ServerName> serverList = new ArrayList<>();
RSGroupInfo detail = retrieveGroupInfo(
zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"), groupName);
// choose server from rsZNode children which also in same group with local mashine
for (Address serverInfo : detail.getServers()) {
String serverPrefix =
serverInfo.getHostname() + ServerName.SERVERNAME_SEPARATOR + serverInfo.getPort();
if (serverNameHostPortMapping.containsKey(serverPrefix)) {
ServerName sn = ServerName.parseServerName(serverNameHostPortMapping.get(serverPrefix));
if(LOG.isDebugEnabled()) {
LOG.debug("Match server in " + groupName + " success " + serverPrefix + "/" + sn);
}
serverList.add(sn);
}
}
return serverList;
}

protected synchronized void chooseSinks() {
List<ServerName> slaveAddresses = fetchSlavesAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
float actualRatio=ratio;
if(getIsGroup() && conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
.getDefaultLoadBalancerClass().getName()).equals(RSGroupBasedLoadBalancer.class.getName())
&& hostServerName != null) {
actualRatio=groupRatio;
}

int numSinks = (int) Math.ceil(slaveAddresses.size() * actualRatio);
this.sinkServers = slaveAddresses.subList(0, numSinks);
StringBuffer sb = new StringBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

sb.append("choose sinker(s) of target cluster ");
for(ServerName sn : sinkServers){
sb.append(sn.getServerName()).append("/");
}
LOG.debug(sb.toString());
badReportCounts.clear();
}

Expand Down Expand Up @@ -314,6 +477,45 @@ List<ServerName> getSinkServers() {
return sinkServers;
}

public RSGroupInfo getRSGroupInfoOfServer(Connection connection, Address hostAndPort)
throws IOException {
Admin rsGroupAdmin =
connection.getAdmin();
RSGroupInfo rsGroupInfo = null;
try {
rsGroupInfo = rsGroupAdmin.getRSGroup(hostAndPort);
}catch (Exception e){
LOG.error("failed to fetch the RSGroupInfo!",e);
}
finally {
if(rsGroupAdmin != null) {
rsGroupAdmin.close();
}
}
return rsGroupInfo;
}

public RSGroupInfo retrieveGroupInfo(ZKWatcher watcher, String groupBasePath,
String groupName) throws IOException {
ByteArrayInputStream bis = null;
try {
String groupInfoPath = ZNodePaths.joinZNode(groupBasePath, groupName);
LOG.debug("---groupInfoPath: " + groupInfoPath);
if (-1 != ZKUtil.checkExists(watcher, groupInfoPath)) {
byte[] data = ZKUtil.getData(watcher, groupInfoPath);
if (data.length > 0) {
ProtobufUtil.expectPBMagicPrefix(data);
bis = new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
}
}
} catch (KeeperException | InterruptedException e) {
throw new IOException("Failed to read groupZNode", e);
} catch (DeserializationException e) {
throw new IOException("Failed to read groupZNode", e);
}
return ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis));
}

/**
* Tracks changes to the list of region servers in a peer's cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -125,6 +126,12 @@ public TableDescriptors getTableDescriptors() {
public Abortable getAbortable() {
return abortable;
}

public ServerName getHostServerName() {
if (server != null)
return server.getServerName();
return null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class Replication implements ReplicationSourceService {

private PeerProcedureHandler peerProcedureHandler;

private ServerName serverName;

/**
* Empty constructor
*/
Expand Down Expand Up @@ -164,6 +167,7 @@ public void stopReplicationService() {
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
this.serverName = this.replicationManager.getServerName();
this.server.getChoreService().scheduleChore(
new ReplicationStatisticsChore("ReplicationSourceStatistics", server,
(int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
*/
private final List<WALEntryFilter> baseFilterOutWALEntries;

private ServerName serverName;

public ServerName getServerName() {
return serverName;
}

ReplicationSource() {
// Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
this(p -> !AbstractFSWALProvider.isMetaFile(p),
Expand Down Expand Up @@ -284,6 +290,7 @@ private ReplicationEndpoint createReplicationEndpoint()
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
this.serverName = server.getServerName();
}
String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();

Expand Down
Loading