1919
2020import java .io .FileNotFoundException ;
2121import java .io .IOException ;
22+ import java .net .URLEncoder ;
23+ import java .nio .charset .StandardCharsets ;
2224import java .util .ArrayList ;
2325import java .util .Arrays ;
26+ import java .util .Collections ;
2427import java .util .LinkedList ;
2528import java .util .List ;
2629import java .util .Map ;
3134import org .apache .hadoop .conf .Configured ;
3235import org .apache .hadoop .fs .FileStatus ;
3336import org .apache .hadoop .fs .FileSystem ;
37+ import org .apache .hadoop .fs .Path ;
3438import org .apache .hadoop .hbase .Abortable ;
3539import org .apache .hadoop .hbase .HBaseConfiguration ;
3640import org .apache .hadoop .hbase .ServerName ;
4044import org .apache .hadoop .hbase .client .replication .TableCFs ;
4145import org .apache .hadoop .hbase .io .WALLink ;
4246import org .apache .hadoop .hbase .procedure2 .util .StringUtils ;
47+ import org .apache .hadoop .hbase .replication .ReplicationException ;
48+ import org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
4349import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
4450import org .apache .hadoop .hbase .replication .ReplicationPeerDescription ;
45- import org .apache .hadoop .hbase .replication .ReplicationQueueInfo ;
51+ import org .apache .hadoop .hbase .replication .ReplicationQueueData ;
52+ import org .apache .hadoop .hbase .replication .ReplicationQueueId ;
4653import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
54+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
4755import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
48- import org .apache .hadoop .hbase .zookeeper .ZKDump ;
56+ import org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
57+ import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
4958import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
5059import org .apache .hadoop .util .Tool ;
5160import org .apache .hadoop .util .ToolRunner ;
5261import org .apache .yetus .audience .InterfaceAudience ;
5362import org .slf4j .Logger ;
5463import org .slf4j .LoggerFactory ;
5564
65+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
5666import org .apache .hbase .thirdparty .com .google .common .util .concurrent .AtomicLongMap ;
5767
5868/**
59- * TODO: reimplement this tool
6069 * <p/>
6170 * Provides information about the existing states of replication, replication peers and queues.
6271 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
@@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
185194 System .err .println ("General Options:" );
186195 System .err .println (" -h|--h|--help Show this help and exit." );
187196 System .err .println (" --distributed Poll each RS and print its own replication queue. "
188- + "Default only polls ZooKeeper " );
197+ + "Default only polls replication table. " );
189198 System .err .println (" --hdfs Use HDFS to calculate usage of WALs by replication."
190199 + " It could be overestimated if replicating to multiple peers."
191200 + " --distributed flag is also needed." );
@@ -229,21 +238,45 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
229238 LOG .info ("Found [--distributed], will poll each RegionServer." );
230239 Set <String > peerIds =
231240 peers .stream ().map ((peer ) -> peer .getPeerId ()).collect (Collectors .toSet ());
232- System .out .println (dumpQueues (zkw , peerIds , opts .isHdfs ()));
241+ System .out .println (dumpQueues (zkw , connection , peerIds , opts .isHdfs ()));
233242 System .out .println (dumpReplicationSummary ());
234243 } else {
235- // use ZK instead
236- System .out .print ("Dumping replication znodes via ZooKeeper: " );
237- System .out .println (ZKDump . getReplicationZnodesDump ( zkw ));
244+ // use replication table instead
245+ System .out .println ("Dumping replication info via replication table. " );
246+ System .out .println (dumpReplicationViaTable ( connection ));
238247 }
239248 return (0 );
240249 } catch (IOException e ) {
241250 return (-1 );
242251 } finally {
243- zkw .close ();
252+ connection .close ();
244253 }
245254 }
246255
256+ public String dumpReplicationViaTable (Connection connection ) throws ReplicationException {
257+ StringBuilder sb = new StringBuilder ();
258+ ReplicationQueueStorage queueStorage =
259+ ReplicationStorageFactory .getReplicationQueueStorage (connection , getConf ());
260+ List <ReplicationQueueData > replicationQueueDataList = queueStorage .listAllQueues ();
261+ for (ReplicationQueueData replicationQueueData : replicationQueueDataList ) {
262+ sb .append (replicationQueueData .getId ().getPeerId ()).append ("\n " );
263+ sb .append (replicationQueueData .getId ().getServerName ().getServerName ());
264+ }
265+
266+ for (ReplicationQueueData replicationQueueData : replicationQueueDataList ) {
267+ for (ImmutableMap .Entry <String , ReplicationGroupOffset > entry : replicationQueueData
268+ .getOffsets ().entrySet ()) {
269+ sb .append ("\n " ).append (entry .getKey ()).append ("/" ).append (entry .getValue ().getWal ())
270+ .append (": " ).append (entry .getValue ().getOffset ());
271+ }
272+ }
273+ Set <String > allHFileRefs = queueStorage .getAllHFileRefs ();
274+ for (String hfileRef : allHFileRefs ) {
275+ sb .append ("\n " ).append (hfileRef );
276+ }
277+ return sb .toString ();
278+ }
279+
247280 public String dumpReplicationSummary () {
248281 StringBuilder sb = new StringBuilder ();
249282 if (!deletedQueues .isEmpty ()) {
@@ -255,7 +288,7 @@ public String dumpReplicationSummary() {
255288 }
256289 if (!deadRegionServers .isEmpty ()) {
257290 sb .append ("Found " + deadRegionServers .size () + " dead regionservers"
258- + ", restart one regionserver to transfer the queues of dead regionservers\n " );
291+ + ", restart one regionServer to transfer the queues of dead regionservers\n " );
259292 for (String deadRs : deadRegionServers ) {
260293 sb .append (" " + deadRs + "\n " );
261294 }
@@ -294,80 +327,99 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
294327 return sb .toString ();
295328 }
296329
297- public String dumpQueues (ZKWatcher zkw , Set <String > peerIds , boolean hdfs ) throws Exception {
298- ReplicationQueueStorage queueStorage ;
330+ public String dumpQueues (ZKWatcher zkw , Connection connection , Set <String > peerIds , boolean hdfs )
331+ throws Exception {
299332 StringBuilder sb = new StringBuilder ();
333+ ReplicationQueueStorage queueStorage =
334+ ReplicationStorageFactory .getReplicationQueueStorage (connection , getConf ());
335+ Set <ServerName > liveRegionServers = ZKUtil .listChildrenNoWatch (zkw , zkw .getZNodePaths ().rsZNode )
336+ .stream ().map (ServerName ::parseServerName ).collect (Collectors .toSet ());
337+
338+ List <ServerName > regionServers = queueStorage .listAllReplicators ();
339+ if (regionServers == null || regionServers .isEmpty ()) {
340+ return sb .toString ();
341+ }
342+ for (ServerName regionServer : regionServers ) {
343+ List <ReplicationQueueId > queueIds = queueStorage .listAllQueueIds (regionServer );
300344
301- // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
302- // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
303- // zkw.getZNodePaths().rsZNode)
304- // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
305- //
306- // Loops each peer on each RS and dumps the queues
307- // List<ServerName> regionservers = queueStorage.getListOfReplicators();
308- // if (regionservers == null || regionservers.isEmpty()) {
309- // return sb.toString();
310- // }
311- // for (ServerName regionserver : regionservers) {
312- // List<String> queueIds = queueStorage.getAllQueues(regionserver);
313- // if (!liveRegionServers.contains(regionserver)) {
314- // deadRegionServers.add(regionserver.getServerName());
315- // }
316- // for (String queueId : queueIds) {
317- // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
318- // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
319- // Collections.sort(wals);
320- // if (!peerIds.contains(queueInfo.getPeerId())) {
321- // deletedQueues.add(regionserver + "/" + queueId);
322- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
323- // } else {
324- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
325- // }
326- // }
327- // }
345+ if (!liveRegionServers .contains (regionServer )) {
346+ deadRegionServers .add (regionServer .getServerName ());
347+ }
348+ for (ReplicationQueueId queueId : queueIds ) {
349+ List <String > wals = null ;
350+ if (queueId .isRecovered ()) {
351+ wals = AbstractFSWALProvider
352+ .getArchivedWALFiles (connection .getConfiguration (), queueId .getSourceServerName ().get (),
353+ URLEncoder .encode (queueId .getSourceServerName ().get ().toString (),
354+ StandardCharsets .UTF_8 .name ()))
355+ .stream ().map (Path ::toString ).collect (Collectors .toList ());
356+ } else {
357+ wals =
358+ AbstractFSWALProvider
359+ .getArchivedWALFiles (connection .getConfiguration (), queueId .getServerName (),
360+ URLEncoder .encode (queueId .getServerName ().toString (),
361+ StandardCharsets .UTF_8 .name ()))
362+ .stream ().map (Path ::toString ).collect (Collectors .toList ());
363+ }
364+ Collections .sort (wals );
365+ if (!peerIds .contains (queueId .getPeerId ())) {
366+ deletedQueues .add (regionServer + "/" + queueId );
367+ sb .append (formatQueue (regionServer , queueStorage , wals , queueId , true , hdfs ));
368+ } else {
369+ sb .append (formatQueue (regionServer , queueStorage , wals , queueId , false , hdfs ));
370+ }
371+ }
372+ }
328373 return sb .toString ();
329374 }
330375
331- private String formatQueue (ServerName regionserver , ReplicationQueueStorage queueStorage ,
332- ReplicationQueueInfo queueInfo , String queueId , List <String > wals , boolean isDeleted ,
333- boolean hdfs ) throws Exception {
376+ private String formatQueue (ServerName regionServer , ReplicationQueueStorage queueStorage ,
377+ List <String > wals , ReplicationQueueId queueId , boolean isDeleted , boolean hdfs )
378+ throws Exception {
334379 StringBuilder sb = new StringBuilder ();
335380
336- List <ServerName > deadServers ;
337-
338- sb .append ("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n " );
381+ sb .append ("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n " );
339382 sb .append (" Queue znode: " + queueId + "\n " );
340- sb .append (" PeerID: " + queueInfo .getPeerId () + "\n " );
341- sb .append (" Recovered: " + queueInfo .isQueueRecovered () + "\n " );
342- deadServers = queueInfo .getDeadRegionServers ();
343- if (deadServers .isEmpty ()) {
344- sb .append (" No dead RegionServers found in this queue." + "\n " );
383+ sb .append (" PeerID: " + queueId .getPeerId () + "\n " );
384+ sb .append (" Recovered: " + queueId .isRecovered () + "\n " );
385+ if (queueId .getSourceServerName ().isPresent ()) {
386+ sb .append (" Dead RegionServers: " + queueId .getSourceServerName ().get () + "\n " );
345387 } else {
346- sb .append (" Dead RegionServers: " + deadServers + "\n " );
388+ sb .append (" No dead RegionServers found in this queue." + "\n " );
347389 }
348390 sb .append (" Was deleted: " + isDeleted + "\n " );
349391 sb .append (" Number of WALs in replication queue: " + wals .size () + "\n " );
350- peersQueueSize .addAndGet (queueInfo .getPeerId (), wals .size ());
392+ peersQueueSize .addAndGet (queueId .getPeerId (), wals .size ());
393+
394+ Set <Map .Entry <String , ReplicationGroupOffset >> offsets =
395+ queueStorage .getOffsets (queueId ).entrySet ();
351396
352397 for (String wal : wals ) {
353- // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
354- // sb.append(" Replication position for " + wal + ": "
355- // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
398+ long position = -1 ;
399+ String walGroup = null ;
400+ for (Map .Entry <String , ReplicationGroupOffset > entry : offsets ) {
401+ walGroup = entry .getKey ();
402+ ReplicationGroupOffset offset = entry .getValue ();
403+ if (offset .getWal ().equals (wal )) {
404+ position = offset .getOffset ();
405+ }
406+ }
407+ sb .append (" Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal )
408+ + ": " + (position > 0 ? position : "0" + " (not started or nothing to replicate)" ) + "\n " );
356409 }
357410
358411 if (hdfs ) {
359412 FileSystem fs = FileSystem .get (getConf ());
360413 sb .append (" Total size of WALs on HDFS for this queue: "
361- + StringUtils .humanSize (getTotalWALSize (fs , wals , regionserver )) + "\n " );
414+ + StringUtils .humanSize (getTotalWALSize (fs , wals , regionServer )) + "\n " );
362415 }
363416 return sb .toString ();
364417 }
365418
366419 /**
367420 * return total size in bytes from a list of WALs
368421 */
369- private long getTotalWALSize (FileSystem fs , List <String > wals , ServerName server )
370- throws IOException {
422+ private long getTotalWALSize (FileSystem fs , List <String > wals , ServerName server ) {
371423 long size = 0 ;
372424 FileStatus fileStatus ;
373425
0 commit comments