Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;

/**
* Provides functionality necessary for caching blocks of data read from FileSystem.
*/
Expand Down Expand Up @@ -64,7 +67,10 @@ public interface BlockCache extends Closeable {
*
* @param blockNumber the id of the given block.
* @param buffer contents of the given block to be added to this cache.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IOException if there is an error writing the given block.
*/
void put(int blockNumber, ByteBuffer buffer) throws IOException;
void put(int blockNumber, ByteBuffer buffer, Configuration conf,
LocalDirAllocator localDirAllocator) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTracker;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -95,21 +97,28 @@ public abstract class CachingBlockManager extends BlockManager {

private final PrefetchingStatistics prefetchingStatistics;

private final Configuration conf;

private final LocalDirAllocator localDirAllocator;

/**
* Constructs an instance of a {@code CachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockData information about each block of the underlying file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param prefetchingStatistics statistics for this stream.
*
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
public CachingBlockManager(
ExecutorServiceFuturePool futurePool,
BlockData blockData,
int bufferPoolSize,
PrefetchingStatistics prefetchingStatistics) {
PrefetchingStatistics prefetchingStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
super(blockData);

Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
Expand All @@ -129,6 +138,8 @@ public CachingBlockManager(

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
}

/**
Expand Down Expand Up @@ -468,7 +479,8 @@ public void requestCaching(BufferData data) {
blockFuture = cf;
}

CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
CachePutTask task =
new CachePutTask(data, blockFuture, this, Instant.now());
Future<Void> actionFuture = futurePool.executeFunction(task);
data.setCaching(actionFuture);
ops.end(op);
Expand Down Expand Up @@ -554,7 +566,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
return;
}

cache.put(blockNumber, buffer);
cache.put(blockNumber, buffer, conf, localDirAllocator);
}

private static class CachePutTask implements Supplier<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -39,9 +38,13 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;

Expand All @@ -67,6 +70,12 @@ public class SingleFilePerBlockCache implements BlockCache {

private final PrefetchingStatistics prefetchingStatistics;

/**
* File attributes attached to any intermediate temporary file created during index creation.
*/
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);

/**
* Cache entry.
* Each block is stored as a separate file.
Expand Down Expand Up @@ -172,11 +181,17 @@ private Entry getEntry(int blockNumber) {
/**
* Puts the given block in this cache.
*
* @throws IllegalArgumentException if buffer is null.
* @throws IllegalArgumentException if buffer.limit() is zero or negative.
* @param blockNumber the block number, used as a key for blocks map.
* @param buffer buffer contents of the given block to be added to this cache.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IOException if either local dir allocator fails to allocate file or if IO error
* occurs while writing the buffer content to the file.
* @throws IllegalArgumentException if buffer is null, or if buffer.limit() is zero or negative.
*/
@Override
public void put(int blockNumber, ByteBuffer buffer) throws IOException {
public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @parameter values in the javadocs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

LocalDirAllocator localDirAllocator) throws IOException {
if (closed) {
return;
}
Expand All @@ -191,7 +206,7 @@ public void put(int blockNumber, ByteBuffer buffer) throws IOException {

Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");

Path blockFilePath = getCacheFilePath();
Path blockFilePath = getCacheFilePath(conf, localDirAllocator);
long size = Files.size(blockFilePath);
if (size != 0) {
String message =
Expand Down Expand Up @@ -221,8 +236,19 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
writeChannel.close();
}

protected Path getCacheFilePath() throws IOException {
return getTempFilePath();
/**
* Return temporary file created based on the file path retrieved from local dir allocator.
*
* @param conf The configuration object.
* @param localDirAllocator Local dir allocator instance.
* @return Path of the temporary file created.
* @throws IOException if IO error occurs while local dir allocator tries to retrieve path
* from local FS or file creation fails or permission set fails.
*/
protected Path getCacheFilePath(final Configuration conf,
final LocalDirAllocator localDirAllocator)
throws IOException {
return getTempFilePath(conf, localDirAllocator);
}

@Override
Expand Down Expand Up @@ -323,9 +349,19 @@ private String getStats() {

private static final String CACHE_FILE_PREFIX = "fs-cache-";

public static boolean isCacheSpaceAvailable(long fileSize) {
/**
* Determine if the cache space is available on the local FS.
*
* @param fileSize The size of the file.
* @param conf The configuration.
* @param localDirAllocator Local dir allocator instance.
* @return True if the given file size is less than the available free space on local FS,
* False otherwise.
*/
public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf,
LocalDirAllocator localDirAllocator) {
try {
Path cacheFilePath = getTempFilePath();
Path cacheFilePath = getTempFilePath(conf, localDirAllocator);
long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
Files.deleteIfExists(cacheFilePath);
Expand All @@ -339,16 +375,25 @@ public static boolean isCacheSpaceAvailable(long fileSize) {
// The suffix (file extension) of each serialized index file.
private static final String BINARY_FILE_SUFFIX = ".bin";

// File attributes attached to any intermediate temporary file created during index creation.
private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS =
PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE));

private static Path getTempFilePath() throws IOException {
return Files.createTempFile(
CACHE_FILE_PREFIX,
BINARY_FILE_SUFFIX,
TEMP_FILE_ATTRS
);
/**
* Create temporary file based on the file path retrieved from local dir allocator
* instance. The file is created with .bin suffix. The created file has been granted
* posix file permissions available in TEMP_FILE_ATTRS.
*
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @return path of the file created.
* @throws IOException if IO error occurs while local dir allocator tries to retrieve path
* from local FS or file creation fails or permission set fails.
*/
private static Path getTempFilePath(final Configuration conf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs or comments in the method would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final LocalDirAllocator localDirAllocator) throws IOException {
org.apache.hadoop.fs.Path path =
localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir);
Path tmpFilePath = Paths.get(tmpFile.toURI());
return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -36,6 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {

private static final int BUFFER_SIZE = 16;

private static final Configuration CONF = new Configuration();

@Test
public void testArgChecks() throws Exception {
// Should not throw.
Expand All @@ -46,7 +51,7 @@ public void testArgChecks() throws Exception {

// Verify it throws correctly.
intercept(IllegalArgumentException.class, "'buffer' must not be null",
() -> cache.put(42, null));
() -> cache.put(42, null, null, null));


intercept(NullPointerException.class, null,
Expand All @@ -67,7 +72,7 @@ public void testPutAndGet() throws Exception {

assertEquals(0, cache.size());
assertFalse(cache.containsBlock(0));
cache.put(0, buffer1);
cache.put(0, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Test* classes, using BUFFER_DIR is not helpful as they don't use S3ATestUtils#prepareTestConfiguration.

Hence, using HADOOP_TMP_DIR for Test* classes.

assertEquals(1, cache.size());
assertTrue(cache.containsBlock(0));
ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE);
Expand All @@ -77,7 +82,7 @@ public void testPutAndGet() throws Exception {

assertEquals(1, cache.size());
assertFalse(cache.containsBlock(1));
cache.put(1, buffer1);
cache.put(1, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
assertEquals(2, cache.size());
assertTrue(cache.containsBlock(1));
ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE);
Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@
<exclude>**/ITestMarkerToolRootOperations.java</exclude>
<!-- leave this until the end for better statistics -->
<exclude>**/ITestAggregateIOStatistics.java</exclude>
<!-- cache file based assertions cannot be properly achieved with parallel
execution, let this be sequential -->
<exclude>**/ITestS3APrefetchingCacheFiles.java</exclude>
</excludes>
</configuration>
</execution>
Expand Down Expand Up @@ -246,6 +249,8 @@
<include>**/ITestS3AContractRootDir.java</include>
<!-- leave this until the end for better statistics -->
<include>**/ITestAggregateIOStatistics.java</include>
<!-- sequential execution for the better cleanup -->
<include>**/ITestS3APrefetchingCacheFiles.java</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1368,19 +1368,28 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() {
*/
File createTmpFileForWrite(String pathStr, long size,
Configuration conf) throws IOException {
initLocalDirAllocatorIfNotInitialized(conf);
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
size, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
// create a temp file on this directory
return File.createTempFile(prefix, null, dir);
}

/**
* Initialize dir allocator if not already initialized.
*
* @param conf The Configuration object.
*/
private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
if (directoryAllocator == null) {
synchronized (this) {
String bufferDir = conf.get(BUFFER_DIR) != null
? BUFFER_DIR : HADOOP_TMP_DIR;
directoryAllocator = new LocalDirAllocator(bufferDir);
}
}
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
size, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
// create a temp file on this directory
return File.createTempFile(prefix, null, dir);
}

/**
Expand Down Expand Up @@ -1573,12 +1582,16 @@ private FSDataInputStream executeOpen(
LOG.debug("Opening '{}'", readContext);

if (this.prefetchEnabled) {
Configuration configuration = getConf();
initLocalDirAllocatorIfNotInitialized(configuration);
return new FSDataInputStream(
new S3APrefetchingInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats));
inputStreamStats,
configuration,
directoryAllocator));
} else {
return new FSDataInputStream(
new S3AInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
Expand Down Expand Up @@ -52,16 +54,20 @@ public class S3ACachingBlockManager extends CachingBlockManager {
* @param blockData information about each block of the S3 file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param streamStatistics statistics for this stream.
*
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if reader is null.
*/
public S3ACachingBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize,
S3AInputStreamStatistics streamStatistics) {
super(futurePool, blockData, bufferPoolSize, streamStatistics);
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {

super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);

Validate.checkNotNull(reader, "reader");

Expand Down
Loading