Skip to content

Commit 6fdffaf

Browse files
committed
HBASE-29427 Merge all commits related to custom tiering into the feature branch (#7124)
This is the whole custom tiering implementation and involves the following individual works: * HBASE-29412 Extend date tiered compaction to allow for tiering by values other than cell timestamp * HBASE-29413 Implement a custom qualifier tiered compaction * HBASE-29414 Refactor DataTieringManager to make priority logic pluggable * HBASE-29422 Implement selectMinorCompation in CustomCellDateTieredCompactionPolicy * HBASE-29424 Implement configuration validation for custom tiering compactions * HBASE-29425 Refine and polish code * HBASE-29426 Define a tiering value provider and refactor custom tiered compaction related classes * HBASE-28463 Rebase time based priority branch (HBASE-28463) with latest master (and fix conflicts) Co-authored-by: Janardhan Hungund <[email protected]> Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
1 parent dc71f85 commit 6fdffaf

33 files changed

+2184
-159
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ public final class TagType {
3636
// String based tag type used in replication
3737
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
3838
public static final byte TTL_TAG_TYPE = (byte) 8;
39+
// tag with the custom cell tiering value for the row
40+
public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9;
3941
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.fs.Path;
2525
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
26-
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
2726
import org.apache.hadoop.hbase.util.Pair;
2827
import org.apache.yetus.audience.InterfaceAudience;
2928

@@ -214,13 +213,13 @@ default Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration con
214213
* not be overridden by all implementing classes. In such cases, the returned Optional will be
215214
* empty. For subclasses implementing this logic, the returned Optional would contain the boolean
216215
* value reflecting if the passed block should indeed be cached.
217-
* @param key The key representing the block to check if it should be cached.
218-
* @param timeRangeTracker the time range tracker containing the timestamps
219-
* @param conf The configuration object to use for determining caching behavior.
216+
* @param key The key representing the block to check if it should be cached.
217+
* @param maxTimeStamp The maximum timestamp for the block to check if it should be cached.
218+
* @param conf The configuration object to use for determining caching behavior.
220219
* @return An empty Optional if this method is not supported; otherwise, the returned Optional
221220
* contains the boolean value indicating if the block should be cached.
222221
*/
223-
default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
222+
default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp,
224223
Configuration conf) {
225224
return Optional.empty();
226225
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ public boolean shouldCacheBlockOnRead(BlockCategory category) {
282282
public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo hFileInfo,
283283
Configuration conf) {
284284
Optional<Boolean> cacheFileBlock = Optional.of(true);
285-
if (getBlockCache().isPresent()) {
285+
// For DATA blocks only, if BuckeCache is in use, we don't need to cache block again
286+
if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) {
286287
Optional<Boolean> result = getBlockCache().get().shouldCacheFile(hFileInfo, conf);
287288
if (result.isPresent()) {
288289
cacheFileBlock = result;

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hadoop.fs.Path;
2727
import org.apache.hadoop.hbase.io.HeapSize;
2828
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
29-
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
3029
import org.apache.hadoop.hbase.util.Pair;
3130
import org.apache.yetus.audience.InterfaceAudience;
3231
import org.slf4j.Logger;
@@ -494,10 +493,10 @@ public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf
494493
}
495494

496495
@Override
497-
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
496+
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp,
498497
Configuration conf) {
499-
return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker, conf),
500-
l2Cache.shouldCacheBlock(key, timeRangeTracker, conf));
498+
return combineCacheResults(l1Cache.shouldCacheBlock(key, maxTimeStamp, conf),
499+
l2Cache.shouldCacheBlock(key, maxTimeStamp, conf));
501500
}
502501

503502
private Optional<Boolean> combineCacheResults(Optional<Boolean> result1,

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.hadoop.hbase.ipc.RpcServer;
4444
import org.apache.hadoop.hbase.regionserver.CellSink;
4545
import org.apache.hadoop.hbase.regionserver.ShipperListener;
46+
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
4647
import org.apache.hadoop.hbase.util.BloomFilterWriter;
4748
import org.apache.hadoop.hbase.util.Bytes;
4849
import org.apache.hadoop.hbase.util.FSUtils;
@@ -217,6 +218,12 @@ public interface Writer extends Closeable, CellSink, ShipperListener {
217218
*/
218219
void appendTrackedTimestampsToMetadata() throws IOException;
219220

221+
/**
222+
* Add Custom cell timestamp to Metadata
223+
*/
224+
public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
225+
throws IOException;
226+
220227
/** Returns the path to this {@link HFile} */
221228
Path getPath();
222229

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
13881388
cacheConf.getBlockCache().ifPresent(cache -> {
13891389
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
13901390
// Cache the block if necessary
1391-
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1391+
if (cacheBlock && cacheOnRead) {
13921392
cache.cacheBlock(cacheKey, blockNoChecksum, cacheConf.isInMemory(), cacheOnly);
13931393
}
13941394
});
@@ -1402,7 +1402,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
14021402
HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
14031403
// Cache the block if necessary
14041404
cacheConf.getBlockCache().ifPresent(cache -> {
1405-
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1405+
if (cacheBlock && cacheOnRead) {
14061406
// Using the wait on cache during compaction and prefetching.
14071407
cache.cacheBlock(cacheKey,
14081408
cacheCompressed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.hfile;
1919

2020
import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
21+
import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
2122
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
2223
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
2324

@@ -29,6 +30,7 @@
2930
import java.util.ArrayList;
3031
import java.util.List;
3132
import java.util.Optional;
33+
import java.util.function.Supplier;
3234
import org.apache.hadoop.conf.Configuration;
3335
import org.apache.hadoop.fs.FSDataOutputStream;
3436
import org.apache.hadoop.fs.FileSystem;
@@ -127,6 +129,12 @@ public class HFileWriterImpl implements HFile.Writer {
127129
/** Cache configuration for caching data on write. */
128130
protected final CacheConfig cacheConf;
129131

132+
public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) {
133+
this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
134+
}
135+
136+
private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;
137+
130138
/**
131139
* Name for this object used when logging or in toString. Is either the result of a toString on
132140
* stream or else name of passed file Path.
@@ -186,7 +194,9 @@ public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path pat
186194
this.path = path;
187195
this.name = path != null ? path.getName() : outputStream.toString();
188196
this.hFileContext = fileContext;
197+
// TODO: Move this back to upper layer
189198
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
199+
this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
190200
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
191201
if (encoding != DataBlockEncoding.NONE) {
192202
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
@@ -588,7 +598,8 @@ private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
588598
}
589599

590600
private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
591-
Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, conf);
601+
Optional<Boolean> result =
602+
cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf);
592603
return result.orElse(true);
593604
}
594605

@@ -899,12 +910,19 @@ public void appendTrackedTimestampsToMetadata() throws IOException {
899910
appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
900911
}
901912

913+
public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
914+
throws IOException {
915+
// TODO: The StoreFileReader always converts the byte[] to TimeRange
916+
// via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
917+
appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker));
918+
}
919+
902920
/**
903921
* Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
904922
* to include the timestamp of this key
905923
*/
906924
private void trackTimestamps(final ExtendedCell cell) {
907-
if (Cell.Type.Put == cell.getType()) {
925+
if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) {
908926
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
909927
}
910928
timeRangeTracker.includeTimestamp(cell);

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
8787
import org.apache.hadoop.hbase.regionserver.HRegion;
8888
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
89-
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
9089
import org.apache.hadoop.hbase.util.Bytes;
9190
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
9291
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -1195,8 +1194,9 @@ void freeSpace(final String why) {
11951194
}
11961195
}
11971196

1198-
if (bytesFreed < bytesToFreeWithExtra &&
1199-
coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
1197+
if (
1198+
bytesFreed < bytesToFreeWithExtra && coldFiles != null
1199+
&& coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
12001200
) {
12011201
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
12021202
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
@@ -2458,10 +2458,10 @@ public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf
24582458
}
24592459

24602460
@Override
2461-
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
2461+
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp,
24622462
Configuration conf) {
24632463
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
2464-
if (dataTieringManager != null && !dataTieringManager.isHotData(timeRangeTracker, conf)) {
2464+
if (dataTieringManager != null && !dataTieringManager.isHotData(maxTimestamp, conf)) {
24652465
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data",
24662466
key.getHfileName());
24672467
return Optional.of(false);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
3838
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
3939
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
40+
import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
4041
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
4142
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
4243
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
@@ -314,6 +315,8 @@ private boolean prepareCreate(final MasterProcedureEnv env) throws IOException {
314315
StoreFileTrackerValidationUtils.checkForCreateTable(env.getMasterConfiguration(),
315316
tableDescriptor);
316317

318+
CustomCellTieredUtils.checkForModifyTable(tableDescriptor);
319+
317320
return true;
318321
}
319322

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
4141
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
4242
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
43+
import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
4344
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
4445
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
4546
import org.apache.hadoop.hbase.util.Bytes;
@@ -420,6 +421,7 @@ private void prepareModify(final MasterProcedureEnv env) throws IOException {
420421
// check for store file tracker configurations
421422
StoreFileTrackerValidationUtils.checkForModifyTable(env.getMasterConfiguration(),
422423
unmodifiedTableDescriptor, modifiedTableDescriptor, !isTableEnabled(env));
424+
CustomCellTieredUtils.checkForModifyTable(modifiedTableDescriptor);
423425
}
424426

425427
/**

0 commit comments

Comments
 (0)