Skip to content

Commit 4a3ff98

Browse files
authored
HBASE-25559 Terminate threads of oldsources while RS is closing (#2938)
Signed-off-by: Viraj Jasani <[email protected]> Signed-off-by: stack <[email protected]> Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent 0353909 commit 4a3ff98

File tree

3 files changed

+113
-1
lines changed

3 files changed

+113
-1
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
112112
// Maximum number of retries before taking bold actions
113113
private int maxRetriesMultiplier;
114114
// Indicates if this particular source is running
115-
private volatile boolean sourceRunning = false;
115+
volatile boolean sourceRunning = false;
116116
// Metrics for this source
117117
private MetricsSource metrics;
118118
// WARN threshold for the number of queued logs, defaults to 2

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,9 @@ public void join() {
10201020
for (ReplicationSourceInterface source : this.sources.values()) {
10211021
source.terminate("Region server is closing");
10221022
}
1023+
for (ReplicationSourceInterface source : this.oldsources) {
1024+
source.terminate("Region server is closing");
1025+
}
10231026
}
10241027

10251028
/**
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication.regionserver;
19+
20+
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import java.util.Optional;
24+
import java.util.stream.Stream;
25+
26+
import org.apache.hadoop.hbase.HBaseClassTestRule;
27+
import org.apache.hadoop.hbase.HConstants;
28+
import org.apache.hadoop.hbase.TableName;
29+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
30+
import org.apache.hadoop.hbase.client.Put;
31+
import org.apache.hadoop.hbase.client.Table;
32+
import org.apache.hadoop.hbase.client.TableDescriptor;
33+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
34+
import org.apache.hadoop.hbase.regionserver.HRegionServer;
35+
import org.apache.hadoop.hbase.replication.TestReplicationBase;
36+
import org.apache.hadoop.hbase.testclassification.MediumTests;
37+
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
38+
import org.apache.hadoop.hbase.util.Bytes;
39+
import org.apache.hadoop.hbase.util.JVMClusterUtil;
40+
import org.junit.BeforeClass;
41+
import org.junit.ClassRule;
42+
import org.junit.Rule;
43+
import org.junit.Test;
44+
import org.junit.experimental.categories.Category;
45+
import org.junit.rules.TestName;
46+
47+
@Category({ ReplicationTests.class, MediumTests.class})
48+
public class TestReplicationSourceManagerJoin extends TestReplicationBase {
49+
50+
@ClassRule
51+
public static final HBaseClassTestRule CLASS_RULE =
52+
HBaseClassTestRule.forClass(TestReplicationSourceManagerJoin.class);
53+
54+
@Rule
55+
public TestName testName = new TestName();
56+
57+
@BeforeClass
58+
public static void setUpBeforeClass() throws Exception {
59+
// NUM_SLAVES1 is presumed 2 in below.
60+
NUM_SLAVES1 = 2;
61+
TestReplicationBase.setUpBeforeClass();
62+
}
63+
64+
@Test
65+
public void testReplicationSourcesTerminate() throws Exception {
66+
// Create table in source cluster only, let TableNotFoundException block peer to avoid
67+
// recovered source end.
68+
TableName tableName = TableName.valueOf(testName.getMethodName());
69+
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
70+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
71+
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
72+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
73+
hbaseAdmin.createTable(td);
74+
assertFalse(UTIL2.getAdmin().tableExists(tableName));
75+
Table table = UTIL1.getConnection().getTable(tableName);
76+
// load data
77+
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
78+
table.put(new Put(Bytes.toBytes(i)).addColumn(famName, row, row));
79+
}
80+
// Kill rs holding table region. There are only TWO servers. We depend on it.
81+
Optional<HRegionServer> server =
82+
UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
83+
.map(JVMClusterUtil.RegionServerThread::getRegionServer)
84+
.filter(rs -> !rs.getRegions(tableName).isEmpty()).findAny();
85+
assertTrue(server.isPresent());
86+
server.get().abort("stopping for test");
87+
88+
UTIL1.waitFor(60000, () -> 1 == UTIL1.getMiniHBaseCluster().getNumLiveRegionServers());
89+
UTIL1.waitTableAvailable(tableName);
90+
// Wait for recovered source running
91+
HRegionServer rs =
92+
UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
93+
ReplicationSourceManager manager = rs.getReplicationSourceService().getReplicationManager();
94+
UTIL1.waitFor(60000, () -> !manager.getOldSources().isEmpty());
95+
96+
assertFalse(manager.getSources().isEmpty());
97+
assertFalse(manager.getOldSources().isEmpty());
98+
99+
// Check all sources running before manager.join(), terminated after manager.join().
100+
Stream.concat(manager.getSources().stream(), manager.getOldSources().stream())
101+
.filter(src -> src instanceof ReplicationSource)
102+
.forEach(src -> assertTrue(((ReplicationSource) src).sourceRunning));
103+
manager.join();
104+
Stream.concat(manager.getSources().stream(), manager.getOldSources().stream())
105+
.filter(src -> src instanceof ReplicationSource)
106+
.forEach(src -> assertFalse(((ReplicationSource) src).sourceRunning));
107+
}
108+
109+
}

0 commit comments

Comments
 (0)