Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ public interface ColumnFamilyDescriptor {
* and BLOOM type blocks).
*/
boolean isBlockCacheEnabled();

/**
* @return true if we should cache data blocks on read
*/
boolean isCacheDataOnRead();

/**
* @return true if we should cache bloomfilter blocks on write
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public class ColumnFamilyDescriptorBuilder {
public static final String BLOCKCACHE = "BLOCKCACHE";
private static final Bytes BLOCKCACHE_BYTES = new Bytes(Bytes.toBytes(BLOCKCACHE));
@InterfaceAudience.Private
public static final String CACHE_DATA_ON_READ = "CACHE_DATA_ON_READ";
private static final Bytes CACHE_DATA_ON_READ_BYTES = new Bytes(Bytes.toBytes(CACHE_DATA_ON_READ));
@InterfaceAudience.Private
public static final String CACHE_DATA_ON_WRITE = "CACHE_DATA_ON_WRITE";
private static final Bytes CACHE_DATA_ON_WRITE_BYTES = new Bytes(Bytes.toBytes(CACHE_DATA_ON_WRITE));
@InterfaceAudience.Private
Expand Down Expand Up @@ -218,6 +221,12 @@ public class ColumnFamilyDescriptorBuilder {
*/
public static final boolean DEFAULT_BLOCKCACHE = true;

/**
* Default setting for whether to cache data blocks on read if block caching
* is enabled.
*/
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;

/**
* Default setting for whether to cache data blocks on write if block caching
* is enabled.
Expand Down Expand Up @@ -296,6 +305,7 @@ public static Map<String, String> getDefaultValues() {
DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE));
DEFAULT_VALUES.put(IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY));
DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
DEFAULT_VALUES.put(CACHE_DATA_ON_READ, String.valueOf(DEFAULT_CACHE_DATA_ON_READ));
DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED));
DEFAULT_VALUES.put(DATA_BLOCK_ENCODING, String.valueOf(DEFAULT_DATA_BLOCK_ENCODING));
// Do NOT add this key/value by default. NEW_VERSION_BEHAVIOR is NOT defined in hbase1 so
Expand Down Expand Up @@ -433,6 +443,11 @@ public ColumnFamilyDescriptorBuilder setBloomFilterType(final BloomType value) {
return this;
}

public ColumnFamilyDescriptorBuilder setCacheDataOnRead(boolean value) {
desc.setCacheDataOnRead(value);
return this;
}

public ColumnFamilyDescriptorBuilder setCacheBloomsOnWrite(boolean value) {
desc.setCacheBloomsOnWrite(value);
return this;
Expand Down Expand Up @@ -1038,6 +1053,20 @@ public ModifyableColumnFamilyDescriptor setScope(int scope) {
return setValue(REPLICATION_SCOPE_BYTES, Integer.toString(scope));
}

@Override
public boolean isCacheDataOnRead() {
return getStringOrDefault(CACHE_DATA_ON_READ_BYTES, Boolean::valueOf,
DEFAULT_CACHE_DATA_ON_READ);
}

/**
* @param value true if we should cache data blocks on read
* @return this (for chained invocation)
*/
public ModifyableColumnFamilyDescriptor setCacheDataOnRead(boolean value) {
return setValue(CACHE_DATA_ON_READ, Boolean.toString(value));
}

@Override
public boolean isCacheDataOnWrite() {
return getStringOrDefault(CACHE_DATA_ON_WRITE_BYTES, Boolean::valueOf, DEFAULT_CACHE_DATA_ON_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class ReplicationLoadSink {
private final long timestampStarted;
private final long totalOpsProcessed;

// TODO: add the builder for this class
@InterfaceAudience.Private
public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
long totalOpsProcessed) {
Expand All @@ -47,4 +46,42 @@ public long getTimestampStarted() {
public long getTotalOpsProcessed() {
return totalOpsProcessed;
}

public static ReplicationLoadSinkBuilder newBuilder() {
return new ReplicationLoadSinkBuilder();
}

public static final class ReplicationLoadSinkBuilder {
private long ageOfLastAppliedOp;
private long timestampsOfLastAppliedOp;
private long timestampStarted;
private long totalOpsProcessed;

private ReplicationLoadSinkBuilder() {}

public ReplicationLoadSinkBuilder setAgeOfLastAppliedOp(long ageOfLastAppliedOp) {
this.ageOfLastAppliedOp = ageOfLastAppliedOp;
return this;
}

public ReplicationLoadSinkBuilder setTimestampsOfLastAppliedOp(long timestampsOfLastAppliedOp) {
this.timestampsOfLastAppliedOp = timestampsOfLastAppliedOp;
return this;
}

public ReplicationLoadSinkBuilder setTimestampStarted(long timestampStarted) {
this.timestampStarted = timestampStarted;
return this;
}

public ReplicationLoadSinkBuilder setTotalOpsProcessed(long totalOpsProcessed) {
this.totalOpsProcessed = totalOpsProcessed;
return this;
}

public ReplicationLoadSink build() {
return new ReplicationLoadSink(ageOfLastAppliedOp, timestampsOfLastAppliedOp,
timestampStarted, totalOpsProcessed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2836,10 +2836,12 @@ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInpu

public static ReplicationLoadSink toReplicationLoadSink(
ClusterStatusProtos.ReplicationLoadSink rls) {
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
rls.getTimeStampsOfLastAppliedOp(),
rls.hasTimestampStarted()? rls.getTimestampStarted(): -1L,
rls.hasTotalOpsProcessed()? rls.getTotalOpsProcessed(): -1L);
ReplicationLoadSink.ReplicationLoadSinkBuilder builder = ReplicationLoadSink.newBuilder();
builder.setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp()).
setTimestampsOfLastAppliedOp(rls.getTimeStampsOfLastAppliedOp()).
setTimestampStarted(rls.hasTimestampStarted()? rls.getTimestampStarted(): -1L).
setTotalOpsProcessed(rls.hasTotalOpsProcessed()? rls.getTotalOpsProcessed(): -1L);
return builder.build();
}

public static ReplicationLoadSource toReplicationLoadSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ public class CacheConfig {
public static final CacheConfig DISABLED = new CacheConfig();

/**
* Configuration key to cache data blocks on read. Bloom blocks and index blocks are always be
* cached if the block cache is enabled.
* Configuration key to turn on block cache. There are separate switches for read and write.
*/
public static final String BLOCKCACHE_ENABLED = "hbase.block.enabled";

/**
* Configuration key to cache data blocks on read.
*/
public static final String CACHE_DATA_ON_READ_KEY = "hbase.block.data.cacheonread";

Expand Down Expand Up @@ -96,6 +100,7 @@ public class CacheConfig {
"hbase.hfile.drop.behind.compaction";

// Defaults
public static final boolean DEFAULT_BLOCKCACHE_ENABLED = true;
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
public static final boolean DEFAULT_IN_MEMORY = false;
Expand All @@ -114,11 +119,14 @@ public class CacheConfig {
* If off we will STILL cache meta blocks; i.e. INDEX and BLOOM types.
* This cannot be disabled.
*/
private final boolean cacheDataOnRead;
private final boolean blockCacheEnabled;

/** Whether blocks should be flagged as in-memory when being cached */
private final boolean inMemory;

/** Whether data blocks should be cached when new files are read */
private boolean cacheDataOnRead;

/** Whether data blocks should be cached when new files are written */
private boolean cacheDataOnWrite;

Expand Down Expand Up @@ -175,15 +183,17 @@ public CacheConfig(Configuration conf, BlockCache blockCache) {
*/
public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache blockCache,
ByteBuffAllocator byteBuffAllocator) {
this.cacheDataOnRead = conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ) &&
(family == null ? true : family.isBlockCacheEnabled());
this.blockCacheEnabled = conf.getBoolean(BLOCKCACHE_ENABLED, DEFAULT_BLOCKCACHE_ENABLED) &&
(family == null || family.isBlockCacheEnabled());
this.inMemory = family == null ? DEFAULT_IN_MEMORY : family.isInMemory();
this.cacheDataCompressed =
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED);
this.dropBehindCompaction =
conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY, DROP_BEHIND_CACHE_COMPACTION_DEFAULT);
// For the following flags we enable them regardless of per-schema settings
// if they are enabled in the global configuration.
this.cacheDataOnRead = conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ) ||
(family != null && family.isCacheDataOnRead());
this.cacheDataOnWrite =
conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE) ||
(family == null ? false : family.isCacheDataOnWrite());
Expand All @@ -209,8 +219,9 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache
* @param cacheConf
*/
public CacheConfig(CacheConfig cacheConf) {
this.cacheDataOnRead = cacheConf.cacheDataOnRead;
this.blockCacheEnabled = cacheConf.blockCacheEnabled;
this.inMemory = cacheConf.inMemory;
this.cacheDataOnRead = cacheConf.cacheDataOnRead;
this.cacheDataOnWrite = cacheConf.cacheDataOnWrite;
this.cacheIndexesOnWrite = cacheConf.cacheIndexesOnWrite;
this.cacheBloomsOnWrite = cacheConf.cacheBloomsOnWrite;
Expand All @@ -225,8 +236,9 @@ public CacheConfig(CacheConfig cacheConf) {
}

private CacheConfig() {
this.cacheDataOnRead = false;
this.blockCacheEnabled = false;
this.inMemory = false;
this.cacheDataOnRead = false;
this.cacheDataOnWrite = false;
this.cacheIndexesOnWrite = false;
this.cacheBloomsOnWrite = false;
Expand All @@ -245,7 +257,7 @@ private CacheConfig() {
* @return true if blocks should be cached on read, false if not
*/
public boolean shouldCacheDataOnRead() {
return cacheDataOnRead;
return blockCacheEnabled && cacheDataOnRead;
}

public boolean shouldDropBehindCompaction() {
Expand All @@ -258,8 +270,10 @@ public boolean shouldDropBehindCompaction() {
* available.
*/
public boolean shouldCacheBlockOnRead(BlockCategory category) {
return cacheDataOnRead || category == BlockCategory.INDEX || category == BlockCategory.BLOOM ||
(prefetchOnOpen && (category != BlockCategory.META && category != BlockCategory.UNKNOWN));
return blockCacheEnabled &&
(cacheDataOnRead || category == BlockCategory.INDEX || category == BlockCategory.BLOOM ||
category == BlockCategory.META) ||
(prefetchOnOpen && (category != BlockCategory.META && category != BlockCategory.UNKNOWN));
}

/**
Expand All @@ -285,6 +299,14 @@ public void setCacheDataOnWrite(boolean cacheDataOnWrite) {
this.cacheDataOnWrite = cacheDataOnWrite;
}

/**
* @param cacheDataOnRead whether data blocks should be written to the cache
* when an HFile is read.
*/
public void setCacheDataOnRead(boolean cacheDataOnRead) {
this.cacheDataOnRead = cacheDataOnRead;
}

/**
* Enable cache on write including:
* cacheDataOnWrite
Expand Down Expand Up @@ -334,7 +356,7 @@ public void setEvictOnClose(boolean evictOnClose) {
* @return true if data blocks should be compressed in the cache, false if not
*/
public boolean shouldCacheDataCompressed() {
return this.cacheDataOnRead && this.cacheDataCompressed;
return this.blockCacheEnabled && this.cacheDataCompressed;
}

/**
Expand All @@ -343,7 +365,7 @@ public boolean shouldCacheDataCompressed() {
public boolean shouldCacheCompressed(BlockCategory category) {
switch (category) {
case DATA:
return this.cacheDataOnRead && this.cacheDataCompressed;
return this.blockCacheEnabled && this.cacheDataCompressed;
default:
return false;
}
Expand Down Expand Up @@ -371,30 +393,9 @@ public long getCacheCompactedBlocksOnWriteThreshold() {
}
/**
* Return true if we may find this type of block in block cache.
* <p>
* TODO: today {@code family.isBlockCacheEnabled()} only means {@code cacheDataOnRead}, so here we
* consider lots of other configurations such as {@code cacheDataOnWrite}. We should fix this in
* the future, {@code cacheDataOnWrite} should honor the CF level {@code isBlockCacheEnabled}
* configuration.
*/
public boolean shouldReadBlockFromCache(BlockType blockType) {
if (cacheDataOnRead) {
return true;
}
if (prefetchOnOpen) {
return true;
}
if (cacheDataOnWrite) {
return true;
}
if (blockType == null) {
return true;
}
if (blockType.getCategory() == BlockCategory.BLOOM ||
blockType.getCategory() == BlockCategory.INDEX) {
return true;
}
return false;
return blockCacheEnabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ public void testDisableCacheDataBlock() throws IOException {
assertFalse(cacheConfig.shouldCacheBloomsOnWrite());
assertFalse(cacheConfig.shouldCacheIndexesOnWrite());

conf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, false);
cacheConfig = new CacheConfig(conf);
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.DATA));
assertFalse(cacheConfig.shouldCacheCompressed(BlockCategory.DATA));
assertFalse(cacheConfig.shouldCacheDataCompressed());
assertFalse(cacheConfig.shouldCacheDataOnWrite());
assertFalse(cacheConfig.shouldCacheDataOnRead());
assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.INDEX));
assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.META));
assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.BLOOM));
assertFalse(cacheConfig.shouldCacheBloomsOnWrite());
assertFalse(cacheConfig.shouldCacheIndexesOnWrite());

conf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, true);
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, true);
Expand All @@ -221,7 +235,7 @@ public void testDisableCacheDataBlock() throws IOException {
assertTrue(cacheConfig.shouldCacheBloomsOnWrite());
assertTrue(cacheConfig.shouldCacheIndexesOnWrite());

conf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, false);
conf.setBoolean(CacheConfig.BLOCKCACHE_ENABLED, false);
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);

cacheConfig = new CacheConfig(conf);
Expand All @@ -230,13 +244,13 @@ public void testDisableCacheDataBlock() throws IOException {
assertFalse(cacheConfig.shouldCacheDataCompressed());
assertFalse(cacheConfig.shouldCacheDataOnWrite());
assertFalse(cacheConfig.shouldCacheDataOnRead());
assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.INDEX));
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.INDEX));
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.META));
assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.BLOOM));
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.BLOOM));
assertTrue(cacheConfig.shouldCacheBloomsOnWrite());
assertTrue(cacheConfig.shouldCacheIndexesOnWrite());

conf.setBoolean(CacheConfig.CACHE_DATA_ON_READ_KEY, true);
conf.setBoolean(CacheConfig.BLOCKCACHE_ENABLED, true);
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);

ColumnFamilyDescriptor columnFamilyDescriptor =
Expand All @@ -251,9 +265,9 @@ public void testDisableCacheDataBlock() throws IOException {
assertFalse(cacheConfig.shouldCacheDataCompressed());
assertFalse(cacheConfig.shouldCacheDataOnWrite());
assertFalse(cacheConfig.shouldCacheDataOnRead());
assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.INDEX));
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.INDEX));
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.META));
assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.BLOOM));
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.BLOOM));
assertTrue(cacheConfig.shouldCacheBloomsOnWrite());
assertTrue(cacheConfig.shouldCacheIndexesOnWrite());
}
Expand Down