From c1623fab0f018eb0911e33545cac21648bf613f9 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 3 Aug 2020 22:04:39 -0400 Subject: [PATCH 1/6] HBASE-24779 Report on the WAL edit buffer usage/limit for replication --- .../MetricsReplicationGlobalSourceSource.java | 13 +++++++++ .../MetricsReplicationSourceSource.java | 10 +++++++ .../MetricsReplicationSourceSourceImpl.java | 11 ++++++++ .../regionserver/MetricsSource.java | 15 +++++++++++ .../replication/regionserver/Replication.java | 13 +++++++-- .../ReplicationSourceManager.java | 16 +++++++++++ .../ReplicationSourceWALReader.java | 5 ++-- .../replication/TestReplicationEndpoint.java | 27 +++++++++++++++++++ 8 files changed, 106 insertions(+), 4 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 630fdb8d2789..8d667c431f42 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -47,6 +47,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue; + private final MutableGaugeLong walReaderBufferUsageBytes; public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { this.rms = rms; @@ -84,6 +85,8 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); failedRecoveryQueue = rms.getMetricsRegistry() .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); + + walReaderBufferUsageBytes = rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); } @Override public void setLastShippedAge(long age) { @@ -250,4 +253,14 @@ public long getShippedOps() { public long getEditsFiltered() { return this.walEditsFilteredCounter.value(); } + + @Override + public void setWALReaderEditsBufferBytes(long usage) { + this.walReaderBufferUsageBytes.set(usage); + } + + @Override + public long getWALReaderEditsBufferBytes() { + return this.walReaderBufferUsageBytes.value(); + } } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index edb786414770..3aebf97340e6 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -49,6 +49,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; + public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); @@ -76,4 +77,13 @@ public interface MetricsReplicationSourceSource extends BaseSource { long getWALEditsRead(); long getShippedOps(); long getEditsFiltered(); + /** + * Sets the total usage of memory used by edits in memory read from WALs. + * @param usage The memory used by edits in bytes + */ + void setWALReaderEditsBufferBytes(long usage); + /** + * Returns the size, in bytes, of edits held in memory to be replicated. + */ + long getWALReaderEditsBufferBytes(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 8ce29937fea9..f4fbe7c57c17 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -314,4 +314,15 @@ public String getMetricsName() { @Override public long getEditsFiltered() { return this.walEditsFilteredCounter.value(); } + + @Override + public void setWALReaderEditsBufferBytes(long usage) { + //noop. Global limit, tracked globally. Do not need per-source metrics + } + + @Override + public long getWALReaderEditsBufferBytes() { + //noop. Global limit, tracked globally. Do not need per-source metrics + return 0L; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 39fe7b429d3e..03c3ac8e2896 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -454,4 +454,19 @@ public String getMetricsName() { public Map getSingleSourceSourceByTable() { return singleSourceSourceByTable; } + + /** + * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. + */ + public void setWALReaderEditsBufferUsage(long usageInBytes) { + globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); + } + + /** + * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. + * @return + */ + public long getWALReaderEditsBufferUsage() { + return globalSourceSource.getWALReaderEditsBufferBytes(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 4cbce8c72731..44822228c799 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; @@ -76,6 +77,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; + private MetricsReplicationSourceSource globalMetricsSource; private PeerProcedureHandler peerProcedureHandler; @@ -150,6 +152,8 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0)); } } + this.globalMetricsSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getGlobalSource(); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod); @@ -214,7 +218,7 @@ public void startReplicationService() throws IOException { this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf); this.scheduleThreadPool.scheduleAtFixedRate( - new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), + new ReplicationStatisticsTask(this.replicationSink, this.replicationManager, this.globalMetricsSource), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); LOG.info("{} started", this.server.toString()); } @@ -244,17 +248,22 @@ private final static class ReplicationStatisticsTask implements Runnable { private final ReplicationSink replicationSink; private final ReplicationSourceManager replicationManager; + private final MetricsReplicationSourceSource globalMetricsSource; public ReplicationStatisticsTask(ReplicationSink replicationSink, - ReplicationSourceManager replicationManager) { + ReplicationSourceManager replicationManager, MetricsReplicationSourceSource globalMetricsSource) { this.replicationManager = replicationManager; this.replicationSink = replicationSink; + this.globalMetricsSource = globalMetricsSource; } @Override public void run() { printStats(this.replicationManager.getStats()); printStats(this.replicationSink.getStats()); + + // Report how much data we've read off disk which is pending replication, across all sources + globalMetricsSource.setWALReaderEditsBufferBytes(replicationManager.getTotalBufferUsed().get()); } private void printStats(String stats) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 1a012bd5db42..d177dfd0f8fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -169,6 +169,8 @@ public class ReplicationSourceManager implements ReplicationListener { // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; + // Total buffer size on this RegionServer for holding batched edits to be shipped. + private final long totalBufferLimit; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -222,6 +224,8 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); + this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); } /** @@ -1069,6 +1073,14 @@ public AtomicLong getTotalBufferUsed() { return totalBufferUsed; } + /** + * Returns the maximum size in bytes of edits held in memory which are pending replication + * across all sources inside this RegionServer. + */ + public long getTotalBufferLimit() { + return totalBufferLimit; + } + /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -1106,6 +1118,10 @@ public ReplicationPeers getReplicationPeers() { */ public String getStats() { StringBuilder stats = new StringBuilder(); + // Print stats that apply across all Replication Sources + stats.append("Global stats: "); + stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=") + .append(getTotalBufferLimit()).append("B\n"); for (ReplicationSourceInterface source : this.sources.values()) { stats.append("Normal source for cluster " + source.getPeerId() + ": "); stats.append(source.getStats() + "\n"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 7e0e550106ed..18268612e44c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -104,8 +104,7 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); - this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, - HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -276,6 +275,8 @@ public Path getCurrentPath() { private boolean checkQuota() { // try not to go over total quota if (totalBufferUsed.get() > totalBufferQuota) { + LOG.warn("Can't read more edits from WAL as buffer usage {}B exceeds limit {}B", + totalBufferUsed.get(), totalBufferQuota); Threads.sleep(sleepForRetries); return false; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 4dd264cd5b2a..d97dfd15d932 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -497,6 +497,33 @@ public boolean canReplicateToSameCluster() { } } + public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { + private long duration; + public SleepingReplicationEndpointForTest() { + super(); + } + + @Override + public void init(Context context) throws IOException { + super.init(context); + if (this.ctx != null) { + duration = this.ctx.getConfiguration().getLong( + "test.sleep.replication.endpoint.duration.millis", 5000L); + } + } + + @Override + public boolean replicate(ReplicateContext context) { + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return super.replicate(context); + } + } + public static class InterClusterReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { From 41c5ae7ae661a92f275144c04a2862f416dcc380 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Tue, 4 Aug 2020 11:50:40 -0400 Subject: [PATCH 2/6] Address busbey feedback: * Fix up metrics source interface hierarchy * Docs for test replication endpoint * Push down metrics updating to mutation of the buffer usage variable (rather than on a timer) --- .../MetricsReplicationGlobalSourceSource.java | 273 +----------------- ...ricsReplicationGlobalSourceSourceImpl.java | 266 +++++++++++++++++ .../MetricsReplicationSourceFactory.java | 2 +- .../MetricsReplicationSourceFactoryImpl.java | 4 +- .../MetricsReplicationSourceSource.java | 10 - .../MetricsReplicationSourceSourceImpl.java | 11 - .../regionserver/MetricsSource.java | 4 +- .../replication/regionserver/Replication.java | 17 +- .../regionserver/ReplicationSource.java | 4 +- .../ReplicationSourceManager.java | 9 +- .../ReplicationSourceWALReader.java | 6 +- .../replication/TestReplicationEndpoint.java | 19 +- 12 files changed, 320 insertions(+), 305 deletions(-) create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 8d667c431f42..fab33bc8b774 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -1,266 +1,19 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.metrics2.lib.MutableFastCounter; -import org.apache.hadoop.metrics2.lib.MutableGaugeLong; -import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ - private static final String KEY_PREFIX = "source."; - - private final MetricsReplicationSourceImpl rms; - - private final MutableHistogram ageOfLastShippedOpHist; - private final MutableGaugeLong sizeOfLogQueueGauge; - private final MutableFastCounter logReadInEditsCounter; - private final MutableFastCounter walEditsFilteredCounter; - private final MutableFastCounter shippedBatchesCounter; - private final MutableFastCounter shippedOpsCounter; - private final MutableFastCounter shippedBytesCounter; - private final MutableFastCounter logReadInBytesCounter; - private final MutableFastCounter shippedHFilesCounter; - private final MutableGaugeLong sizeOfHFileRefsQueueGauge; - private final MutableFastCounter unknownFileLengthForClosedWAL; - private final MutableFastCounter uncleanlyClosedWAL; - private final MutableFastCounter uncleanlyClosedSkippedBytes; - private final MutableFastCounter restartWALReading; - private final MutableFastCounter repeatedFileBytes; - private final MutableFastCounter completedWAL; - private final MutableFastCounter completedRecoveryQueue; - private final MutableFastCounter failedRecoveryQueue; - private final MutableGaugeLong walReaderBufferUsageBytes; - - public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { - this.rms = rms; - - ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); - - sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); - - shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); - - shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); - - shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); - - logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); - - logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); - - walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); - - shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); - - sizeOfHFileRefsQueueGauge = - rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); - - unknownFileLengthForClosedWAL = rms.getMetricsRegistry() - .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); - uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); - uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() - .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); - restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); - repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); - completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); - completedRecoveryQueue = rms.getMetricsRegistry() - .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); - failedRecoveryQueue = rms.getMetricsRegistry() - .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); - - walReaderBufferUsageBytes = rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); - } - - @Override public void setLastShippedAge(long age) { - ageOfLastShippedOpHist.add(age); - } - - @Override public void incrSizeOfLogQueue(int size) { - sizeOfLogQueueGauge.incr(size); - } - - @Override public void decrSizeOfLogQueue(int size) { - sizeOfLogQueueGauge.decr(size); - } - - @Override public void incrLogReadInEdits(long size) { - logReadInEditsCounter.incr(size); - } - - @Override public void incrLogEditsFiltered(long size) { - walEditsFilteredCounter.incr(size); - } - - @Override public void incrBatchesShipped(int batches) { - shippedBatchesCounter.incr(batches); - } - - @Override public void incrOpsShipped(long ops) { - shippedOpsCounter.incr(ops); - } - - @Override public void incrShippedBytes(long size) { - shippedBytesCounter.incr(size); - } - - @Override public void incrLogReadInBytes(long size) { - logReadInBytesCounter.incr(size); - } - - @Override public void clear() { - } - - @Override - public long getLastShippedAge() { - return ageOfLastShippedOpHist.getMax(); - } - - @Override public void incrHFilesShipped(long hfiles) { - shippedHFilesCounter.incr(hfiles); - } - - @Override - public void incrSizeOfHFileRefsQueue(long size) { - sizeOfHFileRefsQueueGauge.incr(size); - } - - @Override - public void decrSizeOfHFileRefsQueue(long size) { - sizeOfHFileRefsQueueGauge.decr(size); - } - - @Override - public int getSizeOfLogQueue() { - return (int)sizeOfLogQueueGauge.value(); - } - - @Override - public void incrUnknownFileLengthForClosedWAL() { - unknownFileLengthForClosedWAL.incr(1L); - } - @Override - public void incrUncleanlyClosedWALs() { - uncleanlyClosedWAL.incr(1L); - } - @Override - public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { - uncleanlyClosedSkippedBytes.incr(bytes); - } - @Override - public void incrRestartedWALReading() { - restartWALReading.incr(1L); - } - @Override - public void incrRepeatedFileBytes(final long bytes) { - repeatedFileBytes.incr(bytes); - } - @Override - public void incrCompletedWAL() { - completedWAL.incr(1L); - } - @Override - public void incrCompletedRecoveryQueue() { - completedRecoveryQueue.incr(1L); - } - @Override - public void incrFailedRecoveryQueue() { - failedRecoveryQueue.incr(1L); - } - @Override - public void init() { - rms.init(); - } - - @Override - public void setGauge(String gaugeName, long value) { - rms.setGauge(KEY_PREFIX + gaugeName, value); - } - - @Override - public void incGauge(String gaugeName, long delta) { - rms.incGauge(KEY_PREFIX + gaugeName, delta); - } - - @Override - public void decGauge(String gaugeName, long delta) { - rms.decGauge(KEY_PREFIX + gaugeName, delta); - } - - @Override - public void removeMetric(String key) { - rms.removeMetric(KEY_PREFIX + key); - } - - @Override - public void incCounters(String counterName, long delta) { - rms.incCounters(KEY_PREFIX + counterName, delta); - } - - @Override - public void updateHistogram(String name, long value) { - rms.updateHistogram(KEY_PREFIX + name, value); - } - - @Override - public String getMetricsContext() { - return rms.getMetricsContext(); - } - - @Override - public String getMetricsDescription() { - return rms.getMetricsDescription(); - } - - @Override - public String getMetricsJmxContext() { - return rms.getMetricsJmxContext(); - } - - @Override - public String getMetricsName() { - return rms.getMetricsName(); - } - - @Override - public long getWALEditsRead() { - return this.logReadInEditsCounter.value(); - } - - @Override - public long getShippedOps() { - return this.shippedOpsCounter.value(); - } - - @Override - public long getEditsFiltered() { - return this.walEditsFilteredCounter.value(); - } - - @Override - public void setWALReaderEditsBufferBytes(long usage) { - this.walReaderBufferUsageBytes.set(usage); - } - - @Override - public long getWALReaderEditsBufferBytes() { - return this.walReaderBufferUsageBytes.value(); - } +public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource { + + public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; + + /** + * Sets the total usage of memory used by edits in memory read from WALs. + * @param usage The memory used by edits in bytes + */ + void setWALReaderEditsBufferBytes(long usage); + /** + * Returns the size, in bytes, of edits held in memory to be replicated. + */ + long getWALReaderEditsBufferBytes(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java new file mode 100644 index 000000000000..f41a64f9b5ff --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableHistogram; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsReplicationGlobalSourceSourceImpl implements MetricsReplicationGlobalSourceSource { + private static final String KEY_PREFIX = "source."; + + private final MetricsReplicationSourceImpl rms; + + private final MutableHistogram ageOfLastShippedOpHist; + private final MutableGaugeLong sizeOfLogQueueGauge; + private final MutableFastCounter logReadInEditsCounter; + private final MutableFastCounter walEditsFilteredCounter; + private final MutableFastCounter shippedBatchesCounter; + private final MutableFastCounter shippedOpsCounter; + private final MutableFastCounter shippedBytesCounter; + private final MutableFastCounter logReadInBytesCounter; + private final MutableFastCounter shippedHFilesCounter; + private final MutableGaugeLong sizeOfHFileRefsQueueGauge; + private final MutableFastCounter unknownFileLengthForClosedWAL; + private final MutableFastCounter uncleanlyClosedWAL; + private final MutableFastCounter uncleanlyClosedSkippedBytes; + private final MutableFastCounter restartWALReading; + private final MutableFastCounter repeatedFileBytes; + private final MutableFastCounter completedWAL; + private final MutableFastCounter completedRecoveryQueue; + private final MutableFastCounter failedRecoveryQueue; + private final MutableGaugeLong walReaderBufferUsageBytes; + + public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { + this.rms = rms; + + ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); + + sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); + + shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); + + shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); + + shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); + + logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); + + logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); + + walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); + + shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); + + sizeOfHFileRefsQueueGauge = + rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); + + unknownFileLengthForClosedWAL = rms.getMetricsRegistry() + .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); + uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); + uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() + .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); + restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); + repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); + completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); + completedRecoveryQueue = rms.getMetricsRegistry() + .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); + failedRecoveryQueue = rms.getMetricsRegistry() + .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); + + walReaderBufferUsageBytes = rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); + } + + @Override public void setLastShippedAge(long age) { + ageOfLastShippedOpHist.add(age); + } + + @Override public void incrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.incr(size); + } + + @Override public void decrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.decr(size); + } + + @Override public void incrLogReadInEdits(long size) { + logReadInEditsCounter.incr(size); + } + + @Override public void incrLogEditsFiltered(long size) { + walEditsFilteredCounter.incr(size); + } + + @Override public void incrBatchesShipped(int batches) { + shippedBatchesCounter.incr(batches); + } + + @Override public void incrOpsShipped(long ops) { + shippedOpsCounter.incr(ops); + } + + @Override public void incrShippedBytes(long size) { + shippedBytesCounter.incr(size); + } + + @Override public void incrLogReadInBytes(long size) { + logReadInBytesCounter.incr(size); + } + + @Override public void clear() { + } + + @Override + public long getLastShippedAge() { + return ageOfLastShippedOpHist.getMax(); + } + + @Override public void incrHFilesShipped(long hfiles) { + shippedHFilesCounter.incr(hfiles); + } + + @Override + public void incrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.incr(size); + } + + @Override + public void decrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.decr(size); + } + + @Override + public int getSizeOfLogQueue() { + return (int)sizeOfLogQueueGauge.value(); + } + + @Override + public void incrUnknownFileLengthForClosedWAL() { + unknownFileLengthForClosedWAL.incr(1L); + } + @Override + public void incrUncleanlyClosedWALs() { + uncleanlyClosedWAL.incr(1L); + } + @Override + public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { + uncleanlyClosedSkippedBytes.incr(bytes); + } + @Override + public void incrRestartedWALReading() { + restartWALReading.incr(1L); + } + @Override + public void incrRepeatedFileBytes(final long bytes) { + repeatedFileBytes.incr(bytes); + } + @Override + public void incrCompletedWAL() { + completedWAL.incr(1L); + } + @Override + public void incrCompletedRecoveryQueue() { + completedRecoveryQueue.incr(1L); + } + @Override + public void incrFailedRecoveryQueue() { + failedRecoveryQueue.incr(1L); + } + @Override + public void init() { + rms.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + rms.setGauge(KEY_PREFIX + gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + rms.incGauge(KEY_PREFIX + gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + rms.decGauge(KEY_PREFIX + gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + rms.removeMetric(KEY_PREFIX + key); + } + + @Override + public void incCounters(String counterName, long delta) { + rms.incCounters(KEY_PREFIX + counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + rms.updateHistogram(KEY_PREFIX + name, value); + } + + @Override + public String getMetricsContext() { + return rms.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return rms.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return rms.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return rms.getMetricsName(); + } + + @Override + public long getWALEditsRead() { + return this.logReadInEditsCounter.value(); + } + + @Override + public long getShippedOps() { + return this.shippedOpsCounter.value(); + } + + @Override + public long getEditsFiltered() { + return this.walEditsFilteredCounter.value(); + } + + @Override + public void setWALReaderEditsBufferBytes(long usage) { + this.walReaderBufferUsageBytes.set(usage); + } + + @Override + public long getWALReaderEditsBufferBytes() { + return this.walReaderBufferUsageBytes.value(); + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java index 2816f832edf9..73d2cfd62f49 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java @@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory { public MetricsReplicationSinkSource getSink(); public MetricsReplicationSourceSource getSource(String id); public MetricsReplicationTableSource getTableSource(String tableName); - public MetricsReplicationSourceSource getGlobalSource(); + public MetricsReplicationGlobalSourceSourceImpl getGlobalSource(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java index a3b346200417..061fc58296e0 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java @@ -39,7 +39,7 @@ private static enum SourceHolder { return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName); } - @Override public MetricsReplicationSourceSource getGlobalSource() { - return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); + @Override public MetricsReplicationGlobalSourceSourceImpl getGlobalSource() { + return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source); } } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 3aebf97340e6..edb786414770 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -49,7 +49,6 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; - public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); @@ -77,13 +76,4 @@ public interface MetricsReplicationSourceSource extends BaseSource { long getWALEditsRead(); long getShippedOps(); long getEditsFiltered(); - /** - * Sets the total usage of memory used by edits in memory read from WALs. - * @param usage The memory used by edits in bytes - */ - void setWALReaderEditsBufferBytes(long usage); - /** - * Returns the size, in bytes, of edits held in memory to be replicated. - */ - long getWALReaderEditsBufferBytes(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index f4fbe7c57c17..8ce29937fea9 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -314,15 +314,4 @@ public String getMetricsName() { @Override public long getEditsFiltered() { return this.walEditsFilteredCounter.value(); } - - @Override - public void setWALReaderEditsBufferBytes(long usage) { - //noop. Global limit, tracked globally. Do not need per-source metrics - } - - @Override - public long getWALReaderEditsBufferBytes() { - //noop. Global limit, tracked globally. Do not need per-source metrics - return 0L; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 03c3ac8e2896..0f73576feaf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource { private long timeStampNextToReplicate; private final MetricsReplicationSourceSource singleSourceSource; - private final MetricsReplicationSourceSource globalSourceSource; + private final MetricsReplicationGlobalSourceSource globalSourceSource; private Map singleSourceSourceByTable; /** @@ -75,7 +75,7 @@ public MetricsSource(String id) { * @param globalSourceSource Class to monitor global-scoped metrics */ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, - MetricsReplicationSourceSource globalSourceSource, + MetricsReplicationGlobalSourceSource globalSourceSource, Map singleSourceSourceByTable) { this.id = id; this.singleSourceSource = singleSourceSource; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 44822228c799..207cd531971c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -77,7 +77,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; - private MetricsReplicationSourceSource globalMetricsSource; + private MetricsReplicationGlobalSourceSource globalMetricsSource; private PeerProcedureHandler peerProcedureHandler; @@ -126,10 +126,12 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir throw new IOException("Could not read cluster id", ke); } SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); + this.globalMetricsSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getGlobalSource(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - mapping); + mapping, globalMetricsSource); this.syncReplicationPeerInfoProvider = new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); PeerActionListener peerActionListener = PeerActionListener.DUMMY; @@ -152,8 +154,6 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0)); } } - this.globalMetricsSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) - .getGlobalSource(); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod); @@ -218,7 +218,7 @@ public void startReplicationService() throws IOException { this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf); this.scheduleThreadPool.scheduleAtFixedRate( - new ReplicationStatisticsTask(this.replicationSink, this.replicationManager, this.globalMetricsSource), + new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); LOG.info("{} started", this.server.toString()); } @@ -248,22 +248,17 @@ private final static class ReplicationStatisticsTask implements Runnable { private final ReplicationSink replicationSink; private final ReplicationSourceManager replicationManager; - private final MetricsReplicationSourceSource globalMetricsSource; public ReplicationStatisticsTask(ReplicationSink replicationSink, - ReplicationSourceManager replicationManager, MetricsReplicationSourceSource globalMetricsSource) { + ReplicationSourceManager replicationManager) { this.replicationManager = replicationManager; this.replicationSink = replicationSink; - this.globalMetricsSource = globalMetricsSource; } @Override public void run() { printStats(this.replicationManager.getStats()); printStats(this.replicationSink.getStats()); - - // Report how much data we've read off disk which is pending replication, across all sources - globalMetricsSource.setWALReaderEditsBufferBytes(replicationManager.getTotalBufferUsed().get()); } private void printStats(String stats) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bc1754904b89..080576e46ed4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -722,7 +722,9 @@ public void postShipEdits(List entries, int batchSize) { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - totalBufferUsed.addAndGet(-batchSize); + long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); + // Record the new buffer usage + this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index d177dfd0f8fe..2cf91ed65b59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -171,6 +171,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final int maxRetriesMultiplier; // Total buffer size on this RegionServer for holding batched edits to be shipped. private final long totalBufferLimit; + private final MetricsReplicationGlobalSourceSource globalMetrics; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -188,7 +189,8 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, + MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -226,6 +228,7 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.globalMetrics = globalMetrics; } /** @@ -1147,4 +1150,8 @@ public void cleanUpHFileRefs(String peerId, List files) { int activeFailoverTaskCount() { return executor.getActiveCount(); } + + MetricsReplicationGlobalSourceSource getGlobalMetrics() { + return this.globalMetrics; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 18268612e44c..487220292854 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -405,7 +404,10 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - return totalBufferUsed.addAndGet(size) >= totalBufferQuota; + long newBufferUsed = totalBufferUsed.addAndGet(size); + // Record the new buffer usage + this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed >= totalBufferQuota; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index d97dfd15d932..ddc8509f7f89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; @@ -329,9 +330,9 @@ public void testMetricsSourceBaseSourcePassThrough() { MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); - MetricsReplicationSourceSource globalSourceSource = - new MetricsReplicationGlobalSourceSource(globalRms); - MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); + MetricsReplicationGlobalSourceSource globalSourceSource = + new MetricsReplicationGlobalSourceSourceImpl(globalRms); + MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); Map singleSourceSourceByTable = @@ -497,6 +498,16 @@ public boolean canReplicateToSameCluster() { } } + /** + * Not used by unit tests, helpful for manual testing with replication. + *

+ * Snippet for `hbase shell`: + *

+   * create 't', 'f'
+   * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+   * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1}
+   * 
+ */ public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { private long duration; public SleepingReplicationEndpointForTest() { @@ -508,7 +519,7 @@ public void init(Context context) throws IOException { super.init(context); if (this.ctx != null) { duration = this.ctx.getConfiguration().getLong( - "test.sleep.replication.endpoint.duration.millis", 5000L); + "hbase.test.sleep.replication.endpoint.duration.millis", 5000L); } } From 41f78b9d2d409b396449846ffe4ca1ea64e12dba Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Tue, 4 Aug 2020 13:11:13 -0400 Subject: [PATCH 3/6] Fix checkstyle and license failures --- .../MetricsReplicationGlobalSourceSource.java | 17 +++++++++++++++++ ...etricsReplicationGlobalSourceSourceImpl.java | 6 ++++-- .../replication/regionserver/Replication.java | 4 ++-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index fab33bc8b774..c3f0bab921f8 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.yetus.audience.InterfaceAudience; diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index f41a64f9b5ff..1c04109ed628 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -24,7 +24,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class MetricsReplicationGlobalSourceSourceImpl implements MetricsReplicationGlobalSourceSource { +public class MetricsReplicationGlobalSourceSourceImpl + implements MetricsReplicationGlobalSourceSource { private static final String KEY_PREFIX = "source."; private final MetricsReplicationSourceImpl rms; @@ -86,7 +87,8 @@ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms failedRecoveryQueue = rms.getMetricsRegistry() .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); - walReaderBufferUsageBytes = rms.getMetricsRegistry().getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); + walReaderBufferUsageBytes = rms.getMetricsRegistry() + .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); } @Override public void setLastShippedAge(long age) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 207cd531971c..195877bf5f3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -126,8 +126,8 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir throw new IOException("Could not read cluster id", ke); } SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); - this.globalMetricsSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) - .getGlobalSource(); + this.globalMetricsSource = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), From e197e980ef20ce2ef2e0d2401a15f552140177a4 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Tue, 4 Aug 2020 13:29:31 -0400 Subject: [PATCH 4/6] Add in Wellington's feedback --- .../regionserver/MetricsReplicationGlobalSourceSource.java | 7 +++++-- .../regionserver/ReplicationSourceWALReader.java | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index c3f0bab921f8..e373a6c1349e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -25,12 +25,15 @@ public interface MetricsReplicationGlobalSourceSource extends MetricsReplication public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; /** - * Sets the total usage of memory used by edits in memory read from WALs. + * Sets the total usage of memory used by edits in memory read from WALs. The memory represented + * by this usage measure is across peers/sources. For example, we may batch the same WAL edits + * multiple times for the sake of replicating them to multiple peers.. * @param usage The memory used by edits in bytes */ void setWALReaderEditsBufferBytes(long usage); + /** - * Returns the size, in bytes, of edits held in memory to be replicated. + * Returns the size, in bytes, of edits held in memory to be replicated across all peers. */ long getWALReaderEditsBufferBytes(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 487220292854..c71db1bf785b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -274,8 +274,8 @@ public Path getCurrentPath() { private boolean checkQuota() { // try not to go over total quota if (totalBufferUsed.get() > totalBufferQuota) { - LOG.warn("Can't read more edits from WAL as buffer usage {}B exceeds limit {}B", - totalBufferUsed.get(), totalBufferQuota); + LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", + this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); Threads.sleep(sleepForRetries); return false; } From 13ff9c8d9edc0c91b0949ac2b087eaf3ad7563d3 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Thu, 6 Aug 2020 15:45:42 -0400 Subject: [PATCH 5/6] Fix NPE on the global source --- .../hbase/replication/regionserver/TestWALEntryStream.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 2a21660dd47b..720ef3c1eddb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -378,6 +379,8 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(MetricsReplicationGlobalSourceSource.class); + when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); return source; } From 7695a702c96815cec8d50970b237982caf2d2e81 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Thu, 6 Aug 2020 17:10:10 -0400 Subject: [PATCH 6/6] Work around the test getting a limit of 0 due to the mock --- .../hbase/replication/regionserver/TestWALEntryStream.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 720ef3c1eddb..8b65235e114d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -372,6 +372,8 @@ public void testWALKeySerialization() throws Exception { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(mockSourceManager.getTotalBufferLimit()).thenReturn( + (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); Server mockServer = Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager);