Skip to content

Commit 399ad71

Browse files
committed
HADOOP-18399 Prefetch - SingleFilePerBlockCache to use LocalDirAllocator for file allocation
1 parent 87429f4 commit 399ad71

File tree

12 files changed

+184
-60
lines changed

12 files changed

+184
-60
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockCache.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
2525

26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.LocalDirAllocator;
28+
2629
/**
2730
* Provides functionality necessary for caching blocks of data read from FileSystem.
2831
*/
@@ -64,7 +67,10 @@ public interface BlockCache extends Closeable {
6467
*
6568
* @param blockNumber the id of the given block.
6669
* @param buffer contents of the given block to be added to this cache.
70+
* @param conf the configuration.
71+
* @param localDirAllocator the local dir allocator instance.
6772
* @throws IOException if there is an error writing the given block.
6873
*/
69-
void put(int blockNumber, ByteBuffer buffer) throws IOException;
74+
void put(int blockNumber, ByteBuffer buffer, Configuration conf,
75+
LocalDirAllocator localDirAllocator) throws IOException;
7076
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36+
import org.apache.hadoop.conf.Configuration;
37+
import org.apache.hadoop.fs.LocalDirAllocator;
3638
import org.apache.hadoop.fs.statistics.DurationTracker;
3739

3840
import static java.util.Objects.requireNonNull;
@@ -95,21 +97,28 @@ public abstract class CachingBlockManager extends BlockManager {
9597

9698
private final PrefetchingStatistics prefetchingStatistics;
9799

100+
private final Configuration conf;
101+
102+
private final LocalDirAllocator localDirAllocator;
103+
98104
/**
99105
* Constructs an instance of a {@code CachingBlockManager}.
100106
*
101107
* @param futurePool asynchronous tasks are performed in this pool.
102108
* @param blockData information about each block of the underlying file.
103109
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
104110
* @param prefetchingStatistics statistics for this stream.
105-
*
111+
* @param conf the configuration.
112+
* @param localDirAllocator the local dir allocator instance.
106113
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
107114
*/
108115
public CachingBlockManager(
109116
ExecutorServiceFuturePool futurePool,
110117
BlockData blockData,
111118
int bufferPoolSize,
112-
PrefetchingStatistics prefetchingStatistics) {
119+
PrefetchingStatistics prefetchingStatistics,
120+
Configuration conf,
121+
LocalDirAllocator localDirAllocator) {
113122
super(blockData);
114123

115124
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -129,6 +138,8 @@ public CachingBlockManager(
129138

130139
this.ops = new BlockOperations();
131140
this.ops.setDebug(false);
141+
this.conf = requireNonNull(conf);
142+
this.localDirAllocator = localDirAllocator;
132143
}
133144

134145
/**
@@ -465,7 +476,8 @@ public void requestCaching(BufferData data) {
465476
blockFuture = cf;
466477
}
467478

468-
CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
479+
CachePutTask task =
480+
new CachePutTask(data, blockFuture, this, Instant.now());
469481
Future<Void> actionFuture = futurePool.executeFunction(task);
470482
data.setCaching(actionFuture);
471483
ops.end(op);
@@ -550,7 +562,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
550562
return;
551563
}
552564

553-
cache.put(blockNumber, buffer);
565+
cache.put(blockNumber, buffer, conf, localDirAllocator);
554566
}
555567

556568
private static class CachePutTask implements Supplier<Void> {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@
2727
import java.nio.file.Files;
2828
import java.nio.file.OpenOption;
2929
import java.nio.file.Path;
30+
import java.nio.file.Paths;
3031
import java.nio.file.StandardOpenOption;
31-
import java.nio.file.attribute.FileAttribute;
3232
import java.nio.file.attribute.PosixFilePermission;
33-
import java.nio.file.attribute.PosixFilePermissions;
3433
import java.util.ArrayList;
3534
import java.util.Collections;
3635
import java.util.EnumSet;
@@ -39,9 +38,13 @@
3938
import java.util.Set;
4039
import java.util.concurrent.ConcurrentHashMap;
4140

41+
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
4242
import org.slf4j.Logger;
4343
import org.slf4j.LoggerFactory;
4444

45+
import org.apache.hadoop.conf.Configuration;
46+
import org.apache.hadoop.fs.LocalDirAllocator;
47+
4548
import static java.util.Objects.requireNonNull;
4649
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
4750

@@ -176,7 +179,8 @@ private Entry getEntry(int blockNumber) {
176179
* @throws IllegalArgumentException if buffer.limit() is zero or negative.
177180
*/
178181
@Override
179-
public void put(int blockNumber, ByteBuffer buffer) throws IOException {
182+
public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
183+
LocalDirAllocator localDirAllocator) throws IOException {
180184
if (closed) {
181185
return;
182186
}
@@ -191,7 +195,7 @@ public void put(int blockNumber, ByteBuffer buffer) throws IOException {
191195

192196
Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
193197

194-
Path blockFilePath = getCacheFilePath();
198+
Path blockFilePath = getCacheFilePath(conf, localDirAllocator);
195199
long size = Files.size(blockFilePath);
196200
if (size != 0) {
197201
String message =
@@ -221,8 +225,10 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
221225
writeChannel.close();
222226
}
223227

224-
protected Path getCacheFilePath() throws IOException {
225-
return getTempFilePath();
228+
protected Path getCacheFilePath(final Configuration conf,
229+
final LocalDirAllocator localDirAllocator)
230+
throws IOException {
231+
return getTempFilePath(conf, localDirAllocator);
226232
}
227233

228234
@Override
@@ -323,9 +329,10 @@ private String getStats() {
323329

324330
private static final String CACHE_FILE_PREFIX = "fs-cache-";
325331

326-
public static boolean isCacheSpaceAvailable(long fileSize) {
332+
public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf,
333+
LocalDirAllocator localDirAllocator) {
327334
try {
328-
Path cacheFilePath = getTempFilePath();
335+
Path cacheFilePath = getTempFilePath(conf, localDirAllocator);
329336
long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
330337
LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
331338
Files.deleteIfExists(cacheFilePath);
@@ -340,15 +347,17 @@ public static boolean isCacheSpaceAvailable(long fileSize) {
340347
private static final String BINARY_FILE_SUFFIX = ".bin";
341348

342349
// File attributes attached to any intermediate temporary file created during index creation.
343-
private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS =
344-
PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ,
345-
PosixFilePermission.OWNER_WRITE));
346-
347-
private static Path getTempFilePath() throws IOException {
348-
return Files.createTempFile(
349-
CACHE_FILE_PREFIX,
350-
BINARY_FILE_SUFFIX,
351-
TEMP_FILE_ATTRS
352-
);
350+
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
351+
ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
352+
353+
private static Path getTempFilePath(final Configuration conf,
354+
final LocalDirAllocator localDirAllocator) throws IOException {
355+
org.apache.hadoop.fs.Path path =
356+
localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf);
357+
File dir = new File(path.getParent().toUri().getPath());
358+
String prefix = path.getName();
359+
File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir);
360+
Path tmpFilePath = Paths.get(tmpFile.toURI());
361+
return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS);
353362
}
354363
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323

2424
import org.junit.Test;
2525

26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.LocalDirAllocator;
2628
import org.apache.hadoop.test.AbstractHadoopTestBase;
2729

30+
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
2831
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
2932
import static org.junit.Assert.assertEquals;
3033
import static org.junit.Assert.assertFalse;
@@ -36,6 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {
3639

3740
private static final int BUFFER_SIZE = 16;
3841

42+
private static final Configuration CONF = new Configuration();
43+
3944
@Test
4045
public void testArgChecks() throws Exception {
4146
// Should not throw.
@@ -46,7 +51,7 @@ public void testArgChecks() throws Exception {
4651

4752
// Verify it throws correctly.
4853
intercept(IllegalArgumentException.class, "'buffer' must not be null",
49-
() -> cache.put(42, null));
54+
() -> cache.put(42, null, null, null));
5055

5156

5257
intercept(NullPointerException.class, null,
@@ -67,7 +72,7 @@ public void testPutAndGet() throws Exception {
6772

6873
assertEquals(0, cache.size());
6974
assertFalse(cache.containsBlock(0));
70-
cache.put(0, buffer1);
75+
cache.put(0, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
7176
assertEquals(1, cache.size());
7277
assertTrue(cache.containsBlock(0));
7378
ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE);
@@ -77,7 +82,7 @@ public void testPutAndGet() throws Exception {
7782

7883
assertEquals(1, cache.size());
7984
assertFalse(cache.containsBlock(1));
80-
cache.put(1, buffer1);
85+
cache.put(1, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
8186
assertEquals(2, cache.size());
8287
assertTrue(cache.containsBlock(1));
8388
ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1561,7 +1561,8 @@ private FSDataInputStream executeOpen(
15611561
readContext.build(),
15621562
createObjectAttributes(path, fileStatus),
15631563
createInputStreamCallbacks(auditSpan),
1564-
inputStreamStats));
1564+
inputStreamStats,
1565+
getConf()));
15651566
} else {
15661567
return new FSDataInputStream(
15671568
new S3AInputStream(

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.fs.LocalDirAllocator;
2830
import org.apache.hadoop.fs.impl.prefetch.BlockData;
2931
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
3032
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
@@ -52,16 +54,20 @@ public class S3ACachingBlockManager extends CachingBlockManager {
5254
* @param blockData information about each block of the S3 file.
5355
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
5456
* @param streamStatistics statistics for this stream.
55-
*
57+
* @param conf the configuration.
58+
* @param localDirAllocator the local dir allocator instance.
5659
* @throws IllegalArgumentException if reader is null.
5760
*/
5861
public S3ACachingBlockManager(
5962
ExecutorServiceFuturePool futurePool,
6063
S3ARemoteObjectReader reader,
6164
BlockData blockData,
6265
int bufferPoolSize,
63-
S3AInputStreamStatistics streamStatistics) {
64-
super(futurePool, blockData, bufferPoolSize, streamStatistics);
66+
S3AInputStreamStatistics streamStatistics,
67+
Configuration conf,
68+
LocalDirAllocator localDirAllocator) {
69+
70+
super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);
6571

6672
Validate.checkNotNull(reader, "reader");
6773

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.LocalDirAllocator;
2729
import org.apache.hadoop.fs.impl.prefetch.BlockData;
2830
import org.apache.hadoop.fs.impl.prefetch.BlockManager;
2931
import org.apache.hadoop.fs.impl.prefetch.BufferData;
@@ -61,7 +63,8 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
6163
* @param s3Attributes attributes of the S3 object being read.
6264
* @param client callbacks used for interacting with the underlying S3 client.
6365
* @param streamStatistics statistics for this stream.
64-
*
66+
* @param conf the configuration.
67+
* @param localDirAllocator the local dir allocator instance.
6568
* @throws IllegalArgumentException if context is null.
6669
* @throws IllegalArgumentException if s3Attributes is null.
6770
* @throws IllegalArgumentException if client is null.
@@ -70,7 +73,9 @@ public S3ACachingInputStream(
7073
S3AReadOpContext context,
7174
S3ObjectAttributes s3Attributes,
7275
S3AInputStream.InputStreamCallbacks client,
73-
S3AInputStreamStatistics streamStatistics) {
76+
S3AInputStreamStatistics streamStatistics,
77+
Configuration conf,
78+
LocalDirAllocator localDirAllocator) {
7479
super(context, s3Attributes, client, streamStatistics);
7580

7681
this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
@@ -79,7 +84,9 @@ public S3ACachingInputStream(
7984
this.getContext().getFuturePool(),
8085
this.getReader(),
8186
this.getBlockData(),
82-
bufferPoolSize);
87+
bufferPoolSize,
88+
conf,
89+
localDirAllocator);
8390
int fileSize = (int) s3Attributes.getLen();
8491
LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
8592
fileSize);
@@ -176,9 +183,15 @@ protected BlockManager createBlockManager(
176183
ExecutorServiceFuturePool futurePool,
177184
S3ARemoteObjectReader reader,
178185
BlockData blockData,
179-
int bufferPoolSize) {
180-
return new S3ACachingBlockManager(futurePool, reader, blockData,
186+
int bufferPoolSize,
187+
Configuration conf,
188+
LocalDirAllocator localDirAllocator) {
189+
return new S3ACachingBlockManager(futurePool,
190+
reader,
191+
blockData,
181192
bufferPoolSize,
182-
getS3AStreamStatistics());
193+
getS3AStreamStatistics(),
194+
conf,
195+
localDirAllocator);
183196
}
184197
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import org.apache.hadoop.classification.InterfaceAudience;
2828
import org.apache.hadoop.classification.InterfaceStability;
2929
import org.apache.hadoop.classification.VisibleForTesting;
30+
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.fs.CanSetReadahead;
3132
import org.apache.hadoop.fs.FSExceptionMessages;
3233
import org.apache.hadoop.fs.FSInputStream;
34+
import org.apache.hadoop.fs.LocalDirAllocator;
3335
import org.apache.hadoop.fs.StreamCapabilities;
3436
import org.apache.hadoop.fs.impl.prefetch.Validate;
3537
import org.apache.hadoop.fs.s3a.S3AInputStream;
@@ -39,6 +41,9 @@
3941
import org.apache.hadoop.fs.statistics.IOStatistics;
4042
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
4143

44+
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
45+
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
46+
4247
/**
4348
* Enhanced {@code InputStream} for reading from S3.
4449
*
@@ -79,6 +84,7 @@ public class S3APrefetchingInputStream
7984
* @param s3Attributes attributes of the S3 object being read.
8085
* @param client callbacks used for interacting with the underlying S3 client.
8186
* @param streamStatistics statistics for this stream.
87+
* @param conf the configuration.
8288
*
8389
* @throws IllegalArgumentException if context is null.
8490
* @throws IllegalArgumentException if s3Attributes is null.
@@ -88,7 +94,8 @@ public S3APrefetchingInputStream(
8894
S3AReadOpContext context,
8995
S3ObjectAttributes s3Attributes,
9096
S3AInputStream.InputStreamCallbacks client,
91-
S3AInputStreamStatistics streamStatistics) {
97+
S3AInputStreamStatistics streamStatistics,
98+
Configuration conf) {
9299

93100
Validate.checkNotNull(context, "context");
94101
Validate.checkNotNull(s3Attributes, "s3Attributes");
@@ -110,11 +117,17 @@ public S3APrefetchingInputStream(
110117
streamStatistics);
111118
} else {
112119
LOG.debug("Creating in caching input stream for {}", context.getPath());
120+
String contextCfgItemName =
121+
conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR;
122+
LocalDirAllocator localDirAllocator =
123+
new LocalDirAllocator(contextCfgItemName);
113124
this.inputStream = new S3ACachingInputStream(
114125
context,
115126
s3Attributes,
116127
client,
117-
streamStatistics);
128+
streamStatistics,
129+
conf,
130+
localDirAllocator);
118131
}
119132
}
120133

0 commit comments

Comments
 (0)