Skip to content

Commit cf71e3a

Browse files
committed
add UT
1 parent 1c2d3ce commit cf71e3a

File tree

3 files changed

+249
-0
lines changed

3 files changed

+249
-0
lines changed

hbase-replication/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@
9898
<artifactId>junit</artifactId>
9999
<scope>test</scope>
100100
</dependency>
101+
<dependency>
102+
<groupId>org.hamcrest</groupId>
103+
<artifactId>hamcrest-core</artifactId>
104+
<scope>test</scope>
105+
</dependency>
106+
<dependency>
107+
<groupId>org.hamcrest</groupId>
108+
<artifactId>hamcrest-library</artifactId>
109+
<scope>test</scope>
110+
</dependency>
101111
<dependency>
102112
<groupId>org.mockito</groupId>
103113
<artifactId>mockito-core</artifactId>

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,4 +320,16 @@ public boolean hasData() throws KeeperException {
320320
public void deleteRootZNode() throws KeeperException {
321321
ZKUtil.deleteNodeRecursively(zookeeper, replicationZNode);
322322
}
323+
324+
String getQueuesZNode() {
325+
return queuesZNode;
326+
}
327+
328+
String getHfileRefsZNode() {
329+
return hfileRefsZNode;
330+
}
331+
332+
String getRegionsZNode() {
333+
return regionsZNode;
334+
}
323335
}
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.empty;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertNotNull;
25+
import static org.junit.Assert.assertNull;
26+
import static org.junit.Assert.assertTrue;
27+
28+
import java.io.IOException;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
import org.apache.hadoop.hbase.HBaseClassTestRule;
35+
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
36+
import org.apache.hadoop.hbase.ServerName;
37+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.MigrationIterator;
38+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.ZkLastPushedSeqId;
39+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.ZkReplicationQueueData;
40+
import org.apache.hadoop.hbase.testclassification.MediumTests;
41+
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
42+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43+
import org.apache.hadoop.hbase.util.MD5Hash;
44+
import org.apache.hadoop.hbase.util.Pair;
45+
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
46+
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
47+
import org.junit.AfterClass;
48+
import org.junit.Before;
49+
import org.junit.BeforeClass;
50+
import org.junit.ClassRule;
51+
import org.junit.Rule;
52+
import org.junit.Test;
53+
import org.junit.experimental.categories.Category;
54+
import org.junit.rules.TestName;
55+
56+
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
57+
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
58+
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
59+
60+
@Category({ ReplicationTests.class, MediumTests.class })
61+
public class TestZKReplicationQueueStorage {
62+
63+
@ClassRule
64+
public static final HBaseClassTestRule CLASS_RULE =
65+
HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
66+
67+
private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
68+
69+
private ZKReplicationQueueStorage storage;
70+
71+
@Rule
72+
public final TestName name = new TestName();
73+
74+
@BeforeClass
75+
public static void setUpBeforeClass() throws Exception {
76+
UTIL.startMiniZKCluster();
77+
}
78+
79+
@AfterClass
80+
public static void tearDownAfterClass() throws IOException {
81+
UTIL.shutdownMiniZKCluster();
82+
}
83+
84+
@Before
85+
public void setUp() throws IOException {
86+
UTIL.getConfiguration().set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName());
87+
storage = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
88+
}
89+
90+
@Test
91+
public void testDeleteRootZNode() throws Exception {
92+
assertFalse(storage.hasData());
93+
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), storage.replicationZNode);
94+
assertTrue(storage.hasData());
95+
storage.deleteRootZNode();
96+
assertFalse(storage.hasData());
97+
}
98+
99+
@Test
100+
public void testListAllQueues() throws Exception {
101+
String peerId = "1";
102+
ServerName deadServer =
103+
ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
104+
int nServers = 10;
105+
for (int i = 0; i < nServers; i++) {
106+
ServerName sn =
107+
ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
108+
String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
109+
String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
110+
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), peerZNode);
111+
for (int j = 0; j < i; j++) {
112+
String wal = ZNodePaths.joinZNode(peerZNode, "wal-" + j);
113+
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(), wal, ZKUtil.positionToByteArray(j));
114+
}
115+
String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
116+
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), deadServerPeerZNode);
117+
for (int j = 0; j < i; j++) {
118+
String wal = ZNodePaths.joinZNode(deadServerPeerZNode, "wal-" + j);
119+
if (j > 0) {
120+
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(), wal, ZKUtil.positionToByteArray(j));
121+
} else {
122+
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), wal);
123+
}
124+
}
125+
}
126+
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(),
127+
ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
128+
MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
129+
storage.listAllQueues();
130+
ServerName previousServerName = null;
131+
for (int i = 0; i < nServers + 1; i++) {
132+
Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
133+
assertNotNull(pair);
134+
if (previousServerName != null) {
135+
assertEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
136+
ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
137+
}
138+
ServerName sn = pair.getFirst();
139+
previousServerName = sn;
140+
if (sn.equals(deadServer)) {
141+
assertThat(pair.getSecond(), empty());
142+
} else {
143+
assertEquals(2, pair.getSecond().size());
144+
int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split((sn.getHostname()))));
145+
ZkReplicationQueueData data0 = pair.getSecond().get(0);
146+
assertEquals(peerId, data0.getQueueId().getPeerId());
147+
assertEquals(sn, data0.getQueueId().getServerName());
148+
assertFalse(data0.getQueueId().getSourceServerName().isPresent());
149+
assertEquals(n, data0.getWalOffsets().size());
150+
for (int j = 0; j < n; j++) {
151+
assertEquals(j, data0.getWalOffsets().get("wal-" + j).intValue());
152+
}
153+
ZkReplicationQueueData data1 = pair.getSecond().get(1);
154+
assertEquals(peerId, data1.getQueueId().getPeerId());
155+
assertEquals(sn, data1.getQueueId().getServerName());
156+
assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
157+
assertEquals(n, data1.getWalOffsets().size());
158+
for (int j = 0; j < n; j++) {
159+
assertEquals(j, data1.getWalOffsets().get("wal-" + j).intValue());
160+
}
161+
}
162+
}
163+
assertNull(iter.next());
164+
}
165+
166+
private String getLastPushedSeqIdZNode(String encodedName, String peerId) {
167+
return ZNodePaths.joinZNode(storage.getRegionsZNode(), encodedName.substring(0, 2),
168+
encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
169+
}
170+
171+
@Test
172+
public void testListAllLastPushedSeqIds() throws Exception {
173+
String peerId1 = "1";
174+
String peerId2 = "2";
175+
Map<String, Set<String>> name2PeerIds = new HashMap<>();
176+
byte[] bytes = new byte[32];
177+
for (int i = 0; i < 100; i++) {
178+
ThreadLocalRandom.current().nextBytes(bytes);
179+
String encodeName = MD5Hash.getMD5AsHex(bytes);
180+
String znode1 = getLastPushedSeqIdZNode(encodeName, peerId1);
181+
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(), znode1, ZKUtil.positionToByteArray(1));
182+
String znode2 = getLastPushedSeqIdZNode(encodeName, peerId2);
183+
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(), znode2, ZKUtil.positionToByteArray(2));
184+
name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
185+
}
186+
int addedEmptyZNodes = 0;
187+
for (int i = 0; i < 256; i++) {
188+
String level1ZNode =
189+
ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
190+
if (ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), level1ZNode) == -1) {
191+
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), level1ZNode);
192+
if (addedEmptyZNodes >= 10) {
193+
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(),
194+
ZNodePaths.joinZNode(level1ZNode, "ab"));
195+
}
196+
addedEmptyZNodes++;
197+
if (addedEmptyZNodes >= 20) {
198+
break;
199+
}
200+
}
201+
}
202+
MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
203+
int emptyListCount = 0;
204+
for (;;) {
205+
List<ZkLastPushedSeqId> list = iter.next();
206+
if (list == null) {
207+
break;
208+
}
209+
if (list.isEmpty()) {
210+
emptyListCount++;
211+
continue;
212+
}
213+
for (ZkLastPushedSeqId seqId : list) {
214+
name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
215+
if (seqId.getPeerId().equals(peerId1)) {
216+
assertEquals(1, seqId.getLastPushedSeqId());
217+
} else {
218+
assertEquals(2, seqId.getLastPushedSeqId());
219+
}
220+
}
221+
}
222+
assertEquals(10, emptyListCount);
223+
name2PeerIds.forEach((encodedRegionName, peerIds) -> {
224+
assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
225+
});
226+
}
227+
}

0 commit comments

Comments
 (0)