Skip to content

Commit 6ad7d9e

Browse files
committed
HBASE-28521 Use standard ConnectionRegistry and Client API to get region server list in in replication (#5825)
Signed-off-by: Guanghao Zhang <[email protected]> Reviewed-by: Andor Molnár <[email protected]> (cherry picked from commit 3d66866)
1 parent 951f335 commit 6ad7d9e

8 files changed

+91
-140
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java

Lines changed: 58 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,25 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22+
import java.util.Collection;
2223
import java.util.Collections;
24+
import java.util.EnumSet;
2325
import java.util.List;
2426
import java.util.Map;
2527
import java.util.UUID;
2628
import java.util.concurrent.ThreadLocalRandom;
2729
import org.apache.hadoop.conf.Configuration;
2830
import org.apache.hadoop.hbase.Abortable;
31+
import org.apache.hadoop.hbase.ClusterMetrics;
2932
import org.apache.hadoop.hbase.HBaseConfiguration;
3033
import org.apache.hadoop.hbase.ServerName;
3134
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
3235
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
3336
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
3437
import org.apache.hadoop.hbase.security.User;
35-
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
36-
import org.apache.hadoop.hbase.zookeeper.ZKListener;
37-
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
38-
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
38+
import org.apache.hadoop.hbase.util.FutureUtils;
39+
import org.apache.hadoop.hbase.util.ReservoirSample;
3940
import org.apache.yetus.audience.InterfaceAudience;
40-
import org.apache.zookeeper.KeeperException;
41-
import org.apache.zookeeper.KeeperException.AuthFailedException;
42-
import org.apache.zookeeper.KeeperException.ConnectionLossException;
43-
import org.apache.zookeeper.KeeperException.SessionExpiredException;
4441
import org.slf4j.Logger;
4542
import org.slf4j.LoggerFactory;
4643

@@ -56,12 +53,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
5653

5754
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
5855

59-
private ZKWatcher zkw = null;
60-
private final Object zkwLock = new Object();
61-
6256
protected Configuration conf;
6357

64-
private AsyncClusterConnection conn;
58+
private final Object connLock = new Object();
59+
60+
private volatile AsyncClusterConnection conn;
6561

6662
/**
6763
* Default maximum number of times a replication sink can be reported as bad before it will no
@@ -106,36 +102,15 @@ public void init(Context context) throws IOException {
106102
this.badReportCounts = Maps.newHashMap();
107103
}
108104

109-
protected void disconnect() {
110-
synchronized (zkwLock) {
111-
if (zkw != null) {
112-
zkw.close();
113-
}
114-
}
115-
if (this.conn != null) {
116-
try {
117-
this.conn.close();
118-
this.conn = null;
119-
} catch (IOException e) {
120-
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
121-
}
122-
}
123-
}
124-
125-
/**
126-
* A private method used to re-establish a zookeeper session with a peer cluster.
127-
*/
128-
private void reconnect(KeeperException ke) {
129-
if (
130-
ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
131-
|| ke instanceof AuthFailedException
132-
) {
133-
String clusterKey = ctx.getPeerConfig().getClusterKey();
134-
LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
135-
try {
136-
reloadZkWatcher();
137-
} catch (IOException io) {
138-
LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
105+
private void disconnect() {
106+
synchronized (connLock) {
107+
if (this.conn != null) {
108+
try {
109+
this.conn.close();
110+
this.conn = null;
111+
} catch (IOException e) {
112+
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
113+
}
139114
}
140115
}
141116
}
@@ -152,13 +127,7 @@ public void stop() {
152127

153128
@Override
154129
protected void doStart() {
155-
try {
156-
reloadZkWatcher();
157-
connectPeerCluster();
158-
notifyStarted();
159-
} catch (IOException e) {
160-
notifyFailed(e);
161-
}
130+
notifyStarted();
162131
}
163132

164133
@Override
@@ -168,44 +137,40 @@ protected void doStop() {
168137
}
169138

170139
@Override
171-
// Synchronize peer cluster connection attempts to avoid races and rate
172-
// limit connections when multiple replication sources try to connect to
173-
// the peer cluster. If the peer cluster is down we can get out of control
174-
// over time.
175140
public UUID getPeerUUID() {
176-
UUID peerUUID = null;
177141
try {
178-
synchronized (zkwLock) {
179-
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
180-
}
181-
} catch (KeeperException ke) {
182-
reconnect(ke);
142+
AsyncClusterConnection conn = connect();
143+
String clusterId = FutureUtils
144+
.get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)))
145+
.getClusterId();
146+
return UUID.fromString(clusterId);
147+
} catch (IOException e) {
148+
LOG.warn("Failed to get cluster id for cluster", e);
149+
return null;
183150
}
184-
return peerUUID;
185151
}
186152

187-
/**
188-
* Closes the current ZKW (if not null) and creates a new one
189-
* @throws IOException If anything goes wrong connecting
190-
*/
191-
private void reloadZkWatcher() throws IOException {
192-
synchronized (zkwLock) {
193-
if (zkw != null) {
194-
zkw.close();
195-
}
196-
zkw =
197-
new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this);
198-
zkw.registerListener(new PeerRegionServerListener(this));
153+
// do not call this method in doStart method, only initialize the connection to remote cluster
154+
// when you actually wants to make use of it. The problem here is that, starting the replication
155+
// endpoint is part of the region server initialization work, so if the peer cluster is fully
156+
// down and we can not connect to it, we will cause the initialization to fail and crash the
157+
// region server, as we need the cluster id while setting up the AsyncClusterConnection, which
158+
// needs to at least connect to zookeeper or some other servers in the peer cluster based on
159+
// different connection registry implementation
160+
private AsyncClusterConnection connect() throws IOException {
161+
AsyncClusterConnection c = this.conn;
162+
if (c != null) {
163+
return c;
199164
}
200-
}
201-
202-
private void connectPeerCluster() throws IOException {
203-
try {
204-
conn = createConnection(this.conf);
205-
} catch (IOException ioe) {
206-
LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
207-
throw ioe;
165+
synchronized (connLock) {
166+
c = this.conn;
167+
if (c != null) {
168+
return c;
169+
}
170+
c = createConnection(this.conf);
171+
conn = c;
208172
}
173+
return c;
209174
}
210175

211176
@Override
@@ -224,36 +189,27 @@ public boolean isAborted() {
224189
* Get the list of all the region servers from the specified peer
225190
* @return list of region server addresses or an empty list if the slave is unavailable
226191
*/
227-
protected List<ServerName> fetchSlavesAddresses() {
228-
List<String> children = null;
192+
// will be overrided in tests so protected
193+
protected Collection<ServerName> fetchPeerAddresses() {
229194
try {
230-
synchronized (zkwLock) {
231-
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
232-
}
233-
} catch (KeeperException ke) {
234-
if (LOG.isDebugEnabled()) {
235-
LOG.debug("Fetch slaves addresses failed", ke);
236-
}
237-
reconnect(ke);
238-
}
239-
if (children == null) {
195+
return FutureUtils.get(connect().getAdmin().getRegionServers(true));
196+
} catch (IOException e) {
197+
LOG.debug("Fetch peer addresses failed", e);
240198
return Collections.emptyList();
241199
}
242-
List<ServerName> addresses = new ArrayList<>(children.size());
243-
for (String child : children) {
244-
addresses.add(ServerName.parseServerName(child));
245-
}
246-
return addresses;
247200
}
248201

249202
protected synchronized void chooseSinks() {
250-
List<ServerName> slaveAddresses = fetchSlavesAddresses();
203+
Collection<ServerName> slaveAddresses = fetchPeerAddresses();
251204
if (slaveAddresses.isEmpty()) {
252205
LOG.warn("No sinks available at peer. Will not be able to replicate");
206+
this.sinkServers = Collections.emptyList();
207+
} else {
208+
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
209+
ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks);
210+
sample.add(slaveAddresses.iterator());
211+
this.sinkServers = sample.getSamplingResult();
253212
}
254-
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
255-
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
256-
this.sinkServers = slaveAddresses.subList(0, numSinks);
257213
badReportCounts.clear();
258214
}
259215

@@ -275,7 +231,7 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
275231
}
276232
ServerName serverName =
277233
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
278-
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
234+
return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName));
279235
}
280236

281237
/**
@@ -307,29 +263,6 @@ List<ServerName> getSinkServers() {
307263
return sinkServers;
308264
}
309265

310-
/**
311-
* Tracks changes to the list of region servers in a peer's cluster.
312-
*/
313-
public static class PeerRegionServerListener extends ZKListener {
314-
315-
private final HBaseReplicationEndpoint replicationEndpoint;
316-
private final String regionServerListNode;
317-
318-
public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
319-
super(endpoint.zkw);
320-
this.replicationEndpoint = endpoint;
321-
this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
322-
}
323-
324-
@Override
325-
public synchronized void nodeChildrenChanged(String path) {
326-
if (path.equals(regionServerListNode)) {
327-
LOG.info("Detected change to peer region servers, fetching updated list");
328-
replicationEndpoint.chooseSinks();
329-
}
330-
}
331-
}
332-
333266
/**
334267
* Wraps a replication region server sink to provide the ability to identify it.
335268
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
106106
private boolean isSerial = false;
107107
// Initialising as 0 to guarantee at least one logging message
108108
private long lastSinkFetchTime = 0;
109-
private volatile boolean stopping = false;
110109

111110
@Override
112111
public void init(Context context) throws IOException {
@@ -449,7 +448,7 @@ public boolean replicate(ReplicateContext replicateContext) {
449448
}
450449

451450
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
452-
while (this.isRunning() && !this.stopping) {
451+
while (this.isRunning()) {
453452
if (!isPeerEnabled()) {
454453
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
455454
sleepMultiplier++;
@@ -514,14 +513,6 @@ protected boolean isPeerEnabled() {
514513
return ctx.getReplicationPeer().isPeerEnabled();
515514
}
516515

517-
@Override
518-
protected void doStop() {
519-
// Allow currently running replication tasks to finish
520-
this.stopping = true;
521-
disconnect(); // don't call super.doStop()
522-
notifyStopped();
523-
}
524-
525516
protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
526517
int timeout) {
527518
int entriesHashCode = System.identityHashCode(entries);

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.Mockito.mock;
2222

2323
import java.io.IOException;
24+
import java.util.Collection;
2425
import java.util.List;
2526
import org.apache.hadoop.conf.Configuration;
2627
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -166,6 +167,9 @@ public void testReportBadSinkDownToZeroSinks() {
166167
ServerName serverNameA = endpoint.getSinkServers().get(0);
167168
ServerName serverNameB = endpoint.getSinkServers().get(1);
168169

170+
serverNames.remove(serverNameA);
171+
serverNames.remove(serverNameB);
172+
169173
SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
170174
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
171175

@@ -191,7 +195,7 @@ public void setRegionServers(List<ServerName> regionServers) {
191195
}
192196

193197
@Override
194-
public List<ServerName> fetchSlavesAddresses() {
198+
protected Collection<ServerName> fetchPeerAddresses() {
195199
return regionServers;
196200
}
197201

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import java.util.ArrayList;
2626
import java.util.List;
2727
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.stream.Collectors;
2829
import org.apache.hadoop.conf.Configuration;
2930
import org.apache.hadoop.fs.FileSystem;
3031
import org.apache.hadoop.fs.Path;
3132
import org.apache.hadoop.hbase.HBaseConfiguration;
3233
import org.apache.hadoop.hbase.HBaseTestingUtil;
3334
import org.apache.hadoop.hbase.HConstants;
35+
import org.apache.hadoop.hbase.ServerName;
3436
import org.apache.hadoop.hbase.TableName;
3537
import org.apache.hadoop.hbase.client.Admin;
3638
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -369,6 +371,14 @@ protected static void runSmallBatchTest() throws IOException, InterruptedExcepti
369371
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
370372
}
371373

374+
protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
375+
List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
376+
.map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
377+
for (ServerName rs : rses) {
378+
util.getMiniHBaseCluster().stopRegionServer(rs);
379+
}
380+
}
381+
372382
@AfterClass
373383
public static void tearDownAfterClass() throws Exception {
374384
if (htable2 != null) {

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli
4444

4545
@Test
4646
public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
47-
UTIL2.shutdownMiniHBaseCluster();
47+
// stop all region servers, we need to keep the master up as the below assertions need to get
48+
// cluster id from remote cluster, if master is also down, we can not get any information from
49+
// the remote cluster after source cluster restarts
50+
stopAllRegionServers(UTIL2);
51+
4852
// add some values to cluster 1
4953
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
5054
Put p = new Put(Bytes.toBytes("row" + i));

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe
4545

4646
@Test
4747
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
48-
UTIL2.shutdownMiniHBaseCluster();
48+
// stop all region servers, we need to keep the master up as the below assertions need to get
49+
// cluster id from remote cluster, if master is also down, we can not get any information from
50+
// the remote cluster after source cluster restarts
51+
stopAllRegionServers(UTIL2);
4952
restartSourceCluster(1);
5053
Admin hbaseAdmin = UTIL1.getAdmin();
5154
// add some values to source cluster

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe
4242

4343
@Test
4444
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
45-
UTIL2.shutdownMiniHBaseCluster();
45+
// stop all region servers, we need to keep the master up as the below assertions need to get
46+
// cluster id from remote cluster, if master is also down, we can not get any information from
47+
// the remote cluster after source cluster restarts
48+
stopAllRegionServers(UTIL2);
4649
restartSourceCluster(1);
4750
Admin hbaseAdmin = UTIL1.getAdmin();
4851
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();

0 commit comments

Comments
 (0)