Skip to content

Commit 64e31d0

Browse files
committed
HADOOP-16798 committer race condition.
Rework previous commit for a more minimal design -one which reduces the diff with the current code. Change-Id: Ic2a1a05d29f6974eb14f9279fbb0575e3f4ddd24
1 parent 0c3ae96 commit 64e31d0

File tree

3 files changed

+25
-50
lines changed

3 files changed

+25
-50
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -841,19 +841,19 @@ protected String getRole() {
841841
/**
842842
* Returns an {@link Tasks.Submitter} for parallel tasks. The number of
843843
* threads in the thread-pool is set by fs.s3a.committer.threads.
844-
* If num-threads is 0, this will a submitter instance which will
845-
* declare itself as disabled; this is used in Tasks as a cue
844+
* If num-threads is 0, this will return null;
845+
* this is used in Tasks as a cue
846846
* to switch to single-threaded execution.
847847
*
848848
* @param context the JobContext for this commit
849-
* @return a submitter
849+
* @return a submitter or null
850850
*/
851851
protected Tasks.Submitter buildSubmitter(
852852
JobContext context) {
853853
if (getThreadCount(context) > 0) {
854854
return new PoolSubmitter(context);
855855
} else {
856-
return new Tasks.DisabledSubmitter();
856+
return null;
857857
}
858858
}
859859

@@ -863,15 +863,14 @@ protected Tasks.Submitter buildSubmitter(
863863
* If num-threads is 0, this will raise an exception.
864864
*
865865
* @param context the JobContext for this commit
866-
* @return an {@link ExecutorService} or null for the number of threads
866+
* @param numThreads threads
867+
* @return an {@link ExecutorService} for the number of threads
867868
*/
868869
private synchronized ExecutorService buildThreadPool(
869-
JobContext context) {
870-
870+
JobContext context, int numThreads) {
871+
Preconditions.checkArgument(numThreads > 0,
872+
"Cannot create a thread pool with no threads");
871873
if (threadPool == null) {
872-
int numThreads = getThreadCount(context);
873-
Preconditions.checkState(numThreads > 0,
874-
"Cannot create a thread pool with no threads");
875874
LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
876875
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
877876
new ThreadFactoryBuilder()
@@ -906,7 +905,7 @@ private int getThreadCount(final JobContext context) {
906905
private synchronized Future<?> submitRunnable(
907906
final JobContext context,
908907
final Runnable task) {
909-
return buildThreadPool(context).submit(task);
908+
return buildThreadPool(context, getThreadCount(context)).submit(task);
910909
}
911910

912911
/**
@@ -917,7 +916,12 @@ private final class PoolSubmitter implements Tasks.Submitter {
917916

918917
private final JobContext context;
919918

919+
private final int numThreads;
920+
920921
private PoolSubmitter(final JobContext context) {
922+
this.numThreads = getThreadCount(context);
923+
Preconditions.checkArgument(numThreads > 0,
924+
"Cannot create a thread pool with no threads");
921925
this.context = context;
922926
}
923927

@@ -926,10 +930,6 @@ public Future<?> submit(final Runnable task) {
926930
return submitRunnable(context, task);
927931
}
928932

929-
@Override
930-
public boolean enabled() {
931-
return true;
932-
}
933933
}
934934

935935
/**
@@ -953,13 +953,14 @@ protected void destroyThreadPool() {
953953
}
954954

955955
/**
956-
* Get the submitter pool for executing the single file commit/revert
956+
* Get the thread pool for executing the single file commit/revert
957957
* within the commit of all uploads of a single task.
958-
*
959-
* @return a disabled submitter
958+
* This is currently null; it is here to allow the Tasks class to
959+
* provide the logic for execute/revert.
960+
* @return null. always.
960961
*/
961962
protected final synchronized Tasks.Submitter singleThreadSubmitter() {
962-
return new Tasks.DisabledSubmitter();
963+
return null;
963964
}
964965

965966
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public interface FailureTask<I, E extends Exception> {
7575
*/
7676
public static class Builder<I> {
7777
private final Iterable<I> items;
78-
private Submitter service = new DisabledSubmitter();
78+
private Submitter service = null;
7979
private FailureTask<I, ?> onFailure = null;
8080
private boolean stopOnFailure = false;
8181
private boolean suppressExceptions = false;
@@ -143,7 +143,7 @@ public Builder<I> stopAbortsOnFailure() {
143143
}
144144

145145
public <E extends Exception> boolean run(Task<I, E> task) throws E {
146-
if (service.enabled()) {
146+
if (service != null) {
147147
return runParallel(task);
148148
} else {
149149
return runSingleThreaded(task);
@@ -418,29 +418,6 @@ public interface Submitter {
418418
* @return the future of the submitted task.
419419
*/
420420
Future<?> submit(Runnable task);
421-
422-
/**
423-
* Is this submitter enabled?
424-
* @return true if work can be submitted to it.
425-
*/
426-
boolean enabled();
427-
}
428-
429-
/**
430-
* The disabled submitter declares itself as not enabled, which is
431-
* used to switch Task's execution to single threaded.
432-
*/
433-
public static class DisabledSubmitter implements Tasks.Submitter {
434-
435-
@Override
436-
public Future<?> submit(final Runnable task) {
437-
throw new UnsupportedOperationException("submitter is disabled");
438-
}
439-
440-
@Override
441-
public boolean enabled() {
442-
return false;
443-
}
444421
}
445422

446423
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class TestTasks extends HadoopTestBase {
6161

6262
/**
6363
* Task submitter bonded to the thread pool, or
64-
* disabled for the 0-thread case.IsMeAs
64+
* null for the 0-thread case.
6565
*/
6666
Tasks.Submitter submitter;
6767
private final CounterTask failingTask
@@ -126,7 +126,7 @@ public void setup() {
126126
.build());
127127
submitter = new PoolSubmitter();
128128
} else {
129-
submitter = new Tasks.DisabledSubmitter();
129+
submitter = null;
130130
}
131131

132132
}
@@ -146,11 +146,8 @@ public Future<?> submit(final Runnable task) {
146146
return threadPool.submit(task);
147147
}
148148

149-
@Override
150-
public boolean enabled() {
151-
return true;
152-
}
153149
}
150+
154151
/**
155152
* create the builder.
156153
* @return pre-inited builder

0 commit comments

Comments
 (0)