Skip to content

Commit daaf5ad

Browse files
shahrs87apurtell
authored andcommitted
HBASE-26963 ReplicationSource#removePeer hangs if we try to remove bad peer. (#4413)
Signed-off-by: Andrew Purtell <[email protected]> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
1 parent b8868f2 commit daaf5ad

File tree

2 files changed

+106
-2
lines changed

2 files changed

+106
-2
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,8 +552,14 @@ private void initialize() {
552552
}
553553

554554
if (!this.isSourceActive()) {
555-
retryStartup.set(!this.abortOnError);
556555
setSourceStartupStatus(false);
556+
if (Thread.currentThread().isInterrupted()) {
557+
// If source is not running and thread is interrupted this means someone has tried to
558+
// remove this peer.
559+
return;
560+
}
561+
562+
retryStartup.set(!this.abortOnError);
557563
throw new IllegalStateException("Source should be active.");
558564
}
559565

@@ -576,8 +582,13 @@ private void initialize() {
576582
}
577583

578584
if (!this.isSourceActive()) {
579-
retryStartup.set(!this.abortOnError);
580585
setSourceStartupStatus(false);
586+
if (Thread.currentThread().isInterrupted()) {
587+
// If source is not running and thread is interrupted this means someone has tried to
588+
// remove this peer.
589+
return;
590+
}
591+
retryStartup.set(!this.abortOnError);
581592
throw new IllegalStateException("Source should be active.");
582593
}
583594
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(),
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client.replication;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.HBaseClassTestRule;
23+
import org.apache.hadoop.hbase.HBaseTestingUtility;
24+
import org.apache.hadoop.hbase.HConstants;
25+
import org.apache.hadoop.hbase.client.Admin;
26+
import org.apache.hadoop.hbase.client.Connection;
27+
import org.apache.hadoop.hbase.client.ConnectionFactory;
28+
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
29+
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
30+
import org.apache.hadoop.hbase.testclassification.ClientTests;
31+
import org.apache.hadoop.hbase.testclassification.MediumTests;
32+
import org.junit.AfterClass;
33+
import org.junit.BeforeClass;
34+
import org.junit.ClassRule;
35+
import org.junit.Rule;
36+
import org.junit.Test;
37+
import org.junit.experimental.categories.Category;
38+
import org.junit.rules.TestName;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
@Category({ MediumTests.class, ClientTests.class })
43+
public class TestBadReplicationPeer {
44+
@ClassRule
45+
public static final HBaseClassTestRule CLASS_RULE =
46+
HBaseClassTestRule.forClass(TestBadReplicationPeer.class);
47+
private static final Logger LOG = LoggerFactory.getLogger(TestBadReplicationPeer.class);
48+
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
49+
private static Configuration conf;
50+
51+
@Rule
52+
public TestName name = new TestName();
53+
54+
@BeforeClass
55+
public static void setUpBeforeClass() throws Exception {
56+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
57+
TEST_UTIL.getConfiguration().setBoolean("replication.source.regionserver.abort", false);
58+
TEST_UTIL.startMiniCluster();
59+
conf = TEST_UTIL.getConfiguration();
60+
}
61+
62+
@AfterClass
63+
public static void tearDownAfterClass() throws Exception {
64+
TEST_UTIL.shutdownMiniCluster();
65+
}
66+
67+
/*
68+
* Add dummy peer and make sure that we are able to remove that peer.
69+
*/
70+
@Test
71+
public void testRemovePeerSucceeds() throws IOException {
72+
String peerId = "dummypeer_1";
73+
try (Connection connection = ConnectionFactory.createConnection(conf);
74+
Admin admin = connection.getAdmin()) {
75+
ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
76+
String quorum = TEST_UTIL.getHBaseCluster().getMaster().getZooKeeper().getQuorum();
77+
rpcBuilder.setClusterKey(quorum + ":/1");
78+
ReplicationPeerConfig rpc = rpcBuilder.build();
79+
admin.addReplicationPeer(peerId, rpc);
80+
LOG.info("Added replication peer with peer id: {}", peerId);
81+
} finally {
82+
LOG.info("Removing replication peer with peer id: {}", peerId);
83+
cleanPeer(peerId);
84+
}
85+
}
86+
87+
private static void cleanPeer(String peerId) throws IOException {
88+
try (Connection connection = ConnectionFactory.createConnection(conf);
89+
Admin admin = connection.getAdmin()) {
90+
admin.removeReplicationPeer(peerId);
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)