From c94267a96e6c6ed152bf5265625dd2c32d8141ea Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 23 Aug 2022 19:46:31 -0700 Subject: [PATCH 1/5] HADOOP-18186. s3a prefetching to use SemaphoredDelegatingExecutor for submitting work --- .../java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index ad94e20e7129e..0e7f305e627d4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -776,7 +776,14 @@ private void initThreadPools(Configuration conf) { executorCapacity = intOption(conf, EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); if (this.prefetchEnabled) { - this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool); + final S3AInputStreamStatistics s3AInputStreamStatistics = + statisticsContext.newInputStreamStatistics(); + this.futurePool = new ExecutorServiceFuturePool( + new SemaphoredDelegatingExecutor( + boundedThreadPool, + executorCapacity, + true, + s3AInputStreamStatistics)); } } From 7be5a61009e255fb88aa6f27e4076f841bf44133 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Thu, 25 Aug 2022 12:11:25 -0700 Subject: [PATCH 2/5] addendum --- .../src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 0e7f305e627d4..00600b479cab2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -775,10 +775,10 @@ private void initThreadPools(Configuration conf) { unboundedThreadPool.allowCoreThreadTimeOut(true); executorCapacity = intOption(conf, EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); - if (this.prefetchEnabled) { + if (prefetchEnabled) { final S3AInputStreamStatistics s3AInputStreamStatistics = statisticsContext.newInputStreamStatistics(); - this.futurePool = new ExecutorServiceFuturePool( + futurePool = new ExecutorServiceFuturePool( new SemaphoredDelegatingExecutor( boundedThreadPool, executorCapacity, From bbad0da1e47420b313e686490294cc38d680d902 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 30 Aug 2022 13:07:27 -0700 Subject: [PATCH 3/5] test for ACTION_EXECUTOR_ACQUIRED --- .../apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 36d049cedf10b..738311e8ad44c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -38,6 +38,7 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; @@ -130,6 +131,7 @@ public void testReadLargeFileFully() throws Throwable { } // Verify that once stream is closed, all memory is freed verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + assertDurationRange(ioStats, StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED, 0, 10); } @Test @@ -159,6 +161,7 @@ public void testRandomReadLargeFile() throws Throwable { } verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + assertDurationRange(ioStats, StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED, 0, 10); } @Test @@ -183,6 +186,8 @@ public void testRandomReadSmallFile() throws Throwable { verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0); // The buffer pool is not used verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + // no prefetch ops, so no action_executor_acquired + assertDurationRange(ioStats, StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED, -1, 0); } } From 121e58ebdd9ec749b9ea9082cd90994cd17776b4 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 30 Aug 2022 15:18:09 -0700 Subject: [PATCH 4/5] update permit --- .../main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 9227be3c9822f..955e2430fc487 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -766,9 +766,11 @@ private void initThreadPools(Configuration conf) { DEFAULT_KEEPALIVE_TIME, 0); int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0; + int activeTasksForBoundedThreadPool = maxThreads; + int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads; boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( - maxThreads, - maxThreads + totalTasks + numPrefetchThreads, + activeTasksForBoundedThreadPool, + waitingTasksForBoundedThreadPool, keepAliveTime, TimeUnit.SECONDS, name + "-bounded"); unboundedThreadPool = new ThreadPoolExecutor( @@ -786,7 +788,7 @@ private void initThreadPools(Configuration conf) { futurePool = new ExecutorServiceFuturePool( new SemaphoredDelegatingExecutor( boundedThreadPool, - executorCapacity, + activeTasksForBoundedThreadPool + waitingTasksForBoundedThreadPool, true, s3AInputStreamStatistics)); } From 8a20615766b630fe539d8a3deda3cc2ca7f18571 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 6 Sep 2022 23:44:15 -0700 Subject: [PATCH 5/5] addendum --- .../fs/s3a/ITestS3APrefetchingInputStream.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 738311e8ad44c..24f74b3a0212e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -38,9 +38,10 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; -import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** @@ -131,7 +132,8 @@ public void testReadLargeFileFully() throws Throwable { } // Verify that once stream is closed, all memory is freed verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); - assertDurationRange(ioStats, StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED, 0, 10); + assertThatStatisticMaximum(ioStats, + StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -161,7 +163,8 @@ public void testRandomReadLargeFile() throws Throwable { } verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); - assertDurationRange(ioStats, StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED, 0, 10); + assertThatStatisticMaximum(ioStats, + StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -187,7 +190,8 @@ public void testRandomReadSmallFile() throws Throwable { // The buffer pool is not used verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); // no prefetch ops, so no action_executor_acquired - assertDurationRange(ioStats, StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED, -1, 0); + assertThatStatisticMaximum(ioStats, + StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1); } }