1919
2020import  java .io .FileNotFoundException ;
2121import  java .io .IOException ;
22+ import  java .net .URLEncoder ;
23+ import  java .nio .charset .StandardCharsets ;
2224import  java .util .ArrayList ;
2325import  java .util .Arrays ;
26+ import  java .util .Collections ;
27+ import  java .util .Comparator ;
2428import  java .util .LinkedList ;
2529import  java .util .List ;
2630import  java .util .Map ;
3135import  org .apache .hadoop .conf .Configured ;
3236import  org .apache .hadoop .fs .FileStatus ;
3337import  org .apache .hadoop .fs .FileSystem ;
34- import  org .apache .hadoop .hbase . Abortable ;
38+ import  org .apache .hadoop .fs . Path ;
3539import  org .apache .hadoop .hbase .HBaseConfiguration ;
3640import  org .apache .hadoop .hbase .ServerName ;
3741import  org .apache .hadoop .hbase .client .Admin ;
4044import  org .apache .hadoop .hbase .client .replication .TableCFs ;
4145import  org .apache .hadoop .hbase .io .WALLink ;
4246import  org .apache .hadoop .hbase .procedure2 .util .StringUtils ;
47+ import  org .apache .hadoop .hbase .replication .ReplicationException ;
48+ import  org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
49+ import  org .apache .hadoop .hbase .replication .ReplicationOffsetUtil ;
4350import  org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
4451import  org .apache .hadoop .hbase .replication .ReplicationPeerDescription ;
45- import  org .apache .hadoop .hbase .replication .ReplicationQueueInfo ;
52+ import  org .apache .hadoop .hbase .replication .ReplicationQueueData ;
53+ import  org .apache .hadoop .hbase .replication .ReplicationQueueId ;
4654import  org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
47- import  org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
48- import  org .apache .hadoop .hbase .zookeeper .ZKDump ;
49- import  org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
55+ import  org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
56+ import  org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
5057import  org .apache .hadoop .util .Tool ;
5158import  org .apache .hadoop .util .ToolRunner ;
5259import  org .apache .yetus .audience .InterfaceAudience ;
5360import  org .slf4j .Logger ;
5461import  org .slf4j .LoggerFactory ;
5562
63+ import  org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
5664import  org .apache .hbase .thirdparty .com .google .common .util .concurrent .AtomicLongMap ;
5765
5866/** 
59-  * TODO: reimplement this tool 
6067 * <p/> 
6168 * Provides information about the existing states of replication, replication peers and queues. 
6269 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] 
6370 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS 
64-  * usage by the replication queues (note: can be overestimated). 
71+  * usage by the replication queues (note: can be overestimated). In the new version, we 
72+  * reimplemented the DumpReplicationQueues tool to support obtaining information from replication 
73+  * table. 
6574 */ 
6675@ InterfaceAudience .Private 
6776public  class  DumpReplicationQueues  extends  Configured  implements  Tool  {
@@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
185194    System .err .println ("General Options:" );
186195    System .err .println (" -h|--h|--help  Show this help and exit." );
187196    System .err .println (" --distributed  Poll each RS and print its own replication queue. " 
188-       + "Default only polls ZooKeeper " );
197+       + "Default only polls replication table. " );
189198    System .err .println (" --hdfs         Use HDFS to calculate usage of WALs by replication." 
190199      + " It could be overestimated if replicating to multiple peers." 
191200      + " --distributed flag is also needed." );
@@ -201,13 +210,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
201210    Connection  connection  = ConnectionFactory .createConnection (conf );
202211    Admin  admin  = connection .getAdmin ();
203212
204-     ZKWatcher  zkw  =
205-       new  ZKWatcher (conf , "DumpReplicationQueues"  + EnvironmentEdgeManager .currentTime (),
206-         new  WarnOnlyAbortable (), true );
207- 
208213    try  {
209-       // Our zk watcher 
210-       LOG .info ("Our Quorum: "  + zkw .getQuorum ());
211214      List <TableCFs > replicatedTableCFs  = admin .listReplicatedTableCFs ();
212215      if  (replicatedTableCFs .isEmpty ()) {
213216        LOG .info ("No tables with a configured replication peer were found." );
@@ -229,21 +232,72 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
229232        LOG .info ("Found [--distributed], will poll each RegionServer." );
230233        Set <String > peerIds  =
231234          peers .stream ().map ((peer ) -> peer .getPeerId ()).collect (Collectors .toSet ());
232-         System .out .println (dumpQueues (zkw , peerIds , opts .isHdfs ()));
235+         System .out .println (dumpQueues (connection , peerIds , opts .isHdfs (),  conf ));
233236        System .out .println (dumpReplicationSummary ());
234237      } else  {
235-         // use ZK  instead 
236-         System .out .print ("Dumping replication znodes  via ZooKeeper: " );
237-         System .out .println (ZKDump . getReplicationZnodesDump ( zkw ));
238+         // use replication table  instead 
239+         System .out .println ("Dumping replication info  via replication table. " );
240+         System .out .println (dumpReplicationViaTable ( connection ,  conf ));
238241      }
239242      return  (0 );
240243    } catch  (IOException  e ) {
241244      return  (-1 );
242245    } finally  {
243-       zkw .close ();
246+       connection .close ();
244247    }
245248  }
246249
250+   public  String  dumpReplicationViaTable (Connection  connection , Configuration  conf )
251+     throws  ReplicationException , IOException  {
252+     StringBuilder  sb  = new  StringBuilder ();
253+     ReplicationQueueStorage  queueStorage  =
254+       ReplicationStorageFactory .getReplicationQueueStorage (connection , conf );
255+ 
256+     // The dump info format is as follows: 
257+     // peers: 
258+     // peers/1: zk1:2181:/hbase 
259+     // peers/1/peer-state: ENABLED 
260+     // rs: 
261+     // rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123 
262+     // rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321 
263+     // hfile-refs: 
264+     // hfile-refs/1/hfile1,hfile2 
265+     // hfile-refs/2/hfile3,hfile4 
266+     String  peersKey  = "peers" ;
267+     sb .append (peersKey ).append (": " ).append ("\n " );
268+     List <ReplicationPeerDescription > repPeerDescs  = connection .getAdmin ().listReplicationPeers ();
269+     for  (ReplicationPeerDescription  repPeerDesc  : repPeerDescs ) {
270+       sb .append (peersKey ).append ("/" ).append (repPeerDesc .getPeerId ()).append (": " )
271+         .append (repPeerDesc .getPeerConfig ().getClusterKey ()).append ("\n " );
272+       sb .append (peersKey ).append ("/" ).append (repPeerDesc .getPeerId ()).append ("/peer-state: " )
273+         .append (repPeerDesc .isEnabled () ? "ENABLED"  : "DISABLED" ).append ("\n " );
274+     }
275+ 
276+     List <ReplicationQueueData > repQueueDataList  = queueStorage .listAllQueues ();
277+     String  rsKey  = "rs" ;
278+     sb .append (rsKey ).append (": " ).append ("\n " );
279+     for  (ReplicationQueueData  repQueueData  : repQueueDataList ) {
280+       String  peerId  = repQueueData .getId ().getPeerId ();
281+       for  (ImmutableMap .Entry <String , ReplicationGroupOffset > entry  : repQueueData .getOffsets ()
282+         .entrySet ()) {
283+         sb .append (rsKey ).append ("/" ).append (entry .getKey ()).append ("/" ).append (peerId ).append ("/" )
284+           .append (entry .getValue ().getWal ()).append (": " ).append (entry .getValue ().getOffset ())
285+           .append ("\n " );
286+       }
287+     }
288+ 
289+     List <String > peerIds  = queueStorage .getAllPeersFromHFileRefsQueue ();
290+     String  hfileKey  = "hfile-refs" ;
291+     sb .append (hfileKey ).append (": " ).append ("\n " );
292+     for  (String  peerId  : peerIds ) {
293+       List <String > hfiles  = queueStorage .getReplicableHFiles (peerId );
294+       sb .append (hfileKey ).append ("/" ).append (peerId ).append ("/" ).append (String .join ("," , hfiles ))
295+         .append ("\n " );
296+     }
297+ 
298+     return  sb .toString ();
299+   }
300+ 
247301  public  String  dumpReplicationSummary () {
248302    StringBuilder  sb  = new  StringBuilder ();
249303    if  (!deletedQueues .isEmpty ()) {
@@ -294,80 +348,111 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
294348    return  sb .toString ();
295349  }
296350
297-   public  String  dumpQueues (ZKWatcher   zkw , Set <String > peerIds , boolean  hdfs )  throws   Exception  { 
298-     ReplicationQueueStorage   queueStorage ; 
351+   public  String  dumpQueues (Connection   connection , Set <String > peerIds , boolean  hdfs , 
352+     Configuration   conf )  throws   Exception  { 
299353    StringBuilder  sb  = new  StringBuilder ();
354+     ReplicationQueueStorage  queueStorage  =
355+       ReplicationStorageFactory .getReplicationQueueStorage (connection , conf );
356+ 
357+     Set <ServerName > liveRegionServers  =
358+       connection .getAdmin ().getClusterMetrics ().getLiveServerMetrics ().keySet ();
300359
301-     // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); 
302-     // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, 
303-     // zkw.getZNodePaths().rsZNode) 
304-     // .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); 
305-     // 
306-     // Loops each peer on each RS and dumps the queues 
307-     // List<ServerName> regionservers = queueStorage.getListOfReplicators(); 
308-     // if (regionservers == null || regionservers.isEmpty()) { 
309-     // return sb.toString(); 
310-     // } 
311-     // for (ServerName regionserver : regionservers) { 
312-     // List<String> queueIds = queueStorage.getAllQueues(regionserver); 
313-     // if (!liveRegionServers.contains(regionserver)) { 
314-     // deadRegionServers.add(regionserver.getServerName()); 
315-     // } 
316-     // for (String queueId : queueIds) { 
317-     // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 
318-     // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); 
319-     // Collections.sort(wals); 
320-     // if (!peerIds.contains(queueInfo.getPeerId())) { 
321-     // deletedQueues.add(regionserver + "/" + queueId); 
322-     // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); 
323-     // } else { 
324-     // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); 
325-     // } 
326-     // } 
327-     // } 
360+     List <ServerName > regionServers  = queueStorage .listAllReplicators ();
361+     if  (regionServers  == null  || regionServers .isEmpty ()) {
362+       return  sb .toString ();
363+     }
364+     for  (ServerName  regionServer  : regionServers ) {
365+       List <ReplicationQueueId > queueIds  = queueStorage .listAllQueueIds (regionServer );
366+ 
367+       if  (!liveRegionServers .contains (regionServer )) {
368+         deadRegionServers .add (regionServer .getServerName ());
369+       }
370+       for  (ReplicationQueueId  queueId  : queueIds ) {
371+         List <String > tmpWals  = new  ArrayList <>();
372+         // wals 
373+         AbstractFSWALProvider 
374+           .getWALFiles (connection .getConfiguration (), queueId .getServerWALsBelongTo ()).stream ()
375+           .map (Path ::toString ).forEach (tmpWals ::add );
376+ 
377+         // old wals 
378+         AbstractFSWALProvider .getArchivedWALFiles (connection .getConfiguration (),
379+           queueId .getServerWALsBelongTo (), URLEncoder 
380+             .encode (queueId .getServerWALsBelongTo ().toString (), StandardCharsets .UTF_8 .name ()))
381+           .stream ().map (Path ::toString ).forEach (tmpWals ::add );
382+ 
383+         Map <String , ReplicationGroupOffset > offsets  = queueStorage .getOffsets (queueId );
384+         // filter out the wal files that should replicate 
385+         List <String > wals  = new  ArrayList <>();
386+         for  (Map .Entry <String , ReplicationGroupOffset > entry  : offsets .entrySet ()) {
387+           ReplicationGroupOffset  offset  = entry .getValue ();
388+           for  (String  wal  : tmpWals ) {
389+             if  (ReplicationOffsetUtil .shouldReplicate (offset , wal )) {
390+               wals .add (wal );
391+             }
392+           }
393+         }
394+         Collections .sort (wals , Comparator .comparingLong (AbstractFSWALProvider ::getTimestamp ));
395+         if  (!peerIds .contains (queueId .getPeerId ())) {
396+           deletedQueues .add (regionServer  + "/"  + queueId );
397+           sb .append (formatQueue (regionServer , offsets , wals , queueId , true , hdfs ));
398+         } else  {
399+           sb .append (formatQueue (regionServer , offsets , wals , queueId , false , hdfs ));
400+         }
401+       }
402+     }
328403    return  sb .toString ();
329404  }
330405
331-   private  String  formatQueue (ServerName  regionserver ,  ReplicationQueueStorage   queueStorage ,
332-     ReplicationQueueInfo   queueInfo ,  String   queueId ,  List <String > wals , boolean  isDeleted ,
333-     boolean   hdfs )  throws  Exception  {
406+   private  String  formatQueue (ServerName  regionServer ,  Map < String ,  ReplicationGroupOffset >  offsets ,
407+     List <String > wals , ReplicationQueueId   queueId ,  boolean  isDeleted ,  boolean   hdfs ) 
408+     throws  Exception  {
334409    StringBuilder  sb  = new  StringBuilder ();
335410
336-     List <ServerName > deadServers ;
337- 
338-     sb .append ("Dumping replication queue info for RegionServer: ["  + regionserver  + "]"  + "\n " );
339-     sb .append ("    Queue znode: "  + queueId  + "\n " );
340-     sb .append ("    PeerID: "  + queueInfo .getPeerId () + "\n " );
341-     sb .append ("    Recovered: "  + queueInfo .isQueueRecovered () + "\n " );
342-     deadServers  = queueInfo .getDeadRegionServers ();
343-     if  (deadServers .isEmpty ()) {
344-       sb .append ("    No dead RegionServers found in this queue."  + "\n " );
411+     sb .append ("Dumping replication queue info for RegionServer: ["  + regionServer  + "]"  + "\n " );
412+     sb .append ("    Queue id: "  + queueId  + "\n " );
413+     sb .append ("    PeerID: "  + queueId .getPeerId () + "\n " );
414+     sb .append ("    Recovered: "  + queueId .isRecovered () + "\n " );
415+     // In new version, we only record the first dead RegionServer in queueId. 
416+     if  (queueId .getSourceServerName ().isPresent ()) {
417+       sb .append ("    Dead RegionServer: "  + queueId .getSourceServerName ().get () + "\n " );
345418    } else  {
346-       sb .append ("    Dead RegionServers: "   +  deadServers  + "\n " );
419+       sb .append ("    No dead RegionServer found in this queue."   + "\n " );
347420    }
348421    sb .append ("    Was deleted: "  + isDeleted  + "\n " );
349422    sb .append ("    Number of WALs in replication queue: "  + wals .size () + "\n " );
350-     peersQueueSize .addAndGet (queueInfo .getPeerId (), wals .size ());
351- 
352-     for  (String  wal  : wals ) {
353-       // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); 
354-       // sb.append(" Replication position for " + wal + ": " 
355-       // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); 
423+     peersQueueSize .addAndGet (queueId .getPeerId (), wals .size ());
424+ 
425+     for  (Map .Entry <String , ReplicationGroupOffset > entry  : offsets .entrySet ()) {
426+       String  walGroup  = entry .getKey ();
427+       ReplicationGroupOffset  offset  = entry .getValue ();
428+       for  (String  wal  : wals ) {
429+         long  position  = 0 ;
430+         if  (offset .getWal ().equals (wal )) {
431+           position  = offset .getOffset ();
432+         }
433+         sb .append (
434+           " Replication position for "  + (walGroup  != null  ? walGroup  + "/"  + wal  : wal ) + ": " );
435+         if  (position  == 0 ) {
436+           sb .append ("0 (not started or nothing to replicate)" );
437+         } else  if  (position  > 0 ) {
438+           sb .append (position );
439+         }
440+         sb .append ("\n " );
441+       }
356442    }
357443
358444    if  (hdfs ) {
359445      FileSystem  fs  = FileSystem .get (getConf ());
360446      sb .append ("    Total size of WALs on HDFS for this queue: " 
361-         + StringUtils .humanSize (getTotalWALSize (fs , wals , regionserver )) + "\n " );
447+         + StringUtils .humanSize (getTotalWALSize (fs , wals , regionServer )) + "\n " );
362448    }
363449    return  sb .toString ();
364450  }
365451
366452  /** 
367453   * return total size in bytes from a list of WALs 
368454   */ 
369-   private  long  getTotalWALSize (FileSystem  fs , List <String > wals , ServerName  server )
370-     throws  IOException  {
455+   private  long  getTotalWALSize (FileSystem  fs , List <String > wals , ServerName  server ) {
371456    long  size  = 0 ;
372457    FileStatus  fileStatus ;
373458
@@ -389,19 +474,4 @@ private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server
389474    totalSizeOfWALs  += size ;
390475    return  size ;
391476  }
392- 
393-   private  static  class  WarnOnlyAbortable  implements  Abortable  {
394-     @ Override 
395-     public  void  abort (String  why , Throwable  e ) {
396-       LOG .warn ("DumpReplicationQueue received abort, ignoring.  Reason: "  + why );
397-       if  (LOG .isDebugEnabled ()) {
398-         LOG .debug (e .toString (), e );
399-       }
400-     }
401- 
402-     @ Override 
403-     public  boolean  isAborted () {
404-       return  false ;
405-     }
406-   }
407477}
0 commit comments