1717 */ 
1818package  org .apache .hadoop .hbase .util ;
1919
20+ import  static  org .junit .Assert .assertEquals ;
21+ 
22+ import  java .util .Collections ;
23+ import  java .util .List ;
24+ import  java .util .stream .Stream ;
2025import  org .apache .hadoop .hbase .HBaseClassTestRule ;
2126import  org .apache .hadoop .hbase .HBaseTestingUtil ;
27+ import  org .apache .hadoop .hbase .ServerName ;
28+ import  org .apache .hadoop .hbase .TableName ;
29+ import  org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
30+ import  org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
31+ import  org .apache .hadoop .hbase .replication .ReplicationPeerStorage ;
32+ import  org .apache .hadoop .hbase .replication .ReplicationQueueId ;
33+ import  org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
34+ import  org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
35+ import  org .apache .hadoop .hbase .replication .SyncReplicationState ;
2236import  org .apache .hadoop .hbase .testclassification .MediumTests ;
2337import  org .apache .hadoop .hbase .testclassification .ReplicationTests ;
24- import  org .junit .AfterClass ;
25- import  org .junit .BeforeClass ;
38+ import  org .apache .hadoop .hbase .util .HbckErrorReporter .ERROR_CODE ;
39+ import  org .apache .hadoop .hbase .util .hbck .HbckTestingUtil ;
40+ import  org .junit .After ;
41+ import  org .junit .Before ;
2642import  org .junit .ClassRule ;
27- import  org .junit .Ignore ;
43+ import  org .junit .Rule ;
2844import  org .junit .Test ;
2945import  org .junit .experimental .categories .Category ;
46+ import  org .junit .rules .TestName ;
3047
3148@ Category ({ ReplicationTests .class , MediumTests .class  })
3249public  class  TestHBaseFsckReplication  {
@@ -36,65 +53,78 @@ public class TestHBaseFsckReplication {
3653    HBaseClassTestRule .forClass (TestHBaseFsckReplication .class );
3754
3855  private  static  final  HBaseTestingUtil  UTIL  = new  HBaseTestingUtil ();
56+   @ Rule 
57+   public  final  TestName  name  = new  TestName ();
3958
40-   @ BeforeClass 
41-   public  static   void  setUp () throws  Exception  {
59+   @ Before 
60+   public  void  setUp () throws  Exception  {
4261    UTIL .getConfiguration ().setBoolean ("hbase.write.hbck1.lock.file" , false );
4362    UTIL .startMiniCluster (1 );
63+     TableName  tableName  = TableName .valueOf ("replication_"  + name .getMethodName ());
64+     UTIL .getAdmin ()
65+       .createTable (ReplicationStorageFactory .createReplicationQueueTableDescriptor (tableName ));
66+     UTIL .getConfiguration ().set (ReplicationStorageFactory .REPLICATION_QUEUE_TABLE_NAME ,
67+       tableName .getNameAsString ());
4468  }
4569
46-   @ AfterClass 
47-   public  static   void  tearDown () throws  Exception  {
70+   @ After 
71+   public  void  tearDown () throws  Exception  {
4872    UTIL .shutdownMiniCluster ();
4973  }
5074
51-   // TODO: reimplement 
52-   @ Ignore 
5375  @ Test 
5476  public  void  test () throws  Exception  {
55-     // ReplicationPeerStorage peerStorage = ReplicationStorageFactory 
56-     // .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 
57-     // ReplicationQueueStorage queueStorage = ReplicationStorageFactory 
58-     // .getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 
59-     // 
60-     // String peerId1 = "1"; 
61-     // String peerId2 = "2"; 
62-     // peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 
63-     // true, SyncReplicationState.NONE); 
64-     // peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 
65-     // true, SyncReplicationState.NONE); 
66-     // for (int i = 0; i < 10; i++) { 
67-     // queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1, 
68-     // "file-" + i); 
69-     // } 
70-     // queueStorage.addWAL(ServerName.valueOf("localhost", 10000, 100000), peerId2, "file"); 
71-     // HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 
72-     // HbckTestingUtil.assertNoErrors(fsck); 
73-     // 
74-     // // should not remove anything since the replication peer is still alive 
75-     // assertEquals(10, queueStorage.getListOfReplicators().size()); 
76-     // peerStorage.removePeer(peerId1); 
77-     // // there should be orphan queues 
78-     // assertEquals(10, queueStorage.getListOfReplicators().size()); 
79-     // fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false); 
80-     // HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 
81-     // return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 
82-     // }).limit(10).toArray(ERROR_CODE[]::new)); 
83-     // 
84-     // // should not delete anything when fix is false 
85-     // assertEquals(10, queueStorage.getListOfReplicators().size()); 
86-     // 
87-     // fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 
88-     // HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 
89-     // return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 
90-     // }).limit(10).toArray(ERROR_CODE[]::new)); 
91-     // 
92-     // List<ServerName> replicators = queueStorage.getListOfReplicators(); 
93-     // // should not remove the server with queue for peerId2 
94-     // assertEquals(1, replicators.size()); 
95-     // assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0)); 
96-     // for (String queueId : queueStorage.getAllQueues(replicators.get(0))) { 
97-     // assertEquals(peerId2, queueId); 
98-     // } 
77+     ReplicationPeerStorage  peerStorage  = ReplicationStorageFactory .getReplicationPeerStorage (
78+       UTIL .getTestFileSystem (), UTIL .getZooKeeperWatcher (), UTIL .getConfiguration ());
79+     ReplicationQueueStorage  queueStorage  = ReplicationStorageFactory 
80+       .getReplicationQueueStorage (UTIL .getConnection (), UTIL .getConfiguration ());
81+ 
82+     String  peerId1  = "1" ;
83+     String  peerId2  = "2" ;
84+     peerStorage .addPeer (peerId1 , ReplicationPeerConfig .newBuilder ().setClusterKey ("key" ).build (),
85+       true , SyncReplicationState .NONE );
86+     peerStorage .addPeer (peerId2 , ReplicationPeerConfig .newBuilder ().setClusterKey ("key" ).build (),
87+       true , SyncReplicationState .NONE );
88+     ReplicationQueueId  queueId  = null ;
89+     for  (int  i  = 0 ; i  < 10 ; i ++) {
90+       queueId  = new  ReplicationQueueId (getServerName (i ), peerId1 );
91+       queueStorage .setOffset (queueId , "group-"  + i ,
92+         new  ReplicationGroupOffset ("file-"  + i , i  * 100 ), Collections .emptyMap ());
93+     }
94+     queueId  = new  ReplicationQueueId (getServerName (0 ), peerId2 );
95+     queueStorage .setOffset (queueId , "group-"  + 0 , new  ReplicationGroupOffset ("file-"  + 0 , 100 ),
96+       Collections .emptyMap ());
97+     HBaseFsck  fsck  = HbckTestingUtil .doFsck (UTIL .getConfiguration (), true );
98+     HbckTestingUtil .assertNoErrors (fsck );
99+ 
100+     // should not remove anything since the replication peer is still alive 
101+     assertEquals (10 , queueStorage .listAllReplicators ().size ());
102+     peerStorage .removePeer (peerId1 );
103+     // there should be orphan queues 
104+     assertEquals (10 , queueStorage .listAllReplicators ().size ());
105+     fsck  = HbckTestingUtil .doFsck (UTIL .getConfiguration (), false );
106+     HbckTestingUtil .assertErrors (fsck , Stream .generate (() -> {
107+       return  ERROR_CODE .UNDELETED_REPLICATION_QUEUE ;
108+     }).limit (10 ).toArray (ERROR_CODE []::new ));
109+ 
110+     // should not delete anything when fix is false 
111+     assertEquals (10 , queueStorage .listAllReplicators ().size ());
112+ 
113+     fsck  = HbckTestingUtil .doFsck (UTIL .getConfiguration (), true );
114+     HbckTestingUtil .assertErrors (fsck , Stream .generate (() -> {
115+       return  ERROR_CODE .UNDELETED_REPLICATION_QUEUE ;
116+     }).limit (10 ).toArray (HbckErrorReporter .ERROR_CODE []::new ));
117+ 
118+     List <ServerName > replicators  = queueStorage .listAllReplicators ();
119+     // should not remove the server with queue for peerId2 
120+     assertEquals (1 , replicators .size ());
121+     assertEquals (ServerName .valueOf ("localhost" , 10000 , 100000 ), replicators .get (0 ));
122+     for  (ReplicationQueueId  qId  : queueStorage .listAllQueueIds (replicators .get (0 ))) {
123+       assertEquals (peerId2 , qId .getPeerId ());
124+     }
125+   }
126+ 
127+   private  ServerName  getServerName (int  i ) {
128+     return  ServerName .valueOf ("localhost" , 10000  + i , 100000  + i );
99129  }
100130}
0 commit comments