1919
2020import java .io .FileNotFoundException ;
2121import java .io .IOException ;
22+ import java .net .URLEncoder ;
23+ import java .nio .charset .StandardCharsets ;
2224import java .util .ArrayList ;
2325import java .util .Arrays ;
2426import java .util .Collections ;
4446import org .apache .hadoop .hbase .procedure2 .util .StringUtils ;
4547import org .apache .hadoop .hbase .replication .ReplicationException ;
4648import org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
49+ import org .apache .hadoop .hbase .replication .ReplicationOffsetUtil ;
4750import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
4851import org .apache .hadoop .hbase .replication .ReplicationPeerDescription ;
4952import org .apache .hadoop .hbase .replication .ReplicationQueueData ;
5053import org .apache .hadoop .hbase .replication .ReplicationQueueId ;
5154import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
5255import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
53- import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
5456import org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
55- import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
56- import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
5757import org .apache .hadoop .util .Tool ;
5858import org .apache .hadoop .util .ToolRunner ;
5959import org .apache .yetus .audience .InterfaceAudience ;
@@ -210,13 +210,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
210210 Connection connection = ConnectionFactory .createConnection (conf );
211211 Admin admin = connection .getAdmin ();
212212
213- ZKWatcher zkw =
214- new ZKWatcher (conf , "DumpReplicationQueues" + EnvironmentEdgeManager .currentTime (),
215- new WarnOnlyAbortable (), true );
216-
217213 try {
218- // Our zk watcher
219- LOG .info ("Our Quorum: " + zkw .getQuorum ());
220214 List <TableCFs > replicatedTableCFs = admin .listReplicatedTableCFs ();
221215 if (replicatedTableCFs .isEmpty ()) {
222216 LOG .info ("No tables with a configured replication peer were found." );
@@ -238,7 +232,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
238232 LOG .info ("Found [--distributed], will poll each RegionServer." );
239233 Set <String > peerIds =
240234 peers .stream ().map ((peer ) -> peer .getPeerId ()).collect (Collectors .toSet ());
241- System .out .println (dumpQueues (zkw , connection , peerIds , opts .isHdfs ()));
235+ System .out .println (dumpQueues (connection , peerIds , opts .isHdfs ()));
242236 System .out .println (dumpReplicationSummary ());
243237 } else {
244238 // use replication table instead
@@ -253,7 +247,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
253247 }
254248 }
255249
256- public String dumpReplicationViaTable (Connection connection ) throws ReplicationException {
250+ private String dumpReplicationViaTable (Connection connection ) throws ReplicationException {
257251 StringBuilder sb = new StringBuilder ();
258252 ReplicationQueueStorage queueStorage =
259253 ReplicationStorageFactory .getReplicationQueueStorage (connection , getConf ());
@@ -327,13 +321,14 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
327321 return sb .toString ();
328322 }
329323
330- public String dumpQueues (ZKWatcher zkw , Connection connection , Set <String > peerIds , boolean hdfs )
324+ public String dumpQueues (Connection connection , Set <String > peerIds , boolean hdfs )
331325 throws Exception {
332326 StringBuilder sb = new StringBuilder ();
333327 ReplicationQueueStorage queueStorage =
334328 ReplicationStorageFactory .getReplicationQueueStorage (connection , getConf ());
335- Set <ServerName > liveRegionServers = ZKUtil .listChildrenNoWatch (zkw , zkw .getZNodePaths ().rsZNode )
336- .stream ().map (ServerName ::parseServerName ).collect (Collectors .toSet ());
329+
330+ Set <ServerName > liveRegionServers =
331+ connection .getAdmin ().getClusterMetrics ().getLiveServerMetrics ().keySet ();
337332
338333 List <ServerName > regionServers = queueStorage .listAllReplicators ();
339334 if (regionServers == null || regionServers .isEmpty ()) {
@@ -346,37 +341,51 @@ public String dumpQueues(ZKWatcher zkw, Connection connection, Set<String> peerI
346341 deadRegionServers .add (regionServer .getServerName ());
347342 }
348343 for (ReplicationQueueId queueId : queueIds ) {
349- List <String > wals = null ;
350- if (queueId .isRecovered ()) {
351- wals = AbstractFSWALProvider
352- .getArchivedWALFiles (connection .getConfiguration (), queueId .getSourceServerName ().get (),
353- queueId .getSourceServerName ().get ().toString ())
354- .stream ().map (Path ::toString ).collect (Collectors .toList ());
355- } else {
356- wals = AbstractFSWALProvider
357- .getArchivedWALFiles (connection .getConfiguration (), queueId .getServerName (),
358- queueId .getServerName ().toString ())
359- .stream ().map (Path ::toString ).collect (Collectors .toList ());
344+ // wals
345+ List <String > tmpWals = AbstractFSWALProvider
346+ .getWALFiles (connection .getConfiguration (),
347+ URLEncoder .encode (queueId .getServerWALsBelongTo ().toString (),
348+ StandardCharsets .UTF_8 .name ()))
349+ .stream ().map (Path ::toString ).collect (Collectors .toList ());
350+
351+ // old wals
352+ tmpWals .addAll (AbstractFSWALProvider
353+ .getArchivedWALFiles (connection .getConfiguration (), queueId .getServerWALsBelongTo (),
354+ URLEncoder .encode (queueId .getServerWALsBelongTo ().toString (),
355+ StandardCharsets .UTF_8 .name ()))
356+ .stream ().map (Path ::toString ).collect (Collectors .toList ()));
357+
358+ Map <String , ReplicationGroupOffset > offsets = queueStorage .getOffsets (queueId );
359+ // filter out the wal files that should replicate
360+ List <String > wals = new ArrayList <>();
361+ for (Map .Entry <String , ReplicationGroupOffset > entry : offsets .entrySet ()) {
362+ ReplicationGroupOffset offset = entry .getValue ();
363+ for (String wal : tmpWals ) {
364+ if (ReplicationOffsetUtil .shouldReplicate (offset , wal )) {
365+ wals .add (wal );
366+ }
367+ }
360368 }
369+
361370 Collections .sort (wals );
362371 if (!peerIds .contains (queueId .getPeerId ())) {
363372 deletedQueues .add (regionServer + "/" + queueId );
364- sb .append (formatQueue (regionServer , queueStorage , wals , queueId , true , hdfs ));
373+ sb .append (formatQueue (regionServer , offsets , wals , queueId , true , hdfs ));
365374 } else {
366- sb .append (formatQueue (regionServer , queueStorage , wals , queueId , false , hdfs ));
375+ sb .append (formatQueue (regionServer , offsets , wals , queueId , false , hdfs ));
367376 }
368377 }
369378 }
370379 return sb .toString ();
371380 }
372381
373- private String formatQueue (ServerName regionServer , ReplicationQueueStorage queueStorage ,
382+ private String formatQueue (ServerName regionServer , Map < String , ReplicationGroupOffset > offsets ,
374383 List <String > wals , ReplicationQueueId queueId , boolean isDeleted , boolean hdfs )
375384 throws Exception {
376385 StringBuilder sb = new StringBuilder ();
377386
378387 sb .append ("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n " );
379- sb .append (" Queue znode : " + queueId + "\n " );
388+ sb .append (" Queue id : " + queueId + "\n " );
380389 sb .append (" PeerID: " + queueId .getPeerId () + "\n " );
381390 sb .append (" Recovered: " + queueId .isRecovered () + "\n " );
382391 // In new version, we only record the first dead RegionServer in queueId.
@@ -389,19 +398,14 @@ private String formatQueue(ServerName regionServer, ReplicationQueueStorage queu
389398 sb .append (" Number of WALs in replication queue: " + wals .size () + "\n " );
390399 peersQueueSize .addAndGet (queueId .getPeerId (), wals .size ());
391400
392- Set <Map .Entry <String , ReplicationGroupOffset >> offsets =
393- queueStorage .getOffsets (queueId ).entrySet ();
394-
395- for (Map .Entry <String , ReplicationGroupOffset > entry : offsets ) {
396- String walGroup = null ;
397- walGroup = entry .getKey ();
401+ for (Map .Entry <String , ReplicationGroupOffset > entry : offsets .entrySet ()) {
402+ String walGroup = entry .getKey ();
403+ ReplicationGroupOffset offset = entry .getValue ();
398404 for (String wal : wals ) {
399- ReplicationGroupOffset offset = entry .getValue ();
400405 if (offset .getWal ().equals (wal )) {
401406 long position = offset .getOffset ();
402407 sb .append (
403408 " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal ) + ": " );
404-
405409 // Position is -1, which means that the file has already been fully replicated,
406410 // the logic here is different from the previous version.
407411 if (position == -1 ) {
0 commit comments