From 978ba12a06efd82719c804ec587ceaf606e01489 Mon Sep 17 00:00:00 2001 From: binlijin Date: Wed, 30 Oct 2019 17:28:34 +0800 Subject: [PATCH 1/2] HBASE-23231 ReplicationSource do not update metrics after refresh --- .../hbase/replication/regionserver/MetricsSource.java | 8 +++++++- .../hbase/replication/regionserver/ReplicationSource.java | 3 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 92ab070d6dc5..23d4c078f71d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -47,7 +47,7 @@ public class MetricsSource implements BaseSource { private String id; private long timeStampNextToReplicate; - private final MetricsReplicationSourceSource singleSourceSource; + private MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; private Map singleSourceSourceByTable; @@ -439,4 +439,10 @@ public String getMetricsName() { public Map getSingleSourceSourceByTable() { return singleSourceSourceByTable; } + + public void initMetric() { + singleSourceSource = + CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getSource(id); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 948c24dc4f86..791fb4ffbada 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -531,6 +531,9 @@ private void initialize() { @Override public void startup() { + if (metrics != null) { + metrics.initMetric(); + } // mark we are running now this.sourceRunning = true; initThread = new Thread(this::initialize); From 8b810a73fb8d80b7cf9083bbe21a3c47eb3df30e Mon Sep 17 00:00:00 2001 From: binlijin Date: Wed, 30 Oct 2019 19:44:32 +0800 Subject: [PATCH 2/2] Do not remove metrics when refresh --- .../regionserver/MetricsSource.java | 8 +------- .../regionserver/ReplicationSource.java | 18 ++++++++++++------ .../ReplicationSourceInterface.java | 8 ++++++++ .../regionserver/ReplicationSourceManager.java | 3 ++- .../replication/ReplicationSourceDummy.java | 9 ++++++++- 5 files changed, 31 insertions(+), 15 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 23d4c078f71d..92ab070d6dc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -47,7 +47,7 @@ public class MetricsSource implements BaseSource { private String id; private long timeStampNextToReplicate; - private MetricsReplicationSourceSource singleSourceSource; + private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; private Map singleSourceSourceByTable; @@ -439,10 +439,4 @@ public String getMetricsName() { public Map getSingleSourceSourceByTable() { return singleSourceSourceByTable; } - - public void initMetric() { - singleSourceSource = - CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) - .getSource(id); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 791fb4ffbada..78edffaf471a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -531,9 +531,6 @@ private void initialize() { @Override public void startup() { - if (metrics != null) { - metrics.initMetric(); - } // mark we are running now this.sourceRunning = true; initThread = new Thread(this::initialize); @@ -552,7 +549,12 @@ public void terminate(String reason, Exception cause) { terminate(reason, cause, true); } - public void terminate(String reason, Exception cause, boolean join) { + @Override + public void terminate(String reason, Exception cause, boolean clearMetrics) { + terminate(reason, cause, clearMetrics, true); + } + + public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { if (cause == null) { LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); } else { @@ -598,7 +600,9 @@ public void terminate(String reason, Exception cause, boolean join) { if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } - metrics.clear(); + if (clearMetrics) { + metrics.clear(); + } if (join) { for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries); @@ -614,7 +618,9 @@ public void terminate(String reason, Exception cause, boolean join) { } } } - this.metrics.clear(); + if (clearMetrics) { + this.metrics.clear(); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df7a8cc7b2ff..0bd90cf1ee89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -91,6 +91,14 @@ void addHFileRefs(TableName tableName, byte[] family, List> pai */ void terminate(String reason, Exception cause); + /** + * End the replication + * @param reason why it's terminating + * @param cause the error that's causing it + * @param clearMetrics removes all metrics about this Source + */ + void terminate(String reason, Exception cause, boolean clearMetrics); + /** * Get the current log that's replicated * @return the current log diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index de37cc862e7e..43afa798f08b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -482,7 +482,8 @@ public void refreshSources(String peerId) throws IOException { ReplicationSourceInterface toRemove = this.sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - toRemove.terminate(terminateMessage); + // Do not clear metrics + toRemove.terminate(terminateMessage, null, false); } for (NavigableSet walsByGroup : walsById.get(peerId).values()) { walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 67f793d628c5..a361c4470604 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -86,7 +86,14 @@ public void terminate(String reason) { @Override public void terminate(String reason, Exception e) { - this.metrics.clear(); + terminate(reason, e, true); + } + + @Override + public void terminate(String reason, Exception e, boolean clearMetrics) { + if (clearMetrics) { + this.metrics.clear(); + } } @Override