Skip to content

Commit 1c2c678

Browse files
virajjasanisteveloughran
authored andcommitted
HADOOP-18186. s3a prefetching to use SemaphoredDelegatingExecutor for submitting work (#4796)
Contributed by Viraj Jasani
1 parent f00d77f commit 1c2c678

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -786,9 +786,11 @@ private void initThreadPools(Configuration conf) {
786786
DEFAULT_KEEPALIVE_TIME, 0);
787787
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
788788

789+
int activeTasksForBoundedThreadPool = maxThreads;
790+
int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
789791
boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
790-
maxThreads,
791-
maxThreads + totalTasks + numPrefetchThreads,
792+
activeTasksForBoundedThreadPool,
793+
waitingTasksForBoundedThreadPool,
792794
keepAliveTime, TimeUnit.SECONDS,
793795
name + "-bounded");
794796
unboundedThreadPool = new ThreadPoolExecutor(
@@ -800,8 +802,15 @@ private void initThreadPools(Configuration conf) {
800802
unboundedThreadPool.allowCoreThreadTimeOut(true);
801803
executorCapacity = intOption(conf,
802804
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
803-
if (this.prefetchEnabled) {
804-
this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool);
805+
if (prefetchEnabled) {
806+
final S3AInputStreamStatistics s3AInputStreamStatistics =
807+
statisticsContext.newInputStreamStatistics();
808+
futurePool = new ExecutorServiceFuturePool(
809+
new SemaphoredDelegatingExecutor(
810+
boundedThreadPool,
811+
activeTasksForBoundedThreadPool + waitingTasksForBoundedThreadPool,
812+
true,
813+
s3AInputStreamStatistics));
805814
}
806815
}
807816

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
3939
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
4040
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
41+
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
4142
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
4243
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
44+
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
4345
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
4446

4547
/**
@@ -130,6 +132,8 @@ public void testReadLargeFileFully() throws Throwable {
130132
}
131133
// Verify that once stream is closed, all memory is freed
132134
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
135+
assertThatStatisticMaximum(ioStats,
136+
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
133137
}
134138

135139
@Test
@@ -159,6 +163,8 @@ public void testRandomReadLargeFile() throws Throwable {
159163
}
160164
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
161165
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
166+
assertThatStatisticMaximum(ioStats,
167+
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
162168
}
163169

164170
@Test
@@ -183,6 +189,9 @@ public void testRandomReadSmallFile() throws Throwable {
183189
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
184190
// The buffer pool is not used
185191
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
192+
// no prefetch ops, so no action_executor_acquired
193+
assertThatStatisticMaximum(ioStats,
194+
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
186195
}
187196
}
188197

0 commit comments

Comments
 (0)