Skip to content

Commit d99eb99

Browse files
committed
HADOOP-18797: Support Concurrent Writes With S3A Magic Committer
1 parent 1046f9c commit d99eb99

File tree

12 files changed

+93
-66
lines changed

12 files changed

+93
-66
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private CommitConstants() {
3939
* {@value}.
4040
*/
4141
public static final String MAGIC = "__magic";
42+
public static final String MAGIC_PATH_PREFIX = MAGIC + "_job-";
4243

4344
/**
4445
* Marker of the start of a directory tree for calculating
@@ -78,6 +79,13 @@ private CommitConstants() {
7879
public static final String MAGIC_COMMITTER_ENABLED
7980
= MAGIC_COMMITTER_PREFIX + ".enabled";
8081

82+
/**
83+
* Flag to indicate whether to cleanup magic directory
84+
* during job completion.
85+
*/
86+
public static final String MAGIC_COMMITTER_CLEANUP_ENABLED
87+
= "fs.s3a.magic.cleanup.enabled";
88+
8189
/**
8290
* Flag to indicate whether a stream is a magic output stream;
8391
* returned in {@code StreamCapabilities}
@@ -117,6 +125,11 @@ private CommitConstants() {
117125
*/
118126
public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = true;
119127

128+
/**
129+
* Is magic directory cleanup enabled by default: {@value}.
130+
*/
131+
public static final boolean DEFAULT_MAGIC_COMMITTER_CLEANUP_ENABLED = true;
132+
120133
/**
121134
* This is the "Pending" directory of the {@code FileOutputCommitter};
122135
* data written here is, in that algorithm, renamed into place.
@@ -280,10 +293,12 @@ private CommitConstants() {
280293
/**
281294
* Default configuration value for
282295
* {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
296+
* It is disabled by default to support concurrent writes on the same
297+
* parent directory but different partition/sub directory.
283298
* Value: {@value}.
284299
*/
285300
public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
286-
true;
301+
false;
287302

288303
/**
289304
* The limit to the number of committed objects tracked during

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
2626
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;
2727

28-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
2928
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
29+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
3030

3131
/**
3232
* These are internal constants not intended for public use.
@@ -108,7 +108,7 @@ private InternalCommitterConstants() {
108108

109109
/** Error message for a path without a magic element in the list: {@value}. */
110110
public static final String E_NO_MAGIC_PATH_ELEMENT
111-
= "No " + MAGIC + " element in path";
111+
= "No " + MAGIC_PATH_PREFIX + " element in path";
112112

113113
/**
114114
* The UUID for jobs: {@value}.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.hadoop.fs.Path;
2626
import org.apache.hadoop.util.StringUtils;
2727

28+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
2829
import static org.apache.hadoop.util.Preconditions.checkArgument;
2930
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
30-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
3131
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT;
3232

3333
/**
@@ -76,7 +76,12 @@ public static List<String> splitPathToElements(Path path) {
7676
* @return true if a path is considered magic
7777
*/
7878
public static boolean isMagicPath(List<String> elements) {
79-
return elements.contains(MAGIC);
79+
for (String element : elements) {
80+
if (element.startsWith(MAGIC_PATH_PREFIX)) {
81+
return true;
82+
}
83+
}
84+
return false;
8085
}
8186

8287
/**
@@ -96,7 +101,18 @@ public static boolean containsBasePath(List<String> elements) {
96101
* @throws IllegalArgumentException if there is no magic element
97102
*/
98103
public static int magicElementIndex(List<String> elements) {
99-
int index = elements.indexOf(MAGIC);
104+
int index = 0;
105+
for (String element : elements) {
106+
if (element.startsWith(MAGIC_PATH_PREFIX)) {
107+
break;
108+
}
109+
index++;
110+
}
111+
// if index goes over and beyond element size
112+
// it implies the magic path is not present
113+
if (index >= elements.size()) {
114+
index = -1;
115+
}
100116
checkArgument(index >= 0, E_NO_MAGIC_PATH_ELEMENT);
101117
return index;
102118
}
@@ -182,15 +198,6 @@ public static String lastElement(List<String> strings) {
182198
return strings.get(strings.size() - 1);
183199
}
184200

185-
/**
186-
* Get the magic subdirectory of a destination directory.
187-
* @param destDir the destination directory
188-
* @return a new path.
189-
*/
190-
public static Path magicSubdir(Path destDir) {
191-
return new Path(destDir, MAGIC);
192-
}
193-
194201
/**
195202
* Calculates the final destination of a file.
196203
* This is the parent of any {@code __magic} element, and the filename
@@ -208,8 +215,8 @@ public static List<String> finalDestination(List<String> elements) {
208215
if (isMagicPath(elements)) {
209216
List<String> destDir = magicPathParents(elements);
210217
List<String> children = magicPathChildren(elements);
211-
checkArgument(!children.isEmpty(), "No path found under " +
212-
MAGIC);
218+
checkArgument(!children.isEmpty(), "No path found under the prefix" +
219+
MAGIC_PATH_PREFIX);
213220
ArrayList<String> dest = new ArrayList<>(destDir);
214221
if (containsBasePath(children)) {
215222
// there's a base marker in the path

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitUtilsWithMR.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ private CommitUtilsWithMR() {
4949
/**
5050
* Get the location of magic job attempts.
5151
* @param out the base output directory.
52+
* @param jobId unique Job ID.
5253
* @return the location of magic job attempts.
5354
*/
54-
public static Path getMagicJobAttemptsPath(Path out) {
55-
return new Path(out, MAGIC);
55+
public static Path getMagicJobAttemptsPath(Path out, String jobId) {
56+
return new Path(out, MAGIC + "_" + jobId);
5657
}
5758

5859
/**
@@ -76,7 +77,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
7677
int appAttemptId,
7778
Path dest) {
7879
return new Path(
79-
getMagicJobAttemptsPath(dest),
80+
getMagicJobAttemptsPath(dest, formatJobDir(jobUUID)),
8081
formatAppAttemptDir(jobUUID, appAttemptId));
8182
}
8283

@@ -88,9 +89,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
8889
*/
8990
public static Path getMagicJobPath(String jobUUID,
9091
Path dest) {
91-
return new Path(
92-
getMagicJobAttemptsPath(dest),
93-
formatJobDir(jobUUID));
92+
return getMagicJobAttemptsPath(dest, formatJobDir(jobUUID));
9493
}
9594

9695
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
6464
private static final Logger LOG =
6565
LoggerFactory.getLogger(MagicS3GuardCommitter.class);
6666

67+
private final boolean cleanupMagicDirectory;
6768
/** Name: {@value}. */
6869
public static final String NAME = CommitConstants.COMMITTER_NAME_MAGIC;
6970

@@ -78,6 +79,9 @@ public MagicS3GuardCommitter(Path outputPath,
7879
super(outputPath, context);
7980
setWorkPath(getTaskAttemptPath(context));
8081
verifyIsMagicCommitPath(getDestS3AFS(), getWorkPath());
82+
this.cleanupMagicDirectory = context.getConfiguration().getBoolean(
83+
CommitConstants.MAGIC_COMMITTER_CLEANUP_ENABLED,
84+
CommitConstants.DEFAULT_MAGIC_COMMITTER_CLEANUP_ENABLED);
8185
LOG.debug("Task attempt {} has work path {}",
8286
context.getTaskAttemptID(),
8387
getWorkPath());
@@ -105,7 +109,6 @@ public void setupJob(JobContext context) throws IOException {
105109
Path jobPath = getJobPath();
106110
final FileSystem destFS = getDestinationFS(jobPath,
107111
context.getConfiguration());
108-
destFS.delete(jobPath, true);
109112
destFS.mkdirs(jobPath);
110113
}
111114
}
@@ -131,16 +134,19 @@ protected ActiveCommit listPendingUploadsToCommit(
131134
* Delete the magic directory.
132135
*/
133136
public void cleanupStagingDirs() {
134-
final Path out = getOutputPath();
135-
Path path = magicSubdir(out);
136-
try(DurationInfo ignored = new DurationInfo(LOG, true,
137-
"Deleting magic directory %s", path)) {
138-
Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
139-
() -> deleteWithWarning(getDestFS(), path, true));
140-
// and the job temp directory with manifests
141-
Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
142-
() -> deleteWithWarning(getDestFS(),
143-
new Path(out, TEMP_DATA), true));
137+
if (cleanupMagicDirectory) {
138+
final Path out = getOutputPath();
139+
Path path = getMagicJobPath(getUUID(), out);
140+
try (DurationInfo ignored = new DurationInfo(LOG, true,
141+
"Deleting magic directory %s", path)) {
142+
Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
143+
() -> deleteWithWarning(getDestFS(), path, true));
144+
// and the job temp directory with manifests
145+
Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
146+
() -> deleteWithWarning(getDestFS(), new Path(out, TEMP_DATA), true));
147+
}
148+
} else {
149+
LOG.info("Skipping magic directory deletion. Cleaning up of magic directory is disabled");
144150
}
145151
}
146152

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterTestHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import static java.util.Objects.requireNonNull;
3535
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
3636
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
37-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
37+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
3838
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT;
3939
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
4040
import static org.apache.hadoop.fs.s3a.commit.impl.CommitOperations.extractMagicFileLength;
@@ -108,7 +108,7 @@ public void assertFileLacksMarkerHeader(Path path) throws IOException {
108108
*/
109109
public static Path makeMagic(Path destFile) {
110110
return new Path(destFile.getParent(),
111-
MAGIC + '/' + BASE + "/" + destFile.getName());
111+
MAGIC_PATH_PREFIX + '/' + BASE + "/" + destFile.getName());
112112
}
113113

114114
/**

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS;
5252
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED;
5353
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
54-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
54+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
5555
import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.assertIsMagicStream;
5656
import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.makeMagic;
5757
import static org.apache.hadoop.fs.s3a.performance.OperationCost.LIST_FILES_LIST_OP;
@@ -123,12 +123,12 @@ protected String fileSystemIOStats() {
123123

124124
@Test
125125
public void testMagicMkdir() throws Throwable {
126-
describe("Mkdirs __magic always skips dir marker deletion");
126+
describe("Mkdirs __magic_job- always skips dir marker deletion");
127127
S3AFileSystem fs = getFileSystem();
128128
Path baseDir = methodPath();
129129
// create dest dir marker, always
130130
fs.mkdirs(baseDir);
131-
Path magicDir = new Path(baseDir, MAGIC);
131+
Path magicDir = new Path(baseDir, MAGIC_PATH_PREFIX);
132132
verifyMetrics(() -> {
133133
fs.mkdirs(magicDir);
134134
return fileSystemIOStats();
@@ -151,10 +151,10 @@ public void testMagicMkdir() throws Throwable {
151151
*/
152152
@Test
153153
public void testMagicMkdirs() throws Throwable {
154-
describe("Mkdirs __magic/subdir always skips dir marker deletion");
154+
describe("Mkdirs __magic_job-/subdir always skips dir marker deletion");
155155
S3AFileSystem fs = getFileSystem();
156156
Path baseDir = methodPath();
157-
Path magicDir = new Path(baseDir, MAGIC);
157+
Path magicDir = new Path(baseDir, MAGIC_PATH_PREFIX);
158158
fs.delete(baseDir, true);
159159

160160
Path magicSubdir = new Path(magicDir, "subdir");

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ private CommitOperations newCommitOperations()
207207
*/
208208
private static Path makeMagic(Path destFile) {
209209
return new Path(destFile.getParent(),
210-
MAGIC + '/' + destFile.getName());
210+
MAGIC_PATH_PREFIX + '/' + destFile.getName());
211211
}
212212

213213
@Test
@@ -279,7 +279,7 @@ public void testBaseRelativePath() throws Throwable {
279279
S3AFileSystem fs = getFileSystem();
280280
Path destDir = methodSubPath("testBaseRelativePath");
281281
fs.delete(destDir, true);
282-
Path pendingBaseDir = new Path(destDir, MAGIC + "/child/" + BASE);
282+
Path pendingBaseDir = new Path(destDir, MAGIC_PATH_PREFIX + "/child/" + BASE);
283283
String child = "subdir/child.txt";
284284
Path pendingChildPath = new Path(pendingBaseDir, child);
285285
Path expectedDestPath = new Path(destDir, child);

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@
3838
public class TestMagicCommitPaths extends Assert {
3939

4040
private static final List<String> MAGIC_AT_ROOT =
41-
list(MAGIC);
41+
list(MAGIC_PATH_PREFIX);
4242
private static final List<String> MAGIC_AT_ROOT_WITH_CHILD =
43-
list(MAGIC, "child");
43+
list(MAGIC_PATH_PREFIX, "child");
4444
private static final List<String> MAGIC_WITH_CHILD =
45-
list("parent", MAGIC, "child");
45+
list("parent", MAGIC_PATH_PREFIX, "child");
4646
private static final List<String> MAGIC_AT_WITHOUT_CHILD =
47-
list("parent", MAGIC);
47+
list("parent", MAGIC_PATH_PREFIX);
4848

4949
private static final List<String> DEEP_MAGIC =
50-
list("parent1", "parent2", MAGIC, "child1", "child2");
50+
list("parent1", "parent2", MAGIC_PATH_PREFIX, "child1", "child2");
5151

5252
public static final String[] EMPTY = {};
5353

@@ -161,40 +161,40 @@ public void testFinalDestinationNoMagic() {
161161
@Test
162162
public void testFinalDestinationMagic1() {
163163
assertEquals(l("first", "2"),
164-
finalDestination(l("first", MAGIC, "2")));
164+
finalDestination(l("first", MAGIC_PATH_PREFIX, "2")));
165165
}
166166

167167
@Test
168168
public void testFinalDestinationMagic2() {
169169
assertEquals(l("first", "3.txt"),
170-
finalDestination(l("first", MAGIC, "2", "3.txt")));
170+
finalDestination(l("first", MAGIC_PATH_PREFIX, "2", "3.txt")));
171171
}
172172

173173
@Test
174174
public void testFinalDestinationRootMagic2() {
175175
assertEquals(l("3.txt"),
176-
finalDestination(l(MAGIC, "2", "3.txt")));
176+
finalDestination(l(MAGIC_PATH_PREFIX, "2", "3.txt")));
177177
}
178178

179179
@Test(expected = IllegalArgumentException.class)
180180
public void testFinalDestinationMagicNoChild() {
181-
finalDestination(l(MAGIC));
181+
finalDestination(l(MAGIC_PATH_PREFIX));
182182
}
183183

184184
@Test
185185
public void testFinalDestinationBaseDirectChild() {
186-
finalDestination(l(MAGIC, BASE, "3.txt"));
186+
finalDestination(l(MAGIC_PATH_PREFIX, BASE, "3.txt"));
187187
}
188188

189189
@Test(expected = IllegalArgumentException.class)
190190
public void testFinalDestinationBaseNoChild() {
191-
assertEquals(l(), finalDestination(l(MAGIC, BASE)));
191+
assertEquals(l(), finalDestination(l(MAGIC_PATH_PREFIX, BASE)));
192192
}
193193

194194
@Test
195195
public void testFinalDestinationBaseSubdirsChild() {
196196
assertEquals(l("2", "3.txt"),
197-
finalDestination(l(MAGIC, "4", BASE, "2", "3.txt")));
197+
finalDestination(l(MAGIC_PATH_PREFIX, "4", BASE, "2", "3.txt")));
198198
}
199199

200200
/**
@@ -203,7 +203,7 @@ public void testFinalDestinationBaseSubdirsChild() {
203203
@Test
204204
public void testFinalDestinationIgnoresBaseBeforeMagic() {
205205
assertEquals(l(BASE, "home", "3.txt"),
206-
finalDestination(l(BASE, "home", MAGIC, "2", "3.txt")));
206+
finalDestination(l(BASE, "home", MAGIC_PATH_PREFIX, "2", "3.txt")));
207207
}
208208

209209
/** varargs to array. */

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
7676
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
7777
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
78-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
78+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
7979
import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
8080
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
8181
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
@@ -586,7 +586,7 @@ private MagicCommitterTestBinding() {
586586
protected void validateResult(final Path destPath,
587587
final SuccessData successData)
588588
throws Exception {
589-
Path magicDir = new Path(destPath, MAGIC);
589+
Path magicDir = new Path(destPath, MAGIC_PATH_PREFIX);
590590

591591
// if an FNFE isn't raised on getFileStatus, list out the directory
592592
// tree

0 commit comments

Comments
 (0)