Skip to content

Commit 7c6370c

Browse files
committed
HBASE-29427 Merge all commits related to custom tiering into the feature branch (apache#7124)
This is the whole custom tiering implementation and involves the following individual works: * HBASE-29412 Extend date tiered compaction to allow for tiering by values other than cell timestamp * HBASE-29413 Implement a custom qualifier tiered compaction * HBASE-29414 Refactor DataTieringManager to make priority logic pluggable * HBASE-29422 Implement selectMinorCompation in CustomCellDateTieredCompactionPolicy * HBASE-29424 Implement configuration validation for custom tiering compactions * HBASE-29425 Refine and polish code * HBASE-29426 Define a tiering value provider and refactor custom tiered compaction related classes * HBASE-28463 Rebase time based priority branch (HBASE-28463) with latest master (and fix conflicts) Co-authored-by: Janardhan Hungund <[email protected]> Signed-off-by: Tak Lon (Stephen) Wu <[email protected]> Change-Id: Ib10409b23a8cb735af5210e5ae4fc843b04b2d10
1 parent f5f8481 commit 7c6370c

35 files changed

+2187
-163
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ public final class TagType {
3636
// String based tag type used in replication
3737
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
3838
public static final byte TTL_TAG_TYPE = (byte) 8;
39+
// tag with the custom cell tiering value for the row
40+
public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9;
3941
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.hadoop.fs.Path;
2525
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
26-
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
2726
import org.apache.hadoop.hbase.util.Pair;
2827
import org.apache.yetus.audience.InterfaceAudience;
2928

@@ -199,13 +198,13 @@ default Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration con
199198
* not be overridden by all implementing classes. In such cases, the returned Optional will be
200199
* empty. For subclasses implementing this logic, the returned Optional would contain the boolean
201200
* value reflecting if the passed block should indeed be cached.
202-
* @param key The key representing the block to check if it should be cached.
203-
* @param timeRangeTracker the time range tracker containing the timestamps
204-
* @param conf The configuration object to use for determining caching behavior.
201+
* @param key The key representing the block to check if it should be cached.
202+
* @param maxTimeStamp The maximum timestamp for the block to check if it should be cached.
203+
* @param conf The configuration object to use for determining caching behavior.
205204
* @return An empty Optional if this method is not supported; otherwise, the returned Optional
206205
* contains the boolean value indicating if the block should be cached.
207206
*/
208-
default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
207+
default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp,
209208
Configuration conf) {
210209
return Optional.empty();
211210
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ public boolean shouldCacheBlockOnRead(BlockCategory category) {
282282
public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo hFileInfo,
283283
Configuration conf) {
284284
Optional<Boolean> cacheFileBlock = Optional.of(true);
285-
if (getBlockCache().isPresent()) {
285+
// For DATA blocks only, if BuckeCache is in use, we don't need to cache block again
286+
if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) {
286287
Optional<Boolean> result = getBlockCache().get().shouldCacheFile(hFileInfo, conf);
287288
if (result.isPresent()) {
288289
cacheFileBlock = result;

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hadoop.fs.Path;
2727
import org.apache.hadoop.hbase.io.HeapSize;
2828
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
29-
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
3029
import org.apache.hadoop.hbase.util.Pair;
3130
import org.apache.yetus.audience.InterfaceAudience;
3231
import org.slf4j.Logger;
@@ -484,10 +483,10 @@ public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf
484483
}
485484

486485
@Override
487-
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
486+
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp,
488487
Configuration conf) {
489-
return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker, conf),
490-
l2Cache.shouldCacheBlock(key, timeRangeTracker, conf));
488+
return combineCacheResults(l1Cache.shouldCacheBlock(key, maxTimeStamp, conf),
489+
l2Cache.shouldCacheBlock(key, maxTimeStamp, conf));
491490
}
492491

493492
private Optional<Boolean> combineCacheResults(Optional<Boolean> result1,

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.hadoop.hbase.ipc.RpcServer;
4444
import org.apache.hadoop.hbase.regionserver.CellSink;
4545
import org.apache.hadoop.hbase.regionserver.ShipperListener;
46+
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
4647
import org.apache.hadoop.hbase.util.BloomFilterWriter;
4748
import org.apache.hadoop.hbase.util.Bytes;
4849
import org.apache.hadoop.hbase.util.FSUtils;
@@ -217,6 +218,12 @@ public interface Writer extends Closeable, CellSink, ShipperListener {
217218
*/
218219
void appendTrackedTimestampsToMetadata() throws IOException;
219220

221+
/**
222+
* Add Custom cell timestamp to Metadata
223+
*/
224+
public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
225+
throws IOException;
226+
220227
/** Returns the path to this {@link HFile} */
221228
Path getPath();
222229

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,7 +1397,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
13971397
cacheConf.getBlockCache().ifPresent(cache -> {
13981398
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
13991399
// Cache the block if necessary
1400-
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1400+
if (cacheBlock && cacheOnRead) {
14011401
cache.cacheBlock(cacheKey, blockNoChecksum, cacheConf.isInMemory(), cacheOnly);
14021402
}
14031403
});
@@ -1411,7 +1411,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
14111411
HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
14121412
// Cache the block if necessary
14131413
cacheConf.getBlockCache().ifPresent(cache -> {
1414-
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1414+
if (cacheBlock && cacheOnRead) {
14151415
// Using the wait on cache during compaction and prefetching.
14161416
cache.cacheBlock(cacheKey,
14171417
cacheCompressed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.hfile;
1919

2020
import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
21+
import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
2122
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
2223
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
2324

@@ -29,6 +30,7 @@
2930
import java.util.ArrayList;
3031
import java.util.List;
3132
import java.util.Optional;
33+
import java.util.function.Supplier;
3234
import org.apache.hadoop.conf.Configuration;
3335
import org.apache.hadoop.fs.FSDataOutputStream;
3436
import org.apache.hadoop.fs.FileSystem;
@@ -127,6 +129,12 @@ public class HFileWriterImpl implements HFile.Writer {
127129
/** Cache configuration for caching data on write. */
128130
protected final CacheConfig cacheConf;
129131

132+
public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) {
133+
this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
134+
}
135+
136+
private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;
137+
130138
/**
131139
* Name for this object used when logging or in toString. Is either the result of a toString on
132140
* stream or else name of passed file Path.
@@ -186,7 +194,9 @@ public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path pat
186194
this.path = path;
187195
this.name = path != null ? path.getName() : outputStream.toString();
188196
this.hFileContext = fileContext;
197+
// TODO: Move this back to upper layer
189198
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
199+
this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
190200
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
191201
if (encoding != DataBlockEncoding.NONE) {
192202
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
@@ -588,7 +598,8 @@ private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
588598
}
589599

590600
private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
591-
Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, conf);
601+
Optional<Boolean> result =
602+
cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf);
592603
return result.orElse(true);
593604
}
594605

@@ -899,12 +910,19 @@ public void appendTrackedTimestampsToMetadata() throws IOException {
899910
appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
900911
}
901912

913+
public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
914+
throws IOException {
915+
// TODO: The StoreFileReader always converts the byte[] to TimeRange
916+
// via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
917+
appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker));
918+
}
919+
902920
/**
903921
* Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
904922
* to include the timestamp of this key
905923
*/
906924
private void trackTimestamps(final ExtendedCell cell) {
907-
if (Cell.Type.Put == cell.getType()) {
925+
if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) {
908926
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
909927
}
910928
timeRangeTracker.includeTimestamp(cell);

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
8787
import org.apache.hadoop.hbase.regionserver.HRegion;
8888
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
89-
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
9089
import org.apache.hadoop.hbase.util.Bytes;
9190
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
9291
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -1182,8 +1181,9 @@ void freeSpace(final String why) {
11821181
}
11831182
}
11841183

1185-
if (bytesFreed < bytesToFreeWithExtra &&
1186-
coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
1184+
if (
1185+
bytesFreed < bytesToFreeWithExtra && coldFiles != null
1186+
&& coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
11871187
) {
11881188
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
11891189
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
@@ -2446,10 +2446,10 @@ public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf
24462446
}
24472447

24482448
@Override
2449-
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
2449+
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp,
24502450
Configuration conf) {
24512451
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
2452-
if (dataTieringManager != null && !dataTieringManager.isHotData(timeRangeTracker, conf)) {
2452+
if (dataTieringManager != null && !dataTieringManager.isHotData(maxTimestamp, conf)) {
24532453
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data",
24542454
key.getHfileName());
24552455
return Optional.of(false);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
3737
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
3838
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
39+
import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
3940
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
4041
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
4142
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -296,6 +297,8 @@ private boolean prepareCreate(final MasterProcedureEnv env) throws IOException {
296297
StoreFileTrackerValidationUtils.checkForCreateTable(env.getMasterConfiguration(),
297298
tableDescriptor);
298299

300+
CustomCellTieredUtils.checkForModifyTable(tableDescriptor);
301+
299302
return true;
300303
}
301304

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
4040
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
4141
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
42+
import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
4243
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
4344
import org.apache.hadoop.hbase.replication.ReplicationException;
4445
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
@@ -449,6 +450,7 @@ private void prepareModify(final MasterProcedureEnv env) throws IOException {
449450
// check for store file tracker configurations
450451
StoreFileTrackerValidationUtils.checkForModifyTable(env.getMasterConfiguration(),
451452
unmodifiedTableDescriptor, modifiedTableDescriptor, !isTableEnabled(env));
453+
CustomCellTieredUtils.checkForModifyTable(modifiedTableDescriptor);
452454
}
453455

454456
/**

0 commit comments

Comments
 (0)