Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand All @@ -29,8 +30,10 @@
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,13 +83,26 @@ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
try {
List<String> queues = storage.getAllQueues(crashedServer);
// this is for upgrading to the new region replication framework, where we will delete the
// legacy region_replica_replication peer directly, without deleting the replication queues,
// as it may still be used by region servers which have not been upgraded yet.
for (Iterator<String> iter = queues.iterator(); iter.hasNext();) {
ReplicationQueueInfo queue = new ReplicationQueueInfo(iter.next());
if (queue.getPeerId().equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)) {
LOG.info("Found replication queue {} for legacy region replication peer, " +
"skipping claiming and removing...", queue.getQueueId());
iter.remove();
storage.removeQueue(crashedServer, queue.getQueueId());
}
}
if (queues.isEmpty()) {
LOG.debug("Finish claiming replication queues for {}", crashedServer);
storage.removeReplicatorIfQueueIsEmpty(crashedServer);
// we are done
return null;
}
LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(),
LOG.debug(
"There are {} replication queues need to be claimed for {}", queues.size(),
crashedServer);
List<ServerName> targetServers =
env.getMasterServices().getServerManager().getOnlineServersList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, St
if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
.equals(peerConfig.getReplicationEndpointImpl())) {
// we do not use this endpoint for region replication any more, see HBASE-26233
LOG.warn("Legacy region replication peer found, removing: {}", peerConfig);
LOG.info("Legacy region replication peer found, removing: {}", peerConfig);
peerStorage.removePeer(peerId);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ void addSource(String peerId) throws IOException {
if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
.equals(peer.getPeerConfig().getReplicationEndpointImpl())) {
// we do not use this endpoint for region replication any more, see HBASE-26233
LOG.warn("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
return;
}
ReplicationSourceInterface src = createSource(peerId, peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
= "hbase.region.replica.replication.enabled";
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;

/**
* @deprecated Since 3.0.0, leave here only for implementing compatibility code.
*/
@Deprecated
public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";

/**
* Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -63,16 +69,22 @@ public void test() throws Exception {
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build();
SingleProcessHBaseCluster cluster = UTIL.getMiniHBaseCluster();
HMaster master = cluster.getMaster();
// can not use Admin.addPeer as it will fail with ClassNotFound
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().addPeer("legacy", peerConfig,
true);
UTIL.getMiniHBaseCluster().stopRegionServer(0);
RegionServerThread rst = UTIL.getMiniHBaseCluster().startRegionServer();
master.getReplicationPeerManager().addPeer("legacy", peerConfig, true);
// add a wal file to the queue
ServerName rsName = cluster.getRegionServer(0).getServerName();
master.getReplicationPeerManager().getQueueStorage().addWAL(rsName,
ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, "test-wal-file");
cluster.stopRegionServer(0);
RegionServerThread rst = cluster.startRegionServer();
// we should still have this peer
assertNotNull(UTIL.getAdmin().getReplicationPeerConfig("legacy"));
// but at RS side, we should not have this peer loaded as replication source
assertTrue(rst.getRegionServer().getReplicationSourceService().getReplicationManager()
.getSources().isEmpty());

UTIL.shutdownMiniHBaseCluster();
UTIL.restartHBaseCluster(1);
// now we should have removed the peer
Expand All @@ -81,5 +93,13 @@ public void test() throws Exception {
// at rs side, we should not have the peer this time, not only for not having replication source
assertTrue(UTIL.getMiniHBaseCluster().getRegionServer(0).getReplicationSourceService()
.getReplicationManager().getReplicationPeers().getAllPeerIds().isEmpty());

// make sure that we can finish the SCP and delete the test-wal-file
UTIL.waitFor(15000,
() -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p)
.allMatch(Procedure::isSuccess));
assertTrue(UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage()
.getAllQueues(rsName).isEmpty());
}
}