Skip to content

Commit a1fc483

Browse files
authored
HBASE-27313: Persist list of Hfiles names for which prefetch is done (#4771)
Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent e298782 commit a1fc483

File tree

8 files changed

+527
-1
lines changed

8 files changed

+527
-1
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
syntax = "proto2";
19+
20+
package hbase.pb;
21+
22+
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
23+
option java_outer_classname = "PersistentPrefetchProtos";
24+
option java_generic_services = true;
25+
option java_generate_equals_and_hash = true;
26+
option optimize_for = SPEED;
27+
28+
29+
message PrefetchedHfileName {
30+
map<string, bool> prefetched_files = 1;
31+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable {
2929
private static final long serialVersionUID = -5199992013113130534L;
3030
private final String hfileName;
3131
private final long offset;
32-
private final BlockType blockType;
32+
private BlockType blockType;
3333
private final boolean isPrimaryReplicaBlock;
3434

3535
/**
@@ -98,4 +98,8 @@ public long getOffset() {
9898
public BlockType getBlockType() {
9999
return blockType;
100100
}
101+
102+
public void setBlockType(BlockType blockType) {
103+
this.blockType = blockType;
104+
}
101105
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public class CacheConfig {
9393
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
9494
"hbase.hfile.drop.behind.compaction";
9595

96+
public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file-list.path";
97+
9698
// Defaults
9799
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
98100
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile;
1919

20+
import java.io.File;
21+
import java.io.FileInputStream;
22+
import java.io.FileOutputStream;
23+
import java.io.IOException;
24+
import java.util.HashMap;
2025
import java.util.Map;
2126
import java.util.concurrent.ConcurrentSkipListMap;
2227
import java.util.concurrent.Future;
@@ -37,19 +42,25 @@
3742
import org.slf4j.Logger;
3843
import org.slf4j.LoggerFactory;
3944

45+
import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
46+
4047
@InterfaceAudience.Private
4148
public final class PrefetchExecutor {
4249

4350
private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
4451

4552
/** Futures for tracking block prefetch activity */
4653
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
54+
/** Set of files for which prefetch is completed */
55+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
56+
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
4757
/** Executor pool shared among all HFiles for block prefetch */
4858
private static final ScheduledExecutorService prefetchExecutorPool;
4959
/** Delay before beginning prefetch */
5060
private static final int prefetchDelayMillis;
5161
/** Variation in prefetch delay times, to mitigate stampedes */
5262
private static final float prefetchDelayVariation;
63+
static String prefetchedFileListPath;
5364
static {
5465
// Consider doing this on demand with a configuration passed in rather
5566
// than in a static initializer.
@@ -79,6 +90,13 @@ public Thread newThread(Runnable r) {
7990
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
8091

8192
public static void request(Path path, Runnable runnable) {
93+
if (prefetchCompleted != null) {
94+
if (isFilePrefetched(path.getName())) {
95+
LOG.info(
96+
"File has already been prefetched before the restart, so skipping prefetch : " + path);
97+
return;
98+
}
99+
}
82100
if (!prefetchPathExclude.matcher(path.toString()).find()) {
83101
long delay;
84102
if (prefetchDelayMillis > 0) {
@@ -104,6 +122,7 @@ public static void request(Path path, Runnable runnable) {
104122

105123
public static void complete(Path path) {
106124
prefetchFutures.remove(path);
125+
prefetchCompleted.put(path.getName(), true);
107126
LOG.debug("Prefetch completed for {}", path);
108127
}
109128

@@ -115,6 +134,7 @@ public static void cancel(Path path) {
115134
prefetchFutures.remove(path);
116135
LOG.debug("Prefetch cancelled for {}", path);
117136
}
137+
prefetchCompleted.remove(path.getName());
118138
}
119139

120140
public static boolean isCompleted(Path path) {
@@ -125,6 +145,68 @@ public static boolean isCompleted(Path path) {
125145
return true;
126146
}
127147

148+
public static void persistToFile(String path) throws IOException {
149+
prefetchedFileListPath = path;
150+
if (prefetchedFileListPath == null) {
151+
LOG.info("Exception while persisting prefetch!");
152+
throw new IOException("Error persisting prefetched HFiles set!");
153+
}
154+
if (!prefetchCompleted.isEmpty()) {
155+
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
156+
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
157+
}
158+
}
159+
}
160+
161+
public static void retrieveFromFile(String path) throws IOException {
162+
prefetchedFileListPath = path;
163+
File prefetchPersistenceFile = new File(prefetchedFileListPath);
164+
if (!prefetchPersistenceFile.exists()) {
165+
LOG.warn("Prefetch persistence file does not exist!");
166+
return;
167+
}
168+
LOG.info("Retrieving from prefetch persistence file " + path);
169+
assert (prefetchedFileListPath != null);
170+
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
171+
PersistentPrefetchProtos.PrefetchedHfileName proto =
172+
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
173+
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
174+
prefetchCompleted.putAll(protoPrefetchedFilesMap);
175+
}
176+
}
177+
178+
private static FileInputStream deleteFileOnClose(final File file) throws IOException {
179+
return new FileInputStream(file) {
180+
private File myFile;
181+
182+
private FileInputStream init(File file) {
183+
myFile = file;
184+
return this;
185+
}
186+
187+
@Override
188+
public void close() throws IOException {
189+
if (myFile == null) {
190+
return;
191+
}
192+
193+
super.close();
194+
if (!myFile.delete()) {
195+
throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
196+
}
197+
myFile = null;
198+
}
199+
}.init(file);
200+
}
201+
202+
public static void removePrefetchedFileWhileEvict(String hfileName) {
203+
prefetchCompleted.remove(hfileName);
204+
}
205+
206+
public static boolean isFilePrefetched(String hfileName) {
207+
return prefetchCompleted.containsKey(hfileName);
208+
}
209+
128210
private PrefetchExecutor() {
129211
}
130212
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io.hfile;
19+
20+
import java.util.Map;
21+
22+
import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
23+
24+
final class PrefetchProtoUtils {
25+
private PrefetchProtoUtils() {
26+
}
27+
28+
static PersistentPrefetchProtos.PrefetchedHfileName
29+
toPB(Map<String, Boolean> prefetchedHfileNames) {
30+
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
31+
.putAllPrefetchedFiles(prefetchedHfileNames).build();
32+
}
33+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile.bucket;
1919

20+
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
21+
2022
import java.io.File;
2123
import java.io.FileInputStream;
2224
import java.io.FileOutputStream;
@@ -65,6 +67,7 @@
6567
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
6668
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
6769
import org.apache.hadoop.hbase.io.hfile.HFileContext;
70+
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
6871
import org.apache.hadoop.hbase.nio.ByteBuff;
6972
import org.apache.hadoop.hbase.nio.RefCnt;
7073
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@@ -235,6 +238,8 @@ public class BucketCache implements BlockCache, HeapSize {
235238
/** In-memory bucket size */
236239
private float memoryFactor;
237240

241+
private String prefetchedFileListPath;
242+
238243
private static final String FILE_VERIFY_ALGORITHM =
239244
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
240245
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
@@ -273,6 +278,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
273278
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
274279
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
275280
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
281+
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
276282

277283
sanityCheckConfigs();
278284

@@ -452,6 +458,9 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
452458
if (!cacheEnabled) {
453459
return;
454460
}
461+
if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
462+
cacheKey.setBlockType(cachedItem.getBlockType());
463+
}
455464
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
456465
// Stuff the entry into the RAM cache so it can get drained to the persistent store
457466
RAMQueueEntry re =
@@ -1187,6 +1196,9 @@ private void persistToFile() throws IOException {
11871196
fos.write(ProtobufMagic.PB_MAGIC);
11881197
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
11891198
}
1199+
if (prefetchedFileListPath != null) {
1200+
PrefetchExecutor.persistToFile(prefetchedFileListPath);
1201+
}
11901202
}
11911203

11921204
/**
@@ -1198,6 +1210,9 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
11981210
return;
11991211
}
12001212
assert !cacheEnabled;
1213+
if (prefetchedFileListPath != null) {
1214+
PrefetchExecutor.retrieveFromFile(prefetchedFileListPath);
1215+
}
12011216

12021217
try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
12031218
int pblen = ProtobufMagic.lengthOfPBMagic();
@@ -1402,6 +1417,7 @@ protected String getAlgorithm() {
14021417
*/
14031418
@Override
14041419
public int evictBlocksByHfileName(String hfileName) {
1420+
PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName);
14051421
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
14061422
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
14071423

0 commit comments

Comments
 (0)