Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ protected void requestCompactionInternal(HRegion region, HStore store, String wh
+ "store size is {}",
getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
}
region.incrementCompactionsQueuedCount();
store.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
Expand Down Expand Up @@ -630,11 +630,11 @@ private void doCompaction(User user) {
} catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem();
region.decrementCompactionsQueuedCount();
store.decrementCompactionsQueuedCount();
return;
}
if (!selected.isPresent()) {
region.decrementCompactionsQueuedCount();
store.decrementCompactionsQueuedCount();
return; // nothing to do
}
c = selected.get();
Expand Down Expand Up @@ -688,16 +688,16 @@ private void doCompaction(User user) {
if (remoteEx != ex) {
LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
}
region.reportCompactionRequestFailure();
store.reportCompactionRequestFailure();
server.checkFileSystem();
} catch (Exception ex) {
LOG.error("Compaction failed " + this, ex);
region.reportCompactionRequestFailure();
store.reportCompactionRequestFailure();
server.checkFileSystem();
} finally {
tracker.afterExecution(store);
completeTracker.completed(store);
region.decrementCompactionsQueuedCount();
store.decrementCompactionsQueuedCount();
LOG.debug("Status {}", CompactSplit.this);
}
}
Expand All @@ -710,7 +710,7 @@ public void run() {
server.isStopped() || (region.getTableDescriptor() != null
&& !region.getTableDescriptor().isCompactionEnabled())
) {
region.decrementCompactionsQueuedCount();
store.decrementCompactionsQueuedCount();
return;
}
doCompaction(user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Number of requests blocked by memstore size.
private final LongAdder blockedRequestsCount = new LongAdder();

// Compaction LongAdders
final LongAdder compactionsFinished = new LongAdder();
final LongAdder compactionsFailed = new LongAdder();
final LongAdder compactionNumFilesCompacted = new LongAdder();
final LongAdder compactionNumBytesCompacted = new LongAdder();
final LongAdder compactionsQueued = new LongAdder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit strange that we only move this field to HStore but still keep other compaction related metrics in HRegion...

Copy link
Contributor Author

@bsglz bsglz May 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right, i thought about it too, but did not find a better way, maybe all that metrics should be moved to HStore?
Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving compaction metrics to HStore would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving compaction metrics to HStore would be better.

I can move the other compaction metrics in a follow-on issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then is it possible to not include the metrics moving but still fix the problem here? It is not a good idea to land a half done work, so I prefer either we do not move any of them, or move them all at once.

final LongAdder flushesQueued = new LongAdder();

private BlockCache blockCache;
Expand Down Expand Up @@ -8621,29 +8615,11 @@ public void reportCompactionRequestStart(boolean isMajor) {
(isMajor ? majorInProgress : minorInProgress).incrementAndGet();
}

public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
public void reportCompactionRequestEnd(boolean isMajor) {
int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();

// metrics
compactionsFinished.increment();
compactionNumFilesCompacted.add(numFiles);
compactionNumBytesCompacted.add(filesSizeCompacted);

assert newValue >= 0;
}

public void reportCompactionRequestFailure() {
compactionsFailed.increment();
}

public void incrementCompactionsQueuedCount() {
compactionsQueued.increment();
}

public void decrementCompactionsQueuedCount() {
compactionsQueued.decrement();
}

public void incrementFlushesQueuedCount() {
flushesQueued.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ public class HStore
// TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
private final List<HStoreFile> filesCompacting = Lists.newArrayList();

// Compaction LongAdders
final LongAdder compactionsFinishedCount = new LongAdder();
final LongAdder compactionsFailedCount = new LongAdder();
final LongAdder compactionNumFilesCompacted = new LongAdder();
final LongAdder compactionNumBytesCompacted = new LongAdder();
final LongAdder compactionsQueuedCount = new LongAdder();

// All access must be synchronized.
private final Set<ChangedReadersObserver> changedReaderObservers =
Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
Expand Down Expand Up @@ -1592,7 +1599,8 @@ public void cancelRequestedCompaction(CompactionContext compaction) {
}

private void finishCompactionRequest(CompactionRequestImpl cr) {
this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
this.region.reportCompactionRequestEnd(cr.isMajor());
reportCompactionRequestEnd(cr.getFiles().size(), cr.getSize());
if (cr.isOffPeak()) {
offPeakCompactionTracker.set(false);
cr.setOffPeak(false);
Expand Down Expand Up @@ -2065,6 +2073,22 @@ public void abort() throws IOException {

@Override
public boolean needsCompaction() {
// For some system compacting, we set selectNow to false, and the files do not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I do not fully understand what is the problem here...

The problem is we schedule too many compactions, because of how we know whether there is a compaction running is through the filesCompacting? This is only true for system compaction? Then why not change the logic of requesting system compaction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic is an improvement introduced by HBASE-8665, which could get better store file selection.
I think that patch is useful, just bring some side effect, and this patch want to fix it.
BTW, skimming that patch maybe helpful to understand.
Thanks a lot.

// be selected until compaction runs, so we should limit the compaction count here
// to avoid the length of queue grows too big.
int filesNotCompacting =
this.storeEngine.storeFileManager.getStorefileCount() - filesCompacting.size();
int maxFilesToCompact = this.storeEngine.getCompactionPolicy().getConf().getMaxFilesToCompact();
int maxCompactionsShouldBeQueued = filesNotCompacting / maxFilesToCompact;
maxCompactionsShouldBeQueued += filesNotCompacting % maxFilesToCompact == 0 ? 0 : 1;
if (this.compactionsQueuedCount.sum() >= maxCompactionsShouldBeQueued) {
LOG.debug(
"this store has too many compactions in queue, filesNotCompacting:{}, "
+ "compactionsQueuedCount:{}, maxCompactionsNeedQueued:{}",
filesNotCompacting, this.compactionsQueuedCount.sum(), maxCompactionsShouldBeQueued);
return false;
}

List<HStoreFile> filesCompactingClone = null;
synchronized (filesCompacting) {
filesCompactingClone = Lists.newArrayList(filesCompacting);
Expand Down Expand Up @@ -2471,4 +2495,47 @@ public long getBloomFilterNegativeResultsCount() {
public long getBloomFilterEligibleRequestsCount() {
return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount();
}

public void incrementCompactionsQueuedCount() {
compactionsQueuedCount.increment();
}

public void decrementCompactionsQueuedCount() {
compactionsQueuedCount.decrement();
}

@Override
public long getCompactionsFinishedCount() {
return this.compactionsFinishedCount.sum();
}

@Override
public long getCompactionsFailedCount() {
return this.compactionsFailedCount.sum();
}

@Override
public long getCompactionsNumFiles() {
return this.compactionNumFilesCompacted.sum();
}

@Override
public long getCompactionsNumBytes() {
return this.compactionNumBytesCompacted.sum();
}

@Override
public long getCompactionsQueuedCount() {
return this.compactionsQueuedCount.sum();
}

public void reportCompactionRequestFailure() {
compactionsFailedCount.increment();
}

public void reportCompactionRequestEnd(int numFiles, long filesSizeCompacted) {
compactionsFinishedCount.increment();
compactionNumFilesCompacted.add(numFiles);
compactionNumBytesCompacted.add(filesSizeCompacted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -57,6 +58,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
private long numReferenceFiles;
private long maxFlushQueueSize;
private long maxCompactionQueueSize;
private long compactionsFinishedCount;
private long compactionsFailedCount;
private long compactionNumFilesCompacted;
private long compactionNumBytesCompacted;
private long compactionsQueuedCount;
private Map<String, Long> readsOnlyFromMemstore;
private Map<String, Long> mixedReadsOnStore;

Expand Down Expand Up @@ -153,17 +159,17 @@ public long getWriteRequestCount() {

@Override
public long getNumFilesCompacted() {
return this.region.compactionNumFilesCompacted.sum();
return this.compactionNumFilesCompacted;
}

@Override
public long getNumBytesCompacted() {
return this.region.compactionNumBytesCompacted.sum();
return this.compactionNumBytesCompacted;
}

@Override
public long getNumCompactionsCompleted() {
return this.region.compactionsFinished.sum();
return this.compactionsFinishedCount;
}

@Override
Expand All @@ -185,12 +191,12 @@ public long getTotalRequestCount() {

@Override
public long getNumCompactionsFailed() {
return this.region.compactionsFailed.sum();
return this.compactionsFailedCount;
}

@Override
public long getNumCompactionsQueued() {
return this.region.compactionsQueued.sum();
return this.compactionsQueuedCount;
}

@Override
Expand Down Expand Up @@ -256,11 +262,21 @@ public void run() {
long tempMinStoreFileAge = Long.MAX_VALUE;
long tempNumReferenceFiles = 0;
long tempMaxCompactionQueueSize = 0;
long tempCompactionsFinishedCount = 0;
long tempCompactionsFailedCount = 0;
long tempCompactionNumFilesCompacted = 0;
long tempCompactionNumBytesCompacted = 0;
long tempCompactionsQueuedCount = 0;
long tempMaxFlushQueueSize = 0;
long avgAgeNumerator = 0;
long numHFiles = 0;
if (region.stores != null) {
for (HStore store : region.stores.values()) {
tempCompactionsFinishedCount += store.getCompactionsFinishedCount();
tempCompactionsFailedCount += store.getCompactionsFailedCount();
tempCompactionNumFilesCompacted += store.getCompactionsNumFiles();
tempCompactionNumBytesCompacted += store.getCompactionsNumBytes();
tempCompactionsQueuedCount += store.getCompactionsQueuedCount();
tempNumStoreFiles += store.getStorefilesCount();
int currentStoreRefCount = store.getStoreRefCount();
tempStoreRefCount += currentStoreRefCount;
Expand Down Expand Up @@ -339,6 +355,11 @@ public void run() {
if (tempMaxFlushQueueSize > maxFlushQueueSize) {
maxFlushQueueSize = tempMaxFlushQueueSize;
}
compactionsFinishedCount = tempCompactionsFinishedCount;
compactionsFailedCount = tempCompactionsFailedCount;
compactionNumFilesCompacted = tempCompactionNumFilesCompacted;
compactionNumBytesCompacted = tempCompactionNumBytesCompacted;
compactionsQueuedCount = tempCompactionsQueuedCount;
}
}

Expand All @@ -355,4 +376,9 @@ public int getReplicaId() {
return region.getRegionInfo().getReplicaId();
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
void updateMetricsAtOnce() {
runnable.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,19 @@ public interface Store {
* loaded.
*/
long getBloomFilterEligibleRequestsCount();

/** Returns the number of finished compactions for this store. */
long getCompactionsFinishedCount();

/** Returns the number of failed compactions for this store. */
long getCompactionsFailedCount();

/** Returns the number of files of compactions for this store. */
long getCompactionsNumFiles();

/** Returns the number of bytes of compactions for this store. */
long getCompactionsNumBytes();

/** Returns the number of compactions queued for this store. */
long getCompactionsQueuedCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public void testCompactionFailure() throws Exception {
null);
// wait for the latch to complete.
latch.await(120, TimeUnit.SECONDS);

metricsWrapper.updateMetricsAtOnce();
// compaction should have completed and been marked as failed due to error in split request
long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long postFailedCount = metricsWrapper.getNumCompactionsFailed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2850,6 +2850,36 @@ public void testImmutableMemStoreLABRefCnt() throws Exception {
}
}

@Test
public void testLimitCompactionQueuedCount() throws IOException {
init(this.name.getMethodName());
// Write a store file.
store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
flush(1);
assertEquals(1, store.getStorefilesCount());

// We have 1 file, not need compact
assertFalse(store.needsCompaction());

// We have 3 file, need compact
addStoreFile();
addStoreFile();
store.refreshStoreFiles();
assertTrue(store.needsCompaction());

// A compaction queued, not need more
store.incrementCompactionsQueuedCount();
assertFalse(store.needsCompaction());

// We have 13 file now, then need one compaction more
int storeFileNum = 10;
for (int s = 0; s < storeFileNum; s++) {
addStoreFile();
}
store.refreshStoreFiles();
assertTrue(store.needsCompaction());
}

private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
Expand Down