Skip to content

Commit ef6a113

Browse files
authored
HBASE-24781 Clean up peer metrics when disabling peer (#4997)
Co-authored-by: Yuta Imazu <[email protected]> Signed-off-by: Duo Zhang <[email protected]
1 parent d1fede7 commit ef6a113

File tree

5 files changed

+50
-5
lines changed

5 files changed

+50
-5
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,14 @@ public void incrLogReadInBytes(long readInBytes) {
272272

273273
/** Removes all metrics about this Source. */
274274
public void clear() {
275+
terminate();
276+
singleSourceSource.clear();
277+
}
278+
279+
public void terminate() {
275280
int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
276281
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
277282
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
278-
singleSourceSource.clear();
279283
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
280284
lastShippedTimeStamps.clear();
281285
lastHFileRefsQueueSize = 0;

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -706,10 +706,13 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
706706
}
707707
}
708708
}
709-
if (clearMetrics) {
710-
// Can be null in test context.
711-
if (this.metrics != null) {
709+
710+
// Can be null in test context.
711+
if (this.metrics != null) {
712+
if (clearMetrics) {
712713
this.metrics.clear();
714+
} else {
715+
this.metrics.terminate();
713716
}
714717
}
715718
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,8 @@ public void refreshSources(String peerId) throws IOException {
467467
ReplicationSourceInterface toRemove = this.sources.remove(peerId);
468468
if (toRemove != null) {
469469
LOG.info("Terminate replication source for " + toRemove.getPeerId());
470-
toRemove.terminate(terminateMessage, null, true);
470+
// Do not clear metrics
471+
toRemove.terminate(terminateMessage, null, false);
471472
}
472473
src = createSource(peerId, peer);
473474
this.sources.put(peerId, src);

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public void terminate(String reason, Exception e) {
9494
public void terminate(String reason, Exception e, boolean clearMetrics) {
9595
if (clearMetrics) {
9696
this.metrics.clear();
97+
} else {
98+
this.metrics.terminate();
9799
}
98100
}
99101

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,41 @@ public void testRemovePeerMetricsCleanup() throws Exception {
572572
}
573573
}
574574

575+
@Test
576+
public void testDisablePeerMetricsCleanup() throws Exception {
577+
final String peerId = "DummyPeer";
578+
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
579+
.setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
580+
try {
581+
MetricsReplicationSourceSource globalSource = getGlobalSource();
582+
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
583+
final long sizeOfLatestPath = getSizeOfLatestPath();
584+
addPeerAndWait(peerId, peerConfig, true);
585+
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
586+
ReplicationSourceInterface source = manager.getSource(peerId);
587+
// Sanity check
588+
assertNotNull(source);
589+
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
590+
// Enqueue log and check if metrics updated
591+
source.enqueueLog(new Path("abc"));
592+
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
593+
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
594+
globalSource.getSizeOfLogQueue());
595+
596+
// Refreshing the peer should decrement the global and single source metrics
597+
manager.refreshSources(peerId);
598+
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
599+
600+
source = manager.getSource(peerId);
601+
assertNotNull(source);
602+
assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
603+
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
604+
globalSource.getSizeOfLogQueue());
605+
} finally {
606+
removePeerAndWait(peerId);
607+
}
608+
}
609+
575610
private ReplicationSourceInterface mockReplicationSource(String peerId) {
576611
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
577612
when(source.getPeerId()).thenReturn(peerId);

0 commit comments

Comments
 (0)