Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,10 @@ default String getNameAsString() {

/** Returns the compaction state of this region */
CompactionState getCompactionState();

/** Returns the total size of the hfiles in the region */
Size getRegionSizeMB();

/** Returns current prefetch ratio of this region on this server */
float getCurrentRegionCachedRatio();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public static RegionMetrics toRegionMetrics(ClusterStatusProtos.RegionLoad regio
ClusterStatusProtos.StoreSequenceId::getSequenceId)))
.setUncompressedStoreFileSize(
new Size(regionLoadPB.getStoreUncompressedSizeMB(), Size.Unit.MEGABYTE))
.build();
.setRegionSizeMB(new Size(regionLoadPB.getRegionSizeMB(), Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionLoadPB.getCurrentRegionCachedRatio()).build();
}

private static List<ClusterStatusProtos.StoreSequenceId>
Expand Down Expand Up @@ -120,7 +121,8 @@ public static ClusterStatusProtos.RegionLoad toRegionLoad(RegionMetrics regionMe
.addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId()))
.setStoreUncompressedSizeMB(
(int) regionMetrics.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE))
.build();
.setRegionSizeMB((int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionMetrics.getCurrentRegionCachedRatio()).build();
}

public static RegionMetricsBuilder newBuilder(byte[] name) {
Expand Down Expand Up @@ -154,6 +156,8 @@ public static RegionMetricsBuilder newBuilder(byte[] name) {
private long blocksLocalWithSsdWeight;
private long blocksTotalWeight;
private CompactionState compactionState;
private Size regionSizeMB = Size.ZERO;
private float currentRegionCachedRatio;

private RegionMetricsBuilder(byte[] name) {
this.name = name;
Expand Down Expand Up @@ -289,14 +293,24 @@ public RegionMetricsBuilder setCompactionState(CompactionState compactionState)
return this;
}

public RegionMetricsBuilder setRegionSizeMB(Size value) {
this.regionSizeMB = value;
return this;
}

public RegionMetricsBuilder setCurrentRegionCachedRatio(float value) {
this.currentRegionCachedRatio = value;
return this;
}

public RegionMetrics build() {
return new RegionMetricsImpl(name, storeCount, storeFileCount, storeRefCount,
maxCompactedStoreFileRefCount, compactingCellCount, compactedCellCount, storeFileSize,
memStoreSize, indexSize, rootLevelIndexSize, uncompressedDataIndexSize, bloomFilterSize,
uncompressedStoreFileSize, writeRequestCount, readRequestCount, cpRequestCount,
filteredReadRequestCount, completedSequenceId, storeSequenceIds, dataLocality,
lastMajorCompactionTimestamp, dataLocalityForSsd, blocksLocalWeight, blocksLocalWithSsdWeight,
blocksTotalWeight, compactionState);
blocksTotalWeight, compactionState, regionSizeMB, currentRegionCachedRatio);
}

private static class RegionMetricsImpl implements RegionMetrics {
Expand Down Expand Up @@ -327,6 +341,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
private final long blocksLocalWithSsdWeight;
private final long blocksTotalWeight;
private final CompactionState compactionState;
private final Size regionSizeMB;
private final float currentRegionCachedRatio;

RegionMetricsImpl(byte[] name, int storeCount, int storeFileCount, int storeRefCount,
int maxCompactedStoreFileRefCount, final long compactingCellCount, long compactedCellCount,
Expand All @@ -336,7 +352,7 @@ private static class RegionMetricsImpl implements RegionMetrics {
long filteredReadRequestCount, long completedSequenceId, Map<byte[], Long> storeSequenceIds,
float dataLocality, long lastMajorCompactionTimestamp, float dataLocalityForSsd,
long blocksLocalWeight, long blocksLocalWithSsdWeight, long blocksTotalWeight,
CompactionState compactionState) {
CompactionState compactionState, Size regionSizeMB, float currentRegionCachedRatio) {
this.name = Preconditions.checkNotNull(name);
this.storeCount = storeCount;
this.storeFileCount = storeFileCount;
Expand Down Expand Up @@ -364,6 +380,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
this.blocksLocalWithSsdWeight = blocksLocalWithSsdWeight;
this.blocksTotalWeight = blocksTotalWeight;
this.compactionState = compactionState;
this.regionSizeMB = regionSizeMB;
this.currentRegionCachedRatio = currentRegionCachedRatio;
}

@Override
Expand Down Expand Up @@ -501,6 +519,16 @@ public CompactionState getCompactionState() {
return compactionState;
}

@Override
public Size getRegionSizeMB() {
return regionSizeMB;
}

@Override
public float getCurrentRegionCachedRatio() {
return currentRegionCachedRatio;
}

@Override
public String toString() {
StringBuilder sb =
Expand Down Expand Up @@ -541,6 +569,8 @@ public String toString() {
Strings.appendKeyValue(sb, "blocksLocalWithSsdWeight", blocksLocalWithSsdWeight);
Strings.appendKeyValue(sb, "blocksTotalWeight", blocksTotalWeight);
Strings.appendKeyValue(sb, "compactionState", compactionState);
Strings.appendKeyValue(sb, "regionSizeMB", regionSizeMB);
Strings.appendKeyValue(sb, "currentRegionCachedRatio", currentRegionCachedRatio);
return sb.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,10 @@ default String getVersion() {
@Nullable
List<ServerTask> getTasks();

/**
* Returns the region cache information for the regions hosted on this server
* @return map of region encoded name and the size of the region cached on this region server
* rounded to MB
*/
Map<String, Integer> getRegionCachedInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu
: null)
.setTasks(serverLoadPB.getTasksList().stream().map(ProtobufUtil::getServerTask)
.collect(Collectors.toList()))
.setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap())
.setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
.setVersion(version).build();
Expand All @@ -111,6 +112,7 @@ public static ClusterStatusProtos.ServerLoad toServerLoad(ServerMetrics metrics)
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.addAllTasks(
metrics.getTasks().stream().map(ProtobufUtil::toServerTask).collect(Collectors.toList()))
.putAllRegionCachedInfo(metrics.getRegionCachedInfo())
.setReportStartTime(metrics.getLastReportTimestamp())
.setReportEndTime(metrics.getReportTimestamp());
if (metrics.getReplicationLoadSink() != null) {
Expand Down Expand Up @@ -142,6 +144,7 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) {
private long reportTimestamp = EnvironmentEdgeManager.currentTime();
private long lastReportTimestamp = 0;
private final List<ServerTask> tasks = new ArrayList<>();
private Map<String, Integer> regionCachedInfo = new HashMap<>();

private ServerMetricsBuilder(ServerName serverName) {
this.serverName = serverName;
Expand Down Expand Up @@ -232,11 +235,16 @@ public ServerMetricsBuilder setTasks(List<ServerTask> tasks) {
return this;
}

public ServerMetricsBuilder setRegionCachedInfo(Map<String, Integer> value) {
this.regionCachedInfo = value;
return this;
}

public ServerMetrics build() {
return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond,
requestCount, readRequestCount, writeRequestCount, usedHeapSize, maxHeapSize, infoServerPort,
sources, sink, regionStatus, coprocessorNames, reportTimestamp, lastReportTimestamp,
userMetrics, tasks);
userMetrics, tasks, regionCachedInfo);
}

private static class ServerMetricsImpl implements ServerMetrics {
Expand All @@ -259,13 +267,15 @@ private static class ServerMetricsImpl implements ServerMetrics {
private final long lastReportTimestamp;
private final Map<byte[], UserMetrics> userMetrics;
private final List<ServerTask> tasks;
private final Map<String, Integer> regionCachedInfo;

ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, long readRequestsCount,
long writeRequestsCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort,
List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks) {
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks,
Map<String, Integer> regionCachedInfo) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
Expand All @@ -284,6 +294,7 @@ private static class ServerMetricsImpl implements ServerMetrics {
this.reportTimestamp = reportTimestamp;
this.lastReportTimestamp = lastReportTimestamp;
this.tasks = tasks;
this.regionCachedInfo = regionCachedInfo;
}

@Override
Expand Down Expand Up @@ -386,6 +397,11 @@ public List<ServerTask> getTasks() {
return tasks;
}

@Override
public Map<String, Integer> getRegionCachedInfo() {
return Collections.unmodifiableMap(regionCachedInfo);
}

@Override
public String toString() {
int storeCount = 0;
Expand Down
36 changes: 0 additions & 36 deletions hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto

This file was deleted.

11 changes: 11 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ message RegionLoad {
MAJOR = 2;
MAJOR_AND_MINOR = 3;
}

/** Total region size in MB */
optional uint32 region_size_MB = 28;

/** Current region cache ratio on this server */
optional float current_region_cached_ratio = 29;
}

message UserLoad {
Expand Down Expand Up @@ -315,6 +321,11 @@ message ServerLoad {
* The active monitored tasks
*/
repeated ServerTask tasks = 15;

/**
* The metrics for region cached on this region server
*/
map<string, uint32> regionCachedInfo = 16;
}

message LiveServerInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ message BucketCacheEntry {
map<int32, string> deserializers = 4;
required BackingMap backing_map = 5;
optional bytes checksum = 6;
map<string, bool> prefetched_files = 7;
map<string, RegionFileSizeMap> cached_files = 7;
}

message BackingMap {
Expand Down Expand Up @@ -81,3 +81,9 @@ enum BlockPriority {
multi = 1;
memory = 2;
}

message RegionFileSizeMap {
required string region_name = 1;
required uint64 region_cached_size = 2;
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -167,7 +168,7 @@ default boolean isMetaBlock(BlockType blockType) {
/**
* Returns the list of fully cached files
*/
default Optional<Map<String, Boolean>> getFullyCachedFiles() {
default Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -431,7 +432,7 @@ public BlockCache[] getBlockCaches() {
* Returns the list of fully cached files
*/
@Override
public Optional<Map<String, Boolean>> getFullyCachedFiles() {
public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
return this.l2Cache.getFullyCachedFiles();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public void run() {
block.release();
}
}
bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path.getName()));
final long fileSize = offset;
bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path, fileSize));
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
Expand All @@ -132,7 +133,6 @@ public void run() {
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
}
}
String regionName = getRegionName(path);
PrefetchExecutor.complete(path);
}
}
Expand Down

This file was deleted.

Loading