Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
<!-- Terasort MR jobs spawn enough processes that they use up all RAM -->
<exclude>**/ITestTerasort*.java</exclude>
<!-- MR jobs spawn enough processes that they use up all RAM -->
<exclude>**/ITest*CommitMRJob.java</exclude>
<!-- operations across the metastore -->
<exclude>**/ITestS3GuardDDBRootOperations.java</exclude>
</excludes>
Expand Down Expand Up @@ -231,8 +229,6 @@
<!-- the local FS. Running them sequentially guarantees isolation -->
<!-- and that they don't conflict with the other MR jobs for RAM -->
<include>**/ITestTerasort*.java</include>
<!-- MR jobs spawn enough processes that they use up all RAM -->
<include>**/ITest*CommitMRJob.java</include>
<!-- operations across the metastore -->
<include>**/ITestS3AContractRootDir.java</include>
<include>**/ITestS3GuardDDBRootOperations.java</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ protected int commitTaskInternal(final TaskAttemptContext context,
// we will try to abort the ones that had already succeeded.
int commitCount = taskOutput.size();
final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>();
LOG.info("{}: uploading from staging directory to S3", getRole());
LOG.info("{}: uploading from staging directory to S3 {}", getRole(),
attemptPath);
LOG.info("{}: Saving pending data information to {}",
getRole(), commitsAttemptPath);
if (taskOutput.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private StagingCommitterConstants() {
/**
* The temporary path for staging data, if not explicitly set.
* By using an unqualified path, this will be qualified to be relative
* to the users' home directory, so protectec from access for others.
* to the users' home directory, so protected from access for others.
*/
public static final String FILESYSTEM_TEMP_PATH = "tmp/staging";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;

import com.amazonaws.services.s3.AmazonS3;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,7 +112,9 @@ protected Configuration createConfiguration() {
MAGIC_COMMITTER_ENABLED,
S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME,
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
FAST_UPLOAD_BUFFER);

conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
Expand Down Expand Up @@ -209,6 +212,7 @@ public static String randomJobId() throws Exception {
*/
@Override
public void teardown() throws Exception {
Thread.currentThread().setName("teardown");
LOG.info("AbstractCommitITest::teardown");
waitForConsistency();
// make sure there are no failures any more
Expand Down Expand Up @@ -359,7 +363,7 @@ private String pathToPrefix(Path path) {
* @throws IOException IO Failure
*/
protected SuccessData verifySuccessMarker(Path dir) throws IOException {
return validateSuccessFile(dir, "", getFileSystem(), "query");
return validateSuccessFile(dir, "", getFileSystem(), "query", 0);
}

/**
Expand Down Expand Up @@ -437,13 +441,15 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId,
* @param committerName name of committer to match, or ""
* @param fs filesystem
* @param origin origin (e.g. "teragen" for messages)
* @param minimumFileCount minimum number of files to have been created
* @return the success data
* @throws IOException IO failure
*/
public static SuccessData validateSuccessFile(final Path outputPath,
final String committerName,
final S3AFileSystem fs,
final String origin) throws IOException {
final String origin,
final int minimumFileCount) throws IOException {
SuccessData successData = loadSuccessFile(fs, outputPath, origin);
String commitDetails = successData.toString();
LOG.info("Committer name " + committerName + "\n{}",
Expand All @@ -456,6 +462,9 @@ public static SuccessData validateSuccessFile(final Path outputPath,
assertEquals("Wrong committer in " + commitDetails,
committerName, successData.getCommitter());
}
Assertions.assertThat(successData.getFilenames())
.describedAs("Files committed")
.hasSizeGreaterThanOrEqualTo(minimumFileCount);
return successData;
}

Expand Down Expand Up @@ -485,8 +494,9 @@ public static SuccessData loadSuccessFile(final S3AFileSystem fs,
status.isFile());
assertTrue("0 byte success file "
+ success + " from " + origin
+ "; a s3guard committer was not used",
+ "; an S3A committer was not used",
status.getLen() > 0);
LOG.info("Loading committer success file {}", success);
return SuccessData.load(fs, success);
}
}

This file was deleted.

Loading