Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
14ddf92
HADOOP-15183: previous patch coalesced into a new PR
steveloughran Apr 20, 2019
0b1902c
HADOOP-15183 getting ITestPartialRenamesDeletes tests to work by putt…
steveloughran Apr 24, 2019
72cdc87
HADOOP-15183 evolving the tests to simulate the real failure scenario
steveloughran Apr 24, 2019
9162aa3
HADOOP-15183: parallel submission of delete operations to DDB.
steveloughran Apr 25, 2019
81f87cd
HADOOP-15183 peformance boost of prune on on-demand DDB tables:
steveloughran Apr 25, 2019
a7f2425
HADOOP-15183: improve parallel execution of metastore deletes
steveloughran Apr 25, 2019
733dc4e
HADOOP-15183. Minor code cleanup before going more aggressively into …
steveloughran Apr 25, 2019
3fa8882
HADOOP-15183: fix up findbugs warnings (and failure to create directo…
steveloughran Apr 26, 2019
7362257
HADOOP-15183: ITestMagiCommitMRJob is failing with __magic still bein…
steveloughran Apr 26, 2019
ea55c76
HADOOP-15183 exploring making the copy parallelized; need to be clear…
steveloughran Apr 26, 2019
ad5bb48
HADOOP-15183 S3Guard and rename failures.
steveloughran May 1, 2019
db463aa
HADOOP-15183: copies are parallelised
steveloughran May 2, 2019
4bda57d
HADOOP-15183: rename operation failure handling works in the test cases
steveloughran May 3, 2019
bfe8b4f
HADOOP-15183: progressive rename tracker
steveloughran May 6, 2019
a11135f
HADOOP-15183 rename all happy.
steveloughran May 7, 2019
d497214
HADOOP-15183: tracking ancestors across move operations.
steveloughran May 8, 2019
9247bca
HADOOP-15183: bulk metastore operation performance/scale
steveloughran May 15, 2019
301a7d4
HADOOP-15183: address github review and yetus comments
steveloughran May 16, 2019
1be6883
HADOOP-15183: full end-to-end test to verify that >1 commit will only…
steveloughran May 17, 2019
37d597b
HADOOP-15183: teardown improvements in ITestPartialRenamesDeletes
steveloughran May 18, 2019
6e5a5a1
HADOOP-15183 checkstyle
steveloughran May 18, 2019
3982ceb
HADOOP-15183 remove diff between this branch and trunk; checkstyle
steveloughran May 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -108,20 +109,55 @@ public static <T> T awaitFuture(final Future<T> future,
*/
public static <T> T raiseInnerCause(final ExecutionException e)
throws IOException {
throw unwrapInnerException(e);
}

/**
* Extract the cause of a completion failure and rethrow it if an IOE
* or RTE.
* @param e exception.
* @param <T> type of return value.
* @return nothing, ever.
* @throws IOException either the inner IOException, or a wrapper around
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
public static <T> T raiseInnerCause(final CompletionException e)
throws IOException {
throw unwrapInnerException(e);
}

/**
* From the inner cause of an execution exception, extract the inner cause.
* If it is an RTE: throw immediately.
* If it is an IOE: Return.
* If it is a WrappedIOException: Unwrap and return
* Else: create a new IOException.
*
* Recursively handles wrapped Execution and Completion Exceptions in
* case something very complicated has happened.
* @param e exception.
* @return an IOException extracted or built from the cause.
* @throws RuntimeException if that is the inner cause.
*/
private static IOException unwrapInnerException(final Throwable e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
return (IOException) cause;
} else if (cause instanceof WrappedIOException){
throw ((WrappedIOException) cause).getCause();
return ((WrappedIOException) cause).getCause();
} else if (cause instanceof CompletionException){
return unwrapInnerException(cause);
} else if (cause instanceof ExecutionException){
return unwrapInnerException(cause);
} else if (cause instanceof RuntimeException){
throw (RuntimeException) cause;
} else if (cause != null) {
// other type: wrap with a new IOE
throw new IOException(cause);
return new IOException(cause);
} else {
// this only happens if somebody deliberately raises
// an ExecutionException
throw new IOException(e);
// this only happens if there was no cause.
return new IOException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,12 @@

<property>
<name>fs.s3a.connection.maximum</name>
<value>15</value>
<description>Controls the maximum number of simultaneous connections to S3.</description>
<value>72</value>
<description>Controls the maximum number of simultaneous connections to S3.
This must be bigger than the value of fs.s3a.threads.max so as to stop
threads being blocked waiting for new HTTPS connections.
Why not equal? The AWS SDK transfer manager also uses these connections.
</description>
</property>

<property>
Expand Down Expand Up @@ -1312,7 +1316,7 @@

<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
<value>64</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
Expand All @@ -1326,8 +1330,25 @@

<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
<description>The number of operations which can be queued for execution</description>
<value>32</value>
<description>The number of operations which can be queued for execution.
This is in addition to the number of active threads in fs.s3a.threads.max.
</description>
</property>

<property>
<name>fs.s3a.executor.capacity</name>
<value>16</value>
<description>The maximum number of submitted tasks which is a single
operation (e.g. rename(), delete()) may submit simultaneously for
execution -excluding the IO-heavy block uploads, whose capacity
is set in "fs.s3a.fast.upload.active.blocks"

All tasks are submitted to the shared thread pool whose size is
set in "fs.s3a.threads.max"; the value of capacity should be less than that
of the thread pool itself, as the goal is to stop a single operation
from overloading that thread pool.
</description>
</property>

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testRenamePopulatesFileAncestors() throws IOException {
* @param dst the destination root to move
* @param nestedPath the nested path to move
*/
private void validateAncestorsMoved(Path src, Path dst, String nestedPath)
protected void validateAncestorsMoved(Path src, Path dst, String nestedPath)
throws IOException {
assertIsDirectory(dst);
assertPathDoesNotExist("src path should not exist", path(src + nestedPath));
Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@
<artifactId>wildfly-openssl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,15 @@ private Constants() {
public static final String ASSUMED_ROLE_CREDENTIALS_DEFAULT =
SimpleAWSCredentialsProvider.NAME;


// the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";

public static final int DEFAULT_MAX_TOTAL_TASKS = 32;

// number of simultaneous connections to s3
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 48;

// connect to s3 over ssl?
public static final String SECURE_CONNECTIONS =
Expand Down Expand Up @@ -200,10 +206,6 @@ private Constants() {
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60;

// the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
public static final int DEFAULT_MAX_TOTAL_TASKS = 5;

// size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
Expand Down Expand Up @@ -289,6 +291,22 @@ private Constants() {
@InterfaceStability.Unstable
public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;

/**
* The capacity of executor queues for operations other than block
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
* This should be less than {@link #MAX_THREADS} for fair
* submission.
* Value: {@value}.
*/
public static final String EXECUTOR_CAPACITY = "fs.s3a.executor.capacity";

/**
* The capacity of executor queues for operations other than block
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
* Value: {@value}
*/
public static final int DEFAULT_EXECUTOR_CAPACITY = 16;

// Private | PublicRead | PublicReadWrite | AuthenticatedRead |
// LogDeliveryWrite | BucketOwnerRead | BucketOwnerFullControl
public static final String CANNED_ACL = "fs.s3a.acl.default";
Expand Down
Loading