Skip to content

Commit 02fc9f1

Browse files
committed
HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits
1 parent da34ecd commit 02fc9f1

File tree

12 files changed

+578
-105
lines changed

12 files changed

+578
-105
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.atomic.AtomicBoolean;
5353
import javax.annotation.Nullable;
5454

55+
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
5556
import software.amazon.awssdk.core.ResponseInputStream;
5657
import software.amazon.awssdk.core.exception.SdkException;
5758
import software.amazon.awssdk.services.s3.S3AsyncClient;
@@ -235,6 +236,7 @@
235236
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
236237
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
237238
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
239+
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
238240
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
239241
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
240242
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
@@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode)
39063908
@Retries.RetryTranslated
39073909
public FileStatus getFileStatus(final Path f) throws IOException {
39083910
Path path = qualify(f);
3911+
if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) {
3912+
// Some downstream apps might call getFileStatus for a magic path to get the file size.
3913+
// when commit data is stored in memory construct the dummy S3AFileStatus with correct
3914+
// file size fetched from the memory.
3915+
if (InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().containsKey(path)) {
3916+
long len = InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().get(path);
3917+
return new S3AFileStatus(len,
3918+
0L,
3919+
path,
3920+
getDefaultBlockSize(path),
3921+
username,
3922+
null,
3923+
null);
3924+
}
3925+
}
39093926
return trackDurationAndSpan(
39103927
INVOCATION_GET_FILE_STATUS, path, () ->
39113928
innerGetFileStatus(path, false, StatusProbeEnum.ALL));

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,13 @@ private CommitConstants() {
242242
*/
243243
public static final int DEFAULT_COMMITTER_THREADS = 32;
244244

245+
246+
public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =
247+
"fs.s3a.committer.magic.track.commits.in.memory.enabled";
248+
249+
public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
250+
false;
251+
245252
/**
246253
* Path in the cluster filesystem for temporary data: {@value}.
247254
* This is for HDFS, not the local filesystem.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@
2020

2121
import java.util.List;
2222

23+
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
24+
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
2527

2628
import org.apache.hadoop.fs.Path;
2729
import org.apache.hadoop.fs.s3a.S3AFileSystem;
2830
import org.apache.hadoop.fs.s3a.Statistic;
29-
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
3031
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
3132
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
3233

3334
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
35+
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
3436

3537
/**
3638
* Adds the code needed for S3A to support magic committers.
@@ -105,13 +107,15 @@ public PutTracker createTracker(Path path, String key,
105107
String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
106108
getStoreContext().incrementStatistic(
107109
Statistic.COMMITTER_MAGIC_FILES_CREATED);
108-
tracker = new MagicCommitTracker(path,
109-
getStoreContext().getBucket(),
110-
key,
111-
destKey,
112-
pendingsetPath,
113-
owner.getWriteOperationHelper(),
114-
trackerStatistics);
110+
if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) {
111+
tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(),
112+
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
113+
trackerStatistics);
114+
} else {
115+
tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(),
116+
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
117+
trackerStatistics);
118+
}
115119
LOG.debug("Created {}", tracker);
116120
} else {
117121
LOG.warn("File being created has a \"magic\" path, but the filesystem"
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.commit.magic;
20+
21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
24+
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
25+
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
26+
import org.apache.hadoop.fs.statistics.IOStatistics;
27+
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
28+
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
29+
import software.amazon.awssdk.services.s3.model.CompletedPart;
30+
31+
import java.io.IOException;
32+
import java.util.ArrayList;
33+
import java.util.Collections;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
38+
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;
39+
40+
/**
41+
* InMemoryMagicCommitTracker stores the commit data in memory.
42+
* The commit data and related data stores are flushed out from
43+
* the memory when the task is committed or aborted.
44+
*/
45+
public class InMemoryMagicCommitTracker extends MagicCommitTracker {
46+
47+
// stores taskAttemptId to commit data mapping
48+
private static Map<String, List<SinglePendingCommit>>
49+
taskAttemptIdToMpuMetdadataMap = new ConcurrentHashMap<>();
50+
51+
// stores the path to its length/size mapping
52+
private static Map<Path, Long> taskAttemptIdToBytesWritten = new ConcurrentHashMap<>();
53+
54+
// stores taskAttemptId to path mapping
55+
private static Map<String, List<Path>> taskAttemptIdToPath = new ConcurrentHashMap<>();
56+
57+
public InMemoryMagicCommitTracker(Path path,
58+
String bucket,
59+
String originalDestKey,
60+
String destKey,
61+
String pendingsetKey,
62+
WriteOperationHelper writer,
63+
PutTrackerStatistics trackerStatistics) {
64+
super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics);
65+
}
66+
67+
@Override
68+
public boolean aboutToComplete(String uploadId,
69+
List<CompletedPart> parts,
70+
long bytesWritten,
71+
final IOStatistics iostatistics)
72+
throws IOException {
73+
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
74+
"empty/null upload ID: " + uploadId);
75+
Preconditions.checkArgument(parts != null, "No uploaded parts list");
76+
Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save");
77+
78+
// build the commit summary
79+
SinglePendingCommit commitData = new SinglePendingCommit();
80+
commitData.touch(System.currentTimeMillis());
81+
commitData.setDestinationKey(getDestKey());
82+
commitData.setBucket(getBucket());
83+
commitData.setUri(getPath().toUri().toString());
84+
commitData.setUploadId(uploadId);
85+
commitData.setText("");
86+
commitData.setLength(bytesWritten);
87+
commitData.bindCommitData(parts);
88+
commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics));
89+
90+
// extract the taskAttemptId from the path
91+
String taskAttemptId = extractTaskAttemptIdFromPath(getPath());
92+
93+
// store the commit data with taskAttemptId as the key
94+
taskAttemptIdToMpuMetdadataMap.computeIfAbsent(taskAttemptId,
95+
k -> Collections.synchronizedList(new ArrayList<>())).add(commitData);
96+
97+
// store the byteswritten(length) for the corresponding file
98+
taskAttemptIdToBytesWritten.put(getPath(), bytesWritten);
99+
100+
// store the mapping between taskAttemptId and path
101+
// This information is used for removing entries from
102+
// the map once the taskAttempt is completed/committed.
103+
taskAttemptIdToPath.computeIfAbsent(taskAttemptId,
104+
k -> Collections.synchronizedList(new ArrayList<>())).add(getPath());
105+
106+
LOG.info("commit metadata for {} parts in {}. size: {} byte(s) "
107+
+ "for the taskAttemptId: {} is stored in memory",
108+
parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId);
109+
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
110+
getPath(), getPendingPartKey(), commitData);
111+
112+
return false;
113+
}
114+
115+
public static Map<String, List<SinglePendingCommit>> getTaskAttemptIdToMpuMetdadataMap() {
116+
return taskAttemptIdToMpuMetdadataMap;
117+
}
118+
119+
public static Map<Path, Long> getTaskAttemptIdToBytesWritten() {
120+
return taskAttemptIdToBytesWritten;
121+
}
122+
123+
public static Map<String, List<Path>> getTaskAttemptIdToPath() {
124+
return taskAttemptIdToPath;
125+
}
126+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java

Lines changed: 30 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,22 @@
1818

1919
package org.apache.hadoop.fs.s3a.commit.magic;
2020

21-
import java.io.ByteArrayInputStream;
2221
import java.io.IOException;
23-
import java.io.InputStream;
24-
import java.util.HashMap;
2522
import java.util.List;
26-
import java.util.Map;
2723

2824
import software.amazon.awssdk.services.s3.model.CompletedPart;
29-
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3025
import org.slf4j.Logger;
3126
import org.slf4j.LoggerFactory;
3227

3328

34-
import org.apache.commons.lang3.StringUtils;
3529
import org.apache.hadoop.classification.InterfaceAudience;
3630
import org.apache.hadoop.fs.Path;
37-
import org.apache.hadoop.fs.s3a.Retries;
38-
import org.apache.hadoop.fs.s3a.S3ADataBlocks;
3931
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
4032
import org.apache.hadoop.fs.s3a.commit.PutTracker;
41-
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
42-
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
4333
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
4434
import org.apache.hadoop.fs.statistics.IOStatistics;
45-
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
46-
import org.apache.hadoop.util.Preconditions;
4735

4836
import static java.util.Objects.requireNonNull;
49-
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
50-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
51-
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
5237

5338
/**
5439
* Put tracker for Magic commits.
@@ -65,7 +50,7 @@ public class MagicCommitTracker extends PutTracker {
6550
private final Path path;
6651
private final WriteOperationHelper writer;
6752
private final String bucket;
68-
private static final byte[] EMPTY = new byte[0];
53+
protected static final byte[] EMPTY = new byte[0];
6954
private final PutTrackerStatistics trackerStatistics;
7055

7156
/**
@@ -118,76 +103,21 @@ public boolean outputImmediatelyVisible() {
118103

119104
/**
120105
* Complete operation: generate the final commit data, put it.
121-
* @param uploadId Upload ID
122-
* @param parts list of parts
106+
*
107+
* @param uploadId Upload ID
108+
* @param parts list of parts
123109
* @param bytesWritten bytes written
124110
* @param iostatistics nullable IO statistics
125111
* @return false, indicating that the commit must fail.
126-
* @throws IOException any IO problem.
112+
* @throws IOException any IO problem.
127113
* @throws IllegalArgumentException bad argument
128114
*/
129115
@Override
130116
public boolean aboutToComplete(String uploadId,
131117
List<CompletedPart> parts,
132118
long bytesWritten,
133-
final IOStatistics iostatistics)
134-
throws IOException {
135-
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
136-
"empty/null upload ID: "+ uploadId);
137-
Preconditions.checkArgument(parts != null,
138-
"No uploaded parts list");
139-
Preconditions.checkArgument(!parts.isEmpty(),
140-
"No uploaded parts to save");
141-
142-
// put a 0-byte file with the name of the original under-magic path
143-
// Add the final file length as a header
144-
// this is done before the task commit, so its duration can be
145-
// included in the statistics
146-
Map<String, String> headers = new HashMap<>();
147-
headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
148-
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
149-
originalDestKey,
150-
0,
151-
new PutObjectOptions(true, null, headers), false);
152-
upload(originalDestPut, new ByteArrayInputStream(EMPTY));
153-
154-
// build the commit summary
155-
SinglePendingCommit commitData = new SinglePendingCommit();
156-
commitData.touch(System.currentTimeMillis());
157-
commitData.setDestinationKey(getDestKey());
158-
commitData.setBucket(bucket);
159-
commitData.setUri(path.toUri().toString());
160-
commitData.setUploadId(uploadId);
161-
commitData.setText("");
162-
commitData.setLength(bytesWritten);
163-
commitData.bindCommitData(parts);
164-
commitData.setIOStatistics(
165-
new IOStatisticsSnapshot(iostatistics));
166-
167-
byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());
168-
LOG.info("Uncommitted data pending to file {};"
169-
+ " commit metadata for {} parts in {}. size: {} byte(s)",
170-
path.toUri(), parts.size(), pendingPartKey, bytesWritten);
171-
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
172-
path, pendingPartKey, commitData);
173-
PutObjectRequest put = writer.createPutObjectRequest(
174-
pendingPartKey,
175-
bytes.length, null, false);
176-
upload(put, new ByteArrayInputStream(bytes));
119+
final IOStatistics iostatistics) throws IOException {
177120
return false;
178-
179-
}
180-
/**
181-
* PUT an object.
182-
* @param request the request
183-
* @param inputStream input stream of data to be uploaded
184-
* @throws IOException on problems
185-
*/
186-
@Retries.RetryTranslated
187-
private void upload(PutObjectRequest request, InputStream inputStream) throws IOException {
188-
trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
189-
() -> writer.putObject(request, PutObjectOptions.keepingDirs(),
190-
new S3ADataBlocks.BlockUploadData(inputStream), false, null));
191121
}
192122

193123
@Override
@@ -201,4 +131,28 @@ public String toString() {
201131
sb.append('}');
202132
return sb.toString();
203133
}
134+
135+
public String getOriginalDestKey() {
136+
return originalDestKey;
137+
}
138+
139+
public String getPendingPartKey() {
140+
return pendingPartKey;
141+
}
142+
143+
public Path getPath() {
144+
return path;
145+
}
146+
147+
public String getBucket() {
148+
return bucket;
149+
}
150+
151+
public WriteOperationHelper getWriter() {
152+
return writer;
153+
}
154+
155+
public PutTrackerStatistics getTrackerStatistics() {
156+
return trackerStatistics;
157+
}
204158
}

0 commit comments

Comments
 (0)