diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 2e6b8ff359512..5583bb7ad05ec 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -435,12 +435,6 @@ aws-java-sdk-bundle compile - - com.twitter - util-core_2.11 - 21.2.0 - compile - org.assertj assertj-core diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java index 34dd6d7ba3b8d..a855a1c2c390c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java @@ -22,10 +22,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import java.util.zip.CRC32; -import com.twitter.util.Awaitable.CanAwait; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,8 +262,6 @@ public boolean stateEqualsOneOf(State... states) { return false; } - private static final CanAwait CAN_AWAIT = () -> false; - public String toString() { return String.format( @@ -281,7 +278,7 @@ private String getFutureStr(Future f) { if (f == null) { return "--"; } else { - return this.action.isReady(CAN_AWAIT) ? "done" : "not done"; + return this.action.isDone() ? "done" : "not done"; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java index 91798e550064a..259f9834cea82 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java @@ -27,8 +27,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,7 +233,7 @@ public synchronized void close() { for (BufferData data : this.getAll()) { Future actionFuture = data.getActionFuture(); if (actionFuture != null) { - actionFuture.raise(new CancellationException("BufferPool is closing.")); + actionFuture.cancel(true); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 93417f3fe61e9..078b9a894e070 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -21,13 +21,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; -import com.twitter.util.Await; -import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Future; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +38,7 @@ public abstract class CachingBlockManager extends BlockManager { private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); // Asynchronous tasks are performed in this pool. - private final FuturePool futurePool; + private final ExecutorServiceFuturePool futurePool; // Pool of shared ByteBuffer instances. private BufferPool bufferPool; @@ -78,7 +77,7 @@ public abstract class CachingBlockManager extends BlockManager { * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, BlockData blockData, int bufferPoolSize) { super(blockData); @@ -344,7 +343,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... /** * Read task that is submitted to the future pool. */ - private static class PrefetchTask extends ExceptionalFunction0 { + private static class PrefetchTask implements Supplier { private final BufferData data; private final CachingBlockManager blockManager; @@ -354,7 +353,7 @@ private static class PrefetchTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { try { this.blockManager.prefetch(data); } catch (Exception e) { @@ -412,7 +411,9 @@ public void requestCaching(BufferData data) { if (state == BufferData.State.PREFETCHING) { blockFuture = data.getActionFuture(); } else { - blockFuture = Future.value(null); + CompletableFuture cf = new CompletableFuture<>(); + cf.complete(null); + blockFuture = cf; } CachePutTask task = new CachePutTask(data, blockFuture, this); @@ -433,13 +434,13 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { } try { - Await.result(blockFuture); + blockFuture.get(); //TODO consider calling get(long timeout, TimeUnit unit) instead if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. return; } } catch (Exception e) { - String message = String.format("error waitng on blockFuture: %s", data); + String message = String.format("error waiting on blockFuture: %s", data); LOG.error(message, e); data.setDone(); return; @@ -500,7 +501,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { this.cache.put(blockNumber, buffer); } - private static class CachePutTask extends ExceptionalFunction0 { + private static class CachePutTask implements Supplier { private final BufferData data; // Block being asynchronously fetched. @@ -519,7 +520,7 @@ private static class CachePutTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { this.blockManager.addToCacheAndRelease(this.data, this.blockFuture); return null; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java new file mode 100644 index 0000000000000..bc8142219d5ce --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.common; + +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Supplier; + +public class ExecutorServiceFuturePool { + private ExecutorService executor; + + public ExecutorServiceFuturePool(ExecutorService executor) { + this.executor = executor; + } + + /** + * @param f function to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if f param is null + */ + public Future apply(final Supplier f) { + return executor.submit(f::get); + } + + public String toString() { + return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor); + } + + public int poolSize() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getPoolSize(); + } else { + return -1; + } + } + + public int numActiveTasks() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } else { + return -1; + } + } + + public long numCompletedTasks() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getCompletedTaskCount(); + } else { + return -1; + } + } + + public long numPendingTasks() { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getQueue().size(); + } else { + return -1; + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 35c26067fcca1..76fe678073df3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -76,8 +76,7 @@ import com.amazonaws.services.s3.transfer.model.UploadResult; import com.amazonaws.event.ProgressListener; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,7 +282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private ThreadPoolExecutor unboundedThreadPool; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // If true, the prefetching input stream is used for reads. private boolean prefetchEnabled; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index acfe6a415f1e3..23eb4691738af 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -18,11 +18,10 @@ package org.apache.hadoop.fs.s3a; -import com.twitter.util.FuturePool; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; @@ -61,7 +60,7 @@ public class S3AReadOpContext extends S3AOpContext { private final AuditSpan auditSpan; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // Size in bytes of a single prefetch block. private final int prefetchBlockSize; @@ -94,7 +93,7 @@ public S3AReadOpContext( ChangeDetectionPolicy changeDetectionPolicy, final long readahead, final AuditSpan auditSpan, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, int prefetchBlockSize, int prefetchBlockCount) { @@ -161,11 +160,11 @@ public AuditSpan getAuditSpan() { } /** - * Gets the {@code FuturePool} used for asynchronous prefetches. + * Gets the {@code ExecutorServiceFuturePool2} used for asynchronous prefetches. * - * @return the {@code FuturePool} used for asynchronous prefetches. + * @return the {@code ExecutorServiceFuturePool2} used for asynchronous prefetches. */ - public FuturePool getFuturePool() { + public ExecutorServiceFuturePool getFuturePool() { return this.futurePool; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java index c4fafd56f1d9b..674a5ccbdd8bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.nio.ByteBuffer; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.CachingBlockManager; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.Validate; /** @@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager { * @throws IllegalArgumentException if reader is null. */ public S3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java index 1117002526838..a1a9a22448ae3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java @@ -21,13 +21,13 @@ import java.io.IOException; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BlockManager; import org.apache.hadoop.fs.common.BufferData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -186,7 +186,7 @@ public String toString() { } protected BlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java index 119e90ffebad5..4855d4c958184 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; -import com.twitter.util.Future; import org.junit.Test; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -83,13 +83,15 @@ public void testValidStateUpdates() { assertEquals(BufferData.State.BLANK, data.getState()); - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); data.setPrefetch(actionFuture); assertEquals(BufferData.State.PREFETCHING, data.getState()); assertNotNull(data.getActionFuture()); assertSame(actionFuture, data.getActionFuture()); - Future actionFuture2 = Future.value(null); + CompletableFuture actionFuture2 = new CompletableFuture<>(); + actionFuture.complete(null); data.setCaching(actionFuture2); assertEquals(BufferData.State.CACHING, data.getState()); assertNotNull(data.getActionFuture()); @@ -117,7 +119,8 @@ public void testValidStateUpdates() { @Test public void testInvalidStateUpdates() throws Exception { - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); testInvalidStateUpdatesHelper( (d) -> d.setPrefetch(actionFuture), BufferData.State.BLANK, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java index 5c2f7eb224151..1b02c495bc477 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java @@ -34,11 +34,11 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.twitter.util.FuturePool; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.common.BlockCache; import org.apache.hadoop.fs.common.BlockData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.SingleFilePerBlockCache; import org.apache.hadoop.fs.common.Validate; import org.apache.hadoop.fs.s3a.Invoker; @@ -109,7 +109,7 @@ public static S3ObjectAttributes createObjectAttributes( } public static S3AReadOpContext createReadContext( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String key, int fileSize, int prefetchBlockSize, @@ -195,7 +195,7 @@ public void close() { public static S3InputStream createInputStream( Class clazz, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -225,7 +225,7 @@ public static S3InputStream createInputStream( } public static TestS3InMemoryInputStream createS3InMemoryInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize) { @@ -235,7 +235,7 @@ public static TestS3InMemoryInputStream createS3InMemoryInputStream( } public static TestS3CachingInputStream createS3CachingInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -322,7 +322,7 @@ private static void randomDelay(int delay) { public static class TestS3CachingBlockManager extends S3CachingBlockManager { public TestS3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { @@ -359,7 +359,7 @@ protected S3File getS3File() { @Override protected S3CachingBlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java index 99836793decba..3f84e2e028339 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java @@ -24,13 +24,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BufferData; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -41,7 +40,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { static final int POOL_SIZE = 3; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE); @@ -106,7 +105,7 @@ public void testArgChecks() throws Exception { */ static class TestBlockManager extends S3CachingBlockManager { TestBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java index 2f555d2b62c47..d7754354db825 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java @@ -22,8 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.ExceptionAsserts; @@ -36,7 +35,7 @@ public class TestS3File extends AbstractHadoopTestBase { private final ExecutorService threadPool = Executors.newFixedThreadPool(1); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 503cd699002c7..318a789cb6889 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -24,8 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.junit.Test; import org.apache.hadoop.fs.FSExceptionMessages; @@ -45,7 +44,7 @@ public class TestS3InputStream extends AbstractHadoopTestBase { private static final int FILE_SIZE = 10; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test