1919
2020import java .io .FileNotFoundException ;
2121import java .io .IOException ;
22+ import java .net .URLEncoder ;
23+ import java .nio .charset .StandardCharsets ;
2224import java .util .ArrayList ;
2325import java .util .Arrays ;
26+ import java .util .Collections ;
27+ import java .util .Comparator ;
2428import java .util .LinkedList ;
2529import java .util .List ;
2630import java .util .Map ;
3135import org .apache .hadoop .conf .Configured ;
3236import org .apache .hadoop .fs .FileStatus ;
3337import org .apache .hadoop .fs .FileSystem ;
34- import org .apache .hadoop .hbase . Abortable ;
38+ import org .apache .hadoop .fs . Path ;
3539import org .apache .hadoop .hbase .HBaseConfiguration ;
3640import org .apache .hadoop .hbase .ServerName ;
3741import org .apache .hadoop .hbase .client .Admin ;
4044import org .apache .hadoop .hbase .client .replication .TableCFs ;
4145import org .apache .hadoop .hbase .io .WALLink ;
4246import org .apache .hadoop .hbase .procedure2 .util .StringUtils ;
47+ import org .apache .hadoop .hbase .replication .ReplicationException ;
48+ import org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
49+ import org .apache .hadoop .hbase .replication .ReplicationOffsetUtil ;
4350import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
4451import org .apache .hadoop .hbase .replication .ReplicationPeerDescription ;
45- import org .apache .hadoop .hbase .replication .ReplicationQueueInfo ;
52+ import org .apache .hadoop .hbase .replication .ReplicationQueueData ;
53+ import org .apache .hadoop .hbase .replication .ReplicationQueueId ;
4654import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
47- import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
48- import org .apache .hadoop .hbase .zookeeper .ZKDump ;
49- import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
55+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
56+ import org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
5057import org .apache .hadoop .util .Tool ;
5158import org .apache .hadoop .util .ToolRunner ;
5259import org .apache .yetus .audience .InterfaceAudience ;
5360import org .slf4j .Logger ;
5461import org .slf4j .LoggerFactory ;
5562
63+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
5664import org .apache .hbase .thirdparty .com .google .common .util .concurrent .AtomicLongMap ;
5765
5866/**
59- * TODO: reimplement this tool
6067 * <p/>
6168 * Provides information about the existing states of replication, replication peers and queues.
6269 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
6370 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
64- * usage by the replication queues (note: can be overestimated).
71+ * usage by the replication queues (note: can be overestimated). In the new version, we
72+ * reimplemented the DumpReplicationQueues tool to support obtaining information from replication
73+ * table.
6574 */
6675@ InterfaceAudience .Private
6776public class DumpReplicationQueues extends Configured implements Tool {
@@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
185194 System .err .println ("General Options:" );
186195 System .err .println (" -h|--h|--help Show this help and exit." );
187196 System .err .println (" --distributed Poll each RS and print its own replication queue. "
188- + "Default only polls ZooKeeper " );
197+ + "Default only polls replication table. " );
189198 System .err .println (" --hdfs Use HDFS to calculate usage of WALs by replication."
190199 + " It could be overestimated if replicating to multiple peers."
191200 + " --distributed flag is also needed." );
@@ -201,13 +210,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
201210 Connection connection = ConnectionFactory .createConnection (conf );
202211 Admin admin = connection .getAdmin ();
203212
204- ZKWatcher zkw =
205- new ZKWatcher (conf , "DumpReplicationQueues" + EnvironmentEdgeManager .currentTime (),
206- new WarnOnlyAbortable (), true );
207-
208213 try {
209- // Our zk watcher
210- LOG .info ("Our Quorum: " + zkw .getQuorum ());
211214 List <TableCFs > replicatedTableCFs = admin .listReplicatedTableCFs ();
212215 if (replicatedTableCFs .isEmpty ()) {
213216 LOG .info ("No tables with a configured replication peer were found." );
@@ -229,21 +232,72 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
229232 LOG .info ("Found [--distributed], will poll each RegionServer." );
230233 Set <String > peerIds =
231234 peers .stream ().map ((peer ) -> peer .getPeerId ()).collect (Collectors .toSet ());
232- System .out .println (dumpQueues (zkw , peerIds , opts .isHdfs ()));
235+ System .out .println (dumpQueues (connection , peerIds , opts .isHdfs (), conf ));
233236 System .out .println (dumpReplicationSummary ());
234237 } else {
235- // use ZK instead
236- System .out .print ("Dumping replication znodes via ZooKeeper: " );
237- System .out .println (ZKDump . getReplicationZnodesDump ( zkw ));
238+ // use replication table instead
239+ System .out .println ("Dumping replication info via replication table. " );
240+ System .out .println (dumpReplicationViaTable ( connection , conf ));
238241 }
239242 return (0 );
240243 } catch (IOException e ) {
241244 return (-1 );
242245 } finally {
243- zkw .close ();
246+ connection .close ();
244247 }
245248 }
246249
250+ public String dumpReplicationViaTable (Connection connection , Configuration conf )
251+ throws ReplicationException , IOException {
252+ StringBuilder sb = new StringBuilder ();
253+ ReplicationQueueStorage queueStorage =
254+ ReplicationStorageFactory .getReplicationQueueStorage (connection , conf );
255+
256+ // The dump info format is as follows:
257+ // peers:
258+ // peers/1: zk1:2181:/hbase
259+ // peers/1/peer-state: ENABLED
260+ // rs:
261+ // rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123
262+ // rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321
263+ // hfile-refs:
264+ // hfile-refs/1/hfile1,hfile2
265+ // hfile-refs/2/hfile3,hfile4
266+ String peersKey = "peers" ;
267+ sb .append (peersKey ).append (": " ).append ("\n " );
268+ List <ReplicationPeerDescription > repPeerDescs = connection .getAdmin ().listReplicationPeers ();
269+ for (ReplicationPeerDescription repPeerDesc : repPeerDescs ) {
270+ sb .append (peersKey ).append ("/" ).append (repPeerDesc .getPeerId ()).append (": " )
271+ .append (repPeerDesc .getPeerConfig ().getClusterKey ()).append ("\n " );
272+ sb .append (peersKey ).append ("/" ).append (repPeerDesc .getPeerId ()).append ("/peer-state: " )
273+ .append (repPeerDesc .isEnabled () ? "ENABLED" : "DISABLED" ).append ("\n " );
274+ }
275+
276+ List <ReplicationQueueData > repQueueDataList = queueStorage .listAllQueues ();
277+ String rsKey = "rs" ;
278+ sb .append (rsKey ).append (": " ).append ("\n " );
279+ for (ReplicationQueueData repQueueData : repQueueDataList ) {
280+ String peerId = repQueueData .getId ().getPeerId ();
281+ for (ImmutableMap .Entry <String , ReplicationGroupOffset > entry : repQueueData .getOffsets ()
282+ .entrySet ()) {
283+ sb .append (rsKey ).append ("/" ).append (entry .getKey ()).append ("/" ).append (peerId ).append ("/" )
284+ .append (entry .getValue ().getWal ()).append (": " ).append (entry .getValue ().getOffset ())
285+ .append ("\n " );
286+ }
287+ }
288+
289+ List <String > peerIds = queueStorage .getAllPeersFromHFileRefsQueue ();
290+ String hfileKey = "hfile-refs" ;
291+ sb .append (hfileKey ).append (": " ).append ("\n " );
292+ for (String peerId : peerIds ) {
293+ List <String > hfiles = queueStorage .getReplicableHFiles (peerId );
294+ sb .append (hfileKey ).append ("/" ).append (peerId ).append ("/" ).append (String .join ("," , hfiles ))
295+ .append ("\n " );
296+ }
297+
298+ return sb .toString ();
299+ }
300+
247301 public String dumpReplicationSummary () {
248302 StringBuilder sb = new StringBuilder ();
249303 if (!deletedQueues .isEmpty ()) {
@@ -294,80 +348,111 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
294348 return sb .toString ();
295349 }
296350
297- public String dumpQueues (ZKWatcher zkw , Set <String > peerIds , boolean hdfs ) throws Exception {
298- ReplicationQueueStorage queueStorage ;
351+ public String dumpQueues (Connection connection , Set <String > peerIds , boolean hdfs ,
352+ Configuration conf ) throws Exception {
299353 StringBuilder sb = new StringBuilder ();
354+ ReplicationQueueStorage queueStorage =
355+ ReplicationStorageFactory .getReplicationQueueStorage (connection , conf );
356+
357+ Set <ServerName > liveRegionServers =
358+ connection .getAdmin ().getClusterMetrics ().getLiveServerMetrics ().keySet ();
300359
301- // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
302- // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
303- // zkw.getZNodePaths().rsZNode)
304- // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
305- //
306- // Loops each peer on each RS and dumps the queues
307- // List<ServerName> regionservers = queueStorage.getListOfReplicators();
308- // if (regionservers == null || regionservers.isEmpty()) {
309- // return sb.toString();
310- // }
311- // for (ServerName regionserver : regionservers) {
312- // List<String> queueIds = queueStorage.getAllQueues(regionserver);
313- // if (!liveRegionServers.contains(regionserver)) {
314- // deadRegionServers.add(regionserver.getServerName());
315- // }
316- // for (String queueId : queueIds) {
317- // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
318- // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
319- // Collections.sort(wals);
320- // if (!peerIds.contains(queueInfo.getPeerId())) {
321- // deletedQueues.add(regionserver + "/" + queueId);
322- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
323- // } else {
324- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
325- // }
326- // }
327- // }
360+ List <ServerName > regionServers = queueStorage .listAllReplicators ();
361+ if (regionServers == null || regionServers .isEmpty ()) {
362+ return sb .toString ();
363+ }
364+ for (ServerName regionServer : regionServers ) {
365+ List <ReplicationQueueId > queueIds = queueStorage .listAllQueueIds (regionServer );
366+
367+ if (!liveRegionServers .contains (regionServer )) {
368+ deadRegionServers .add (regionServer .getServerName ());
369+ }
370+ for (ReplicationQueueId queueId : queueIds ) {
371+ List <String > tmpWals = new ArrayList <>();
372+ // wals
373+ AbstractFSWALProvider
374+ .getWALFiles (connection .getConfiguration (), queueId .getServerWALsBelongTo ()).stream ()
375+ .map (Path ::toString ).forEach (tmpWals ::add );
376+
377+ // old wals
378+ AbstractFSWALProvider .getArchivedWALFiles (connection .getConfiguration (),
379+ queueId .getServerWALsBelongTo (), URLEncoder
380+ .encode (queueId .getServerWALsBelongTo ().toString (), StandardCharsets .UTF_8 .name ()))
381+ .stream ().map (Path ::toString ).forEach (tmpWals ::add );
382+
383+ Map <String , ReplicationGroupOffset > offsets = queueStorage .getOffsets (queueId );
384+ // filter out the wal files that should replicate
385+ List <String > wals = new ArrayList <>();
386+ for (Map .Entry <String , ReplicationGroupOffset > entry : offsets .entrySet ()) {
387+ ReplicationGroupOffset offset = entry .getValue ();
388+ for (String wal : tmpWals ) {
389+ if (ReplicationOffsetUtil .shouldReplicate (offset , wal )) {
390+ wals .add (wal );
391+ }
392+ }
393+ }
394+ Collections .sort (wals , Comparator .comparingLong (AbstractFSWALProvider ::getTimestamp ));
395+ if (!peerIds .contains (queueId .getPeerId ())) {
396+ deletedQueues .add (regionServer + "/" + queueId );
397+ sb .append (formatQueue (regionServer , offsets , wals , queueId , true , hdfs ));
398+ } else {
399+ sb .append (formatQueue (regionServer , offsets , wals , queueId , false , hdfs ));
400+ }
401+ }
402+ }
328403 return sb .toString ();
329404 }
330405
331- private String formatQueue (ServerName regionserver , ReplicationQueueStorage queueStorage ,
332- ReplicationQueueInfo queueInfo , String queueId , List <String > wals , boolean isDeleted ,
333- boolean hdfs ) throws Exception {
406+ private String formatQueue (ServerName regionServer , Map < String , ReplicationGroupOffset > offsets ,
407+ List <String > wals , ReplicationQueueId queueId , boolean isDeleted , boolean hdfs )
408+ throws Exception {
334409 StringBuilder sb = new StringBuilder ();
335410
336- List <ServerName > deadServers ;
337-
338- sb .append ("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n " );
339- sb .append (" Queue znode: " + queueId + "\n " );
340- sb .append (" PeerID: " + queueInfo .getPeerId () + "\n " );
341- sb .append (" Recovered: " + queueInfo .isQueueRecovered () + "\n " );
342- deadServers = queueInfo .getDeadRegionServers ();
343- if (deadServers .isEmpty ()) {
344- sb .append (" No dead RegionServers found in this queue." + "\n " );
411+ sb .append ("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n " );
412+ sb .append (" Queue id: " + queueId + "\n " );
413+ sb .append (" PeerID: " + queueId .getPeerId () + "\n " );
414+ sb .append (" Recovered: " + queueId .isRecovered () + "\n " );
415+ // In new version, we only record the first dead RegionServer in queueId.
416+ if (queueId .getSourceServerName ().isPresent ()) {
417+ sb .append (" Dead RegionServer: " + queueId .getSourceServerName ().get () + "\n " );
345418 } else {
346- sb .append (" Dead RegionServers: " + deadServers + "\n " );
419+ sb .append (" No dead RegionServer found in this queue." + "\n " );
347420 }
348421 sb .append (" Was deleted: " + isDeleted + "\n " );
349422 sb .append (" Number of WALs in replication queue: " + wals .size () + "\n " );
350- peersQueueSize .addAndGet (queueInfo .getPeerId (), wals .size ());
351-
352- for (String wal : wals ) {
353- // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
354- // sb.append(" Replication position for " + wal + ": "
355- // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
423+ peersQueueSize .addAndGet (queueId .getPeerId (), wals .size ());
424+
425+ for (Map .Entry <String , ReplicationGroupOffset > entry : offsets .entrySet ()) {
426+ String walGroup = entry .getKey ();
427+ ReplicationGroupOffset offset = entry .getValue ();
428+ for (String wal : wals ) {
429+ long position = 0 ;
430+ if (offset .getWal ().equals (wal )) {
431+ position = offset .getOffset ();
432+ }
433+ sb .append (
434+ " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal ) + ": " );
435+ if (position == 0 ) {
436+ sb .append ("0 (not started or nothing to replicate)" );
437+ } else if (position > 0 ) {
438+ sb .append (position );
439+ }
440+ sb .append ("\n " );
441+ }
356442 }
357443
358444 if (hdfs ) {
359445 FileSystem fs = FileSystem .get (getConf ());
360446 sb .append (" Total size of WALs on HDFS for this queue: "
361- + StringUtils .humanSize (getTotalWALSize (fs , wals , regionserver )) + "\n " );
447+ + StringUtils .humanSize (getTotalWALSize (fs , wals , regionServer )) + "\n " );
362448 }
363449 return sb .toString ();
364450 }
365451
366452 /**
367453 * return total size in bytes from a list of WALs
368454 */
369- private long getTotalWALSize (FileSystem fs , List <String > wals , ServerName server )
370- throws IOException {
455+ private long getTotalWALSize (FileSystem fs , List <String > wals , ServerName server ) {
371456 long size = 0 ;
372457 FileStatus fileStatus ;
373458
@@ -389,19 +474,4 @@ private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server
389474 totalSizeOfWALs += size ;
390475 return size ;
391476 }
392-
393- private static class WarnOnlyAbortable implements Abortable {
394- @ Override
395- public void abort (String why , Throwable e ) {
396- LOG .warn ("DumpReplicationQueue received abort, ignoring. Reason: " + why );
397- if (LOG .isDebugEnabled ()) {
398- LOG .debug (e .toString (), e );
399- }
400- }
401-
402- @ Override
403- public boolean isAborted () {
404- return false ;
405- }
406- }
407477}
0 commit comments