diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 4593a7d20020..926f4826265f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -405,7 +405,7 @@ protected void requestCompactionInternal(HRegion region, HStore store, String wh + "store size is {}", getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); } - region.incrementCompactionsQueuedCount(); + store.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") @@ -630,11 +630,11 @@ private void doCompaction(User user) { } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); - region.decrementCompactionsQueuedCount(); + store.decrementCompactionsQueuedCount(); return; } if (!selected.isPresent()) { - region.decrementCompactionsQueuedCount(); + store.decrementCompactionsQueuedCount(); return; // nothing to do } c = selected.get(); @@ -688,16 +688,16 @@ private void doCompaction(User user) { if (remoteEx != ex) { LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); } - region.reportCompactionRequestFailure(); + store.reportCompactionRequestFailure(); server.checkFileSystem(); } catch (Exception ex) { LOG.error("Compaction failed " + this, ex); - region.reportCompactionRequestFailure(); + store.reportCompactionRequestFailure(); server.checkFileSystem(); } finally { tracker.afterExecution(store); completeTracker.completed(store); - region.decrementCompactionsQueuedCount(); + store.decrementCompactionsQueuedCount(); LOG.debug("Status {}", CompactSplit.this); } } @@ -710,7 +710,7 @@ public void run() { server.isStopped() || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled()) ) { - region.decrementCompactionsQueuedCount(); + store.decrementCompactionsQueuedCount(); return; } doCompaction(user); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0dc96747dd36..f49c5922eda9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -358,12 +358,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Number of requests blocked by memstore size. private final LongAdder blockedRequestsCount = new LongAdder(); - // Compaction LongAdders - final LongAdder compactionsFinished = new LongAdder(); - final LongAdder compactionsFailed = new LongAdder(); - final LongAdder compactionNumFilesCompacted = new LongAdder(); - final LongAdder compactionNumBytesCompacted = new LongAdder(); - final LongAdder compactionsQueued = new LongAdder(); final LongAdder flushesQueued = new LongAdder(); private BlockCache blockCache; @@ -8621,29 +8615,11 @@ public void reportCompactionRequestStart(boolean isMajor) { (isMajor ? majorInProgress : minorInProgress).incrementAndGet(); } - public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) { + public void reportCompactionRequestEnd(boolean isMajor) { int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet(); - - // metrics - compactionsFinished.increment(); - compactionNumFilesCompacted.add(numFiles); - compactionNumBytesCompacted.add(filesSizeCompacted); - assert newValue >= 0; } - public void reportCompactionRequestFailure() { - compactionsFailed.increment(); - } - - public void incrementCompactionsQueuedCount() { - compactionsQueued.increment(); - } - - public void decrementCompactionsQueuedCount() { - compactionsQueued.decrement(); - } - public void incrementFlushesQueuedCount() { flushesQueued.increment(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5e2bf00f85be..9f3fe45b29b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -184,6 +184,13 @@ public class HStore // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. private final List filesCompacting = Lists.newArrayList(); + // Compaction LongAdders + final LongAdder compactionsFinishedCount = new LongAdder(); + final LongAdder compactionsFailedCount = new LongAdder(); + final LongAdder compactionNumFilesCompacted = new LongAdder(); + final LongAdder compactionNumBytesCompacted = new LongAdder(); + final LongAdder compactionsQueuedCount = new LongAdder(); + // All access must be synchronized. private final Set changedReaderObservers = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -1592,7 +1599,8 @@ public void cancelRequestedCompaction(CompactionContext compaction) { } private void finishCompactionRequest(CompactionRequestImpl cr) { - this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); + this.region.reportCompactionRequestEnd(cr.isMajor()); + reportCompactionRequestEnd(cr.getFiles().size(), cr.getSize()); if (cr.isOffPeak()) { offPeakCompactionTracker.set(false); cr.setOffPeak(false); @@ -2065,6 +2073,22 @@ public void abort() throws IOException { @Override public boolean needsCompaction() { + // For some system compacting, we set selectNow to false, and the files do not + // be selected until compaction runs, so we should limit the compaction count here + // to avoid the length of queue grows too big. + int filesNotCompacting = + this.storeEngine.storeFileManager.getStorefileCount() - filesCompacting.size(); + int maxFilesToCompact = this.storeEngine.getCompactionPolicy().getConf().getMaxFilesToCompact(); + int maxCompactionsShouldBeQueued = filesNotCompacting / maxFilesToCompact; + maxCompactionsShouldBeQueued += filesNotCompacting % maxFilesToCompact == 0 ? 0 : 1; + if (this.compactionsQueuedCount.sum() >= maxCompactionsShouldBeQueued) { + LOG.debug( + "this store has too many compactions in queue, filesNotCompacting:{}, " + + "compactionsQueuedCount:{}, maxCompactionsNeedQueued:{}", + filesNotCompacting, this.compactionsQueuedCount.sum(), maxCompactionsShouldBeQueued); + return false; + } + List filesCompactingClone = null; synchronized (filesCompacting) { filesCompactingClone = Lists.newArrayList(filesCompacting); @@ -2471,4 +2495,47 @@ public long getBloomFilterNegativeResultsCount() { public long getBloomFilterEligibleRequestsCount() { return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); } + + public void incrementCompactionsQueuedCount() { + compactionsQueuedCount.increment(); + } + + public void decrementCompactionsQueuedCount() { + compactionsQueuedCount.decrement(); + } + + @Override + public long getCompactionsFinishedCount() { + return this.compactionsFinishedCount.sum(); + } + + @Override + public long getCompactionsFailedCount() { + return this.compactionsFailedCount.sum(); + } + + @Override + public long getCompactionsNumFiles() { + return this.compactionNumFilesCompacted.sum(); + } + + @Override + public long getCompactionsNumBytes() { + return this.compactionNumBytesCompacted.sum(); + } + + @Override + public long getCompactionsQueuedCount() { + return this.compactionsQueuedCount.sum(); + } + + public void reportCompactionRequestFailure() { + compactionsFailedCount.increment(); + } + + public void reportCompactionRequestEnd(int numFiles, long filesSizeCompacted) { + compactionsFinishedCount.increment(); + compactionNumFilesCompacted.add(numFiles); + compactionNumBytesCompacted.add(filesSizeCompacted); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index c9595c12b5e9..be78ca350040 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.errorprone.annotations.RestrictedApi; import java.io.Closeable; import java.io.IOException; import java.util.HashMap; @@ -57,6 +58,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable private long numReferenceFiles; private long maxFlushQueueSize; private long maxCompactionQueueSize; + private long compactionsFinishedCount; + private long compactionsFailedCount; + private long compactionNumFilesCompacted; + private long compactionNumBytesCompacted; + private long compactionsQueuedCount; private Map readsOnlyFromMemstore; private Map mixedReadsOnStore; @@ -153,17 +159,17 @@ public long getWriteRequestCount() { @Override public long getNumFilesCompacted() { - return this.region.compactionNumFilesCompacted.sum(); + return this.compactionNumFilesCompacted; } @Override public long getNumBytesCompacted() { - return this.region.compactionNumBytesCompacted.sum(); + return this.compactionNumBytesCompacted; } @Override public long getNumCompactionsCompleted() { - return this.region.compactionsFinished.sum(); + return this.compactionsFinishedCount; } @Override @@ -185,12 +191,12 @@ public long getTotalRequestCount() { @Override public long getNumCompactionsFailed() { - return this.region.compactionsFailed.sum(); + return this.compactionsFailedCount; } @Override public long getNumCompactionsQueued() { - return this.region.compactionsQueued.sum(); + return this.compactionsQueuedCount; } @Override @@ -256,11 +262,21 @@ public void run() { long tempMinStoreFileAge = Long.MAX_VALUE; long tempNumReferenceFiles = 0; long tempMaxCompactionQueueSize = 0; + long tempCompactionsFinishedCount = 0; + long tempCompactionsFailedCount = 0; + long tempCompactionNumFilesCompacted = 0; + long tempCompactionNumBytesCompacted = 0; + long tempCompactionsQueuedCount = 0; long tempMaxFlushQueueSize = 0; long avgAgeNumerator = 0; long numHFiles = 0; if (region.stores != null) { for (HStore store : region.stores.values()) { + tempCompactionsFinishedCount += store.getCompactionsFinishedCount(); + tempCompactionsFailedCount += store.getCompactionsFailedCount(); + tempCompactionNumFilesCompacted += store.getCompactionsNumFiles(); + tempCompactionNumBytesCompacted += store.getCompactionsNumBytes(); + tempCompactionsQueuedCount += store.getCompactionsQueuedCount(); tempNumStoreFiles += store.getStorefilesCount(); int currentStoreRefCount = store.getStoreRefCount(); tempStoreRefCount += currentStoreRefCount; @@ -339,6 +355,11 @@ public void run() { if (tempMaxFlushQueueSize > maxFlushQueueSize) { maxFlushQueueSize = tempMaxFlushQueueSize; } + compactionsFinishedCount = tempCompactionsFinishedCount; + compactionsFailedCount = tempCompactionsFailedCount; + compactionNumFilesCompacted = tempCompactionNumFilesCompacted; + compactionNumBytesCompacted = tempCompactionNumBytesCompacted; + compactionsQueuedCount = tempCompactionsQueuedCount; } } @@ -355,4 +376,9 @@ public int getReplicaId() { return region.getRegionInfo().getReplicaId(); } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + void updateMetricsAtOnce() { + runnable.run(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 39cf9eacd2ef..e3d5573b9146 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -252,4 +252,19 @@ public interface Store { * loaded. */ long getBloomFilterEligibleRequestsCount(); + + /** Returns the number of finished compactions for this store. */ + long getCompactionsFinishedCount(); + + /** Returns the number of failed compactions for this store. */ + long getCompactionsFailedCount(); + + /** Returns the number of files of compactions for this store. */ + long getCompactionsNumFiles(); + + /** Returns the number of bytes of compactions for this store. */ + long getCompactionsNumBytes(); + + /** Returns the number of compactions queued for this store. */ + long getCompactionsQueuedCount(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index c0bc72079cb7..e7defa9e2e84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -422,7 +422,7 @@ public void testCompactionFailure() throws Exception { null); // wait for the latch to complete. latch.await(120, TimeUnit.SECONDS); - + metricsWrapper.updateMetricsAtOnce(); // compaction should have completed and been marked as failed due to error in split request long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); long postFailedCount = metricsWrapper.getNumCompactionsFailed(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 2e999dfaa455..d863a6b3386b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -2850,6 +2850,36 @@ public void testImmutableMemStoreLABRefCnt() throws Exception { } } + @Test + public void testLimitCompactionQueuedCount() throws IOException { + init(this.name.getMethodName()); + // Write a store file. + store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); + flush(1); + assertEquals(1, store.getStorefilesCount()); + + // We have 1 file, not need compact + assertFalse(store.needsCompaction()); + + // We have 3 file, need compact + addStoreFile(); + addStoreFile(); + store.refreshStoreFiles(); + assertTrue(store.needsCompaction()); + + // A compaction queued, not need more + store.incrementCompactionsQueuedCount(); + assertFalse(store.needsCompaction()); + + // We have 13 file now, then need one compaction more + int storeFileNum = 10; + for (int s = 0; s < storeFileNum; s++) { + addStoreFile(); + } + store.refreshStoreFiles(); + assertTrue(store.needsCompaction()); + } + private HStoreFile mockStoreFileWithLength(long length) { HStoreFile sf = mock(HStoreFile.class); StoreFileReader sfr = mock(StoreFileReader.class);