From ac27e4c1a3860c4f0d4a3f1a4c7241a2a8a7e6a0 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Thu, 29 Jun 2023 17:16:45 +0100 Subject: [PATCH 01/12] HBASE-28004 Persistent cache map can get corrupt if crash happens midway through the write --- .../protobuf/server/io/BucketCacheEntry.proto | 2 + .../hadoop/hbase/io/hfile/CacheConfig.java | 2 - .../hbase/io/hfile/HFilePreadReader.java | 6 + .../hbase/io/hfile/PrefetchExecutor.java | 91 +------------ .../hbase/io/hfile/bucket/BucketCache.java | 121 +++++++++++++----- .../hbase/io/hfile/bucket/BucketEntry.java | 8 +- .../io/hfile/bucket/BucketProtoUtils.java | 4 +- .../hbase/io/hfile/bucket/FileIOEngine.java | 37 +++++- .../io/hfile/bucket/PersistentIOEngine.java | 1 + .../TestBlockEvictionOnRegionMovement.java | 1 - .../hbase/io/hfile/TestPrefetchRSClose.java | 4 - .../io/hfile/bucket/TestBucketCache.java | 11 +- .../bucket/TestBucketCachePersister.java | 8 +- .../hfile/bucket/TestPrefetchPersistence.java | 8 +- .../hbase/io/hfile/bucket/TestRAMCache.java | 2 +- .../bucket/TestVerifyBucketCacheFile.java | 98 ++++++++++++-- 16 files changed, 243 insertions(+), 161 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto index 038c6ca3f04d..d49ebc36f488 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto @@ -32,6 +32,7 @@ message BucketCacheEntry { map deserializers = 4; required BackingMap backing_map = 5; optional bytes checksum = 6; + map prefetched_files = 7; } message BackingMap { @@ -71,6 +72,7 @@ message BucketEntry { required int64 access_counter = 3; required int32 deserialiser_index = 4; required BlockPriority priority = 5; + required int64 cachedTime = 6; } enum BlockPriority { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 15c64c03d5e5..57f91fa19f44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -93,8 +93,6 @@ public class CacheConfig { public static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; - public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path"; - /** * Configuration key to set interval for persisting bucket cache to disk. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 2c71ce9f4842..70722c2ea60d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,11 @@ public void run() { block.release(); } } + cacheConf.getBlockCache().ifPresent( bc -> { + if (bc instanceof BucketCache) { + ((BucketCache) bc).fileCacheCompleted(path.getName()); + } + }); } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index d3064e066a12..fd4c03a4383d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; @@ -41,9 +36,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; - @InterfaceAudience.Private public final class PrefetchExecutor { @@ -51,16 +43,12 @@ public final class PrefetchExecutor { /** Futures for tracking block prefetch activity */ private static final Map> prefetchFutures = new ConcurrentSkipListMap<>(); - /** Set of files for which prefetch is completed */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") - private static HashMap prefetchCompleted = new HashMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ private static final int prefetchDelayMillis; /** Variation in prefetch delay times, to mitigate stampedes */ private static final float prefetchDelayVariation; - static String prefetchedFileListPath; static { // Consider doing this on demand with a configuration passed in rather // than in a static initializer. @@ -90,13 +78,6 @@ public Thread newThread(Runnable r) { + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); public static void request(Path path, Runnable runnable) { - if (prefetchCompleted != null) { - if (isFilePrefetched(path.getName())) { - LOG.info( - "File has already been prefetched before the restart, so skipping prefetch : " + path); - return; - } - } if (!prefetchPathExclude.matcher(path.toString()).find()) { long delay; if (prefetchDelayMillis > 0) { @@ -122,8 +103,9 @@ public static void request(Path path, Runnable runnable) { public static void complete(Path path) { prefetchFutures.remove(path); - prefetchCompleted.put(path.getName(), true); - LOG.debug("Prefetch completed for {}", path.getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch completed for {}", path.getName()); + } } public static void cancel(Path path) { @@ -134,8 +116,6 @@ public static void cancel(Path path) { prefetchFutures.remove(path); LOG.debug("Prefetch cancelled for {}", path); } - LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName()); - removePrefetchedFileWhileEvict(path.getName()); } public static boolean isCompleted(Path path) { @@ -145,71 +125,6 @@ public static boolean isCompleted(Path path) { } return true; } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", - justification = "false positive, try-with-resources ensures close is called.") - public static void persistToFile(String path) throws IOException { - prefetchedFileListPath = path; - if (prefetchedFileListPath == null) { - LOG.info("Exception while persisting prefetch!"); - throw new IOException("Error persisting prefetched HFiles set!"); - } - if (!prefetchCompleted.isEmpty()) { - try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) { - PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos); - } - } - } - - public static void retrieveFromFile(String path) throws IOException { - prefetchedFileListPath = path; - File prefetchPersistenceFile = new File(prefetchedFileListPath); - if (!prefetchPersistenceFile.exists()) { - LOG.warn("Prefetch persistence file does not exist!"); - return; - } - LOG.info("Retrieving from prefetch persistence file " + path); - assert (prefetchedFileListPath != null); - try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) { - PersistentPrefetchProtos.PrefetchedHfileName proto = - PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis); - Map protoPrefetchedFilesMap = proto.getPrefetchedFilesMap(); - prefetchCompleted.putAll(protoPrefetchedFilesMap); - } - } - - private static FileInputStream deleteFileOnClose(final File file) throws IOException { - return new FileInputStream(file) { - private File myFile; - - private FileInputStream init(File file) { - myFile = file; - return this; - } - - @Override - public void close() throws IOException { - if (myFile == null) { - return; - } - - super.close(); - if (!myFile.delete()) { - throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); - } - myFile = null; - } - }.init(file); - } - - public static void removePrefetchedFileWhileEvict(String hfileName) { - prefetchCompleted.remove(hfileName); - } - - public static boolean isFilePrefetched(String hfileName) { - return prefetchCompleted.containsKey(hfileName); - } - private PrefetchExecutor() { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 14c4c44ee16f..aa85b17a3864 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; -import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -52,6 +50,7 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.ByteBuffAllocator; @@ -78,6 +77,7 @@ import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; import org.apache.hadoop.util.StringUtils; +import org.apache.hbase.thirdparty.com.google.protobuf.compiler.PluginProtos; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +145,10 @@ public class BucketCache implements BlockCache, HeapSize { transient final RAMCache ramCache; // In this map, store the block's meta data like offset, length transient ConcurrentHashMap backingMap; + /** Set of files for which prefetch is completed */ + final Map prefetchCompleted = new ConcurrentHashMap<>(); + + private BucketCachePersister cachePersister; /** * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so @@ -177,9 +181,6 @@ public class BucketCache implements BlockCache, HeapSize { private static final int DEFAULT_CACHE_WAIT_TIME = 50; private final BucketCacheStats cacheStats = new BucketCacheStats(); - - /** BucketCache persister thread */ - private BucketCachePersister cachePersister; private final String persistencePath; static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); private final long cacheCapacity; @@ -239,8 +240,6 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; - private String prefetchedFileListPath; - private long bucketcachePersistInterval; private static final String FILE_VERIFY_ALGORITHM = @@ -293,7 +292,6 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); - this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); sanityCheckConfigs(); @@ -320,8 +318,10 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); - if (ioEngine.isPersistent() && persistencePath != null) { - startBucketCachePersisterThread(); + if (isCachePersistent()) { + if(ioEngine instanceof FileIOEngine) { + startBucketCachePersisterThread(); + } try { retrieveFromFile(bucketSizes); } catch (IOException ioex) { @@ -379,7 +379,8 @@ protected void startWriterThreads() { } void startBucketCachePersisterThread() { - cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); + cachePersister = + new BucketCachePersister(this, bucketcachePersistInterval); cachePersister.setDaemon(true); cachePersister.start(); } @@ -429,7 +430,7 @@ private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String } public boolean isCachePersistenceEnabled() { - return (prefetchedFileListPath != null) && (persistencePath != null); + return persistencePath != null; } /** @@ -505,7 +506,8 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); // Stuff the entry into the RAM cache so it can get drained to the persistent store RAMQueueEntry re = - new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); + new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory, + isCachePersistent() && ioEngine instanceof FileIOEngine); /** * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same * key in ramCache, the heap size of bucket cache need to update if replacing entry from @@ -589,6 +591,12 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, } return cachedBlock; } + } catch (HBaseIOException hioex) { + //When using file io engine persistent cache, + // the cache map state might differ from the actual cache. If we reach this block, + // we should remove the cache key entry from the backing map + backingMap.remove(key); + LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); } catch (IOException ioex) { LOG.error("Failed reading block " + key + " from bucket cache", ioex); checkIOErrorIsTolerated(); @@ -616,13 +624,15 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); } if (ioEngine.isPersistent()) { - if (prefetchedFileListPath != null) { - PrefetchExecutor.removePrefetchedFileWhileEvict(cacheKey.getHfileName()); - } + prefetchCompleted.remove(cacheKey.getHfileName()); setCacheInconsistent(true); } } + public void fileCacheCompleted(String fileName) { + prefetchCompleted.put(fileName, true); + } + /** * Free the {{@link BucketEntry} actually,which could only be invoked when the * {@link BucketEntry#refCnt} becoming 0. @@ -1251,17 +1261,21 @@ static List getRAMQueueEntries(BlockingQueue q, */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", justification = "false positive, try-with-resources ensures close is called.") - void persistToFile() throws IOException { - if (!ioEngine.isPersistent()) { + void + persistToFile() throws IOException { + if(!isCachePersistent()) { throw new IOException("Attempt to persist non-persistent cache mappings!"); } - try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { + File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); + try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { fos.write(ProtobufMagic.PB_MAGIC); BucketProtoUtils.toPB(this).writeDelimitedTo(fos); } - if (prefetchedFileListPath != null) { - PrefetchExecutor.persistToFile(prefetchedFileListPath); - } + tempPersistencePath.renameTo(new File(persistencePath)); + } + + private boolean isCachePersistent() { + return ioEngine.isPersistent() && persistencePath != null; } /** @@ -1273,9 +1287,6 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { return; } assert !cacheEnabled; - if (prefetchedFileListPath != null) { - PrefetchExecutor.retrieveFromFile(prefetchedFileListPath); - } try (FileInputStream in = deleteFileOnClose(persistenceFile)) { int pblen = ProtobufMagic.lengthOfPBMagic(); @@ -1341,7 +1352,7 @@ public void close() throws IOException { }.init(file); } - private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) + private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) throws IOException { if (capacitySize != cacheCapacity) { throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) @@ -1358,16 +1369,37 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String } private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { + backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), + this::createRecycler); + prefetchCompleted.clear(); + prefetchCompleted.putAll(proto.getPrefetchedFilesMap()); if (proto.hasChecksum()) { - ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), - algorithm); + try { + ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), + algorithm); + } catch (IOException e) { + LOG.warn("Checksum for cache file failed. " + + "We need to validate each cache key in the backing map. This may take some time..."); + long startTime = EnvironmentEdgeManager.currentTime(); + int totalKeysOriginally = backingMap.size(); + for (Map.Entry keyEntry : backingMap.entrySet()){ + try { + ((FileIOEngine)ioEngine).checkCacheTime(keyEntry.getValue()); + } catch (IOException e1) { + LOG.debug("Check for key {} failed. Removing it from map.", keyEntry.getKey()); + backingMap.remove(keyEntry.getKey()); + prefetchCompleted.remove(keyEntry.getKey().getHfileName()); + } + } + LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", + totalKeysOriginally, backingMap.size(), + (EnvironmentEdgeManager.currentTime() - startTime)); + } } else { // if has not checksum, it means the persistence file is old format LOG.info("Persistent file is old format, it does not support verifying file integrity!"); } verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); - backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), - this::createRecycler); } /** @@ -1417,7 +1449,9 @@ public void shutdown() { LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" + persistencePath); if (ioEngine.isPersistent() && persistencePath != null) { - cachePersister.interrupt(); + if(cachePersister != null) { + cachePersister.interrupt(); + } try { join(); persistToFile(); @@ -1485,7 +1519,7 @@ protected String getAlgorithm() { */ @Override public int evictBlocksByHfileName(String hfileName) { - PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName); + this.prefetchCompleted.remove(hfileName); Set keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); @@ -1556,12 +1590,15 @@ static class RAMQueueEntry { private final Cacheable data; private long accessCounter; private boolean inMemory; + private boolean isCachePersistent; - RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { + RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, + boolean isCachePersistent) { this.key = bck; this.data = data; this.accessCounter = accessCounter; this.inMemory = inMemory; + this.isCachePersistent = isCachePersistent; } public Cacheable getData() { @@ -1591,6 +1628,9 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a if (len == 0) { return null; } + if (isCachePersistent && data instanceof HFileBlock) { + len += Long.BYTES; //we need to record the cache time for consistency check in case of recovery + } long offset = alloc.allocateBlock(len); boolean succ = false; BucketEntry bucketEntry = null; @@ -1604,7 +1644,16 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a ByteBuff sliceBuf = block.getBufferReadOnly(); block.getMetaData(metaBuff); ioEngine.write(sliceBuf, offset); - ioEngine.write(metaBuff, offset + len - metaBuff.limit()); + //adds the cache time after the block and metadata part + if (isCachePersistent) { + ioEngine.write(metaBuff, offset + len - metaBuff.limit() - Long.BYTES); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(bucketEntry.getCachedTime()); + buffer.rewind(); + ioEngine.write(buffer, (offset + len - Long.BYTES)); + } else { + ioEngine.write(metaBuff, offset + len - metaBuff.limit()); + } } else { // Only used for testing. ByteBuffer bb = ByteBuffer.allocate(len); @@ -1760,6 +1809,10 @@ float getMemoryFactor() { return memoryFactor; } + public String getPersistencePath() { + return persistencePath; + } + /** * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index a04a32bfe645..730ea0ed7816 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -91,7 +91,7 @@ class BucketEntry implements HBaseReferenceCounted { /** * Time this block was cached. Presumes we are created just before we are added to the cache. */ - private final long cachedTime = System.nanoTime(); + private long cachedTime = System.nanoTime(); /** * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt} @@ -99,6 +99,11 @@ class BucketEntry implements HBaseReferenceCounted { * for test. */ BucketEntry(long offset, int length, long accessCounter, boolean inMemory, + Function createRecycler, ByteBuffAllocator allocator) { + this(offset, length, accessCounter, System.nanoTime(), inMemory, createRecycler, allocator); + } + + BucketEntry(long offset, int length, long accessCounter, long cachedTime, boolean inMemory, Function createRecycler, ByteBuffAllocator allocator) { if (createRecycler == null) { throw new IllegalArgumentException("createRecycler could not be null!"); @@ -106,6 +111,7 @@ class BucketEntry implements HBaseReferenceCounted { setOffset(offset); this.length = length; this.accessCounter = accessCounter; + this.cachedTime = cachedTime; this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; this.refCnt = RefCnt.create(createRecycler.apply(this)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index ff4e90b88650..5c5a8683a42a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -45,6 +45,7 @@ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) + .putAllPrefetchedFiles(cache.prefetchCompleted) .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) @@ -99,6 +100,7 @@ private static BucketCacheProtos.BlockType toPB(BlockType blockType) { private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) + .setCachedTime(entry.getCachedTime()) .setLength(entry.getLength()).setDeserialiserIndex(entry.deserializerIndex) .setAccessCounter(entry.getAccessCounter()).setPriority(toPB(entry.getPriority())).build(); } @@ -128,7 +130,7 @@ static ConcurrentHashMap fromPB(Map // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator // which created by RpcServer elegantly. BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(), - protoValue.getAccessCounter(), + protoValue.getAccessCounter(), protoValue.getCachedTime(), protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler, ByteBuffAllocator.HEAP); // This is the deserializer that we stored diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 370343b1b25c..34acb2847e2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -26,6 +26,7 @@ import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -49,6 +50,7 @@ public class FileIOEngine extends PersistentIOEngine { private final long sizePerFile; private final long capacity; + private boolean maintainPersistence; private FileReadAccessor readAccessor = new FileReadAccessor(); private FileWriteAccessor writeAccessor = new FileWriteAccessor(); @@ -59,6 +61,7 @@ public FileIOEngine(long capacity, boolean maintainPersistence, String... filePa this.sizePerFile = capacity / filePaths.length; this.capacity = this.sizePerFile * filePaths.length; this.fileChannels = new FileChannel[filePaths.length]; + this.maintainPersistence = maintainPersistence; if (!maintainPersistence) { for (String filePath : filePaths) { File file = new File(filePath); @@ -145,10 +148,42 @@ public Cacheable read(BucketEntry be) throws IOException { throw ioe; } } - dstBuff.rewind(); + if (maintainPersistence) { + dstBuff.position(length - Long.BYTES); + long cachedNanoTime = dstBuff.getLong(); + if (be.getCachedTime() != cachedNanoTime) { + dstBuff.release(); + throw new HBaseIOException("The cached time recorded within the cached block differs " + + "from its bucket entry, so it might not be the same."); + } + dstBuff.rewind(); + dstBuff.limit(length - Long.BYTES); + dstBuff = dstBuff.slice(); + } else { + dstBuff.rewind(); + } return be.wrapAsCacheable(dstBuff); } + void checkCacheTime(BucketEntry be) throws IOException { + long offset = be.offset(); + int length = be.getLength(); + ByteBuff dstBuff = be.allocator.allocate(Long.BYTES); + try { + accessFile(readAccessor, dstBuff, (offset + length - Long.BYTES)); + } catch (IOException ioe) { + dstBuff.release(); + throw ioe; + } + dstBuff.rewind(); + long cachedNanoTime = dstBuff.getLong(); + if (be.getCachedTime() != cachedNanoTime) { + dstBuff.release(); + throw new HBaseIOException("The cached time recorded within the cached block differs " + + "from its bucket entry, so it might not be the same."); + } + } + void closeFileChannels() { for (FileChannel fileChannel : fileChannels) { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java index 495814fdc5fe..b6e64e825771 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java @@ -70,6 +70,7 @@ protected byte[] calculateChecksum(String algorithm) { sb.append(getFileSize(filePath)); sb.append(file.lastModified()); } + LOG.debug("Checksum for persistence cache: {}", sb); MessageDigest messageDigest = MessageDigest.getInstance(algorithm); messageDigest.update(Bytes.toBytes(sb.toString())); return messageDigest.digest(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java index 66b2ca73ded8..eb3e3cc61f4b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java @@ -79,7 +79,6 @@ public void setup() throws Exception { conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache"); conf.setInt("hbase.bucketcache.size", 400); conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100); conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true); zkCluster = TEST_UTIL.startMiniZKCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java index b10186996ede..64db9158333d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java @@ -75,7 +75,6 @@ public void setup() throws Exception { conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache"); conf.setInt("hbase.bucketcache.size", 400); conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); zkCluster = TEST_UTIL.startMiniZKCluster(); cluster = TEST_UTIL.startMiniHBaseCluster(option); assertEquals(2, cluster.getRegionServerThreads().size()); @@ -114,18 +113,15 @@ public void testPrefetchPersistence() throws Exception { // Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files // should exist. assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); // Stop the RS cluster.stopRegionServer(0); LOG.info("Stopped Region Server 0."); Thread.sleep(1000); assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); // Start the RS and validate cluster.startRegionServer(); - assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertFalse(new File(testDir + "/bucket.persistence").exists()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index ad381a665c3b..c4573e111536 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; @@ -319,7 +320,7 @@ public void testRetrieveFromPMem() throws Exception { final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; testRetrievalUtils(testDir, ioEngineName); int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; - String persistencePath = testDir + "/bucket.persistence"; + String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); assertFalse(new File(persistencePath).exists()); @@ -330,7 +331,7 @@ public void testRetrieveFromPMem() throws Exception { private void testRetrievalUtils(Path testDir, String ioEngineName) throws IOException, InterruptedException { - final String persistencePath = testDir + "/bucket.persistence"; + final String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); try { @@ -701,8 +702,8 @@ public void testRAMCache() { HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); - RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false); - RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false); + RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false); + RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false); assertFalse(cache.containsKey(key1)); assertNull(cache.putIfAbsent(key1, re1)); @@ -749,7 +750,7 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { BucketAllocator allocator = new BucketAllocator(availableSpace, null); BlockCacheKey key = new BlockCacheKey("dummy", 1L); - RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true); + RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false); Assert.assertEquals(0, allocator.getUsedSize()); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index dbd3d7f86646..2a7357f1f912 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -85,7 +85,6 @@ public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) thr } public BucketCache setupBucketCache(Configuration conf) throws IOException { - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + "/prefetch.persistence")); BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); @@ -111,9 +110,7 @@ public void testPrefetchPersistenceCrash() throws Exception { readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache); Thread.sleep(bucketCachePersistInterval); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").delete()); assertTrue(new File(testDir + "/bucket.persistence").delete()); cleanupBucketCache(bucketCache); } @@ -128,7 +125,6 @@ public void testPrefetchPersistenceCrashNegative() throws Exception { // Load Cache Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); - assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertFalse(new File(testDir + "/bucket.persistence").exists()); cleanupBucketCache(bucketCache); } @@ -145,9 +141,9 @@ public void testPrefetchListUponBlockEviction() throws Exception { Thread.sleep(500); // Evict Blocks from cache BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey(); - assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName())); + assertTrue(bucketCache1.prefetchCompleted.containsKey(storeFile.getName())); bucketCache1.evictBlock(bucketCacheKey); - assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName())); + assertFalse(bucketCache1.prefetchCompleted.containsKey(storeFile.getName())); } public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 771ab0158f61..64e9fbf6fc7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -106,8 +106,6 @@ public void setup() throws IOException { conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); - prefetchPersistencePath = testDir + "/prefetch.persistence"; - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, prefetchPersistencePath); fs = HFileSystem.get(conf); } @@ -132,10 +130,10 @@ public void testPrefetchPersistence() throws Exception { bucketCache.shutdown(); assertTrue(new File(testDir + "/bucket.persistence").exists()); - assertTrue(new File(testDir + "/prefetch.persistence").exists()); bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); + cacheConf = new CacheConfig(conf, bucketCache); assertFalse(new File(testDir + "/bucket.persistence").exists()); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertTrue(usedSize != 0); @@ -148,9 +146,9 @@ public void testPrefetchPersistence() throws Exception { public void closeStoreFile(Path path) throws Exception { HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); - assertTrue(PrefetchExecutor.isFilePrefetched(path.getName())); + assertTrue(bucketCache.prefetchCompleted.containsKey(path.getName())); reader.close(true); - assertFalse(PrefetchExecutor.isFilePrefetched(path.getName())); + assertFalse(bucketCache.prefetchCompleted.containsKey(path.getName())); } public void readStoreFile(Path storeFilePath, long offset) throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java index 0e777a4a7b9f..58d9385f57e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java @@ -90,7 +90,7 @@ public void testAtomicRAMCache() throws Exception { MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, new HFileContextBuilder().build(), ByteBuffAllocator.HEAP); - RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false); + RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false); Assert.assertNull(cache.putIfAbsent(key, re)); Assert.assertEquals(cache.putIfAbsent(key, re), re); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index 3b2b9961b2b7..d928d46a7471 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.BufferedWriter; +import java.io.File; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.nio.file.FileSystems; @@ -33,16 +37,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.security.auth.login.ConfigurationSpi; /** * Basic test for check file's integrity before start BucketCache in fileIOEngine @@ -147,16 +155,15 @@ public void testRetrieveFromFile() throws Exception { @Test public void testRetrieveFromFileAfterDelete() throws Exception { - HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); Path testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); Configuration conf = TEST_UTIL.getConfiguration(); conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); - + String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, - testDir + "/bucket.persistence", 60 * 1000, conf); + mapFileName, 60 * 1000, conf); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -171,14 +178,13 @@ public void testRetrieveFromFileAfterDelete() throws Exception { // Shutdown BucketCache bucketCache.shutdown(); // Delete the persistence file - final java.nio.file.Path mapFile = - FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); - assertTrue(Files.deleteIfExists(mapFile)); + File mapFile = new File(mapFileName); + assertTrue(mapFile.delete()); Thread.sleep(350); // Create BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, - testDir + "/bucket.persistence", 60 * 1000, conf); + mapFileName, 60 * 1000, conf); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } @@ -232,9 +238,17 @@ public void testModifiedBucketCacheFileData() throws Exception { /** * Test whether BucketCache is started normally after modifying the cache file's last modified * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache - * to file. Then Restart BucketCache after modify cache file's last modified time, and it can't - * restore cache from file, the cache file and persistence file would be deleted before - * BucketCache start normally. + * to file. Then Restart BucketCache after modify cache file's last modified time. + * HBASE-XXXX has modified persistence cache such that now we store extra 8 bytes + * at the end of each block in the cache, representing the nanosecond time the block has been + * cached. So in the event the cache file has failed checksum verification during loading time, + * we go through all the cached blocks in the cache map and validate the cached time long between + * what is in the map and the cache file. If that check fails, we pull the cache key entry out of + * the map. + * Since in this test we are only modifying the access time to induce a checksum error, the cache + * file content is still valid and the extra verification should validate that all cache keys in + * the map are still recoverable from the cache. + * * @throws Exception the exception */ @Test @@ -249,6 +263,8 @@ public void testModifiedBucketCacheFileTime() throws Exception { long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); + Pair myPair = new Pair<>(); + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); // Add blocks @@ -257,6 +273,8 @@ public void testModifiedBucketCacheFileTime() throws Exception { } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); + long blockCount = bucketCache.backingMap.size(); + assertNotEquals(0, blockCount); // persist cache to file bucketCache.shutdown(); @@ -268,9 +286,65 @@ public void testModifiedBucketCacheFileTime() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + assertEquals(blockCount, bucketCache.backingMap.size()); + + + TEST_UTIL.cleanupTestDir(); + } + + /** + * When using persistent bucket cache, there may be crashes between persisting the backing map + * and syncing new blocks to the cache file itself, leading to an inconsistent state between + * the cache keys and the cached data. This is to make sure the cache keys are updated + * accordingly, and the keys that are still valid do succeed in retrieve related block data from + * the cache without any corruption. + * @throws Exception the exception + */ + @Test + public void testBucketCacheRecovery() throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + //Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, + mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); + // Add three blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + //saves the current state + bucketCache.persistToFile(); + //evicts first block + bucketCache.evictBlock(blocks[0].getBlockName()); + + //now adds a fourth block to bucket cache + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + //Creates new bucket cache instance without persisting to file after evicting first block + // and caching fourth block. So the bucket cache file has only the last three blocks, + // but backing map (containing cache keys) was persisted when first three blocks + // were in the cache. So the state on this recovery is: + // - Backing map: [block0, block1, block2] + // - Cache: [block1, block2, block3] + // Therefore, this bucket cache would be able to recover only block1 and block2. + BucketCache newBucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, + mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); + assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); + assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); + assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); + assertEquals(2, newBucketCache.backingMap.size()); TEST_UTIL.cleanupTestDir(); } From 7ba88ef7f8cfbf5fdc9ab038190395305ebf80e8 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Thu, 3 Aug 2023 09:22:43 +0100 Subject: [PATCH 02/12] spotless Change-Id: I83a8902544409fa08367d5ab4b316ec319a62381 --- .../hbase/io/hfile/HFilePreadReader.java | 2 +- .../hbase/io/hfile/PrefetchExecutor.java | 2 + .../hbase/io/hfile/bucket/BucketCache.java | 35 ++++----- .../io/hfile/bucket/BucketProtoUtils.java | 6 +- .../hbase/io/hfile/bucket/FileIOEngine.java | 8 +- .../io/hfile/bucket/TestBucketCache.java | 3 +- .../bucket/TestBucketCachePersister.java | 1 - .../hfile/bucket/TestPrefetchPersistence.java | 1 - .../bucket/TestVerifyBucketCacheFile.java | 76 +++++++++---------- 9 files changed, 64 insertions(+), 70 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 70722c2ea60d..adf657f9d5d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -78,7 +78,7 @@ public void run() { block.release(); } } - cacheConf.getBlockCache().ifPresent( bc -> { + cacheConf.getBlockCache().ifPresent(bc -> { if (bc instanceof BucketCache) { ((BucketCache) bc).fileCacheCompleted(path.getName()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index fd4c03a4383d..02fbc12e85c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + @InterfaceAudience.Private public final class PrefetchExecutor { @@ -125,6 +126,7 @@ public static boolean isCompleted(Path path) { } return true; } + private PrefetchExecutor() { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index aa85b17a3864..0698a64368b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -66,7 +67,6 @@ import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; import org.apache.hadoop.util.StringUtils; -import org.apache.hbase.thirdparty.com.google.protobuf.compiler.PluginProtos; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -319,7 +318,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); if (isCachePersistent()) { - if(ioEngine instanceof FileIOEngine) { + if (ioEngine instanceof FileIOEngine) { startBucketCachePersisterThread(); } try { @@ -379,8 +378,7 @@ protected void startWriterThreads() { } void startBucketCachePersisterThread() { - cachePersister = - new BucketCachePersister(this, bucketcachePersistInterval); + cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); cachePersister.setDaemon(true); cachePersister.start(); } @@ -505,9 +503,8 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach } LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); // Stuff the entry into the RAM cache so it can get drained to the persistent store - RAMQueueEntry re = - new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory, - isCachePersistent() && ioEngine instanceof FileIOEngine); + RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), + inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); /** * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same * key in ramCache, the heap size of bucket cache need to update if replacing entry from @@ -592,7 +589,7 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, return cachedBlock; } } catch (HBaseIOException hioex) { - //When using file io engine persistent cache, + // When using file io engine persistent cache, // the cache map state might differ from the actual cache. If we reach this block, // we should remove the cache key entry from the backing map backingMap.remove(key); @@ -1261,9 +1258,8 @@ static List getRAMQueueEntries(BlockingQueue q, */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", justification = "false positive, try-with-resources ensures close is called.") - void - persistToFile() throws IOException { - if(!isCachePersistent()) { + void persistToFile() throws IOException { + if (!isCachePersistent()) { throw new IOException("Attempt to persist non-persistent cache mappings!"); } File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); @@ -1352,7 +1348,7 @@ public void close() throws IOException { }.init(file); } - private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) + private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) throws IOException { if (capacitySize != cacheCapacity) { throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) @@ -1382,9 +1378,9 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio + "We need to validate each cache key in the backing map. This may take some time..."); long startTime = EnvironmentEdgeManager.currentTime(); int totalKeysOriginally = backingMap.size(); - for (Map.Entry keyEntry : backingMap.entrySet()){ + for (Map.Entry keyEntry : backingMap.entrySet()) { try { - ((FileIOEngine)ioEngine).checkCacheTime(keyEntry.getValue()); + ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); } catch (IOException e1) { LOG.debug("Check for key {} failed. Removing it from map.", keyEntry.getKey()); backingMap.remove(keyEntry.getKey()); @@ -1449,7 +1445,7 @@ public void shutdown() { LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" + persistencePath); if (ioEngine.isPersistent() && persistencePath != null) { - if(cachePersister != null) { + if (cachePersister != null) { cachePersister.interrupt(); } try { @@ -1593,7 +1589,7 @@ static class RAMQueueEntry { private boolean isCachePersistent; RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, - boolean isCachePersistent) { + boolean isCachePersistent) { this.key = bck; this.data = data; this.accessCounter = accessCounter; @@ -1629,7 +1625,8 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a return null; } if (isCachePersistent && data instanceof HFileBlock) { - len += Long.BYTES; //we need to record the cache time for consistency check in case of recovery + len += Long.BYTES; // we need to record the cache time for consistency check in case of + // recovery } long offset = alloc.allocateBlock(len); boolean succ = false; @@ -1644,7 +1641,7 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a ByteBuff sliceBuf = block.getBufferReadOnly(); block.getMetaData(metaBuff); ioEngine.write(sliceBuf, offset); - //adds the cache time after the block and metadata part + // adds the cache time after the block and metadata part if (isCachePersistent) { ioEngine.write(metaBuff, offset + len - metaBuff.limit() - Long.BYTES); ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 5c5a8683a42a..68984a053e53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -100,9 +100,9 @@ private static BucketCacheProtos.BlockType toPB(BlockType blockType) { private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) - .setCachedTime(entry.getCachedTime()) - .setLength(entry.getLength()).setDeserialiserIndex(entry.deserializerIndex) - .setAccessCounter(entry.getAccessCounter()).setPriority(toPB(entry.getPriority())).build(); + .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) + .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter()) + .setPriority(toPB(entry.getPriority())).build(); } private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 34acb2847e2e..38f9db04b6d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -153,8 +153,8 @@ public Cacheable read(BucketEntry be) throws IOException { long cachedNanoTime = dstBuff.getLong(); if (be.getCachedTime() != cachedNanoTime) { dstBuff.release(); - throw new HBaseIOException("The cached time recorded within the cached block differs " + - "from its bucket entry, so it might not be the same."); + throw new HBaseIOException("The cached time recorded within the cached block differs " + + "from its bucket entry, so it might not be the same."); } dstBuff.rewind(); dstBuff.limit(length - Long.BYTES); @@ -179,8 +179,8 @@ void checkCacheTime(BucketEntry be) throws IOException { long cachedNanoTime = dstBuff.getLong(); if (be.getCachedTime() != cachedNanoTime) { dstBuff.release(); - throw new HBaseIOException("The cached time recorded within the cached block differs " + - "from its bucket entry, so it might not be the same."); + throw new HBaseIOException("The cached time recorded within the cached block differs " + + "from its bucket entry, so it might not be the same."); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index c4573e111536..bc49321ea1ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -331,7 +331,8 @@ public void testRetrieveFromPMem() throws Exception { private void testRetrievalUtils(Path testDir, String ioEngineName) throws IOException, InterruptedException { - final String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + final String persistencePath = + testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index 2a7357f1f912..3e51564762a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 64e9fbf6fc7a..64fa08ad857f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index d928d46a7471..6fdea844aa32 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -36,8 +36,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; @@ -50,7 +50,6 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import javax.security.auth.login.ConfigurationSpi; /** * Basic test for check file's integrity before start BucketCache in fileIOEngine @@ -161,9 +160,9 @@ public void testRetrieveFromFileAfterDelete() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); - BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, - mapFileName, 60 * 1000, conf); + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -182,9 +181,9 @@ public void testRetrieveFromFileAfterDelete() throws Exception { assertTrue(mapFile.delete()); Thread.sleep(350); // Create BucketCache - bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, - mapFileName, 60 * 1000, conf); + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } @@ -238,17 +237,15 @@ public void testModifiedBucketCacheFileData() throws Exception { /** * Test whether BucketCache is started normally after modifying the cache file's last modified * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache - * to file. Then Restart BucketCache after modify cache file's last modified time. - * HBASE-XXXX has modified persistence cache such that now we store extra 8 bytes - * at the end of each block in the cache, representing the nanosecond time the block has been - * cached. So in the event the cache file has failed checksum verification during loading time, - * we go through all the cached blocks in the cache map and validate the cached time long between - * what is in the map and the cache file. If that check fails, we pull the cache key entry out of - * the map. - * Since in this test we are only modifying the access time to induce a checksum error, the cache - * file content is still valid and the extra verification should validate that all cache keys in - * the map are still recoverable from the cache. - * + * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has + * modified persistence cache such that now we store extra 8 bytes at the end of each block in the + * cache, representing the nanosecond time the block has been cached. So in the event the cache + * file has failed checksum verification during loading time, we go through all the cached blocks + * in the cache map and validate the cached time long between what is in the map and the cache + * file. If that check fails, we pull the cache key entry out of the map. Since in this test we + * are only modifying the access time to induce a checksum error, the cache file content is still + * valid and the extra verification should validate that all cache keys in the map are still + * recoverable from the cache. * @throws Exception the exception */ @Test @@ -289,16 +286,15 @@ public void testModifiedBucketCacheFileTime() throws Exception { assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); assertEquals(blockCount, bucketCache.backingMap.size()); - TEST_UTIL.cleanupTestDir(); } /** - * When using persistent bucket cache, there may be crashes between persisting the backing map - * and syncing new blocks to the cache file itself, leading to an inconsistent state between - * the cache keys and the cached data. This is to make sure the cache keys are updated - * accordingly, and the keys that are still valid do succeed in retrieve related block data from - * the cache without any corruption. + * When using persistent bucket cache, there may be crashes between persisting the backing map and + * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache + * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the + * keys that are still valid do succeed in retrieve related block data from the cache without any + * corruption. * @throws Exception the exception */ @Test @@ -307,13 +303,12 @@ public void testBucketCacheRecovery() throws Exception { Path testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); Configuration conf = HBaseConfiguration.create(); - //Disables the persister thread by setting its interval to MAX_VALUE + // Disables the persister thread by setting its interval to MAX_VALUE conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); - BucketCache bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, - mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); @@ -321,28 +316,29 @@ public void testBucketCacheRecovery() throws Exception { cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); - //saves the current state + // saves the current state bucketCache.persistToFile(); - //evicts first block + // evicts first block bucketCache.evictBlock(blocks[0].getBlockName()); - //now adds a fourth block to bucket cache + // now adds a fourth block to bucket cache cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); - //Creates new bucket cache instance without persisting to file after evicting first block + // Creates new bucket cache instance without persisting to file after evicting first block // and caching fourth block. So the bucket cache file has only the last three blocks, // but backing map (containing cache keys) was persisted when first three blocks // were in the cache. So the state on this recovery is: // - Backing map: [block0, block1, block2] // - Cache: [block1, block2, block3] // Therefore, this bucket cache would be able to recover only block1 and block2. - BucketCache newBucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, - mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); - assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); - assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); + assertEquals(blocks[1].getBlock(), + newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); + assertEquals(blocks[2].getBlock(), + newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); assertEquals(2, newBucketCache.backingMap.size()); TEST_UTIL.cleanupTestDir(); From ba548cac7297fc94bda4c5230f6b78dee6c99421 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 9 Aug 2023 16:08:46 +0100 Subject: [PATCH 03/12] UTs polishing Change-Id: I8a7888f28c91a866184e3820e3d6fc48855755f2 --- .../hbase/io/hfile/bucket/BucketCache.java | 11 +++ .../io/hfile/bucket/TestBucketCache.java | 70 +++++++++++++------ 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 0698a64368b9..17ae637f2eb7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1459,6 +1459,17 @@ public void shutdown() { } } + /** + * Needed mostly for UTs that might run in the same VM and create different BucketCache instances + * on different UT methods. + */ + @Override + public void finalize() { + if (cachePersister != null) { + cachePersister.interrupt(); + } + } + @Override public CacheStats getStats() { return cacheStats; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index bc49321ea1ff..28917aeece3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -299,12 +299,17 @@ public void testRetrieveFromFile() throws Exception { testRetrievalUtils(testDir, ioEngineName); int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; String persistencePath = testDir + "/bucket.persistence"; - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - smallBucketSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); - HBASE_TESTING_UTILITY.cleanupTestDir(); + BucketCache bucketCache = null; + try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + bucketCache.shutdown(); + HBASE_TESTING_UTILITY.cleanupTestDir(); + } } @Test @@ -321,21 +326,27 @@ public void testRetrieveFromPMem() throws Exception { testRetrievalUtils(testDir, ioEngineName); int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - smallBucketSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); - HBASE_TESTING_UTILITY.cleanupTestDir(); + BucketCache bucketCache = null; + try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + bucketCache.shutdown(); + HBASE_TESTING_UTILITY.cleanupTestDir(); + } } private void testRetrievalUtils(Path testDir, String ioEngineName) throws IOException, InterruptedException { final String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath); + BucketCache bucketCache = null; try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); @@ -355,7 +366,9 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) assertFalse(new File(persistencePath).exists()); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); } finally { - bucketCache.shutdown(); + if (bucketCache != null) { + bucketCache.shutdown(); + } } assertTrue(new File(persistencePath).exists()); } @@ -384,12 +397,17 @@ public void testRetrieveFromMultipleFiles() throws Exception { testRetrievalUtils(testDirInitial, ioEngineName); int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; String persistencePath = testDirInitial + "/bucket.persistence"; - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, - smallBucketSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); - HBASE_TESTING_UTILITY.cleanupTestDir(); + BucketCache bucketCache = null; + try { + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + bucketCache.shutdown(); + HBASE_TESTING_UTILITY.cleanupTestDir(); + } } @Test @@ -770,13 +788,14 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { */ @Test public void testFreeBucketEntryRestoredFromFile() throws Exception { + BucketCache bucketCache = null; try { final Path dataTestDir = createAndGetTestDir(); String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -811,19 +830,21 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } finally { + bucketCache.shutdown(); HBASE_TESTING_UTILITY.cleanupTestDir(); } } @Test public void testBlockAdditionWaitWhenCache() throws Exception { + BucketCache bucketCache = null; try { final Path dataTestDir = createAndGetTestDir(); String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; - BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, persistencePath); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -866,6 +887,9 @@ public void testBlockAdditionWaitWhenCache() throws Exception { assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } finally { + if (bucketCache != null) { + bucketCache.shutdown(); + } HBASE_TESTING_UTILITY.cleanupTestDir(); } } From 17b4f7fac03b5035f43da7c8744f4b18352a72f2 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 9 Aug 2023 17:07:02 +0100 Subject: [PATCH 04/12] additional spotbug issue not reported locally Change-Id: I8091c310c9a6583945170160c8daddd932627c00 --- .../org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 17ae637f2eb7..08dbfa3cddb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1464,7 +1464,7 @@ public void shutdown() { * on different UT methods. */ @Override - public void finalize() { + protected void finalize() { if (cachePersister != null) { cachePersister.interrupt(); } From 1dd8064a6f67f06489a1d80ed8a0a076599c2e60 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Thu, 10 Aug 2023 21:33:53 +0100 Subject: [PATCH 05/12] Addressing Ankit's review comments Change-Id: I9401c42d44134fce77c812414c88aa2e731e8616 --- .../org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java | 5 ++++- .../hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 08dbfa3cddb0..0bfc99a82a96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1267,7 +1267,10 @@ void persistToFile() throws IOException { fos.write(ProtobufMagic.PB_MAGIC); BucketProtoUtils.toPB(this).writeDelimitedTo(fos); } - tempPersistencePath.renameTo(new File(persistencePath)); + if (!tempPersistencePath.renameTo(new File(persistencePath))) { + LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " + + "RS crashes/restarts before we successfully checkpoint again."); + } } private boolean isCachePersistent() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java index b6e64e825771..495814fdc5fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java @@ -70,7 +70,6 @@ protected byte[] calculateChecksum(String algorithm) { sb.append(getFileSize(filePath)); sb.append(file.lastModified()); } - LOG.debug("Checksum for persistence cache: {}", sb); MessageDigest messageDigest = MessageDigest.getInstance(algorithm); messageDigest.update(Bytes.toBytes(sb.toString())); return messageDigest.digest(); From 58d2cd41341e30a1e8b51f7746bf9a9bdb97e4b1 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 14 Aug 2023 10:12:46 +0100 Subject: [PATCH 06/12] addressing Rahul's comments Change-Id: I3e0341c48d77f6a4f91dd12f9f74a69fc1c1ac33 --- .../org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java | 7 +++++-- .../apache/hadoop/hbase/io/hfile/bucket/BucketCache.java | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index adf657f9d5d5..69d8fbc55ba3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -79,8 +79,11 @@ public void run() { } } cacheConf.getBlockCache().ifPresent(bc -> { - if (bc instanceof BucketCache) { - ((BucketCache) bc).fileCacheCompleted(path.getName()); + if (bc instanceof CombinedBlockCache) { + BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); + if (l2 instanceof BucketCache) { + ((BucketCache) l2).fileCacheCompleted(path.getName()); + } } }); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 0bfc99a82a96..2aac95e8a571 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -627,7 +627,9 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre } public void fileCacheCompleted(String fileName) { - prefetchCompleted.put(fileName, true); + if (isCachePersistent()) { + prefetchCompleted.put(fileName, true); + } } /** From 55b2fa3ab46b37521c7ebb9c78473d511c2f3083 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 16 Aug 2023 13:17:12 +0100 Subject: [PATCH 07/12] avoid re-read already cached blocks from the cache when prefetching Change-Id: I5f9aba5b85053f5e7750f4a152d0d321126b39ed --- .../protobuf/server/io/BucketCacheEntry.proto | 1 + .../hbase/io/hfile/HFilePreadReader.java | 45 +++- .../hbase/io/hfile/bucket/BucketCache.java | 55 +++-- .../hbase/io/hfile/bucket/BucketEntry.java | 18 +- .../io/hfile/bucket/BucketProtoUtils.java | 6 +- .../io/hfile/TestPrefetchWithBucketCache.java | 199 ++++++++++++++++++ .../io/hfile/bucket/TestBucketCache.java | 2 +- .../bucket/TestBucketCachePersister.java | 4 +- .../hfile/bucket/TestByteBufferIOEngine.java | 2 +- .../hfile/bucket/TestPrefetchPersistence.java | 4 +- 10 files changed, 300 insertions(+), 36 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java diff --git a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto index d49ebc36f488..ae1980fe51e6 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto @@ -73,6 +73,7 @@ message BucketEntry { required int32 deserialiser_index = 4; required BlockPriority priority = 5; required int64 cachedTime = 6; + optional int32 disk_size_with_header = 7; } enum BlockPriority { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 69d8fbc55ba3..0a805624caed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; +import java.util.Optional; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,8 +39,13 @@ public class HFilePreadReader extends HFileReaderImpl { public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); + final MutableBoolean fileAlreadyCached = new MutableBoolean(false); + BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent( bc -> + fileAlreadyCached.setValue(bc.getFullyCachedFiles().get(path.getName())==null ? false : true) + ); // Prefetch file blocks upon open if requested - if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) { + if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() + && !fileAlreadyCached.booleanValue()) { PrefetchExecutor.request(path, new Runnable() { @Override public void run() { @@ -56,12 +64,34 @@ public void run() { if (LOG.isTraceEnabled()) { LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end)); } + Optional bucketCacheOptional = + BucketCache.getBuckedCacheFromCacheConfig(cacheConf); // Don't use BlockIterator here, because it's designed to read load-on-open section. long onDiskSizeOfNextBlock = -1; while (offset < end) { if (Thread.interrupted()) { break; } + //BucketCache can be persistent and resilient to restarts, so we check first if the + // block exists on its in-memory index, if so, we just update the offset and move on + // to the next block without actually going read all the way to the cache. + if (bucketCacheOptional.isPresent()){ + BucketCache cache = bucketCacheOptional.get(); + BlockCacheKey cacheKey = new BlockCacheKey(name, offset); + BucketEntry entry = cache.getBackingMap().get(cacheKey); + if (entry != null) { + cacheKey = new BlockCacheKey(name, offset); + entry = cache.getBackingMap().get(cacheKey); + if (entry == null) { + LOG.debug("No cache key {}, we'll read and cache it", cacheKey); + } else { + offset += entry.getOnDiskSizeWithHeader(); + LOG.debug("Found cache key {}. Skipping prefetch, the block is already cached.", + cacheKey); + continue; + } + } + } // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then // the internal-to-hfileblock thread local which holds the overread that gets the // next header, will not have happened...so, pass in the onDiskSize gotten from the @@ -78,20 +108,15 @@ public void run() { block.release(); } } - cacheConf.getBlockCache().ifPresent(bc -> { - if (bc instanceof CombinedBlockCache) { - BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); - if (l2 instanceof BucketCache) { - ((BucketCache) l2).fileCacheCompleted(path.getName()); - } - } - }); + BucketCache.getBuckedCacheFromCacheConfig(cacheConf). + ifPresent(bc -> bc.fileCacheCompleted(path.getName())); + } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e); } - } catch (Exception e) { + } catch (Throwable e) { // Other exceptions are interesting LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 2aac95e8a571..57099b8d4254 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Optional; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -62,9 +63,11 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -142,10 +145,12 @@ public class BucketCache implements BlockCache, HeapSize { // Store the block in this map before writing it to cache transient final RAMCache ramCache; + // In this map, store the block's meta data like offset, length - transient ConcurrentHashMap backingMap; + transient Map backingMap; + /** Set of files for which prefetch is completed */ - final Map prefetchCompleted = new ConcurrentHashMap<>(); + final Map fullyCachedFiles = new ConcurrentHashMap<>(); private BucketCachePersister cachePersister; @@ -324,6 +329,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck try { retrieveFromFile(bucketSizes); } catch (IOException ioex) { + backingMap.clear(); + fullyCachedFiles.clear(); LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); } } @@ -621,15 +628,13 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); } if (ioEngine.isPersistent()) { - prefetchCompleted.remove(cacheKey.getHfileName()); + fullyCachedFiles.remove(cacheKey.getHfileName()); setCacheInconsistent(true); } } public void fileCacheCompleted(String fileName) { - if (isCachePersistent()) { - prefetchCompleted.put(fileName, true); - } + fullyCachedFiles.put(fileName, true); } /** @@ -1372,8 +1377,8 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), this::createRecycler); - prefetchCompleted.clear(); - prefetchCompleted.putAll(proto.getPrefetchedFilesMap()); + fullyCachedFiles.clear(); + fullyCachedFiles.putAll(proto.getPrefetchedFilesMap()); if (proto.hasChecksum()) { try { ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), @@ -1389,7 +1394,7 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio } catch (IOException e1) { LOG.debug("Check for key {} failed. Removing it from map.", keyEntry.getKey()); backingMap.remove(keyEntry.getKey()); - prefetchCompleted.remove(keyEntry.getKey().getHfileName()); + fullyCachedFiles.remove(keyEntry.getKey().getHfileName()); } } LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", @@ -1436,6 +1441,7 @@ private void disableCache() { if (!ioEngine.isPersistent() || persistencePath == null) { // If persistent ioengine and a path, we will serialize out the backingMap. this.backingMap.clear(); + this.fullyCachedFiles.clear(); } } @@ -1531,7 +1537,7 @@ protected String getAlgorithm() { */ @Override public int evictBlocksByHfileName(String hfileName) { - this.prefetchCompleted.remove(hfileName); + this.fullyCachedFiles.remove(hfileName); Set keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); @@ -1648,8 +1654,10 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a boolean succ = false; BucketEntry bucketEntry = null; try { - bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler, - getByteBuffAllocator()); + int diskSizeWithHeader = (data instanceof HFileBlock) ? + ((HFileBlock)data).getOnDiskSizeWithHeader() : data.getSerializedLength(); + bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, + createRecycler, getByteBuffAllocator()); bucketEntry.setDeserializerReference(data.getDeserializer()); if (data instanceof HFileBlock) { // If an instance of HFileBlock, save on some allocations. @@ -1903,4 +1911,27 @@ public void clear() { } } } + public Map getBackingMap() { + return backingMap; + } + + public Map getFullyCachedFiles() { + return fullyCachedFiles; + } + + public static Optional getBuckedCacheFromCacheConfig(CacheConfig cacheConf){ + if (cacheConf.getBlockCache().isPresent()) { + BlockCache bc = cacheConf.getBlockCache().get(); + if (bc instanceof CombinedBlockCache) { + BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); + if (l2 instanceof BucketCache) { + return Optional.of((BucketCache) l2); + } + } else if (bc instanceof BucketCache) { + return Optional.of((BucketCache) bc); + } + } + return Optional.empty(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index 730ea0ed7816..14a77fe94356 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -43,13 +43,15 @@ * bytes gives us 256TB or so. */ @InterfaceAudience.Private -class BucketEntry implements HBaseReferenceCounted { +public class BucketEntry implements HBaseReferenceCounted { // access counter comparator, descending order static final Comparator COMPARATOR = Comparator.comparingLong(BucketEntry::getAccessCounter).reversed(); private int offsetBase; private int length; + + private int onDiskSizeWithHeader; private byte offset1; /** @@ -98,23 +100,23 @@ class BucketEntry implements HBaseReferenceCounted { * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used * for test. */ - BucketEntry(long offset, int length, long accessCounter, boolean inMemory, + BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, boolean inMemory, Function createRecycler, ByteBuffAllocator allocator) { - this(offset, length, accessCounter, System.nanoTime(), inMemory, createRecycler, allocator); + this(offset, length, onDiskSizeWithHeader, accessCounter, System.nanoTime(), inMemory, createRecycler, allocator); } - BucketEntry(long offset, int length, long accessCounter, long cachedTime, boolean inMemory, + BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, long cachedTime, boolean inMemory, Function createRecycler, ByteBuffAllocator allocator) { if (createRecycler == null) { throw new IllegalArgumentException("createRecycler could not be null!"); } setOffset(offset); this.length = length; + this.onDiskSizeWithHeader = onDiskSizeWithHeader; this.accessCounter = accessCounter; this.cachedTime = cachedTime; this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; this.refCnt = RefCnt.create(createRecycler.apply(this)); - this.markedAsEvicted = new AtomicBoolean(false); this.allocator = allocator; } @@ -165,10 +167,14 @@ public BlockPriority getPriority() { return this.priority; } - long getCachedTime() { + public long getCachedTime() { return cachedTime; } + public int getOnDiskSizeWithHeader() { + return onDiskSizeWithHeader; + } + /** * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 68984a053e53..8830e5d3255a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -45,7 +45,7 @@ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) - .putAllPrefetchedFiles(cache.prefetchCompleted) + .putAllPrefetchedFiles(cache.fullyCachedFiles) .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) @@ -101,6 +101,7 @@ private static BucketCacheProtos.BlockType toPB(BlockType blockType) { private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) + .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader()) .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter()) .setPriority(toPB(entry.getPriority())).build(); } @@ -130,7 +131,8 @@ static ConcurrentHashMap fromPB(Map // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator // which created by RpcServer elegantly. BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(), - protoValue.getAccessCounter(), protoValue.getCachedTime(), + protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(), + protoValue.getCachedTime(), protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler, ByteBuffAllocator.HEAP); // This is the deserializer that we stored diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java new file mode 100644 index 000000000000..7e26d425488e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({ IOTests.class, MediumTests.class }) +public class TestPrefetchWithBucketCache { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class); + + @Rule + public TestName name = new TestName(); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; + private static final int DATA_BLOCK_SIZE = 2048; + private static final int NUM_KV = 100; + + private Configuration conf; + private CacheConfig cacheConf; + private FileSystem fs; + private BlockCache blockCache; + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + fs = HFileSystem.get(conf); + File testDir = new File(name.getMethodName()); + testDir.mkdir(); + conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); + conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); + blockCache = BlockCacheFactory.createBlockCache(conf); + cacheConf = new CacheConfig(conf, blockCache); + } + + @After + public void tearDown() { + File cacheFile = new File(name.getMethodName() + "/bucket.cache"); + File dir = new File(name.getMethodName()); + cacheFile.delete(); + dir.delete(); + } + + @Test + public void testPrefetchDoesntOverwork() throws Exception { + Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork"); + //Prefetches the file blocks + readStoreFile(storeFile); + BucketCache bc = BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get(); + Map snapshot = ImmutableMap.copyOf(bc.getBackingMap()); + //Reads file again and check we are not prefetching it again + readStoreFile(storeFile); + //Makes sure the cache hasn't changed + snapshot.entrySet().forEach( e -> { + BucketEntry entry = bc.getBackingMap().get(e.getKey()); + assertNotNull(entry); + assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); + }); + //forcibly removes first block from the bc backing map, in order to cause it to be cached again + BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); + bc.getBackingMap().remove(key); + bc.getFullyCachedFiles().remove(storeFile.getName()); + assertTrue(snapshot.size() > bc.getBackingMap().size()); + readStoreFile(storeFile); + assertEquals(snapshot.size(), bc.getBackingMap().size()); + assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); + } + + private void readStoreFile(Path storeFilePath) throws Exception { + readStoreFile(storeFilePath, (r, o) -> { + HFileBlock block = null; + try { + block = r.readBlock(o, -1, false, true, false, true, null, null); + } catch (IOException e) { + fail(e.getMessage()); + } + return block; + }, (key, block) -> { + boolean isCached = blockCache.getBlock(key, true, false, true) != null; + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(isCached); + } + }); + } + + private void readStoreFile(Path storeFilePath, + BiFunction readFunction, + BiConsumer validationFunction) throws Exception { + // Open the file + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); + + while (!reader.prefetchComplete()) { + // Sleep for a bit + Thread.sleep(1000); + } + long offset = 0; + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + HFileBlock block = readFunction.apply(reader, offset); + BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); + validationFunction.accept(blockCacheKey, block); + offset += block.getOnDiskSizeWithHeader(); + } + } + + private Path writeStoreFile(String fname) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + return writeStoreFile(fname, meta); + } + + private Path writeStoreFile(String fname, HFileContext context) throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withOutputDir(storeFileParentDir).withFileContext(context).build(); + Random rand = ThreadLocalRandom.current(); + final int rowLen = 32; + for (int i = 0; i < NUM_KV; ++i) { + byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); + byte[] v = RandomKeyValueUtil.randomValue(rand); + int cfLen = rand.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, + k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); + sfw.append(kv); + } + + sfw.close(); + return sfw.getPath(); + } + + public static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { + throw new RuntimeException("Generated an invalid key type: " + keyType + ". " + + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 28917aeece3c..0cbafedc7c53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -592,7 +592,7 @@ public void testOffsetProducesPositiveOutput() { // This number is picked because it produces negative output if the values isn't ensured to be // positive. See HBASE-18757 for more information. long testValue = 549888460800L; - BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true, (entry) -> { + BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { return ByteBuffAllocator.NONE; }, ByteBuffAllocator.HEAP); assertEquals(testValue, bucketEntry.offset()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index 3e51564762a8..bd69f28e1eac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -139,10 +139,10 @@ public void testPrefetchListUponBlockEviction() throws Exception { readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1); Thread.sleep(500); // Evict Blocks from cache + assertTrue(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName())); BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey(); - assertTrue(bucketCache1.prefetchCompleted.containsKey(storeFile.getName())); bucketCache1.evictBlock(bucketCacheKey); - assertFalse(bucketCache1.prefetchCompleted.containsKey(storeFile.getName())); + assertFalse(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName())); } public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index 820e91aa6e81..b42e7be804db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -49,7 +49,7 @@ private static class MockBucketEntry extends BucketEntry { private long off; MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { - super(offset & 0xFF00, length, 0, false, (entry) -> { + super(offset & 0xFF00, length, length, 0, false, (entry) -> { return ByteBuffAllocator.NONE; }, allocator); this.off = offset; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 64fa08ad857f..f15874bc61c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -145,9 +145,9 @@ public void testPrefetchPersistence() throws Exception { public void closeStoreFile(Path path) throws Exception { HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); - assertTrue(bucketCache.prefetchCompleted.containsKey(path.getName())); + assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName())); reader.close(true); - assertFalse(bucketCache.prefetchCompleted.containsKey(path.getName())); + assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName())); } public void readStoreFile(Path storeFilePath, long offset) throws Exception { From bb73be5e64de5b22a9f4b2c64bdf05415808ae87 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Fri, 18 Aug 2023 15:55:09 +0100 Subject: [PATCH 08/12] spotless fixes Change-Id: Ica5aca33034bf391f1d76ac904449bcfdafed393 --- .../hbase/io/hfile/HFilePreadReader.java | 19 +++++---- .../hbase/io/hfile/bucket/BucketCache.java | 8 ++-- .../hbase/io/hfile/bucket/BucketEntry.java | 12 +++--- .../io/hfile/TestPrefetchWithBucketCache.java | 41 ++++++++++--------- 4 files changed, 43 insertions(+), 37 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 0a805624caed..fc35dc5876b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -40,12 +40,13 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); final MutableBoolean fileAlreadyCached = new MutableBoolean(false); - BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent( bc -> - fileAlreadyCached.setValue(bc.getFullyCachedFiles().get(path.getName())==null ? false : true) - ); + BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc -> fileAlreadyCached + .setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : true)); // Prefetch file blocks upon open if requested - if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() - && !fileAlreadyCached.booleanValue()) { + if ( + cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() + && !fileAlreadyCached.booleanValue() + ) { PrefetchExecutor.request(path, new Runnable() { @Override public void run() { @@ -72,10 +73,10 @@ public void run() { if (Thread.interrupted()) { break; } - //BucketCache can be persistent and resilient to restarts, so we check first if the + // BucketCache can be persistent and resilient to restarts, so we check first if the // block exists on its in-memory index, if so, we just update the offset and move on // to the next block without actually going read all the way to the cache. - if (bucketCacheOptional.isPresent()){ + if (bucketCacheOptional.isPresent()) { BucketCache cache = bucketCacheOptional.get(); BlockCacheKey cacheKey = new BlockCacheKey(name, offset); BucketEntry entry = cache.getBackingMap().get(cacheKey); @@ -108,8 +109,8 @@ public void run() { block.release(); } } - BucketCache.getBuckedCacheFromCacheConfig(cacheConf). - ifPresent(bc -> bc.fileCacheCompleted(path.getName())); + BucketCache.getBuckedCacheFromCacheConfig(cacheConf) + .ifPresent(bc -> bc.fileCacheCompleted(path.getName())); } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 57099b8d4254..797bbcef9cc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1654,8 +1654,9 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a boolean succ = false; BucketEntry bucketEntry = null; try { - int diskSizeWithHeader = (data instanceof HFileBlock) ? - ((HFileBlock)data).getOnDiskSizeWithHeader() : data.getSerializedLength(); + int diskSizeWithHeader = (data instanceof HFileBlock) + ? ((HFileBlock) data).getOnDiskSizeWithHeader() + : data.getSerializedLength(); bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, createRecycler, getByteBuffAllocator()); bucketEntry.setDeserializerReference(data.getDeserializer()); @@ -1911,6 +1912,7 @@ public void clear() { } } } + public Map getBackingMap() { return backingMap; } @@ -1919,7 +1921,7 @@ public Map getFullyCachedFiles() { return fullyCachedFiles; } - public static Optional getBuckedCacheFromCacheConfig(CacheConfig cacheConf){ + public static Optional getBuckedCacheFromCacheConfig(CacheConfig cacheConf) { if (cacheConf.getBlockCache().isPresent()) { BlockCache bc = cacheConf.getBlockCache().get(); if (bc instanceof CombinedBlockCache) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index 14a77fe94356..68620151400d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -100,13 +100,15 @@ public class BucketEntry implements HBaseReferenceCounted { * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used * for test. */ - BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, boolean inMemory, - Function createRecycler, ByteBuffAllocator allocator) { - this(offset, length, onDiskSizeWithHeader, accessCounter, System.nanoTime(), inMemory, createRecycler, allocator); + BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, + boolean inMemory, Function createRecycler, ByteBuffAllocator allocator) { + this(offset, length, onDiskSizeWithHeader, accessCounter, System.nanoTime(), inMemory, + createRecycler, allocator); } - BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, long cachedTime, boolean inMemory, - Function createRecycler, ByteBuffAllocator allocator) { + BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, + long cachedTime, boolean inMemory, Function createRecycler, + ByteBuffAllocator allocator) { if (createRecycler == null) { throw new IllegalArgumentException("createRecycler could not be null!"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index 7e26d425488e..829dac81dda3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -17,8 +17,21 @@ */ package org.apache.hadoop.hbase.io.hfile; -import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -30,7 +43,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -38,19 +50,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; -import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @Category({ IOTests.class, MediumTests.class }) public class TestPrefetchWithBucketCache { @@ -97,19 +98,19 @@ public void tearDown() { @Test public void testPrefetchDoesntOverwork() throws Exception { Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork"); - //Prefetches the file blocks + // Prefetches the file blocks readStoreFile(storeFile); BucketCache bc = BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get(); Map snapshot = ImmutableMap.copyOf(bc.getBackingMap()); - //Reads file again and check we are not prefetching it again + // Reads file again and check we are not prefetching it again readStoreFile(storeFile); - //Makes sure the cache hasn't changed - snapshot.entrySet().forEach( e -> { + // Makes sure the cache hasn't changed + snapshot.entrySet().forEach(e -> { BucketEntry entry = bc.getBackingMap().get(e.getKey()); assertNotNull(entry); assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); }); - //forcibly removes first block from the bc backing map, in order to cause it to be cached again + // forcibly removes first block from the bc backing map, in order to cause it to be cached again BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); bc.getBackingMap().remove(key); bc.getFullyCachedFiles().remove(storeFile.getName()); From f5878ccc99b020fdf962d94331c1b54399642382 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 21 Aug 2023 11:04:21 +0100 Subject: [PATCH 09/12] adding extra logging to debug failing UT, as I can't reproduce it locally Change-Id: If06d2a1e67d90f9ab7aaaa3b93ecaf951015f1d7 --- .../hadoop/hbase/io/hfile/HFilePreadReader.java | 2 ++ .../hbase/io/hfile/TestPrefetchWithBucketCache.java | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index fc35dc5876b9..f9c0ae592424 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -91,6 +91,8 @@ public void run() { cacheKey); continue; } + } else { + LOG.debug("No entry in the backing map for cache key {}", cacheKey); } } // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index 829dac81dda3..637c563dd23c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; @@ -52,10 +53,14 @@ import org.junit.rules.TestName; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Category({ IOTests.class, MediumTests.class }) public class TestPrefetchWithBucketCache { + private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class); @@ -99,10 +104,12 @@ public void tearDown() { public void testPrefetchDoesntOverwork() throws Exception { Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork"); // Prefetches the file blocks + LOG.debug("First read should prefetch the blocks."); readStoreFile(storeFile); BucketCache bc = BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get(); Map snapshot = ImmutableMap.copyOf(bc.getBackingMap()); // Reads file again and check we are not prefetching it again + LOG.debug("Second read, no prefetch should happen here."); readStoreFile(storeFile); // Makes sure the cache hasn't changed snapshot.entrySet().forEach(e -> { @@ -112,11 +119,13 @@ public void testPrefetchDoesntOverwork() throws Exception { }); // forcibly removes first block from the bc backing map, in order to cause it to be cached again BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); + LOG.debug("removing block {}", key); bc.getBackingMap().remove(key); bc.getFullyCachedFiles().remove(storeFile.getName()); assertTrue(snapshot.size() > bc.getBackingMap().size()); + LOG.debug("Third read should prefetch again, as we removed one block for the file."); readStoreFile(storeFile); - assertEquals(snapshot.size(), bc.getBackingMap().size()); + Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); } From 9f669c8e9ea617c99acdd9ae8f59fb22ba4cf272 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 21 Aug 2023 17:07:19 +0100 Subject: [PATCH 10/12] spotless and additional checks Change-Id: Idbc28136f6744ae9026b0899c4393bfa78123d01 --- .../hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index 637c563dd23c..e4330308243d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -51,11 +51,11 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; - -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + @Category({ IOTests.class, MediumTests.class }) public class TestPrefetchWithBucketCache { @@ -107,6 +107,8 @@ public void testPrefetchDoesntOverwork() throws Exception { LOG.debug("First read should prefetch the blocks."); readStoreFile(storeFile); BucketCache bc = BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get(); + // Our file should have 6 DATA blocks. We should wait for all of them to be cached + Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); Map snapshot = ImmutableMap.copyOf(bc.getBackingMap()); // Reads file again and check we are not prefetching it again LOG.debug("Second read, no prefetch should happen here."); From 784869cb813dc40ead3bdc72eb2964ba3fb1c905 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 22 Aug 2023 10:24:32 +0100 Subject: [PATCH 11/12] checkstyles and javadoc fixes Change-Id: I88408fc035c8aa267020a6d36a4f0854599751a4 --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 2 +- .../hadoop/hbase/io/hfile/bucket/BucketEntry.java | 2 +- .../io/hfile/TestPrefetchWithBucketCache.java | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 797bbcef9cc1..bc5e7e7c9b9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1476,7 +1476,7 @@ public void shutdown() { */ @Override protected void finalize() { - if (cachePersister != null) { + if (cachePersister != null && !cachePersister.isInterrupted()) { cachePersister.interrupt(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index 68620151400d..c93dac8a572b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -253,7 +253,7 @@ public BucketEntry retain() { * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the * area in bucketAllocator.
* 3.evict those block without any rpc reference if cache size exceeded. we'll only free those - * blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do. + * blocks with zero rpc reference count. * @return true to indicate we've decreased to zero and do the de-allocation. */ @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index e4330308243d..baaa65faa579 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -142,13 +142,13 @@ private void readStoreFile(Path storeFilePath) throws Exception { return block; }, (key, block) -> { boolean isCached = blockCache.getBlock(key, true, false, true) != null; - if ( - block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX - || block.getBlockType() == BlockType.INTERMEDIATE_INDEX - ) { - assertTrue(isCached); - } - }); + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(isCached); + } + }); } private void readStoreFile(Path storeFilePath, From e6e4162c3d974f803f99e6948416621a374ccd37 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 22 Aug 2023 11:38:48 +0100 Subject: [PATCH 12/12] spotless Change-Id: If9af5bfe3e9144217917c6bae4678420410be3c4 --- .../io/hfile/TestPrefetchWithBucketCache.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index baaa65faa579..e4330308243d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -142,13 +142,13 @@ private void readStoreFile(Path storeFilePath) throws Exception { return block; }, (key, block) -> { boolean isCached = blockCache.getBlock(key, true, false, true) != null; - if ( - block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX - || block.getBlockType() == BlockType.INTERMEDIATE_INDEX - ) { - assertTrue(isCached); - } - }); + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(isCached); + } + }); } private void readStoreFile(Path storeFilePath,