diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 817f9e2d4b1b..69871d9dc2fc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -316,4 +316,12 @@ default boolean matchReplicationScope(boolean enabled) { } return !enabled; } + + /** + * Checks whether row caching is enabled for this table. Note that row caching is applied at the + * entire row level, not at the column family level. + * @return {@code true} if row caching is enabled, {@code false} if disabled, or {@code null} if + * not explicitly set + */ + Boolean getRowCacheEnabled(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 8636b006e83d..99692b36fa4f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -227,6 +227,15 @@ public class TableDescriptorBuilder { private final static Map DEFAULT_VALUES = new HashMap<>(); private final static Set RESERVED_KEYWORDS = new HashSet<>(); + /** + * Used by HBase Shell interface to access this metadata attribute which denotes if the row cache + * is enabled. + */ + @InterfaceAudience.Private + public static final String ROW_CACHE_ENABLED = "ROW_CACHE_ENABLED"; + private static final Bytes ROW_CACHE_ENABLED_KEY = new Bytes(Bytes.toBytes(ROW_CACHE_ENABLED)); + private static final boolean DEFAULT_ROW_CACHE_ENABLED = false; + static { DEFAULT_VALUES.put(MAX_FILESIZE, String.valueOf(HConstants.DEFAULT_MAX_FILE_SIZE)); DEFAULT_VALUES.put(READONLY, String.valueOf(DEFAULT_READONLY)); @@ -236,6 +245,7 @@ public class TableDescriptorBuilder { DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY)); // Setting ERASURE_CODING_POLICY to NULL so that it is not considered as metadata DEFAULT_VALUES.put(ERASURE_CODING_POLICY, String.valueOf(DEFAULT_ERASURE_CODING_POLICY)); + DEFAULT_VALUES.put(ROW_CACHE_ENABLED, String.valueOf(DEFAULT_ROW_CACHE_ENABLED)); DEFAULT_VALUES.keySet().stream().map(s -> new Bytes(Bytes.toBytes(s))) .forEach(RESERVED_KEYWORDS::add); RESERVED_KEYWORDS.add(IS_META_KEY); @@ -565,6 +575,11 @@ public TableDescriptor build() { return new ModifyableTableDescriptor(desc); } + public TableDescriptorBuilder setRowCacheEnabled(boolean rowCacheEnabled) { + desc.setRowCacheEnabled(rowCacheEnabled); + return this; + } + private static final class ModifyableTableDescriptor implements TableDescriptor, Comparable { @@ -1510,6 +1525,16 @@ public Optional getRegionServerGroup() { return Optional.empty(); } } + + @Override + public Boolean getRowCacheEnabled() { + Bytes value = getValue(ROW_CACHE_ENABLED_KEY); + return value == null ? null : Boolean.valueOf(Bytes.toString(value.get())); + } + + public ModifyableTableDescriptor setRowCacheEnabled(boolean enabled) { + return setValue(ROW_CACHE_ENABLED_KEY, Boolean.toString(enabled)); + } } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1051686d32e8..e3bb13e6ac8f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1017,6 +1017,24 @@ public enum OperationStatusCode { public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f; + /** + * Configuration key for the size of the row cache + */ + public static final String ROW_CACHE_SIZE_KEY = "row.cache.size"; + public static final float ROW_CACHE_SIZE_DEFAULT = 0.0f; + + /** + * Configuration key for the evict the row cache on close + */ + public static final String ROW_CACHE_EVICT_ON_CLOSE_KEY = "row.cache.evictOnClose"; + public static final boolean ROW_CACHE_EVICT_ON_CLOSE_DEFAULT = false; + + /** + * Configuration key for the row cache enabled + */ + public static final String ROW_CACHE_ENABLED_KEY = "row.cache.enabled"; + public static final boolean ROW_CACHE_ENABLED_DEFAULT = false; + /** * Configuration key for the memory size of the block cache */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 5ec39fa5803d..54505dfce955 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -547,8 +547,8 @@ public void setTimestamp(byte[] ts) throws IOException { @Override public ExtendedCell deepClone() { - // This is not used in actual flow. Throwing UnsupportedOperationException - throw new UnsupportedOperationException(); + // To garbage collect the objects referenced by this cell, we need to deep clone it + return ExtendedCell.super.deepClone(); } } @@ -796,8 +796,8 @@ public void write(ByteBuffer buf, int offset) { @Override public ExtendedCell deepClone() { - // This is not used in actual flow. Throwing UnsupportedOperationException - throw new UnsupportedOperationException(); + // To cache row, we need to deep clone it + return super.deepClone(); } } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index d015ceb2d457..5e97b9541eba 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1027,6 +1027,12 @@ possible configurations would overwhelm and obscure the important. This configuration allows setting an absolute memory size instead of a percentage of the maximum heap. Takes precedence over hfile.block.cache.size if both are specified. + + row.cache.size + 0.0 + Percentage of maximum heap (-Xmx setting) to allocate to row cache. + Default of 0.0; it means the row cache is disabled. + hfile.block.index.cacheonwrite false diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index c88a77b51407..166484fe8991 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -430,6 +430,13 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo String L2_CACHE_HIT_RATIO_DESC = "L2 cache hit ratio."; String L2_CACHE_MISS_RATIO = "l2CacheMissRatio"; String L2_CACHE_MISS_RATIO_DESC = "L2 cache miss ratio."; + + String ROW_CACHE_HIT_COUNT = "rowCacheHitCount"; + String ROW_CACHE_MISS_COUNT = "rowCacheMissCount"; + String ROW_CACHE_EVICTED_ROW_COUNT = "rowCacheEvictedRowCount"; + String ROW_CACHE_SIZE = "rowCacheSize"; + String ROW_CACHE_COUNT = "rowCacheCount"; + String RS_START_TIME_NAME = "regionServerStartTime"; String ZOOKEEPER_QUORUM_NAME = "zookeeperQuorum"; String SERVER_NAME_NAME = "serverName"; diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index b214c8f8f4e7..90ea2a1165c8 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -452,6 +452,12 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) { .addCounter(Interns.info(BLOCK_CACHE_DELETE_FAMILY_BLOOM_HIT_COUNT, ""), rsWrap.getDeleteFamilyBloomHitCount()) .addCounter(Interns.info(BLOCK_CACHE_TRAILER_HIT_COUNT, ""), rsWrap.getTrailerHitCount()) + .addCounter(Interns.info(ROW_CACHE_HIT_COUNT, ""), rsWrap.getRowCacheHitCount()) + .addCounter(Interns.info(ROW_CACHE_MISS_COUNT, ""), rsWrap.getRowCacheMissCount()) + .addCounter(Interns.info(ROW_CACHE_EVICTED_ROW_COUNT, ""), + rsWrap.getRowCacheEvictedRowCount()) + .addGauge(Interns.info(ROW_CACHE_SIZE, ""), rsWrap.getRowCacheSize()) + .addGauge(Interns.info(ROW_CACHE_COUNT, ""), rsWrap.getRowCacheCount()) .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC), rsWrap.getUpdatesBlockedTime()) .addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), rsWrap.getFlushedCellsCount()) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 5b957d9bf08f..68e43b276ee2 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -635,6 +635,16 @@ public interface MetricsRegionServerWrapper { long getTrailerHitCount(); + long getRowCacheHitCount(); + + long getRowCacheMissCount(); + + long getRowCacheSize(); + + long getRowCacheCount(); + + long getRowCacheEvictedRowCount(); + long getTotalRowActionRequestCount(); long getByteBuffAllocatorHeapAllocationBytes(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java index 7ada303d2939..6ad962a46348 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java @@ -93,25 +93,29 @@ public static void validateRegionServerHeapMemoryAllocation(Configuration conf) } float memStoreFraction = getGlobalMemStoreHeapPercent(conf, false); float blockCacheFraction = getBlockCacheHeapPercent(conf); + float rowCacheFraction = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT); float minFreeHeapFraction = getRegionServerMinFreeHeapFraction(conf); int memStorePercent = (int) (memStoreFraction * 100); int blockCachePercent = (int) (blockCacheFraction * 100); + int rowCachePercent = (int) (rowCacheFraction * 100); int minFreeHeapPercent = (int) (minFreeHeapFraction * 100); - int usedPercent = memStorePercent + blockCachePercent; + int usedPercent = memStorePercent + blockCachePercent + rowCachePercent; int maxAllowedUsed = 100 - minFreeHeapPercent; if (usedPercent > maxAllowedUsed) { throw new RuntimeException(String.format( "RegionServer heap memory allocation is invalid: total memory usage exceeds 100%% " - + "(memStore + blockCache + requiredFreeHeap). " - + "Check the following configuration values:%n" + " - %s = %.2f%n" + " - %s = %s%n" - + " - %s = %s%n" + " - %s = %s", + + "(memStore + blockCache + rowCache + requiredFreeHeap). " + + "Check the following configuration values:" + "%n - %s = %.2f" + "%n - %s = %s" + + "%n - %s = %s" + "%n - %s = %s" + "%n - %s = %s", MEMSTORE_SIZE_KEY, memStoreFraction, HConstants.HFILE_BLOCK_CACHE_MEMORY_SIZE_KEY, conf.get(HConstants.HFILE_BLOCK_CACHE_MEMORY_SIZE_KEY), HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, conf.get(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY), HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY, - conf.get(HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY))); + conf.get(HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY), HConstants.ROW_CACHE_SIZE_KEY, + conf.get(HConstants.ROW_CACHE_SIZE_KEY))); } } @@ -313,4 +317,15 @@ public static long getBucketCacheSize(final Configuration conf) { } return (long) (bucketCacheSize * 1024 * 1024); } + + public static long getRowCacheSize(Configuration conf) { + long max = -1L; + final MemoryUsage usage = safeGetHeapMemoryUsage(); + if (usage != null) { + max = usage.getMax(); + } + float globalRowCachePercent = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT); + return ((long) (max * globalRowCachePercent)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9b7daee0f668..3e55fadf9a4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_DEFAULT; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ROW_LOCK_READ_LOCK_KEY; @@ -65,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -433,6 +436,16 @@ public MetricsTableRequests getMetricsTableRequests() { */ private long openSeqNum = HConstants.NO_SEQNUM; + /** + * Basically the same as openSeqNum, but it is updated when bulk load is done. + */ + private final AtomicLong rowCacheSeqNum = new AtomicLong(HConstants.NO_SEQNUM); + + /** + * The setting for whether to enable row cache for this region. + */ + private final boolean isRowCacheEnabled; + /** * The default setting for whether to enable on-demand CF loading for scan requests to this * region. Requests can override it. @@ -929,6 +942,16 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies()) .mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE); + + this.isRowCacheEnabled = determineRowCacheEnabled(); + } + + boolean determineRowCacheEnabled() { + Boolean fromDescriptor = htableDescriptor.getRowCacheEnabled(); + // The setting from TableDescriptor has higher priority than the global configuration + return fromDescriptor != null + ? fromDescriptor + : conf.getBoolean(HConstants.ROW_CACHE_ENABLED_KEY, HConstants.ROW_CACHE_ENABLED_DEFAULT); } private void setHTableSpecificConf() { @@ -1940,6 +1963,8 @@ public Pair> call() throws IOException { } } + evictRowCache(); + status.setStatus("Writing region close event to WAL"); // Always write close marker to wal even for read only table. This is not a big problem as we // do not write any data into the region; it is just a meta edit in the WAL file. @@ -1980,6 +2005,22 @@ public Pair> call() throws IOException { } } + private void evictRowCache() { + boolean evictOnClose = getReadOnlyConfiguration().getBoolean(ROW_CACHE_EVICT_ON_CLOSE_KEY, + ROW_CACHE_EVICT_ON_CLOSE_DEFAULT); + + if (!evictOnClose) { + return; + } + + if (!(rsServices instanceof HRegionServer regionServer)) { + return; + } + + RowCacheService rowCacheService = regionServer.getRSRpcServices().getRowCacheService(); + rowCacheService.evictRowsByRegion(this); + } + /** Wait for all current flushes and compactions of the region to complete */ // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for // Phoenix needs. @@ -7881,6 +7922,7 @@ private HRegion openHRegion(final CancelableProgressable reporter) throws IOExce LOG.debug("checking classloading for " + this.getRegionInfo().getEncodedName()); TableDescriptorChecker.checkClassLoading(cConfig, htableDescriptor); this.openSeqNum = initialize(reporter); + this.rowCacheSeqNum.set(this.openSeqNum); this.mvcc.advanceTo(openSeqNum); // The openSeqNum must be increased every time when a region is assigned, as we rely on it to // determine whether a region has been successfully reopened. So here we always write open @@ -8709,6 +8751,22 @@ public long getOpenSeqNum() { return this.openSeqNum; } + public long getRowCacheSeqNum() { + return this.rowCacheSeqNum.get(); + } + + @Override + public boolean isRowCacheEnabled() { + return isRowCacheEnabled; + } + + /** + * This is used to invalidate the row cache of the bulk-loaded region. + */ + public void increaseRowCacheSeqNum() { + this.rowCacheSeqNum.incrementAndGet(); + } + @Override public Map getMaxStoreSeqId() { return this.maxSeqIdInStores; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index c8f7f96a033b..d5dd2ca74de6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -68,6 +68,7 @@ class MetricsRegionServerWrapperImpl implements MetricsRegionServerWrapper { private BlockCache l1Cache = null; private BlockCache l2Cache = null; private MobFileCache mobFileCache; + private final RowCache rowCache; private CacheStats cacheStats; private CacheStats l1Stats = null; private CacheStats l2Stats = null; @@ -99,6 +100,8 @@ public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) { this.regionServer = regionServer; initBlockCache(); initMobFileCache(); + RSRpcServices rsRpcServices = this.regionServer.getRSRpcServices(); + this.rowCache = rsRpcServices == null ? null : rsRpcServices.getRowCacheService().getRowCache(); this.excludeDatanodeManager = this.regionServer.getWalFactory().getExcludeDatanodeManager(); this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD, @@ -1194,6 +1197,31 @@ public long getTrailerHitCount() { return this.cacheStats != null ? this.cacheStats.getTrailerHitCount() : 0L; } + @Override + public long getRowCacheHitCount() { + return this.rowCache != null ? this.rowCache.getHitCount() : 0L; + } + + @Override + public long getRowCacheMissCount() { + return this.rowCache != null ? this.rowCache.getMissCount() : 0L; + } + + @Override + public long getRowCacheSize() { + return this.rowCache != null ? this.rowCache.getSize() : 0L; + } + + @Override + public long getRowCacheCount() { + return this.rowCache != null ? this.rowCache.getCount() : 0L; + } + + @Override + public long getRowCacheEvictedRowCount() { + return this.rowCache != null ? this.rowCache.getEvictedRowCount() : 0L; + } + @Override public long getByteBuffAllocatorHeapAllocationBytes() { return ByteBuffAllocator.getHeapAllocationBytes(allocator, ByteBuffAllocator.HEAP); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index fdfea375e096..ef01c62d726e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -354,6 +354,11 @@ public class RSRpcServices extends HBaseRpcServicesBase public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = "hbase.regionserver.bootstrap.nodes.executorService"; + /** + * The row cache service + */ + private final RowCacheService rowCacheService = new RowCacheService(getConfiguration()); + /** * An Rpc callback for closing a RegionScanner. */ @@ -668,7 +673,8 @@ private CheckAndMutateResult checkAndMutate(HRegion region, List Row.COMPARATOR.compare(v1, v2)); } - OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); + OperationStatus[] codes = + rowCacheService.batchMutate(region, mArray, atomic, nonceGroup, nonce); // When atomic is true, it indicates that the mutateRow API or the batch API with // RowMutations is called. In this case, we need to merge the results of the @@ -2336,6 +2343,11 @@ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, @Override public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { + return rowCacheService.bulkLoadHFile(this, request); + } + + BulkLoadHFileResponse bulkLoadHFileInternal(final BulkLoadHFileRequest request) + throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); List clusterIds = new ArrayList<>(request.getClusterIdsList()); if (clusterIds.contains(this.server.getClusterId())) { @@ -2592,8 +2604,7 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal RegionScannerImpl scanner = null; long blockBytesScannedBefore = context.getBlockBytesScanned(); try { - scanner = region.getScanner(scan); - scanner.next(results); + scanner = rowCacheService.getScanner(region, get, scan, results, context); } finally { if (scanner != null) { if (closeCallBack == null) { @@ -3002,33 +3013,10 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); } } else { - Result r = null; - Boolean processed = null; - MutationType type = mutation.getMutateType(); - switch (type) { - case APPEND: - // TODO: this doesn't actually check anything. - r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, - context); - break; - case INCREMENT: - // TODO: this doesn't actually check anything. - r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, - context); - break; - case PUT: - put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); - processed = Boolean.TRUE; - break; - case DELETE: - delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); - processed = Boolean.TRUE; - break; - default: - throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); - } - if (processed != null) { - builder.setProcessed(processed); + Result r = rowCacheService.mutate(this, region, mutation, quota, cellScanner, nonceGroup, + spaceQuotaEnforcement, context); + if (r == Result.EMPTY_RESULT) { + builder.setProcessed(true); } boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, r, controller, clientCellBlockSupported); @@ -3047,10 +3035,41 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque } } + Result mutateInternal(MutationProto mutation, HRegion region, OperationQuota quota, + CellScanner cellScanner, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, + RpcCallContext context) throws IOException { + MutationType type = mutation.getMutateType(); + return switch (type) { + case APPEND -> + // TODO: this doesn't actually check anything. + append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, context); + case INCREMENT -> + // TODO: this doesn't actually check anything. + increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, + context); + case PUT -> { + put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); + yield Result.EMPTY_RESULT; + } + case DELETE -> { + delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); + yield Result.EMPTY_RESULT; + } + }; + } + private void put(HRegion region, OperationQuota quota, MutationProto mutation, CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { long before = EnvironmentEdgeManager.currentTime(); Put put = ProtobufUtil.toPut(mutation, cellScanner); + // Put with TTL is not allowed on tables with row cache enabled, because cached rows cannot + // track TTL expiration + if (region.isRowCacheEnabled()) { + if (put.getTTL() != Long.MAX_VALUE) { + throw new DoNotRetryIOException( + "Tables with row cache enabled do not allow setting TTL on Puts"); + } + } checkCellSizeLimit(region, put); spaceQuota.getPolicyEnforcement(region).check(put); quota.addMutation(put); @@ -3095,7 +3114,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); } if (result == null) { - result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); + result = rowCacheService.checkAndMutate(region, checkAndMutate, nonceGroup, nonce); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); } @@ -4079,4 +4098,9 @@ RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, Pair pair = newRegionScanner(request, region, builder); return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); } + + // For testing only + public RowCacheService getRowCacheService() { + return rowCacheService; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 42069e58092e..48d4add69ee6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -577,4 +577,10 @@ void requestCompaction(byte[] family, String why, int priority, boolean major, * estimating quota consumption. */ int getMinBlockSizeBytes(); + + /** + * Returns whether the row cache is enabled for this region. + * @return true if the row cache is enabled for this region + */ + boolean isRowCacheEnabled(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java new file mode 100644 index 000000000000..a977c7d59cf2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +/** + * Interface for caching rows retrieved by Get operations. + */ +@org.apache.yetus.audience.InterfaceAudience.Private +public interface RowCache { + /** + * Cache the specified row. + * @param key the key of the row to cache + * @param value the cells of the row to cache + */ + void cacheRow(RowCacheKey key, RowCells value); + + /** + * Evict the specified row. + * @param key the key of the row to evict + */ + void evictRow(RowCacheKey key); + + /** + * Evict all rows belonging to the specified region. This is heavy operation as it iterates the + * entire RowCache key set. + * @param region the region whose rows should be evicted + */ + void evictRowsByRegion(HRegion region); + + /** + * Get the number of rows in the cache. + * @return the number of rows in the cache + */ + long getCount(); + + /** + * Get the number of rows evicted from the cache. + * @return the number of rows evicted from the cache + */ + long getEvictedRowCount(); + + /** + * Get the hit count. + * @return the hit count + */ + long getHitCount(); + + /** + * Get the maximum size of the cache in bytes. + * @return the maximum size of the cache in bytes + */ + long getMaxSize(); + + /** + * Get the miss count. + * @return the miss count + */ + long getMissCount(); + + /** + * Get the specified row from the cache. + * @param key the key of the row to get + * @param caching whether caching is enabled for this request + * @return the cells of the row, or null if not found or caching is disabled + */ + RowCells getRow(RowCacheKey key, boolean caching); + + /** + * Get the current size of the cache in bytes. + * @return the current size of the cache in bytes + */ + long getSize(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java new file mode 100644 index 000000000000..09ec68194ea9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Arrays; +import java.util.Objects; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +@org.apache.yetus.audience.InterfaceAudience.Private +public class RowCacheKey implements HeapSize { + public static final long FIXED_OVERHEAD = ClassSize.estimateBase(RowCacheKey.class, false); + + private final String encodedRegionName; + private final byte[] rowKey; + + // When a region is reopened or bulk-loaded, its rowCacheSeqNum is used to generate new keys that + // bypass the existing cache. This mechanism is effective when ROW_CACHE_EVICT_ON_CLOSE is set to + // false. + private final long rowCacheSeqNum; + + public RowCacheKey(HRegion region, byte[] rowKey) { + this.encodedRegionName = region.getRegionInfo().getEncodedName(); + this.rowKey = Objects.requireNonNull(rowKey, "rowKey cannot be null"); + this.rowCacheSeqNum = region.getRowCacheSeqNum(); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + RowCacheKey that = (RowCacheKey) o; + return rowCacheSeqNum == that.rowCacheSeqNum + && Objects.equals(encodedRegionName, that.encodedRegionName) + && Objects.deepEquals(rowKey, that.rowKey); + } + + @Override + public int hashCode() { + return Objects.hash(encodedRegionName, Arrays.hashCode(rowKey), rowCacheSeqNum); + } + + @Override + public String toString() { + return encodedRegionName + '_' + Bytes.toStringBinary(rowKey) + '_' + rowCacheSeqNum; + } + + @Override + public long heapSize() { + return FIXED_OVERHEAD + ClassSize.align(rowKey.length); + } + + boolean isSameRegion(HRegion region) { + return this.encodedRegionName.equals(region.getRegionInfo().getEncodedName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java new file mode 100644 index 000000000000..f0549fdda54b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; +import org.apache.hadoop.hbase.quotas.OperationQuota; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; + +/** + * It is responsible for populating the row cache and retrieving rows from it. + */ +@org.apache.yetus.audience.InterfaceAudience.Private +public class RowCacheService { + /** + * A barrier that prevents the row cache from being populated during region operations, such as + * bulk loads. It is implemented as a counter to address issues that arise when the same region is + * updated concurrently. + */ + private final Map regionLevelBarrierMap = new ConcurrentHashMap<>(); + /** + * A barrier that prevents the row cache from being populated during row mutations. It is + * implemented as a counter to address issues that arise when the same row is mutated + * concurrently. + */ + private final Map rowLevelBarrierMap = new ConcurrentHashMap<>(); + + private final boolean enabledByConf; + private final RowCache rowCache; + + @FunctionalInterface + interface RowOperation { + R execute() throws IOException; + } + + RowCacheService(Configuration conf) { + enabledByConf = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT) > 0; + // Currently we only support TinyLfu implementation + rowCache = new TinyLfuRowCache(MemorySizeUtil.getRowCacheSize(conf)); + } + + RegionScannerImpl getScanner(HRegion region, Get get, Scan scan, List results, + RpcCallContext context) throws IOException { + if (!canCacheRow(get, region)) { + return getScannerInternal(region, scan, results); + } + + RowCacheKey key = new RowCacheKey(region, get.getRow()); + + // Try get from row cache + if (tryGetFromCache(region, key, get, results)) { + // Cache is hit, and then no scanner is created + return null; + } + + RegionScannerImpl scanner = getScannerInternal(region, scan, results); + + // When results came from memstore only, do not populate the row cache + boolean readFromMemStoreOnly = context.getBlockBytesScanned() < 1; + if (!readFromMemStoreOnly) { + populateCache(region, results, key); + } + + return scanner; + } + + private RegionScannerImpl getScannerInternal(HRegion region, Scan scan, List results) + throws IOException { + RegionScannerImpl scanner = region.getScanner(scan); + scanner.next(results); + return scanner; + } + + private boolean tryGetFromCache(HRegion region, RowCacheKey key, Get get, List results) { + RowCells row = rowCache.getRow(key, get.getCacheBlocks()); + + if (row == null) { + return false; + } + + results.addAll(row.getCells()); + region.addReadRequestsCount(1); + if (region.getMetrics() != null) { + region.getMetrics().updateReadRequestCount(); + } + return true; + } + + private void populateCache(HRegion region, List results, RowCacheKey key) { + // The row cache is populated only when no region level barriers remain + regionLevelBarrierMap.computeIfAbsent(region, t -> { + // The row cache is populated only when no row level barriers remain + rowLevelBarrierMap.computeIfAbsent(key, k -> { + try { + rowCache.cacheRow(key, new RowCells(results)); + } catch (CloneNotSupportedException ignored) { + // Not able to cache row cells, ignore + } + return null; + }); + return null; + }); + } + + BulkLoadHFileResponse bulkLoadHFile(RSRpcServices rsRpcServices, BulkLoadHFileRequest request) + throws ServiceException { + HRegion region; + try { + region = rsRpcServices.getRegion(request.getRegion()); + } catch (IOException ie) { + throw new ServiceException(ie); + } + + if (!region.isRowCacheEnabled()) { + return bulkLoad(rsRpcServices, request); + } + + // Since bulkload modifies the store files, the row cache should be disabled until the bulkload + // is finished. + createRegionLevelBarrier(region); + try { + // We do not invalidate the entire row cache directly, as it contains a large number of + // entries and takes a long time. Instead, we increment rowCacheSeqNum, which is used when + // constructing a RowCacheKey, thereby making the existing row cache entries stale. + increaseRowCacheSeqNum(region); + return bulkLoad(rsRpcServices, request); + } finally { + // The row cache for the region has been enabled again + removeTableLevelBarrier(region); + } + } + + BulkLoadHFileResponse bulkLoad(RSRpcServices rsRpcServices, BulkLoadHFileRequest request) + throws ServiceException { + return rsRpcServices.bulkLoadHFileInternal(request); + } + + void increaseRowCacheSeqNum(HRegion region) { + region.increaseRowCacheSeqNum(); + } + + void removeTableLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfPresent(region, (k, counter) -> { + int remaining = counter.decrementAndGet(); + return (remaining <= 0) ? null : counter; + }); + } + + void createRegionLevelBarrier(HRegion region) { + regionLevelBarrierMap.computeIfAbsent(region, k -> new AtomicInteger(0)).incrementAndGet(); + } + + // @formatter:off + /** + * Row cache is only enabled when the following conditions are met: + * - Row cache is enabled at the table level. + * - Cache blocks is enabled in the get request. + * - A Get object cannot be distinguished from others except by its row key. + * So we check equality for the following: + * - filter + * - retrieving cells + * - TTL + * - attributes + * - CheckExistenceOnly + * - ColumnFamilyTimeRange + * - Consistency + * - MaxResultsPerColumnFamily + * - ReplicaId + * - RowOffsetPerColumnFamily + * @param get the Get request + * @param region the Region + * @return true if the row can be cached, false otherwise + */ + // @formatter:on + boolean canCacheRow(Get get, Region region) { + return enabledByConf && region.isRowCacheEnabled() && get.getCacheBlocks() + && get.getFilter() == null && isRetrieveAllCells(get, region) && isDefaultTtl(region) + && get.getAttributesMap().isEmpty() && !get.isCheckExistenceOnly() + && get.getColumnFamilyTimeRange().isEmpty() && get.getConsistency() == Consistency.STRONG + && get.getMaxResultsPerColumnFamily() == -1 && get.getReplicaId() == -1 + && get.getRowOffsetPerColumnFamily() == 0 && get.getTimeRange().isAllTime(); + } + + private static boolean isRetrieveAllCells(Get get, Region region) { + if (region.getTableDescriptor().getColumnFamilyCount() != get.numFamilies()) { + return false; + } + + boolean hasQualifier = get.getFamilyMap().values().stream().anyMatch(Objects::nonNull); + return !hasQualifier; + } + + private static boolean isDefaultTtl(Region region) { + return Arrays.stream(region.getTableDescriptor().getColumnFamilies()) + .allMatch(cfd -> cfd.getTimeToLive() == ColumnFamilyDescriptorBuilder.DEFAULT_TTL); + } + + private R mutateWithRowCacheBarrier(HRegion region, List mutations, + RowOperation operation) throws IOException { + if (!region.isRowCacheEnabled()) { + return operation.execute(); + } + + Set rowCacheKeys = new HashSet<>(mutations.size()); + try { + // Evict the entire row cache + mutations.forEach(mutation -> rowCacheKeys.add(new RowCacheKey(region, mutation.getRow()))); + rowCacheKeys.forEach(key -> { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); + + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRow(key); + }); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + rowCacheKeys.forEach(this::removeRowLevelBarrier); + } + } + + private R mutateWithRowCacheBarrier(HRegion region, byte[] row, RowOperation operation) + throws IOException { + if (!region.isRowCacheEnabled()) { + return operation.execute(); + } + + RowCacheKey key = new RowCacheKey(region, row); + try { + // Creates a barrier that prevents the row cache from being populated for this row + // during mutation. Reads for the row can instead be served from HFiles or the block cache. + createRowLevelBarrier(key); + + // After creating the barrier, evict the existing row cache for this row, + // as it becomes invalid after the mutation + evictRow(key); + + return execute(operation); + } finally { + // Remove the barrier after mutation to allow the row cache to be populated again + removeRowLevelBarrier(key); + } + } + + R execute(RowOperation operation) throws IOException { + return operation.execute(); + } + + void evictRow(RowCacheKey key) { + rowCache.evictRow(key); + } + + /** + * Remove the barrier after mutation to allow the row cache to be populated again + * @param key the cache key of the row + */ + void removeRowLevelBarrier(RowCacheKey key) { + rowLevelBarrierMap.computeIfPresent(key, (k, counter) -> { + int remaining = counter.decrementAndGet(); + return (remaining <= 0) ? null : counter; + }); + } + + /** + * Creates a barrier to prevent the row cache from being populated for this row during mutation + * @param key the cache key of the row + */ + void createRowLevelBarrier(RowCacheKey key) { + rowLevelBarrierMap.computeIfAbsent(key, k -> new AtomicInteger(0)).incrementAndGet(); + } + + Result mutate(RSRpcServices rsRpcServices, HRegion region, ClientProtos.MutationProto mutation, + OperationQuota quota, CellScanner cellScanner, long nonceGroup, + ActivePolicyEnforcement spaceQuotaEnforcement, RpcCallContext context) throws IOException { + return mutateWithRowCacheBarrier(region, mutation.getRow().toByteArray(), + () -> rsRpcServices.mutateInternal(mutation, region, quota, cellScanner, nonceGroup, + spaceQuotaEnforcement, context)); + } + + CheckAndMutateResult checkAndMutate(HRegion region, CheckAndMutate checkAndMutate, + long nonceGroup, long nonce) throws IOException { + return mutateWithRowCacheBarrier(region, checkAndMutate.getRow(), + () -> region.checkAndMutate(checkAndMutate, nonceGroup, nonce)); + } + + CheckAndMutateResult checkAndMutate(HRegion region, List mutations, + CheckAndMutate checkAndMutate, long nonceGroup, long nonce) throws IOException { + return mutateWithRowCacheBarrier(region, mutations, + () -> region.checkAndMutate(checkAndMutate, nonceGroup, nonce)); + } + + OperationStatus[] batchMutate(HRegion region, Mutation[] mArray, boolean atomic, long nonceGroup, + long nonce) throws IOException { + return mutateWithRowCacheBarrier(region, Arrays.asList(mArray), + () -> region.batchMutate(mArray, atomic, nonceGroup, nonce)); + } + + // For testing only + AtomicInteger getRowLevelBarrier(RowCacheKey key) { + return rowLevelBarrierMap.get(key); + } + + // For testing only + AtomicInteger getRegionLevelBarrier(HRegion region) { + return regionLevelBarrierMap.get(region); + } + + public RowCache getRowCache() { + return rowCache; + } + + void evictRowsByRegion(HRegion region) { + rowCache.evictRowsByRegion(region); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java new file mode 100644 index 000000000000..af0a0ea4c537 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.ClassSize; + +@org.apache.yetus.audience.InterfaceAudience.Private +public class RowCells implements HeapSize { + public static final long FIXED_OVERHEAD = ClassSize.estimateBase(RowCells.class, false); + + private final List cells = new ArrayList<>(); + + public RowCells(List cells) throws CloneNotSupportedException { + for (Cell cell : cells) { + if (!(cell instanceof ExtendedCell extCell)) { + throw new CloneNotSupportedException("Cell is not an ExtendedCell"); + } + try { + // To garbage collect the objects referenced by the cells + this.cells.add(extCell.deepClone()); + } catch (RuntimeException e) { + throw new CloneNotSupportedException("Deep clone failed"); + } + } + } + + @Override + public long heapSize() { + long cellsSize = cells.stream().mapToLong(Cell::heapSize).sum(); + return FIXED_OVERHEAD + cellsSize; + } + + public List getCells() { + return cells; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCache.java new file mode 100644 index 000000000000..a23fd171c13d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCache.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.atomic.LongAdder; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * A {@link RowCache} implementation backed by Caffeine with a TinyLFU-based eviction policy. + */ +@org.apache.yetus.audience.InterfaceAudience.Private +public class TinyLfuRowCache implements RowCache { + private final class EvictionListener + implements RemovalListener<@NonNull RowCacheKey, @NonNull RowCells> { + @Override + public void onRemoval(RowCacheKey key, RowCells value, @NonNull RemovalCause cause) { + evictedRowCount.increment(); + } + } + + private final Cache<@NonNull RowCacheKey, RowCells> cache; + + // Cache.stats() does not provide eviction count for entries, so we maintain our own counter. + private final LongAdder evictedRowCount = new LongAdder(); + + TinyLfuRowCache(long maxSizeBytes) { + if (maxSizeBytes <= 0) { + cache = Caffeine.newBuilder().maximumSize(0).build(); + return; + } + + cache = + Caffeine.newBuilder().maximumWeight(maxSizeBytes).removalListener(new EvictionListener()) + .weigher((RowCacheKey key, + RowCells value) -> (int) Math.min(key.heapSize() + value.heapSize(), Integer.MAX_VALUE)) + .recordStats().build(); + } + + @Override + public void cacheRow(RowCacheKey key, RowCells value) { + cache.put(key, value); + } + + @Override + public RowCells getRow(RowCacheKey key, boolean caching) { + if (!caching) { + return null; + } + + return cache.getIfPresent(key); + } + + @Override + public void evictRow(RowCacheKey key) { + cache.asMap().remove(key); + } + + @Override + public void evictRowsByRegion(HRegion region) { + cache.asMap().keySet().removeIf(key -> key.isSameRegion(region)); + } + + @Override + public long getHitCount() { + return cache.stats().hitCount(); + } + + @Override + public long getMissCount() { + return cache.stats().missCount(); + } + + @Override + public long getEvictedRowCount() { + return evictedRowCount.sum(); + } + + @Override + public long getSize() { + Optional result = cache.policy().eviction().map(Policy.Eviction::weightedSize); + return result.orElse(OptionalLong.of(-1L)).orElse(-1L); + } + + @Override + public long getMaxSize() { + Optional result = cache.policy().eviction().map(Policy.Eviction::getMaximum); + return result.orElse(-1L); + } + + @Override + public long getCount() { + return cache.estimatedSize(); + } +} diff --git a/hbase-server/src/main/resources/hbase-webapps/regionserver/blockCache.jsp b/hbase-server/src/main/resources/hbase-webapps/regionserver/cache.jsp similarity index 79% rename from hbase-server/src/main/resources/hbase-webapps/regionserver/blockCache.jsp rename to hbase-server/src/main/resources/hbase-webapps/regionserver/cache.jsp index 7bba000a271a..c3f9a6bb3fa3 100644 --- a/hbase-server/src/main/resources/hbase-webapps/regionserver/blockCache.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/regionserver/cache.jsp @@ -21,6 +21,8 @@ import="org.apache.hadoop.hbase.io.hfile.BlockCache" import="org.apache.hadoop.hbase.regionserver.HRegionServer" %> <%@ page import="org.apache.hadoop.hbase.io.hfile.CacheConfig" %> +<%@ page import="org.apache.hadoop.hbase.regionserver.RowCacheService" %> +<%@ page import="org.apache.hadoop.hbase.regionserver.RowCache" %> <%-- Template for rendering Block Cache tabs in RegionServer Status page. --%> @@ -35,15 +37,19 @@ BlockCache[] bcs = bc == null ? null : bc.getBlockCaches(); BlockCache l1 = bcs == null ? bc : bcs[0]; BlockCache l2 = bcs == null ? null : bcs.length <= 1 ? null : bcs[1]; + + RowCacheService rowCacheService = regionServer.getRSRpcServices().getRowCacheService(); + RowCache rowCache = rowCacheService == null ? null : rowCacheService.getRowCache(); %>
@@ -68,5 +74,9 @@ <% request.setAttribute("name", "L2"); %>
+
+ <% request.setAttribute("rowCache", rowCache); %> + +
diff --git a/hbase-server/src/main/resources/hbase-webapps/regionserver/regionserver.jsp b/hbase-server/src/main/resources/hbase-webapps/regionserver/regionserver.jsp index f25377e7e4d3..4489db57e215 100644 --- a/hbase-server/src/main/resources/hbase-webapps/regionserver/regionserver.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/regionserver/regionserver.jsp @@ -105,8 +105,8 @@
-

Block Cache

- +

Cache

+
diff --git a/hbase-server/src/main/resources/hbase-webapps/regionserver/rowCacheStats.jsp b/hbase-server/src/main/resources/hbase-webapps/regionserver/rowCacheStats.jsp new file mode 100644 index 000000000000..1695ca96572c --- /dev/null +++ b/hbase-server/src/main/resources/hbase-webapps/regionserver/rowCacheStats.jsp @@ -0,0 +1,72 @@ +<%-- +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--%> +<%@ page contentType="text/html;charset=UTF-8" + import="org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix" %> +<%@ page import="org.apache.hadoop.hbase.regionserver.RowCache" %> + +<% + RowCache rowCache = (RowCache) request.getAttribute("rowCache"); +if (rowCache == null) { %> +

RowCache is disabled

+<% } else { %> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
AttributeValueDescription
Size<%= TraditionalBinaryPrefix.long2String(rowCache.getSize(), "B", 1) %>Current size of row cache in use
Free<%= TraditionalBinaryPrefix.long2String(rowCache.getMaxSize() - rowCache.getSize(), "B", 1) %>The total free memory currently available to store more cache entries
Count<%= String.format("%,d", rowCache.getCount()) %>The number of rows in row cache
Evicted Rows<%= String.format("%,d", rowCache.getEvictedRowCount()) %>The total number of rows evicted
Hits<%= String.format("%,d", rowCache.getHitCount()) %>The number requests that were cache hits
Misses<%= String.format("%,d", rowCache.getMissCount()) %>The number requests that were cache misses
All Time Hit Ratio<%= String.format("%,.2f", rowCache.getHitCount() * 100.0 / (rowCache.getMissCount() + rowCache.getHitCount())) %><%= "%" %>Hit Count divided by total requests count
+

RowCache is a separate cache distinct from BlockCache.

+<% } %> diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java index 5f00c34dbcb0..3cf1fc33753a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java @@ -52,6 +52,13 @@ public void testValidateRegionServerHeapMemoryAllocation() { assertEquals(HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD, 0.2f, 0.0f); MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); + // when memstore size + block cache size + row cache size + default free heap min size == 1.0 + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.4f); + conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + assertEquals(HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD, 0.2f, 0.0f); + MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); + // when memstore size + block cache size + default free heap min size > 1.0 conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.5f); assertThrows(RuntimeException.class, @@ -60,6 +67,7 @@ public void testValidateRegionServerHeapMemoryAllocation() { // when free heap min size is set to 0, it should not throw an exception conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.5f); conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.5f); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.0f); conf.setLong(MemorySizeUtil.HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY, 0L); MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); @@ -86,4 +94,14 @@ public void testGetRegionServerMinFreeHeapFraction() { minFreeHeapFraction = MemorySizeUtil.getRegionServerMinFreeHeapFraction(conf); assertEquals(0.0f, minFreeHeapFraction, 0.0f); } + + @Test + public void testGetRowCacheSize() { + float rowCacheSizeRatio = 0.01f; + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, rowCacheSizeRatio); + long rowCacheSizeBytes = MemorySizeUtil.getRowCacheSize(conf); + + long maxMemory = MemorySizeUtil.safeGetHeapMemoryUsage().getMax(); + assertEquals((long) (maxMemory * rowCacheSizeRatio), rowCacheSizeBytes); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index f1b6efe50a99..6b677f2d1223 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -662,6 +662,31 @@ public long getTrailerHitCount() { return 0; } + @Override + public long getRowCacheHitCount() { + return 2; + } + + @Override + public long getRowCacheMissCount() { + return 1; + } + + @Override + public long getRowCacheEvictedRowCount() { + return 0; + } + + @Override + public long getRowCacheSize() { + return 1; + } + + @Override + public long getRowCacheCount() { + return 2; + } + @Override public int getSplitQueueSize() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java index aac2a5922b9b..76c2a8ad6e42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java @@ -148,6 +148,11 @@ public void testWrapperSource() { HELPER.assertGauge("l2CacheHitRatio", 90, serverSource); HELPER.assertGauge("l2CacheMissRatio", 10, serverSource); HELPER.assertCounter("updatesBlockedTime", 419, serverSource); + HELPER.assertCounter("rowCacheHitCount", 2, serverSource); + HELPER.assertCounter("rowCacheMissCount", 1, serverSource); + HELPER.assertCounter("rowCacheEvictedRowCount", 0, serverSource); + HELPER.assertGauge("rowCacheSize", 1, serverSource); + HELPER.assertGauge("rowCacheCount", 2, serverSource); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java new file mode 100644 index 000000000000..48138a854052 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_EVICTED_ROW_COUNT; +import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_HIT_COUNT; +import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_MISS_COUNT; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRowCache { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCache.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] CF1 = Bytes.toBytes("cf1"); + private static final byte[] CF2 = Bytes.toBytes("cf2"); + private static final byte[] Q1 = Bytes.toBytes("q1"); + private static final byte[] Q2 = Bytes.toBytes("q2"); + + private static MetricsAssertHelper metricsHelper; + private static MetricsRegionServer metricsRegionServer; + private static MetricsRegionServerSource serverSource; + + private static Admin admin; + private static RowCache rowCache; + + private TableName tableName; + private Table table; + HRegion region; + private final Map counterBase = new HashMap<>(); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // Enable row cache but reduce the block cache size to fit in 80% of the heap + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + + SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(); + cluster.waitForActiveAndReadyMaster(); + admin = TEST_UTIL.getAdmin(); + + metricsHelper = CompatibilityFactory.getInstance(MetricsAssertHelper.class); + metricsRegionServer = cluster.getRegionServer(0).getMetrics(); + serverSource = metricsRegionServer.getMetricsSource(); + + rowCache = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRSRpcServices() + .getRowCacheService().getRowCache(); + } + + @AfterClass + public static void afterClass() throws Exception { + HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void beforeTestMethod() throws Exception { + ColumnFamilyDescriptor cf1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + // To test data block encoding + ColumnFamilyDescriptor cf2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF).build(); + + tableName = TableName.valueOf(testName.getMethodName()); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName).setRowCacheEnabled(true) + .setColumnFamily(cf1).setColumnFamily(cf2).build(); + admin.createTable(td); + table = admin.getConnection().getTable(tableName); + region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegions().stream() + .filter(r -> r.getRegionInfo().getTable().equals(tableName)).findFirst().orElseThrow(); + } + + @After + public void afterTestMethod() throws Exception { + counterBase.clear(); + + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + private void setCounterBase(String metric, long value) { + counterBase.put(metric, value); + } + + private void assertCounterDiff(String metric, long diff) { + Long base = counterBase.get(metric); + if (base == null) { + throw new IllegalStateException( + "base counter of " + metric + " metric should have been set before by setCounterBase()"); + } + long newValue = base + diff; + metricsHelper.assertCounter(metric, newValue, serverSource); + counterBase.put(metric, newValue); + } + + private static void recomputeMetrics() { + metricsRegionServer.getRegionServerWrapper().forceRecompute(); + } + + @Test + public void testGetWithRowCache() throws IOException { + byte[] rowKey = "row".getBytes(); + Get get = new Get(rowKey); + Result result; + + RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey); + + // Initialize metrics + recomputeMetrics(); + setCounterBase("Get_num_ops", metricsHelper.getCounter("Get_num_ops", serverSource)); + setCounterBase(ROW_CACHE_HIT_COUNT, + metricsHelper.getCounter(ROW_CACHE_HIT_COUNT, serverSource)); + setCounterBase(ROW_CACHE_MISS_COUNT, + metricsHelper.getCounter(ROW_CACHE_MISS_COUNT, serverSource)); + setCounterBase(ROW_CACHE_EVICTED_ROW_COUNT, + metricsHelper.getCounter(ROW_CACHE_EVICTED_ROW_COUNT, serverSource)); + + // Put a row + Put put = new Put(rowKey); + put.addColumn(CF1, Q1, Bytes.toBytes(0L)); + put.addColumn(CF1, Q2, "12".getBytes()); + put.addColumn(CF2, Q1, "21".getBytes()); + put.addColumn(CF2, Q2, "22".getBytes()); + table.put(put); + admin.flush(tableName); + recomputeMetrics(); + assertCounterDiff(ROW_CACHE_HIT_COUNT, 0); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // First get to populate the row cache + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + assertArrayEquals("21".getBytes(), result.getValue(CF2, Q1)); + assertArrayEquals("22".getBytes(), result.getValue(CF2, Q2)); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation from HFile without row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 0); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 1); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Get from the row cache + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + assertArrayEquals("21".getBytes(), result.getValue(CF2, Q1)); + assertArrayEquals("22".getBytes(), result.getValue(CF2, Q2)); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation from the row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 1); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Row cache is invalidated by the put operation + assertNotNull(rowCache.getRow(rowCacheKey, true)); + table.put(put); + recomputeMetrics(); + assertCounterDiff(ROW_CACHE_HIT_COUNT, 1); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 1); + + // Get is executed without the row cache; however, the cache is re-populated as a result + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation not from the row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 0); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 1); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Get again with the row cache + result = table.get(get); + recomputeMetrics(); + assertArrayEquals(rowKey, result.getRow()); + assertCounterDiff("Get_num_ops", 1); + // Ensure the get operation from the row cache + assertCounterDiff(ROW_CACHE_HIT_COUNT, 1); + assertCounterDiff(ROW_CACHE_MISS_COUNT, 0); + assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0); + + // Row cache is invalidated by the increment operation + assertNotNull(rowCache.getRow(rowCacheKey, true)); + table.incrementColumnValue(rowKey, CF1, Q1, 1); + assertNull(rowCache.getRow(rowCacheKey, true)); + + // Get is executed without the row cache; however, the cache is re-populated as a result + table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey, true)); + + // Row cache is invalidated by the append operation + assertNotNull(rowCache.getRow(rowCacheKey, true)); + Append append = new Append(rowKey); + append.addColumn(CF1, Q1, Bytes.toBytes(0L)); + table.append(append); + assertNull(rowCache.getRow(rowCacheKey, true)); + + // Get is executed without the row cache; however, the cache is re-populated as a result + table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey, true)); + + // Row cache is invalidated by the delete operation + assertNotNull(rowCache.getRow(rowCacheKey, true)); + Delete delete = new Delete(rowKey); + delete.addColumn(CF1, Q1); + table.delete(delete); + assertNull(rowCache.getRow(rowCacheKey, true)); + } + + @Test(expected = DoNotRetryIOException.class) + public void testPutWithTTL() throws IOException { + // Put with TTL is not allowed on tables with row cache enabled, because cached rows cannot + // track TTL expiration + Put put = new Put("row".getBytes()); + put.addColumn(CF1, Q1, "11".getBytes()); + put.setTTL(1); + table.put(put); + } + + @Test + public void testCheckAndMutate() throws IOException { + byte[] rowKey = "row".getBytes(); + Get get = new Get(rowKey); + Result result; + CheckAndMutate cam; + CheckAndMutateResult camResult; + + RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey); + + // Put a row + Put put1 = new Put(rowKey); + put1.addColumn(CF1, Q1, "11".getBytes()); + put1.addColumn(CF1, Q2, "12".getBytes()); + table.put(put1); + admin.flush(tableName); + + // Validate that the row cache is populated + result = table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey, true)); + assertArrayEquals("11".getBytes(), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + + // The row cache is not invalidated when a checkAndMutate operation fails + Put put2 = new Put(rowKey); + put2.addColumn(CF1, Q2, "1212".getBytes()); + cam = CheckAndMutate.newBuilder(rowKey).ifEquals(CF1, Q2, "00".getBytes()).build(put2); + camResult = table.checkAndMutate(cam); + assertFalse(camResult.isSuccess()); + assertNull(rowCache.getRow(rowCacheKey, true)); + + // Validate that the row cache is populated + result = table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey, true)); + assertArrayEquals("11".getBytes(), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + + // The row cache is invalidated by a checkAndMutate operation + cam = CheckAndMutate.newBuilder(rowKey).ifEquals(CF1, Q2, "12".getBytes()).build(put2); + camResult = table.checkAndMutate(cam); + assertTrue(camResult.isSuccess()); + assertNull(rowCache.getRow(rowCacheKey, true)); + } + + @Test + public void testCheckAndMutates() throws IOException { + byte[] rowKey1 = "row1".getBytes(); + byte[] rowKey2 = "row2".getBytes(); + Get get1 = new Get(rowKey1); + Get get2 = new Get(rowKey2); + Result result1, result2; + List cams; + List camResults; + + RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1); + RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2); + + // Put rows + Put put1 = new Put(rowKey1); + put1.addColumn(CF1, Q1, "111".getBytes()); + put1.addColumn(CF1, Q2, "112".getBytes()); + table.put(put1); + Put put2 = new Put(rowKey2); + put2.addColumn(CF1, Q1, "211".getBytes()); + put2.addColumn(CF1, Q2, "212".getBytes()); + table.put(put2); + admin.flush(tableName); + + // Validate that the row caches are populated + result1 = table.get(get1); + assertNotNull(rowCache.getRow(rowCacheKey1, true)); + assertArrayEquals("111".getBytes(), result1.getValue(CF1, Q1)); + assertArrayEquals("112".getBytes(), result1.getValue(CF1, Q2)); + result2 = table.get(get2); + assertNotNull(rowCache.getRow(rowCacheKey2, true)); + assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2)); + + // The row caches are invalidated by checkAndMutate operations + cams = new ArrayList<>(); + cams.add(CheckAndMutate.newBuilder(rowKey1).ifEquals(CF1, Q2, "112".getBytes()).build(put1)); + cams.add(CheckAndMutate.newBuilder(rowKey2).ifEquals(CF1, Q2, "212".getBytes()).build(put2)); + camResults = table.checkAndMutate(cams); + assertTrue(camResults.get(0).isSuccess()); + assertTrue(camResults.get(1).isSuccess()); + assertNull(rowCache.getRow(rowCacheKey1, true)); + assertNull(rowCache.getRow(rowCacheKey2, true)); + } + + @Test + public void testRowMutations() throws IOException { + byte[] rowKey1 = "row1".getBytes(); + byte[] rowKey2 = "row2".getBytes(); + Get get1 = new Get(rowKey1); + Get get2 = new Get(rowKey2); + Result result1, result2; + + RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1); + RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2); + + // Put rows + Put put1 = new Put(rowKey1); + put1.addColumn(CF1, Q1, "111".getBytes()); + put1.addColumn(CF1, Q2, "112".getBytes()); + table.put(put1); + Put put2 = new Put(rowKey2); + put2.addColumn(CF1, Q1, "211".getBytes()); + put2.addColumn(CF1, Q2, "212".getBytes()); + table.put(put2); + admin.flush(tableName); + + // Validate that the row caches are populated + result1 = table.get(get1); + assertNotNull(rowCache.getRow(rowCacheKey1, true)); + assertArrayEquals("111".getBytes(), result1.getValue(CF1, Q1)); + assertArrayEquals("112".getBytes(), result1.getValue(CF1, Q2)); + result2 = table.get(get2); + assertNotNull(rowCache.getRow(rowCacheKey1, true)); + assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2)); + + // The row caches are invalidated by batch operation + Put put12 = new Put(rowKey1); + put12.addColumn(CF1, Q1, "111111".getBytes()); + Put put13 = new Put(rowKey1); + put13.addColumn(CF1, Q2, "112112".getBytes()); + RowMutations rms = new RowMutations(rowKey1); + rms.add(put12); + rms.add(put13); + CheckAndMutate cam = + CheckAndMutate.newBuilder(rowKey1).ifEquals(CF1, Q1, "111".getBytes()).build(rms); + table.checkAndMutate(cam); + assertNull(rowCache.getRow(rowCacheKey1, true)); + assertNotNull(rowCache.getRow(rowCacheKey2, true)); + + // Validate that the row caches are populated + result1 = table.get(get1); + assertNotNull(rowCache.getRow(rowCacheKey1, true)); + assertArrayEquals("111111".getBytes(), result1.getValue(CF1, Q1)); + assertArrayEquals("112112".getBytes(), result1.getValue(CF1, Q2)); + result2 = table.get(get2); + assertNotNull(rowCache.getRow(rowCacheKey1, true)); + assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2)); + } + + @Test + public void testBatch() throws IOException, InterruptedException { + byte[] rowKey1 = "row1".getBytes(); + byte[] rowKey2 = "row2".getBytes(); + byte[] rowKey3 = "row3".getBytes(); + Get get1 = new Get(rowKey1); + Get get2 = new Get(rowKey2); + Get get3 = new Get(rowKey3); + List batchOperations; + Object[] results; + + RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1); + RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2); + RowCacheKey rowCacheKey3 = new RowCacheKey(region, rowKey3); + + // Put rows + batchOperations = new ArrayList<>(); + Put put1 = new Put(rowKey1); + put1.addColumn(CF1, Q1, "111".getBytes()); + put1.addColumn(CF1, Q2, "112".getBytes()); + batchOperations.add(put1); + Put put2 = new Put(rowKey2); + put2.addColumn(CF1, Q1, "211".getBytes()); + put2.addColumn(CF1, Q2, "212".getBytes()); + batchOperations.add(put2); + Put put3 = new Put(rowKey3); + put3.addColumn(CF1, Q1, "311".getBytes()); + put3.addColumn(CF1, Q2, "312".getBytes()); + batchOperations.add(put3); + results = new Result[batchOperations.size()]; + table.batch(batchOperations, results); + admin.flush(tableName); + + // Validate that the row caches are populated + batchOperations = new ArrayList<>(); + batchOperations.add(get1); + batchOperations.add(get2); + batchOperations.add(get3); + results = new Object[batchOperations.size()]; + table.batch(batchOperations, results); + assertEquals(3, results.length); + assertNotNull(rowCache.getRow(rowCacheKey1, true)); + assertArrayEquals("111".getBytes(), ((Result) results[0]).getValue(CF1, Q1)); + assertArrayEquals("112".getBytes(), ((Result) results[0]).getValue(CF1, Q2)); + assertNotNull(rowCache.getRow(rowCacheKey2, true)); + assertArrayEquals("211".getBytes(), ((Result) results[1]).getValue(CF1, Q1)); + assertArrayEquals("212".getBytes(), ((Result) results[1]).getValue(CF1, Q2)); + assertNotNull(rowCache.getRow(rowCacheKey3, true)); + assertArrayEquals("311".getBytes(), ((Result) results[2]).getValue(CF1, Q1)); + assertArrayEquals("312".getBytes(), ((Result) results[2]).getValue(CF1, Q2)); + + // The row caches are invalidated by batch operation + batchOperations = new ArrayList<>(); + batchOperations.add(put1); + Put put2New = new Put(rowKey2); + put2New.addColumn(CF1, Q1, "211211".getBytes()); + put2New.addColumn(CF1, Q2, "212".getBytes()); + CheckAndMutate cam = + CheckAndMutate.newBuilder(rowKey2).ifEquals(CF1, Q1, "211".getBytes()).build(put2New); + batchOperations.add(cam); + results = new Object[batchOperations.size()]; + table.batch(batchOperations, results); + assertEquals(2, results.length); + assertNull(rowCache.getRow(rowCacheKey1, true)); + assertNull(rowCache.getRow(rowCacheKey2, true)); + assertNotNull(rowCache.getRow(rowCacheKey3, true)); + } + + @Test + public void testGetFromMemstoreOnly() throws IOException, InterruptedException { + byte[] rowKey = "row".getBytes(); + RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey); + + // Put a row into memstore only, not flushed to HFile yet + Put put = new Put(rowKey); + put.addColumn(CF1, Q1, Bytes.toBytes(0L)); + table.put(put); + + // Get from memstore only + Get get = new Get(rowKey); + table.get(get); + + // Validate that the row cache is not populated + assertNull(rowCache.getRow(rowCacheKey, true)); + + // Flush memstore to HFile, then get again + admin.flush(tableName); + get = new Get(rowKey); + table.get(get); + + // Validate that the row cache is populated now + assertNotNull(rowCache.getRow(rowCacheKey, true)); + + // Put another qualifier. And now the cells are in both memstore and HFile. + put = new Put(rowKey); + put.addColumn(CF1, Q2, Bytes.toBytes(0L)); + table.put(put); + + // Validate that the row cache is invalidated + assertNull(rowCache.getRow(rowCacheKey, true)); + + // Get from memstore and HFile + get = new Get(rowKey); + table.get(get); + assertNotNull(rowCache.getRow(rowCacheKey, true)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java new file mode 100644 index 000000000000..b774d6379f9f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCacheCanCacheRow { + private static final byte[] CF1 = "cf1".getBytes(); + private static final byte[] CF2 = "cf2".getBytes(); + private static final byte[] ROW_KEY = "row".getBytes(); + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheCanCacheRow.class); + + @Test + public void testRowCacheEnabledByTable() { + Region region = Mockito.mock(Region.class); + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptor td; + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true).setColumnFamily(cfd) + .build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + + RowCacheService rowCacheService = new RowCacheService(conf); + Assert.assertTrue(rowCacheService.canCacheRow(get, region)); + + // Disable row cache, expect false + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfd) + .setRowCacheEnabled(false).build(); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Assert.assertFalse(rowCacheService.canCacheRow(get, region)); + } + + @Test + public void testRowCacheDisabledByConfig() { + Region region = Mockito.mock(Region.class); + Configuration conf = HBaseConfiguration.create(); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptor td; + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + + // Row cache enabled at table level, but disabled by row cache size 0, expect false + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true).setColumnFamily(cfd) + .build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + + RowCacheService rowCacheService = new RowCacheService(conf); + Assert.assertFalse(rowCacheService.canCacheRow(get, region)); + } + + @Test + public void testRetrieveAllCells() { + Region region = Mockito.mock(Region.class); + ColumnFamilyDescriptor cfd1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + ColumnFamilyDescriptor cfd2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2).build(); + TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd1).setColumnFamily(cfd2).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + RowCacheService rowCacheService = new RowCacheService(conf); + + // Not all CFs, expect false + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + Assert.assertFalse(rowCacheService.canCacheRow(get, region)); + + // All CFs, expect true + get.addFamily(CF2); + Assert.assertTrue(rowCacheService.canCacheRow(get, region)); + + // Not all qualifiers, expect false + get.addColumn(CF1, "q1".getBytes()); + Assert.assertFalse(rowCacheService.canCacheRow(get, region)); + } + + @Test + public void testTtl() { + ColumnFamilyDescriptor cfd1; + ColumnFamilyDescriptor cfd2; + TableDescriptor td; + Region region = Mockito.mock(Region.class); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + RowCacheService rowCacheService = new RowCacheService(conf); + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + get.addFamily(CF2); + + // Ttl is set, expect false + cfd1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).setTimeToLive(1).build(); + cfd2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2).build(); + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd1).setColumnFamily(cfd2).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Assert.assertFalse(rowCacheService.canCacheRow(get, region)); + + // Ttl is not set, expect true + cfd1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd1).setColumnFamily(cfd2).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + Assert.assertTrue(rowCacheService.canCacheRow(get, region)); + } + + @Test + public void testFilter() { + testWith( + get -> get.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW_KEY)))); + } + + @Test + public void testCacheBlock() { + testWith(get -> get.setCacheBlocks(false)); + } + + @Test + public void testAttribute() { + testWith(get -> get.setAttribute("test", "value".getBytes())); + } + + @Test + public void testCheckExistenceOnly() { + testWith(get -> get.setCheckExistenceOnly(true)); + } + + @Test + public void testColumnFamilyTimeRange() { + testWith(get -> get.setColumnFamilyTimeRange(CF1, 1000, 2000)); + } + + @Test + public void testConsistency() { + testWith(get -> get.setConsistency(Consistency.TIMELINE)); + } + + @Test + public void testAuthorizations() { + testWith(get -> get.setAuthorizations(new Authorizations("foo"))); + } + + @Test + public void testId() { + testWith(get -> get.setId("test")); + } + + @Test + public void testIsolationLevel() { + testWith(get -> get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED)); + } + + @Test + public void testMaxResultsPerColumnFamily() { + testWith(get -> get.setMaxResultsPerColumnFamily(2)); + } + + @Test + public void testReplicaId() { + testWith(get -> get.setReplicaId(1)); + } + + @Test + public void testRowOffsetPerColumnFamily() { + testWith(get -> get.setRowOffsetPerColumnFamily(1)); + } + + @Test + public void testTimeRange() { + testWith(get -> { + try { + return get.setTimeRange(1, 2); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testTimestamp() { + testWith(get -> get.setTimestamp(1)); + } + + private static void testWith(Function func) { + Region region = Mockito.mock(Region.class); + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true) + .setColumnFamily(cfd).build(); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled()); + + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + RowCacheService rowCacheService = new RowCacheService(conf); + + Get get = new Get(ROW_KEY); + get.addFamily(CF1); + Assert.assertTrue(rowCacheService.canCacheRow(get, region)); + + // noinspection unused + var unused = func.apply(get); + + // expect false + Assert.assertFalse(rowCacheService.canCacheRow(get, region)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java new file mode 100644 index 000000000000..4c70547d1812 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCacheConfiguration { + private static final byte[] CF1 = "cf1".getBytes(); + private static final TableName TABLE_NAME = TableName.valueOf("table"); + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + @Test + public void testDetermineRowCacheEnabled() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + + HRegion region; + + // Set global config to false + conf.setBoolean(HConstants.ROW_CACHE_ENABLED_KEY, false); + + region = createRegion(null); + assertFalse(region.determineRowCacheEnabled()); + + region = createRegion(false); + assertFalse(region.determineRowCacheEnabled()); + + region = createRegion(true); + assertTrue(region.determineRowCacheEnabled()); + + // Set global config to true + conf.setBoolean(HConstants.ROW_CACHE_ENABLED_KEY, true); + + region = createRegion(null); + assertTrue(region.determineRowCacheEnabled()); + + region = createRegion(false); + assertFalse(region.determineRowCacheEnabled()); + + region = createRegion(true); + assertTrue(region.determineRowCacheEnabled()); + } + + private HRegion createRegion(Boolean rowCacheEnabled) throws IOException { + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfd); + if (rowCacheEnabled != null) { + tdb.setRowCacheEnabled(rowCacheEnabled); + } + return TEST_UTIL.createLocalHRegion(tdb.build(), "".getBytes(), "1".getBytes()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java new file mode 100644 index 000000000000..e682a599d30d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@Category({ RegionServerTests.class, MediumTests.class }) +@RunWith(Parameterized.class) +public class TestRowCacheEvictOnClose { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheEvictOnClose.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] CF1 = Bytes.toBytes("cf1"); + private static final byte[] Q1 = Bytes.toBytes("q1"); + private static final byte[] Q2 = Bytes.toBytes("q2"); + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameter + public boolean evictOnClose; + + @Parameterized.Parameters + public static List params() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + @Test + public void testEvictOnClose() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // Enable row cache + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + + // Set ROW_CACHE_EVICT_ON_CLOSE + conf.setBoolean(ROW_CACHE_EVICT_ON_CLOSE_KEY, evictOnClose); + + // Start cluster + SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(); + cluster.waitForActiveAndReadyMaster(); + Admin admin = TEST_UTIL.getAdmin(); + + RowCache rowCache = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRSRpcServices() + .getRowCacheService().getRowCache(); + + // Create table with row cache enabled + ColumnFamilyDescriptor cf1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build(); + TableName tableName = TableName.valueOf(testName.getMethodName().replaceAll("[\\[\\]]", "_")); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName).setRowCacheEnabled(true) + .setColumnFamily(cf1).build(); + admin.createTable(td); + Table table = admin.getConnection().getTable(tableName); + + int numRows = 10; + + // Put rows + for (int i = 0; i < numRows; i++) { + byte[] rowKey = ("row" + i).getBytes(); + Put put = new Put(rowKey); + put.addColumn(CF1, Q1, Bytes.toBytes(0L)); + put.addColumn(CF1, Q2, "12".getBytes()); + table.put(put); + } + // Need to flush because the row cache is not populated when reading only from the memstore. + admin.flush(tableName); + + // Populate row caches + for (int i = 0; i < numRows; i++) { + byte[] rowKey = ("row" + i).getBytes(); + Get get = new Get(rowKey); + Result result = table.get(get); + assertArrayEquals(rowKey, result.getRow()); + assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1)); + assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2)); + } + + // Verify row cache has some entries + assertEquals(numRows, rowCache.getCount()); + + // Disable table + admin.disableTable(tableName); + + // Verify row cache is cleared on table close + assertEquals(evictOnClose ? 0 : numRows, rowCache.getCount()); + + admin.deleteTable(tableName); + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java new file mode 100644 index 000000000000..a8c59dc6ccbc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRowCacheHRegion { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheHRegion.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + public static final byte[] CF = Bytes.toBytes("cf1"); + + @Rule + public TestName currentTest = new TestName(); + + @BeforeClass + public static void setupCluster() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void teardownCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testOpenHRegion() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + WALFactory walFactory = new WALFactory(conf, + ServerName.valueOf(currentTest.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()) + .toString()); + WAL wal = walFactory.getWAL(null); + Path hbaseRootDir = CommonFSUtils.getRootDir(conf); + TableName tableName = TableName.valueOf(currentTest.getMethodName()); + RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(); + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + HRegion region = HRegion.openHRegion(conf, FileSystem.get(conf), hbaseRootDir, hri, htd, wal, + regionServer, null); + + // Verify that rowCacheSeqNum is initialized correctly + assertNotEquals(HConstants.NO_SEQNUM, region.getRowCacheSeqNum()); + assertEquals(region.getOpenSeqNum(), region.getRowCacheSeqNum()); + + region.close(); + walFactory.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheKey.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheKey.java new file mode 100644 index 000000000000..ee75fd251924 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheKey.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotSame; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ IOTests.class, SmallTests.class }) +public class TestRowCacheKey { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheKey.class); + + private static HRegion region1; + private static HRegion region2; + private static RegionInfo regionInfo1; + + @BeforeClass + public static void beforeClass() { + TableName tableName = TableName.valueOf("table1"); + + regionInfo1 = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo1.getEncodedName()).thenReturn("region1"); + Mockito.when(regionInfo1.getTable()).thenReturn(tableName); + + region1 = Mockito.mock(HRegion.class); + Mockito.when(region1.getRegionInfo()).thenReturn(regionInfo1); + + RegionInfo regionInfo2 = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo2.getEncodedName()).thenReturn("region2"); + Mockito.when(regionInfo2.getTable()).thenReturn(tableName); + + region2 = Mockito.mock(HRegion.class); + Mockito.when(region2.getRegionInfo()).thenReturn(regionInfo2); + } + + @Test + public void testEquality() { + RowCacheKey key11 = new RowCacheKey(region1, "row1".getBytes()); + RowCacheKey key12 = new RowCacheKey(region1, "row2".getBytes()); + RowCacheKey key21 = new RowCacheKey(region2, "row1".getBytes()); + RowCacheKey key22 = new RowCacheKey(region2, "row2".getBytes()); + RowCacheKey key11Another = new RowCacheKey(region1, "row1".getBytes()); + assertNotSame(key11, key11Another); + + // Ensure hashCode works well + assertNotEquals(key11.hashCode(), key12.hashCode()); + assertNotEquals(key11.hashCode(), key21.hashCode()); + assertNotEquals(key11.hashCode(), key22.hashCode()); + assertEquals(key11.hashCode(), key11Another.hashCode()); + + // Ensure equals works well + assertNotEquals(key11, key12); + assertNotEquals(key11, key21); + assertNotEquals(key11, key22); + assertEquals(key11, key11Another); + } + + @Test + public void testDifferentRowCacheSeqNum() { + RowCacheKey key1 = new RowCacheKey(region1, "row1".getBytes()); + + HRegion region1Another = Mockito.mock(HRegion.class); + Mockito.when(region1Another.getRegionInfo()).thenReturn(regionInfo1); + Mockito.when(region1Another.getRowCacheSeqNum()).thenReturn(1L); + RowCacheKey key1Another = new RowCacheKey(region1Another, "row1".getBytes()); + + assertNotEquals(key1.hashCode(), key1Another.hashCode()); + assertNotEquals(key1, key1Another); + } + + @Test + public void testHeapSize() { + RowCacheKey key; + long base = RowCacheKey.FIXED_OVERHEAD; + + key = new RowCacheKey(region1, "1".getBytes()); + assertEquals(base + 8, key.heapSize()); + + key = new RowCacheKey(region1, "12345678".getBytes()); + assertEquals(base + 8, key.heapSize()); + + key = new RowCacheKey(region1, "123456789".getBytes()); + assertEquals(base + 8 * 2, key.heapSize()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheService.java new file mode 100644 index 000000000000..35977aadc790 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheService.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCacheService { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheService.class); + + @Test + public void testBarrier() throws IOException { + // Mocking dependencies to create RowCacheService instance + RegionInfo regionInfo = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo.getEncodedName()).thenReturn("region1"); + TableName tableName = TableName.valueOf("table1"); + Mockito.when(regionInfo.getTable()).thenReturn(tableName); + + List stores = new ArrayList<>(); + HStore hStore = Mockito.mock(HStore.class); + Mockito.when(hStore.getStorefilesCount()).thenReturn(2); + stores.add(hStore); + + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder("CF1".getBytes()).build(); + TableDescriptor td = Mockito.mock(TableDescriptor.class); + Mockito.when(td.getColumnFamilies()).thenReturn(new ColumnFamilyDescriptor[] { cfd }); + + byte[] rowKey = "row".getBytes(); + Get get = new Get(rowKey); + Scan scan = new Scan(get); + + RegionScannerImpl regionScanner = Mockito.mock(RegionScannerImpl.class); + + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + HRegion region = Mockito.mock(HRegion.class); + Mockito.when(region.getRegionInfo()).thenReturn(regionInfo); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.getStores()).thenReturn(stores); + Mockito.when(region.getScanner(scan)).thenReturn(regionScanner); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + Mockito.when(region.isRowCacheEnabled()).thenReturn(true); + + RpcCallContext context = Mockito.mock(RpcCallContext.class); + Mockito.when(context.getBlockBytesScanned()).thenReturn(1L); + + RowCacheKey key = new RowCacheKey(region, rowKey); + List results = new ArrayList<>(); + results.add(KeyValueTestUtil.create("row", "CF", "q1", 1, "v1")); + + // Initialize RowCacheService + RowCacheService rowCacheService = new RowCacheService(conf); + RowCache rowCache = rowCacheService.getRowCache(); + + // Verify that row cache populated before creating a row level barrier + rowCacheService.getScanner(region, get, scan, results, context); + assertNotNull(rowCache.getRow(key, true)); + assertNull(rowCacheService.getRowLevelBarrier(key)); + + // Evict the row cache + rowCache.evictRow(key); + assertNull(rowCache.getRow(key, true)); + + // Create a row level barrier for the row key + rowCacheService.createRowLevelBarrier(key); + assertEquals(1, rowCacheService.getRowLevelBarrier(key).get()); + + // Verify that no row cache populated after creating a row level barrier + rowCacheService.getScanner(region, get, scan, results, context); + assertNull(rowCache.getRow(key, true)); + + // Remove the row level barrier + rowCacheService.removeRowLevelBarrier(key); + assertNull(rowCacheService.getRowLevelBarrier(key)); + + // Verify that row cache populated before creating a table level barrier + rowCacheService.getScanner(region, get, scan, results, context); + assertNotNull(rowCache.getRow(key, true)); + assertNull(rowCacheService.getRegionLevelBarrier(region)); + + // Evict the row cache + rowCache.evictRow(key); + assertNull(rowCache.getRow(key, true)); + + // Create a table level barrier for the row key + rowCacheService.createRegionLevelBarrier(region); + assertEquals(1, rowCacheService.getRegionLevelBarrier(region).get()); + + // Verify that no row cache populated after creating a table level barrier + rowCacheService.getScanner(region, get, scan, results, context); + assertNull(rowCache.getRow(key, true)); + + // Remove the table level barrier + rowCacheService.removeTableLevelBarrier(region); + assertNull(rowCacheService.getRegionLevelBarrier(region)); + } + + @Test + public void testMutate() throws IOException, ServiceException { + // Mocking RowCacheService and its dependencies + TableDescriptor tableDescriptor = Mockito.mock(TableDescriptor.class); + + RegionInfo regionInfo = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo.getEncodedName()).thenReturn("region1"); + + HRegion region = Mockito.mock(HRegion.class); + Mockito.when(region.getTableDescriptor()).thenReturn(tableDescriptor); + Mockito.when(region.getRegionInfo()).thenReturn(regionInfo); + Mockito.when(region.getBlockCache()).thenReturn(Mockito.mock(BlockCache.class)); + Mockito.when(region.isRowCacheEnabled()).thenReturn(true); + + RSRpcServices rsRpcServices = Mockito.mock(RSRpcServices.class); + Mockito.when(rsRpcServices.getRegion(Mockito.any())).thenReturn(region); + + MutationProto mutationProto = MutationProto.newBuilder().build(); + + OperationQuota operationQuota = Mockito.mock(OperationQuota.class); + + CellScanner cellScanner = Mockito.mock(CellScanner.class); + + ActivePolicyEnforcement activePolicyEnforcement = Mockito.mock(ActivePolicyEnforcement.class); + + RpcCallContext rpcCallContext = Mockito.mock(RpcCallContext.class); + + CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder("row".getBytes()) + .ifEquals("CF".getBytes(), "q1".getBytes(), "v1".getBytes()).build(new Put("row".getBytes())); + + Put put1 = new Put("row1".getBytes()); + put1.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes()); + Put put2 = new Put("row1".getBytes()); + put2.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes()); + List mutations = new ArrayList<>(); + mutations.add(put1); + mutations.add(put2); + + Mutation[] mutationArray = new Mutation[mutations.size()]; + mutations.toArray(mutationArray); + + RowCacheService rowCacheService; + InOrder inOrder; + + // Mutate + rowCacheService = Mockito.mock(RowCacheService.class); + Mockito.when(rowCacheService.mutate(rsRpcServices, region, mutationProto, operationQuota, + cellScanner, 0, activePolicyEnforcement, rpcCallContext)).thenCallRealMethod(); + inOrder = Mockito.inOrder(rowCacheService); + rowCacheService.mutate(rsRpcServices, region, mutationProto, operationQuota, cellScanner, 0, + activePolicyEnforcement, rpcCallContext); + // Verify the sequence of method calls + inOrder.verify(rowCacheService, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // CheckAndMutate + rowCacheService = Mockito.mock(RowCacheService.class); + Mockito.when(rowCacheService.checkAndMutate(region, checkAndMutate, 0, 0)).thenCallRealMethod(); + inOrder = Mockito.inOrder(rowCacheService); + rowCacheService.checkAndMutate(region, checkAndMutate, 0, 0); + // Verify the sequence of method calls + inOrder.verify(rowCacheService, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // RowMutations + rowCacheService = Mockito.mock(RowCacheService.class); + Mockito.when(rowCacheService.checkAndMutate(region, mutations, checkAndMutate, 0, 0)) + .thenCallRealMethod(); + inOrder = Mockito.inOrder(rowCacheService); + rowCacheService.checkAndMutate(region, mutations, checkAndMutate, 0, 0); + // Verify the sequence of method calls + inOrder.verify(rowCacheService, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // Batch + rowCacheService = Mockito.mock(RowCacheService.class); + Mockito.when(rowCacheService.batchMutate(region, mutationArray, true, 0, 0)) + .thenCallRealMethod(); + inOrder = Mockito.inOrder(rowCacheService); + rowCacheService.batchMutate(region, mutationArray, true, 0, 0); + // Verify the sequence of method calls + inOrder.verify(rowCacheService, Mockito.times(1)).createRowLevelBarrier(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).evictRow(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).execute(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).removeRowLevelBarrier(Mockito.any()); + + // Bulkload + HBaseProtos.RegionSpecifier regionSpecifier = HBaseProtos.RegionSpecifier.newBuilder() + .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME) + .setValue(ByteString.copyFrom("region".getBytes())).build(); + ClientProtos.BulkLoadHFileRequest bulkLoadRequest = + ClientProtos.BulkLoadHFileRequest.newBuilder().setRegion(regionSpecifier).build(); + rowCacheService = Mockito.mock(RowCacheService.class); + Mockito.when(rowCacheService.bulkLoadHFile(rsRpcServices, bulkLoadRequest)) + .thenCallRealMethod(); + inOrder = Mockito.inOrder(rowCacheService); + rowCacheService.bulkLoadHFile(rsRpcServices, bulkLoadRequest); + // Verify the sequence of method calls + inOrder.verify(rowCacheService, Mockito.times(1)).createRegionLevelBarrier(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).increaseRowCacheSeqNum(Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).bulkLoad(Mockito.any(), Mockito.any()); + inOrder.verify(rowCacheService, Mockito.times(1)).removeTableLevelBarrier(Mockito.any()); + } + + @Test + public void testCaching() throws IOException { + // Mocking dependencies to create RowCacheService instance + RegionInfo regionInfo = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo.getEncodedName()).thenReturn("region1"); + TableName tableName = TableName.valueOf("table1"); + Mockito.when(regionInfo.getTable()).thenReturn(tableName); + + List stores = new ArrayList<>(); + HStore hStore = Mockito.mock(HStore.class); + Mockito.when(hStore.getStorefilesCount()).thenReturn(2); + stores.add(hStore); + + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder("CF1".getBytes()).build(); + TableDescriptor td = Mockito.mock(TableDescriptor.class); + Mockito.when(td.getColumnFamilies()).thenReturn(new ColumnFamilyDescriptor[] { cfd }); + + RpcCallContext context = Mockito.mock(RpcCallContext.class); + Mockito.when(context.getBlockBytesScanned()).thenReturn(1L); + + byte[] rowKey = "row".getBytes(); + RegionScannerImpl regionScanner = Mockito.mock(RegionScannerImpl.class); + + Get get = new Get(rowKey); + Scan scan = new Scan(get); + + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + HRegion region = Mockito.mock(HRegion.class); + Mockito.when(region.getRegionInfo()).thenReturn(regionInfo); + Mockito.when(region.getTableDescriptor()).thenReturn(td); + Mockito.when(region.getStores()).thenReturn(stores); + Mockito.when(region.getScanner(scan)).thenReturn(regionScanner); + Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf); + Mockito.when(region.isRowCacheEnabled()).thenReturn(true); + + RowCacheKey key = new RowCacheKey(region, rowKey); + List results = new ArrayList<>(); + results.add(KeyValueTestUtil.create("row", "CF", "q1", 1, "v1")); + + // Initialize RowCacheService + RowCacheService rowCacheService = new RowCacheService(conf); + RowCache rowCache = rowCacheService.getRowCache(); + + // Verify that row cache populated with caching=false + // This should be called first not to populate the row cache + get.setCacheBlocks(false); + rowCacheService.getScanner(region, get, scan, results, context); + assertNull(rowCache.getRow(key, true)); + assertNull(rowCache.getRow(key, false)); + + // Verify that row cache populated with caching=true + get.setCacheBlocks(true); + rowCacheService.getScanner(region, get, scan, results, context); + assertNotNull(rowCache.getRow(key, true)); + assertNull(rowCache.getRow(key, false)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java new file mode 100644 index 000000000000..6abf3ca8f4bf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestRowCacheWithBucketCacheAndDataBlockEncoding { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheWithBucketCacheAndDataBlockEncoding.class); + + @Parameterized.Parameter + public static boolean uesBucketCache; + + @Parameterized.Parameters + public static List params() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + @Rule + public TestName name = new TestName(); + + private static final byte[] ROW_KEY = Bytes.toBytes("checkRow"); + private static final byte[] CF = Bytes.toBytes("CF"); + private static final byte[] QUALIFIER = Bytes.toBytes("cq"); + private static final byte[] VALUE = Bytes.toBytes("checkValue"); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final Configuration conf = TEST_UTIL.getConfiguration(); + private static Admin admin = null; + private static RowCache rowCache; + + @BeforeClass + public static void beforeClass() throws Exception { + // Use bucket cache + if (uesBucketCache) { + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 1); + conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64); + } + + // Use row cache + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + TEST_UTIL.startMiniCluster(); + admin = TEST_UTIL.getAdmin(); + + rowCache = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRSRpcServices() + .getRowCacheService().getRowCache(); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRowCacheNoEncode() throws Exception { + testRowCache(name.getMethodName(), DataBlockEncoding.NONE); + } + + @Test + public void testRowCacheEncode() throws Exception { + testRowCache(name.getMethodName(), DataBlockEncoding.FAST_DIFF); + } + + private void testRowCache(String methodName, DataBlockEncoding dbe) throws Exception { + TableName tableName = TableName.valueOf(methodName.replaceAll("[\\[\\]]", "_")); + try (Table testTable = createTable(tableName, dbe)) { + Put put = new Put(ROW_KEY); + put.addColumn(CF, QUALIFIER, VALUE); + testTable.put(put); + admin.flush(testTable.getName()); + + long countBase = rowCache.getCount(); + long hitCountBase = rowCache.getHitCount(); + + Result result; + + // First get should not hit the row cache, and populate it + Get get = new Get(ROW_KEY); + result = testTable.get(get); + assertArrayEquals(ROW_KEY, result.getRow()); + assertArrayEquals(VALUE, result.getValue(CF, QUALIFIER)); + assertEquals(1, rowCache.getCount() - countBase); + assertEquals(0, rowCache.getHitCount() - hitCountBase); + + // Second get should hit the row cache + result = testTable.get(get); + assertArrayEquals(ROW_KEY, result.getRow()); + assertArrayEquals(VALUE, result.getValue(CF, QUALIFIER)); + assertEquals(1, rowCache.getCount() - countBase); + assertEquals(1, rowCache.getHitCount() - hitCountBase); + } + } + + private Table createTable(TableName tableName, DataBlockEncoding dbe) throws IOException { + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).setBlocksize(100) + .setDataBlockEncoding(dbe).build()) + .setRowCacheEnabled(true).build(); + return TEST_UTIL.createTable(td, null); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCells.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCells.java new file mode 100644 index 000000000000..6307bbce3059 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCells.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCells { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCells.class); + + @Test + public void testDeepClone() throws CloneNotSupportedException { + List cells = new ArrayList<>(); + KeyValue kv1 = KeyValueTestUtil.create("row", "CF", "q1", 1, "v1"); + cells.add(kv1); + KeyValue kv2 = KeyValueTestUtil.create("row", "CF", "q12", 2, "v22"); + cells.add(kv2); + RowCells rowCells = new RowCells(cells); + + // Ensure deep clone happened + assertNotSame(kv1, rowCells.getCells().get(0)); + assertEquals(kv1, rowCells.getCells().get(0)); + assertNotSame(kv2, rowCells.getCells().get(1)); + assertEquals(kv2, rowCells.getCells().get(1)); + } + + @Test + public void testHeapSize() throws CloneNotSupportedException { + List cells; + RowCells rowCells; + + cells = new ArrayList<>(); + rowCells = new RowCells(cells); + assertEquals(RowCells.FIXED_OVERHEAD, rowCells.heapSize()); + + cells = new ArrayList<>(); + KeyValue kv1 = KeyValueTestUtil.create("row", "CF", "q1", 1, "v1"); + cells.add(kv1); + KeyValue kv2 = KeyValueTestUtil.create("row", "CF", "q22", 2, "v22"); + cells.add(kv2); + rowCells = new RowCells(cells); + assertEquals(RowCells.FIXED_OVERHEAD + kv1.heapSize() + kv2.heapSize(), rowCells.heapSize()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/http/TestRSStatusPage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/http/TestRSStatusPage.java index b4b7214b037d..fabc12ee835c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/http/TestRSStatusPage.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/http/TestRSStatusPage.java @@ -132,7 +132,7 @@ public void testStatusPage() throws Exception { assertTrue(page.contains(expectedPageHeader)); assertTrue(page.contains("

Server Metrics

")); assertTrue(page.contains("Requests Per Second")); - assertTrue(page.contains("

Block Cache

")); + assertTrue(page.contains("

Cache

")); assertTrue(page.contains("

Regions

")); assertTrue(page.contains("

Replication Status

")); assertTrue(page.contains("

Software Attributes

")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesRowCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesRowCache.java new file mode 100644 index 000000000000..22ae124fb9aa --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesRowCache.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Comparator; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RowCache; +import org.apache.hadoop.hbase.regionserver.RowCacheKey; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestBulkLoadHFilesRowCache { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadHFilesRowCache.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static Admin admin; + + final static int NUM_CFS = 2; + final static byte[] QUAL = Bytes.toBytes("qual"); + final static int ROWCOUNT = 10; + + private TableName tableName; + private Table table; + private HRegion[] regions; + + @Rule + public TestName testName = new TestName(); + + static String family(int i) { + return String.format("family_%04d", i); + } + + public static void buildHFiles(FileSystem fs, Path dir) throws IOException { + byte[] val = "value".getBytes(); + for (int i = 0; i < NUM_CFS; i++) { + Path testIn = new Path(dir, family(i)); + + TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), + Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); + } + } + + private TableDescriptor createTableDesc(TableName name) { + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(name).setRowCacheEnabled(true); + IntStream.range(0, NUM_CFS).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) + .forEachOrdered(builder::setColumnFamily); + return builder.build(); + } + + private Path buildBulkFiles(TableName table) throws Exception { + Path dir = TEST_UTIL.getDataTestDirOnTestFS(table.getNameAsString()); + Path bulk1 = new Path(dir, table.getNameAsString()); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + buildHFiles(fs, bulk1); + return bulk1; + } + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // Enable row cache but reduce the block cache size to fit in 80% of the heap + conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f); + conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + + TEST_UTIL.startMiniCluster(1); + admin = TEST_UTIL.getAdmin(); + } + + @AfterClass + public static void teardownCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws Exception { + tableName = TableName.valueOf(testName.getMethodName()); + // Split the table into 2 regions + byte[][] splitKeys = new byte[][] { TestHRegionServerBulkLoad.rowkey(ROWCOUNT) }; + admin.createTable(createTableDesc(tableName), splitKeys); + table = TEST_UTIL.getConnection().getTable(tableName); + // Sorted by region name + regions = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegions().stream() + .filter(r -> r.getRegionInfo().getTable().equals(tableName)) + .sorted(Comparator.comparing(r -> r.getRegionInfo().getRegionNameAsString())) + .toArray(HRegion[]::new); + } + + @After + public void after() throws Exception { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + } + + @Test + public void testRowCache() throws Exception { + RowCache rowCache = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRSRpcServices() + .getRowCacheService().getRowCache(); + + // The region to be bulk-loaded + byte[] rowKeyRegion0 = TestHRegionServerBulkLoad.rowkey(0); + // The region not to be bulk-loaded + byte[] rowKeyRegion1 = TestHRegionServerBulkLoad.rowkey(ROWCOUNT); + + // Put a row into each region to populate the row cache + Put put0 = new Put(rowKeyRegion0); + put0.addColumn(family(0).getBytes(), "q1".getBytes(), "value".getBytes()); + table.put(put0); + Put put1 = new Put(rowKeyRegion1); + put1.addColumn(family(0).getBytes(), "q1".getBytes(), "value".getBytes()); + table.put(put1); + admin.flush(tableName); + + // Ensure each region has a row cache + Get get0 = new Get(rowKeyRegion0); + Result result0 = table.get(get0); + assertNotNull(result0); + RowCacheKey keyPrev0 = new RowCacheKey(regions[0], get0.getRow()); + assertNotNull(rowCache.getRow(keyPrev0, true)); + Get get1 = new Get(rowKeyRegion1); + Result result1 = table.get(get1); + assertNotNull(result1); + RowCacheKey keyPrev1 = new RowCacheKey(regions[1], get1.getRow()); + assertNotNull(rowCache.getRow(keyPrev1, true)); + + // Do bulkload to region0 only + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); + Path dir = buildBulkFiles(tableName); + loader.bulkLoad(tableName, dir); + + // Ensure the row cache is removed after bulkload for region0 + RowCacheKey keyCur0 = new RowCacheKey(regions[0], get0.getRow()); + assertNotEquals(keyPrev0, keyCur0); + assertNull(rowCache.getRow(keyCur0, true)); + // Ensure the row cache for keyPrev0 still exists, but it is not used anymore. + assertNotNull(rowCache.getRow(keyPrev0, true)); + + // Ensure the row cache for region1 is not affected + RowCacheKey keyCur1 = new RowCacheKey(regions[1], get1.getRow()); + assertEquals(keyPrev1, keyCur1); + assertNotNull(rowCache.getRow(keyCur1, true)); + } +} diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 93cc312338c9..f4d14eab7d19 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -1610,6 +1610,7 @@ def update_tdb_from_arg(tdb, arg) tdb.setRegionMemStoreReplication(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::REGION_MEMSTORE_REPLICATION))) if arg.include?(TableDescriptorBuilder::REGION_MEMSTORE_REPLICATION) tdb.setRegionSplitPolicyClassName(arg.delete(TableDescriptorBuilder::SPLIT_POLICY)) if arg.include?(TableDescriptorBuilder::SPLIT_POLICY) tdb.setRegionReplication(JInteger.valueOf(arg.delete(TableDescriptorBuilder::REGION_REPLICATION))) if arg.include?(TableDescriptorBuilder::REGION_REPLICATION) + tdb.setRowCacheEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::ROW_CACHE_ENABLED))) if arg.include?(TableDescriptorBuilder::ROW_CACHE_ENABLED) set_user_metadata(tdb, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(tdb, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] end diff --git a/pom.xml b/pom.xml index 13820e0b5296..104d61a3e4cc 100644 --- a/pom.xml +++ b/pom.xml @@ -875,7 +875,7 @@ 0.15.0 0.15.0 1.11.4 - 2.8.1 + 3.2.2 1.15 1.7 2.18.0