1717 */
1818package org .apache .hadoop .hbase .util ;
1919
20+ import static org .junit .Assert .assertEquals ;
21+
22+ import java .util .Collections ;
23+ import java .util .List ;
24+ import java .util .stream .Stream ;
2025import org .apache .hadoop .hbase .HBaseClassTestRule ;
2126import org .apache .hadoop .hbase .HBaseTestingUtil ;
27+ import org .apache .hadoop .hbase .ServerName ;
28+ import org .apache .hadoop .hbase .TableName ;
29+ import org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
30+ import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
31+ import org .apache .hadoop .hbase .replication .ReplicationPeerStorage ;
32+ import org .apache .hadoop .hbase .replication .ReplicationQueueId ;
33+ import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
34+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
35+ import org .apache .hadoop .hbase .replication .SyncReplicationState ;
2236import org .apache .hadoop .hbase .testclassification .MediumTests ;
2337import org .apache .hadoop .hbase .testclassification .ReplicationTests ;
24- import org .junit .AfterClass ;
25- import org .junit .BeforeClass ;
38+ import org .apache .hadoop .hbase .util .HbckErrorReporter .ERROR_CODE ;
39+ import org .apache .hadoop .hbase .util .hbck .HbckTestingUtil ;
40+ import org .junit .After ;
41+ import org .junit .Before ;
2642import org .junit .ClassRule ;
27- import org .junit .Ignore ;
43+ import org .junit .Rule ;
2844import org .junit .Test ;
2945import org .junit .experimental .categories .Category ;
46+ import org .junit .rules .TestName ;
3047
3148@ Category ({ ReplicationTests .class , MediumTests .class })
3249public class TestHBaseFsckReplication {
@@ -36,65 +53,78 @@ public class TestHBaseFsckReplication {
3653 HBaseClassTestRule .forClass (TestHBaseFsckReplication .class );
3754
3855 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil ();
56+ @ Rule
57+ public final TestName name = new TestName ();
3958
40- @ BeforeClass
41- public static void setUp () throws Exception {
59+ @ Before
60+ public void setUp () throws Exception {
4261 UTIL .getConfiguration ().setBoolean ("hbase.write.hbck1.lock.file" , false );
4362 UTIL .startMiniCluster (1 );
63+ TableName tableName = TableName .valueOf ("replication_" + name .getMethodName ());
64+ UTIL .getAdmin ()
65+ .createTable (ReplicationStorageFactory .createReplicationQueueTableDescriptor (tableName ));
66+ UTIL .getConfiguration ().set (ReplicationStorageFactory .REPLICATION_QUEUE_TABLE_NAME ,
67+ tableName .getNameAsString ());
4468 }
4569
46- @ AfterClass
47- public static void tearDown () throws Exception {
70+ @ After
71+ public void tearDown () throws Exception {
4872 UTIL .shutdownMiniCluster ();
4973 }
5074
51- // TODO: reimplement
52- @ Ignore
5375 @ Test
5476 public void test () throws Exception {
55- // ReplicationPeerStorage peerStorage = ReplicationStorageFactory
56- // .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
57- // ReplicationQueueStorage queueStorage = ReplicationStorageFactory
58- // .getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
59- //
60- // String peerId1 = "1";
61- // String peerId2 = "2";
62- // peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
63- // true, SyncReplicationState.NONE);
64- // peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
65- // true, SyncReplicationState.NONE);
66- // for (int i = 0; i < 10; i++) {
67- // queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1,
68- // "file-" + i);
69- // }
70- // queueStorage.addWAL(ServerName.valueOf("localhost", 10000, 100000), peerId2, "file");
71- // HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
72- // HbckTestingUtil.assertNoErrors(fsck);
73- //
74- // // should not remove anything since the replication peer is still alive
75- // assertEquals(10, queueStorage.getListOfReplicators().size());
76- // peerStorage.removePeer(peerId1);
77- // // there should be orphan queues
78- // assertEquals(10, queueStorage.getListOfReplicators().size());
79- // fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false);
80- // HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
81- // return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
82- // }).limit(10).toArray(ERROR_CODE[]::new));
83- //
84- // // should not delete anything when fix is false
85- // assertEquals(10, queueStorage.getListOfReplicators().size());
86- //
87- // fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
88- // HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
89- // return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
90- // }).limit(10).toArray(ERROR_CODE[]::new));
91- //
92- // List<ServerName> replicators = queueStorage.getListOfReplicators();
93- // // should not remove the server with queue for peerId2
94- // assertEquals(1, replicators.size());
95- // assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0));
96- // for (String queueId : queueStorage.getAllQueues(replicators.get(0))) {
97- // assertEquals(peerId2, queueId);
98- // }
77+ ReplicationPeerStorage peerStorage = ReplicationStorageFactory .getReplicationPeerStorage (
78+ UTIL .getTestFileSystem (), UTIL .getZooKeeperWatcher (), UTIL .getConfiguration ());
79+ ReplicationQueueStorage queueStorage = ReplicationStorageFactory
80+ .getReplicationQueueStorage (UTIL .getConnection (), UTIL .getConfiguration ());
81+
82+ String peerId1 = "1" ;
83+ String peerId2 = "2" ;
84+ peerStorage .addPeer (peerId1 , ReplicationPeerConfig .newBuilder ().setClusterKey ("key" ).build (),
85+ true , SyncReplicationState .NONE );
86+ peerStorage .addPeer (peerId2 , ReplicationPeerConfig .newBuilder ().setClusterKey ("key" ).build (),
87+ true , SyncReplicationState .NONE );
88+ ReplicationQueueId queueId = null ;
89+ for (int i = 0 ; i < 10 ; i ++) {
90+ queueId = new ReplicationQueueId (getServerName (i ), peerId1 );
91+ queueStorage .setOffset (queueId , "group-" + i ,
92+ new ReplicationGroupOffset ("file-" + i , i * 100 ), Collections .emptyMap ());
93+ }
94+ queueId = new ReplicationQueueId (getServerName (0 ), peerId2 );
95+ queueStorage .setOffset (queueId , "group-" + 0 , new ReplicationGroupOffset ("file-" + 0 , 100 ),
96+ Collections .emptyMap ());
97+ HBaseFsck fsck = HbckTestingUtil .doFsck (UTIL .getConfiguration (), true );
98+ HbckTestingUtil .assertNoErrors (fsck );
99+
100+ // should not remove anything since the replication peer is still alive
101+ assertEquals (10 , queueStorage .listAllReplicators ().size ());
102+ peerStorage .removePeer (peerId1 );
103+ // there should be orphan queues
104+ assertEquals (10 , queueStorage .listAllReplicators ().size ());
105+ fsck = HbckTestingUtil .doFsck (UTIL .getConfiguration (), false );
106+ HbckTestingUtil .assertErrors (fsck , Stream .generate (() -> {
107+ return ERROR_CODE .UNDELETED_REPLICATION_QUEUE ;
108+ }).limit (10 ).toArray (ERROR_CODE []::new ));
109+
110+ // should not delete anything when fix is false
111+ assertEquals (10 , queueStorage .listAllReplicators ().size ());
112+
113+ fsck = HbckTestingUtil .doFsck (UTIL .getConfiguration (), true );
114+ HbckTestingUtil .assertErrors (fsck , Stream .generate (() -> {
115+ return ERROR_CODE .UNDELETED_REPLICATION_QUEUE ;
116+ }).limit (10 ).toArray (HbckErrorReporter .ERROR_CODE []::new ));
117+
118+ List <ServerName > replicators = queueStorage .listAllReplicators ();
119+ // should not remove the server with queue for peerId2
120+ assertEquals (1 , replicators .size ());
121+ assertEquals (ServerName .valueOf ("localhost" , 10000 , 100000 ), replicators .get (0 ));
122+ for (ReplicationQueueId qId : queueStorage .listAllQueueIds (replicators .get (0 ))) {
123+ assertEquals (peerId2 , qId .getPeerId ());
124+ }
125+ }
126+
127+ private ServerName getServerName (int i ) {
128+ return ServerName .valueOf ("localhost" , 10000 + i , 100000 + i );
99129 }
100130}
0 commit comments