Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
String ROW_READS_ONLY_ON_MEMSTORE_DESC = "Row reads happening completely out of memstore";
String MIXED_ROW_READS = "mixedRowReadsCount";
String MIXED_ROW_READS_ON_STORE_DESC = "Row reads happening out of files and memstore on store";
String STOREFILES_ACCESSED_DAYS_AND_SIZE_TEMPLATE = "storeFilesAccessed%sDaysSize";
String STOREFILES_ACCESSED_DAYS_AND_SIZE_DESC_TEMPLATE = "Store file accessed %s days size.";

/**
* Close the region's metrics as this region is closing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@ void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
this.regionWrapper.getNumReferenceFiles());
mrb.addGauge(Interns.info(regionNamePrefix + MetricsRegionServerSource.STOREFILE_SIZE,
MetricsRegionServerSource.STOREFILE_SIZE_DESC), this.regionWrapper.getStoreFileSize());

Map<Integer, Long> sfAccessDaysAndSize = regionWrapper.getStoreFilesAccessedDaysAndSize();
if (sfAccessDaysAndSize != null) {
for (Map.Entry<Integer, Long> e : sfAccessDaysAndSize.entrySet()) {
mrb.addGauge(Interns.info(
regionNamePrefix + String
.format(MetricsRegionSource.STOREFILES_ACCESSED_DAYS_AND_SIZE_TEMPLATE, e.getKey()),
String.format(MetricsRegionSource.STOREFILES_ACCESSED_DAYS_AND_SIZE_DESC_TEMPLATE,
e.getKey())),
e.getValue());
}
}
mrb.addCounter(
Interns.info(regionNamePrefix + MetricsRegionSource.COMPACTIONS_COMPLETED_COUNT,
MetricsRegionSource.COMPACTIONS_COMPLETED_DESC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,7 @@ public interface MetricsRegionWrapper {
/** Returns the number of row reads on memstore and file per store */
Map<String, Long> getMixedRowReadsCount();

/** Returns the region's store files accessed days and size */
Map<Integer, Long> getStoreFilesAccessedDaysAndSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,18 @@ void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
Interns.info(tableNamePrefix + MetricsRegionServerSource.STATIC_INDEX_SIZE,
MetricsRegionServerSource.STATIC_INDEX_SIZE),
tableWrapperAgg.getStaticIndexSize(tableName.getNameAsString()));
Map<Integer, Long> sfAccessDaysAndSize =
tableWrapperAgg.getStoreFilesAccessedDaysAndSize(tableName.getNameAsString());
if (sfAccessDaysAndSize != null) {
for (Map.Entry<Integer, Long> e : sfAccessDaysAndSize.entrySet()) {
mrb.addGauge(Interns.info(
tableNamePrefix + String
.format(MetricsRegionSource.STOREFILES_ACCESSED_DAYS_AND_SIZE_TEMPLATE, e.getKey()),
String.format(MetricsRegionSource.STOREFILES_ACCESSED_DAYS_AND_SIZE_DESC_TEMPLATE,
e.getKey())),
e.getValue());
}
}
mrb.addCounter(
Interns.info(tableNamePrefix + MetricsRegionServerSource.BLOOM_FILTER_REQUESTS_COUNT,
MetricsRegionServerSource.BLOOM_FILTER_REQUESTS_COUNT_DESC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,7 @@ public interface MetricsTableWrapperAggregate {

/** Returns number of row reads from file and memstore per store for this table */
Map<String, Long> getMixedRowReadsCount(String table);

/** Return the store files accessed days and size for this table */
Map<Integer, Long> getStoreFilesAccessedDaysAndSize(String table);
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,11 @@ public Map<String, Long> getMixedRowReadsCount(String table) {
map.put("table#info", 3L);
return map;
}

@Override
public Map<Integer, Long> getStoreFilesAccessedDaysAndSize(String table) {
Map<Integer, Long> map = new HashMap<>();
map.put(7, 3L);
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,5 +232,12 @@ public Map<String, Long> getMixedRowReadsCount() {
map.put("info", 0L);
return map;
}

@Override
public Map<Integer, Long> getStoreFilesAccessedDaysAndSize() {
Map<Integer, Long> map = new HashMap<>();
map.put(7, 0L);
return map;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -54,6 +55,7 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -211,6 +213,9 @@ public class HStore
private AtomicLong majorCompactedCellsSize = new AtomicLong();

private final StoreContext storeContext;
private long nextSFileAccessTimeLoadTime;
private long sfileAccessTimeLoadInterval;
private Map<String, Pair<Long, Long>> sfAccessTimeAndSizeMap = new HashMap<>();

// Used to track the store files which are currently being written. For compaction, if we want to
// compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
Expand Down Expand Up @@ -320,6 +325,10 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
confPrintThreshold = 10;
}
this.parallelPutCountPrintThreshold = confPrintThreshold;
int initialDelay = conf.getInt("hbase.hstore.hfile.load.initialDelay", 60 * 60 * 1000);
sfileAccessTimeLoadInterval = conf.getLong("hbase.hstore.hfile.load.interval", 60 * 60 * 1000);
nextSFileAccessTimeLoadTime =
EnvironmentEdgeManager.currentTime() + ThreadLocalRandom.current().nextInt(initialDelay);

LOG.info(
"Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
Expand Down Expand Up @@ -1809,6 +1818,35 @@ public OptionalDouble getAvgStoreFileAge() {
return getStoreFileAgeStream().average();
}

@Override
public Map<String, Pair<Long, Long>> getStoreFilesAccessTimeAndSize() {
long now = EnvironmentEdgeManager.currentTime();
if (now < nextSFileAccessTimeLoadTime) {
return sfAccessTimeAndSizeMap;
}
Collection<HStoreFile> storeFiles = this.storeEngine.getStoreFileManager().getStorefiles();
Map<String, Pair<Long, Long>> tmpSFAccessTimeAndSizeMap = new HashMap<>();
for (HStoreFile sf : storeFiles) {
if (sf.getReader() == null) {
continue;
}
FileStatus fileStatus;
try {
fileStatus = sf.getFileInfo().getFileStatus();
} catch (IOException e) {
LOG.warn(e.getMessage());
continue;
}
if (fileStatus != null) {
tmpSFAccessTimeAndSizeMap.put(fileStatus.getPath().getName(),
Pair.newPair(fileStatus.getAccessTime(), fileStatus.getLen()));
}
}
this.sfAccessTimeAndSizeMap = tmpSFAccessTimeAndSizeMap;
nextSFileAccessTimeLoadTime = now + sfileAccessTimeLoadInterval;
return tmpSFAccessTimeAndSizeMap;
}

@Override
public long getNumReferenceFiles() {
return this.storeEngine.getStoreFileManager().getStorefiles().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.util.Map;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics2.MetricsExecutor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -59,15 +61,32 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
private long maxCompactionQueueSize;
private Map<String, Long> readsOnlyFromMemstore;
private Map<String, Long> mixedReadsOnStore;
private Map<Integer, Long> storeFilesAccessedDaysAndSize;

private int[] storeFilesAccessedDaysThresholds;
private ScheduledFuture<?> regionMetricsUpdateTask;

// Count the size of the region's storefiles according to the access time,
// and set the size to the accessed-days interval to expose it to metrics.
// the accessed-days interval default value is "7,30,90", we can change it through
// 'hbase.region.hfile.accessed.days.thresholds'. see HBASE-27483 for details.
public static final long ONE_DAY_MS = 24 * 60 * 60 * 1000;
public static final String STOREFILES_ACCESSED_DAYS_THRESHOLDS =
"hbase.region.hfile.accessed.days.thresholds";
public static final int[] STOREFILES_ACCESSED_DAYS_THRESHOLDS_DEFAULT = { 7, 30, 90 };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add few line of javadoc or some line in README.md how to use this ?

Copy link
Contributor

@taklwu taklwu Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question, if we are using cache / bucket cache heavily, will this metric retain at the time the store file flushes or opened for cache? It’s fine as is without caring the cache , just want to learn more here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in this situation, the metrics will retain at the time the store file flushes or opened for cache, although the cached data of store file is accessed at a new time. I'm not sure whether it is critical issue.


public MetricsRegionWrapperImpl(HRegion region) {
this.region = region;
this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
this.runnable = new HRegionMetricsWrapperRunnable();
this.regionMetricsUpdateTask =
this.executor.scheduleWithFixedDelay(this.runnable, PERIOD, PERIOD, TimeUnit.SECONDS);

storeFilesAccessedDaysThresholds =
region.getReadOnlyConfiguration().getInts(STOREFILES_ACCESSED_DAYS_THRESHOLDS);
if (storeFilesAccessedDaysThresholds == null || storeFilesAccessedDaysThresholds.length == 0) {
storeFilesAccessedDaysThresholds = STOREFILES_ACCESSED_DAYS_THRESHOLDS_DEFAULT;
}
}

@Override
Expand Down Expand Up @@ -243,6 +262,11 @@ public Map<String, Long> getMixedRowReadsCount() {
return mixedReadsOnStore;
}

@Override
public Map<Integer, Long> getStoreFilesAccessedDaysAndSize() {
return storeFilesAccessedDaysAndSize;
}

public class HRegionMetricsWrapperRunnable implements Runnable {

@Override
Expand All @@ -259,8 +283,12 @@ public void run() {
long tempMaxFlushQueueSize = 0;
long avgAgeNumerator = 0;
long numHFiles = 0;
if (region.stores != null) {
for (HStore store : region.stores.values()) {
Map<Integer, Long> tmpStoreFileAccessDaysAndSize = new TreeMap<>();
for (int threshold : storeFilesAccessedDaysThresholds) {
tmpStoreFileAccessDaysAndSize.put(threshold, 0L);
}
if (region.getStores() != null) {
for (HStore store : region.getStores()) {
tempNumStoreFiles += store.getStorefilesCount();
int currentStoreRefCount = store.getStoreRefCount();
tempStoreRefCount += currentStoreRefCount;
Expand Down Expand Up @@ -313,9 +341,24 @@ public void run() {
tempVal += store.getMemstoreOnlyRowReadsCount();
}
readsOnlyFromMemstore.put(store.getColumnFamilyName(), tempVal);

Map<String, Pair<Long, Long>> sfAccessTimeAndSizeMap =
store.getStoreFilesAccessTimeAndSize();
long now = EnvironmentEdgeManager.currentTime();
for (Pair<Long, Long> pair : sfAccessTimeAndSizeMap.values()) {
long accessTime = pair.getFirst();
long size = pair.getSecond();
for (int threshold : storeFilesAccessedDaysThresholds) {
long sumSize = tmpStoreFileAccessDaysAndSize.get(threshold);
if ((now - accessTime) >= threshold * ONE_DAY_MS) {
sumSize = sumSize + size;
tmpStoreFileAccessDaysAndSize.put(threshold, sumSize);
}
}
}
}
}

storeFilesAccessedDaysAndSize = tmpStoreFileAccessDaysAndSize;
numStoreFiles = tempNumStoreFiles;
storeRefCount = tempStoreRefCount;
maxCompactedStoreFileRefCount = tempMaxCompactedStoreFileRefCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.MetricsRegionWrapperImpl.ONE_DAY_MS;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionWrapperImpl.STOREFILES_ACCESSED_DAYS_THRESHOLDS;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionWrapperImpl.STOREFILES_ACCESSED_DAYS_THRESHOLDS_DEFAULT;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics2.MetricsExecutor;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -43,6 +50,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
private ScheduledFuture<?> tableMetricsUpdateTask;
private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap =
new ConcurrentHashMap<>();
private int[] storeFilesAccessedDaysThresholds;

public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
this.regionServer = regionServer;
Expand All @@ -52,6 +60,12 @@ public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
this.runnable = new TableMetricsWrapperRunnable();
this.tableMetricsUpdateTask =
this.executor.scheduleWithFixedDelay(this.runnable, period, period, TimeUnit.MILLISECONDS);

storeFilesAccessedDaysThresholds =
regionServer.getConfiguration().getInts(STOREFILES_ACCESSED_DAYS_THRESHOLDS);
if (storeFilesAccessedDaysThresholds == null || storeFilesAccessedDaysThresholds.length == 0) {
storeFilesAccessedDaysThresholds = STOREFILES_ACCESSED_DAYS_THRESHOLDS_DEFAULT;
}
}

public class TableMetricsWrapperRunnable implements Runnable {
Expand All @@ -64,6 +78,11 @@ public void run() {
MetricsTableValues mt = localMetricsTableMap.get(tbl);
if (mt == null) {
mt = new MetricsTableValues();
Map<Integer, Long> tmpStoreFileAccessDaysAndSize = new TreeMap<>();
for (int threshold : storeFilesAccessedDaysThresholds) {
tmpStoreFileAccessDaysAndSize.put(threshold, 0L);
}
mt.storeFilesAccessedDaysAndSize = tmpStoreFileAccessDaysAndSize;
localMetricsTableMap.put(tbl, mt);
}
long memstoreReadCount = 0L;
Expand Down Expand Up @@ -115,10 +134,23 @@ public void run() {
// accumulate the count
mt.perStoreMemstoreOnlyReadCount.put(tempKey, memstoreReadCount);
mt.perStoreMixedReadCount.put(tempKey, mixedReadCount);
Map<String, Pair<Long, Long>> sfAccessTimeAndSizeMap =
store.getStoreFilesAccessTimeAndSize();
long now = EnvironmentEdgeManager.currentTime();
for (Pair<Long, Long> pair : sfAccessTimeAndSizeMap.values()) {
Long accessTime = pair.getFirst();
Long size = pair.getSecond();
for (int threshold : storeFilesAccessedDaysThresholds) {
Long sumSize = mt.storeFilesAccessedDaysAndSize.get(threshold);
if ((now - accessTime) >= threshold * ONE_DAY_MS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here if the accessTime is very old, we will increase the sumSize for all the thresholds? Is this by design? Seems a bit strange...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 sir.
Yes, this is by design. For instance, if an HFile has not been accessed for 90 days, it is certain that it has also not been accessed for 30 and 7 days.
And what is your suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so the metric here is 'the number if store files which are not accessed by at least xx days'? Then I think we;d better have a better name for it? Looking at 'storeFilesAccessedDays', I can not get the meaning unless you explain it...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sir.

sumSize = sumSize + size;
mt.storeFilesAccessedDaysAndSize.put(threshold, sumSize);
}
}
}
}

mt.regionCount += 1;

mt.readRequestCount += r.getReadRequestsCount();
mt.filteredReadRequestCount += r.getFilteredReadRequestsCount();
mt.writeRequestCount += r.getWriteRequestsCount();
Expand Down Expand Up @@ -178,6 +210,16 @@ public Map<String, Long> getMixedRowReadsCount(String table) {
}
}

@Override
public Map<Integer, Long> getStoreFilesAccessedDaysAndSize(String table) {
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
if (metricsTable == null) {
return null;
} else {
return metricsTable.storeFilesAccessedDaysAndSize;
}
}

@Override
public long getCpRequestsCount(String table) {
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
Expand Down Expand Up @@ -419,6 +461,7 @@ private static class MetricsTableValues {
long cpRequestCount;
Map<String, Long> perStoreMemstoreOnlyReadCount = new HashMap<>();
Map<String, Long> perStoreMixedReadCount = new HashMap<>();
Map<Integer, Long> storeFilesAccessedDaysAndSize = new TreeMap<>();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -28,6 +29,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down Expand Up @@ -252,4 +254,7 @@ public interface Store {
* loaded.
*/
long getBloomFilterEligibleRequestsCount();

/** Returns Access time and size of store files in this store */
Map<String, Pair<Long, Long>> getStoreFilesAccessTimeAndSize();
}
Loading