From 62cd66cbe63126af10ecf77df30e1ba25eaa1284 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 17 Apr 2020 18:31:50 +0100 Subject: [PATCH 1/3] HADOOP-16798. S3A Committer thread pool shutdown problems. Contributed by Steve Loughran. Fixes a condition which can cause job commit to fail if a task was aborted < 60s before the job commit commenced: the task abort will shut down the thread pool with a hard exit after 60s; the job commit POST requests would be scheduled through the same pool, so be interrupted and fail. At present the access is synchronized, but presumably the executor shutdown code is calling wait() and releasing locks. Task abort is triggered from the AM when task attempts succeed but there are still active speculative task attempts running. Thus it only surfaces when speculation is enabled and the final tasks are speculating, which, given they are the stragglers, is not unheard of. The fix copies and clears the threadPool field in a synchronized block, then shuts it down; job commit will encounter the empty field and demand-create a new one. As would a sequence of task aborts. Change-Id: I23fc1c4bf112d2c36bd9f1baf76cf5057e4f10e1 --- .../fs/s3a/commit/AbstractS3ACommitter.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index e82fbda63dd0c..ecf9bbbd8865f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -870,12 +870,19 @@ protected final synchronized ExecutorService buildThreadPool( * Destroy any thread pools; wait for that to finish, * but don't overreact if it doesn't finish in time. */ - protected synchronized void destroyThreadPool() { - if (threadPool != null) { + protected void destroyThreadPool() { + ExecutorService pool; + // reset the thread pool in a sync block, then shut it down + // afterwards. This allows for other threads to create a + // new thread pool on demand. + synchronized(this) { + pool = this.threadPool; + threadPool = null; + } + if (pool != null) { LOG.debug("Destroying thread pool"); - HadoopExecutors.shutdown(threadPool, LOG, + HadoopExecutors.shutdown(pool, LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); - threadPool = null; } } From 0c3ae967df2dbea37f2c9b71cc096caf1f610ae8 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 19 May 2020 20:06:02 +0100 Subject: [PATCH 2/3] HADOOP-16798 committer race condition. rather than pass around the thread pool, a "Submitter' is supplied to Tasks; there's a disabled one -which triggers the downgrade to single threaded execution, and a multithreaded one which submits work through the committer's (now private) thread pool. As submission is in a synchronized block, the race condition should be blocked Change-Id: I9dbb6f6eb5a32c3b7bc8d638eae93d38261c9d6c --- .../fs/s3a/commit/AbstractS3ACommitter.java | 121 +++++++++++++----- .../apache/hadoop/fs/s3a/commit/Tasks.java | 48 ++++++- .../staging/PartitionedStagingCommitter.java | 7 +- .../s3a/commit/staging/StagingCommitter.java | 2 +- .../hadoop/fs/s3a/commit/TestTasks.java | 24 +++- 5 files changed, 161 insertions(+), 41 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index ecf9bbbd8865f..4b98a1fa9e4f5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -24,6 +24,7 @@ import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -472,7 +473,7 @@ protected void commitPendingUploads( Tasks.foreach(pending.getSourceFiles()) .stopOnFailure() .suppressExceptions(false) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .abortWith(path -> loadAndAbort(commitContext, pending, path, true, false)) .revertWith(path -> @@ -502,7 +503,7 @@ protected void precommitCheckPendingFiles( Tasks.foreach(pending.getSourceFiles()) .stopOnFailure() .suppressExceptions(false) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .run(path -> PendingSet.load(sourceFS, path)); } } @@ -525,7 +526,7 @@ private void loadAndCommit( Tasks.foreach(pendingSet.getCommits()) .stopOnFailure() .suppressExceptions(false) - .executeWith(singleCommitThreadPool()) + .executeWith(singleThreadSubmitter()) .onFailure((commit, exception) -> commitContext.abortSingleCommit(commit)) .abortWith(commitContext::abortSingleCommit) @@ -580,7 +581,7 @@ private void loadAndAbort( path); FileSystem fs = getDestFS(); Tasks.foreach(pendingSet.getCommits()) - .executeWith(singleCommitThreadPool()) + .executeWith(singleThreadSubmitter()) .suppressExceptions(suppressExceptions) .run(commit -> { try { @@ -674,7 +675,7 @@ protected void abortPendingUploadsInCleanup( return; } Tasks.foreach(pending) - .executeWith(buildThreadPool(getJobContext())) + .executeWith(buildSubmitter(getJobContext())) .suppressExceptions(suppressExceptions) .run(u -> commitContext.abortMultipartCommit( u.getKey(), u.getUploadId())); @@ -837,35 +838,100 @@ protected String getRole() { return role; } + /** + * Returns an {@link Tasks.Submitter} for parallel tasks. The number of + * threads in the thread-pool is set by fs.s3a.committer.threads. + * If num-threads is 0, this will a submitter instance which will + * declare itself as disabled; this is used in Tasks as a cue + * to switch to single-threaded execution. + * + * @param context the JobContext for this commit + * @return a submitter + */ + protected Tasks.Submitter buildSubmitter( + JobContext context) { + if (getThreadCount(context) > 0) { + return new PoolSubmitter(context); + } else { + return new Tasks.DisabledSubmitter(); + } + } + /** * Returns an {@link ExecutorService} for parallel tasks. The number of * threads in the thread-pool is set by fs.s3a.committer.threads. - * If num-threads is 0, this will return null; + * If num-threads is 0, this will raise an exception. * * @param context the JobContext for this commit * @return an {@link ExecutorService} or null for the number of threads */ - protected final synchronized ExecutorService buildThreadPool( + private synchronized ExecutorService buildThreadPool( JobContext context) { if (threadPool == null) { - int numThreads = context.getConfiguration().getInt( - FS_S3A_COMMITTER_THREADS, - DEFAULT_COMMITTER_THREADS); + int numThreads = getThreadCount(context); + Preconditions.checkState(numThreads > 0, + "Cannot create a thread pool with no threads"); LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads); - if (numThreads > 0) { - threadPool = HadoopExecutors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d") - .build()); - } else { - return null; - } + threadPool = HadoopExecutors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d") + .build()); } return threadPool; } + /** + * Get the thread count for this job's commit operations. + * @param context the JobContext for this commit + * @return a possibly zero thread count. + */ + private int getThreadCount(final JobContext context) { + return context.getConfiguration().getInt( + FS_S3A_COMMITTER_THREADS, + DEFAULT_COMMITTER_THREADS); + } + + /** + * Submit a runnable. + * This will demand-create the thread pool if needed. + *

+ * This is synchronized to ensure the thread pool is always valid when + * work is synchronized. See HADOOP-16798. + * @param context the JobContext for this commit + * @param task task to execute + * @return the future of the submitted task. + */ + private synchronized Future submitRunnable( + final JobContext context, + final Runnable task) { + return buildThreadPool(context).submit(task); + } + + /** + * The real task submitter, which hands off the work to + * the current thread pool. + */ + private final class PoolSubmitter implements Tasks.Submitter { + + private final JobContext context; + + private PoolSubmitter(final JobContext context) { + this.context = context; + } + + @Override + public Future submit(final Runnable task) { + return submitRunnable(context, task); + } + + @Override + public boolean enabled() { + return true; + } + } + /** * Destroy any thread pools; wait for that to finish, * but don't overreact if it doesn't finish in time. @@ -887,16 +953,13 @@ protected void destroyThreadPool() { } /** - * Get the thread pool for executing the single file commit/revert + * Get the submitter pool for executing the single file commit/revert * within the commit of all uploads of a single task. - * This is currently null; it is here to allow the Tasks class to - * provide the logic for execute/revert. - * Why not use the existing thread pool? Too much fear of deadlocking, - * and tasks are being committed in parallel anyway. - * @return null. always. + * + * @return a disabled submitter */ - protected final synchronized ExecutorService singleCommitThreadPool() { - return null; + protected final synchronized Tasks.Submitter singleThreadSubmitter() { + return new Tasks.DisabledSubmitter(); } /** @@ -939,7 +1002,7 @@ protected void abortPendingUploads(JobContext context, CommitOperations.CommitContext commitContext = initiateCommitOperation()) { Tasks.foreach(pending) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .suppressExceptions(suppressExceptions) .run(commitContext::abortSingleCommit); } @@ -968,7 +1031,7 @@ protected void abortPendingUploads( CommitOperations.CommitContext commitContext = initiateCommitOperation()) { Tasks.foreach(pending.getSourceFiles()) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .suppressExceptions(suppressExceptions) .run(path -> loadAndAbort(commitContext, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java index b6b6b9707ebc5..dec880744b810 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -76,7 +75,7 @@ public interface FailureTask { */ public static class Builder { private final Iterable items; - private ExecutorService service = null; + private Submitter service = new DisabledSubmitter(); private FailureTask onFailure = null; private boolean stopOnFailure = false; private boolean suppressExceptions = false; @@ -96,11 +95,11 @@ public static class Builder { /** * Declare executor service: if null, the tasks are executed in a single * thread. - * @param executorService service to schedule tasks with. + * @param submitter service to schedule tasks with. * @return this builder. */ - public Builder executeWith(ExecutorService executorService) { - this.service = executorService; + public Builder executeWith(Submitter submitter) { + this.service = submitter; return this; } @@ -144,7 +143,7 @@ public Builder stopAbortsOnFailure() { } public boolean run(Task task) throws E { - if (service != null) { + if (service.enabled()) { return runParallel(task); } else { return runSingleThreaded(task); @@ -407,4 +406,41 @@ private static void castAndThrow(Exception e) throws E { } throw (E) e; } + + /** + * Interface to whatever lets us submit tasks. + */ + public interface Submitter { + + /** + * Submit work. + * @param task task to execute + * @return the future of the submitted task. + */ + Future submit(Runnable task); + + /** + * Is this submitter enabled? + * @return true if work can be submitted to it. + */ + boolean enabled(); + } + + /** + * The disabled submitter declares itself as not enabled, which is + * used to switch Task's execution to single threaded. + */ + public static class DisabledSubmitter implements Tasks.Submitter { + + @Override + public Future submit(final Runnable task) { + throw new UnsupportedOperationException("submitter is disabled"); + } + + @Override + public boolean enabled() { + return false; + } + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java index 20aca3cf49ae0..7be54062d28f5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,7 +186,7 @@ private void replacePartitions( Map partitions = new ConcurrentHashMap<>(); FileSystem sourceFS = pending.getSourceFS(); - ExecutorService pool = buildThreadPool(context); + Tasks.Submitter submitter = buildSubmitter(context); try (DurationInfo ignored = new DurationInfo(LOG, "Replacing partitions")) { @@ -198,7 +197,7 @@ private void replacePartitions( Tasks.foreach(pending.getSourceFiles()) .stopOnFailure() .suppressExceptions(false) - .executeWith(pool) + .executeWith(submitter) .run(path -> { PendingSet pendingSet = PendingSet.load(sourceFS, path); Path lastParent = null; @@ -216,7 +215,7 @@ private void replacePartitions( Tasks.foreach(partitions.keySet()) .stopOnFailure() .suppressExceptions(false) - .executeWith(pool) + .executeWith(submitter) .run(partitionPath -> { LOG.debug("{}: removing partition path to be replaced: " + getRole(), partitionPath); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 7eca1b42659e5..91e68af8bb1d0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -699,7 +699,7 @@ protected int commitTaskInternal(final TaskAttemptContext context, Tasks.foreach(taskOutput) .stopOnFailure() .suppressExceptions(false) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .run(stat -> { Path path = stat.getPath(); File localFile = new File(path.toUri().getPath()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java index 4ee39f1bfa08e..92c07f77b94a1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -57,6 +58,12 @@ public class TestTasks extends HadoopTestBase { * Thread pool for task execution. */ private ExecutorService threadPool; + + /** + * Task submitter bonded to the thread pool, or + * disabled for the 0-thread case.IsMeAs + */ + Tasks.Submitter submitter; private final CounterTask failingTask = new CounterTask("failing committer", FAILPOINT, Item::commit); @@ -117,6 +124,9 @@ public void setup() { .setDaemon(true) .setNameFormat(getMethodName() + "-pool-%d") .build()); + submitter = new PoolSubmitter(); + } else { + submitter = new Tasks.DisabledSubmitter(); } } @@ -129,12 +139,24 @@ public void teardown() { } } + private class PoolSubmitter implements Tasks.Submitter { + + @Override + public Future submit(final Runnable task) { + return threadPool.submit(task); + } + + @Override + public boolean enabled() { + return true; + } + } /** * create the builder. * @return pre-inited builder */ private Tasks.Builder builder() { - return Tasks.foreach(items).executeWith(threadPool); + return Tasks.foreach(items).executeWith(submitter); } private void assertRun(Tasks.Builder builder, From 64e31d082228bc2c1473674f1366eeb8ea90780d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 20 May 2020 13:29:08 +0100 Subject: [PATCH 3/3] HADOOP-16798 committer race condition. Rework previous commit for a more minimal design -one which reduces the diff with the current code. Change-Id: Ic2a1a05d29f6974eb14f9279fbb0575e3f4ddd24 --- .../fs/s3a/commit/AbstractS3ACommitter.java | 39 ++++++++++--------- .../apache/hadoop/fs/s3a/commit/Tasks.java | 27 +------------ .../hadoop/fs/s3a/commit/TestTasks.java | 9 ++--- 3 files changed, 25 insertions(+), 50 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index 4b98a1fa9e4f5..32d00a4353e98 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -841,19 +841,19 @@ protected String getRole() { /** * Returns an {@link Tasks.Submitter} for parallel tasks. The number of * threads in the thread-pool is set by fs.s3a.committer.threads. - * If num-threads is 0, this will a submitter instance which will - * declare itself as disabled; this is used in Tasks as a cue + * If num-threads is 0, this will return null; + * this is used in Tasks as a cue * to switch to single-threaded execution. * * @param context the JobContext for this commit - * @return a submitter + * @return a submitter or null */ protected Tasks.Submitter buildSubmitter( JobContext context) { if (getThreadCount(context) > 0) { return new PoolSubmitter(context); } else { - return new Tasks.DisabledSubmitter(); + return null; } } @@ -863,15 +863,14 @@ protected Tasks.Submitter buildSubmitter( * If num-threads is 0, this will raise an exception. * * @param context the JobContext for this commit - * @return an {@link ExecutorService} or null for the number of threads + * @param numThreads threads + * @return an {@link ExecutorService} for the number of threads */ private synchronized ExecutorService buildThreadPool( - JobContext context) { - + JobContext context, int numThreads) { + Preconditions.checkArgument(numThreads > 0, + "Cannot create a thread pool with no threads"); if (threadPool == null) { - int numThreads = getThreadCount(context); - Preconditions.checkState(numThreads > 0, - "Cannot create a thread pool with no threads"); LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads); threadPool = HadoopExecutors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder() @@ -906,7 +905,7 @@ private int getThreadCount(final JobContext context) { private synchronized Future submitRunnable( final JobContext context, final Runnable task) { - return buildThreadPool(context).submit(task); + return buildThreadPool(context, getThreadCount(context)).submit(task); } /** @@ -917,7 +916,12 @@ private final class PoolSubmitter implements Tasks.Submitter { private final JobContext context; + private final int numThreads; + private PoolSubmitter(final JobContext context) { + this.numThreads = getThreadCount(context); + Preconditions.checkArgument(numThreads > 0, + "Cannot create a thread pool with no threads"); this.context = context; } @@ -926,10 +930,6 @@ public Future submit(final Runnable task) { return submitRunnable(context, task); } - @Override - public boolean enabled() { - return true; - } } /** @@ -953,13 +953,14 @@ protected void destroyThreadPool() { } /** - * Get the submitter pool for executing the single file commit/revert + * Get the thread pool for executing the single file commit/revert * within the commit of all uploads of a single task. - * - * @return a disabled submitter + * This is currently null; it is here to allow the Tasks class to + * provide the logic for execute/revert. + * @return null. always. */ protected final synchronized Tasks.Submitter singleThreadSubmitter() { - return new Tasks.DisabledSubmitter(); + return null; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java index dec880744b810..c318e86605e0c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java @@ -75,7 +75,7 @@ public interface FailureTask { */ public static class Builder { private final Iterable items; - private Submitter service = new DisabledSubmitter(); + private Submitter service = null; private FailureTask onFailure = null; private boolean stopOnFailure = false; private boolean suppressExceptions = false; @@ -143,7 +143,7 @@ public Builder stopAbortsOnFailure() { } public boolean run(Task task) throws E { - if (service.enabled()) { + if (service != null) { return runParallel(task); } else { return runSingleThreaded(task); @@ -418,29 +418,6 @@ public interface Submitter { * @return the future of the submitted task. */ Future submit(Runnable task); - - /** - * Is this submitter enabled? - * @return true if work can be submitted to it. - */ - boolean enabled(); - } - - /** - * The disabled submitter declares itself as not enabled, which is - * used to switch Task's execution to single threaded. - */ - public static class DisabledSubmitter implements Tasks.Submitter { - - @Override - public Future submit(final Runnable task) { - throw new UnsupportedOperationException("submitter is disabled"); - } - - @Override - public boolean enabled() { - return false; - } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java index 92c07f77b94a1..4211c62a77b9c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java @@ -61,7 +61,7 @@ public class TestTasks extends HadoopTestBase { /** * Task submitter bonded to the thread pool, or - * disabled for the 0-thread case.IsMeAs + * null for the 0-thread case. */ Tasks.Submitter submitter; private final CounterTask failingTask @@ -126,7 +126,7 @@ public void setup() { .build()); submitter = new PoolSubmitter(); } else { - submitter = new Tasks.DisabledSubmitter(); + submitter = null; } } @@ -146,11 +146,8 @@ public Future submit(final Runnable task) { return threadPool.submit(task); } - @Override - public boolean enabled() { - return true; - } } + /** * create the builder. * @return pre-inited builder