Skip to content

Commit 52db86b

Browse files
authored
HADOOP-17021. Add concat fs command (#1993)
Contributed by Jinglun
1 parent 735e85a commit 52db86b

File tree

4 files changed

+272
-0
lines changed

4 files changed

+272
-0
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs.shell;
19+
20+
import java.io.FileNotFoundException;
21+
import java.io.IOException;
22+
import java.util.LinkedList;
23+
24+
import com.google.common.annotations.VisibleForTesting;
25+
26+
import org.apache.hadoop.classification.InterfaceAudience;
27+
import org.apache.hadoop.classification.InterfaceStability;
28+
import org.apache.hadoop.fs.FileSystem;
29+
import org.apache.hadoop.fs.Path;
30+
import org.apache.hadoop.fs.PathIOException;
31+
32+
/**
33+
* Concat the given files.
34+
*/
35+
@InterfaceAudience.Private
36+
@InterfaceStability.Unstable
37+
public class Concat extends FsCommand {
38+
public static void registerCommands(CommandFactory factory) {
39+
factory.addClass(Concat.class, "-concat");
40+
}
41+
42+
public static final String NAME = "concat";
43+
public static final String USAGE = "<target path> <src path> <src path> ...";
44+
public static final String DESCRIPTION = "Concatenate existing source files"
45+
+ " into the target file. Target file and source files should be in the"
46+
+ " same directory.";
47+
private static FileSystem testFs; // test only.
48+
49+
@Override
50+
protected void processArguments(LinkedList<PathData> args)
51+
throws IOException {
52+
if (args.size() < 1) {
53+
throw new IOException("Target path not specified. " + USAGE);
54+
}
55+
if (args.size() < 3) {
56+
throw new IOException(
57+
"The number of source paths is less than 2. " + USAGE);
58+
}
59+
PathData target = args.removeFirst();
60+
LinkedList<PathData> srcList = args;
61+
if (!target.exists || !target.stat.isFile()) {
62+
throw new FileNotFoundException(String
63+
.format("Target path %s does not exist or is" + " not file.",
64+
target.path));
65+
}
66+
Path[] srcArray = new Path[srcList.size()];
67+
for (int i = 0; i < args.size(); i++) {
68+
PathData src = srcList.get(i);
69+
if (!src.exists || !src.stat.isFile()) {
70+
throw new FileNotFoundException(
71+
String.format("%s does not exist or is not file.", src.path));
72+
}
73+
srcArray[i] = src.path;
74+
}
75+
FileSystem fs = target.fs;
76+
if (testFs != null) {
77+
fs = testFs;
78+
}
79+
try {
80+
fs.concat(target.path, srcArray);
81+
} catch (UnsupportedOperationException exception) {
82+
throw new PathIOException("Dest filesystem '" + fs.getUri().getScheme()
83+
+ "' doesn't support concat.", exception);
84+
}
85+
}
86+
87+
@VisibleForTesting
88+
static void setTestFs(FileSystem fs) {
89+
testFs = fs;
90+
}
91+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public static void registerCommands(CommandFactory factory) {
7070
factory.registerCommands(Truncate.class);
7171
factory.registerCommands(SnapshotCommands.class);
7272
factory.registerCommands(XAttrCommands.class);
73+
factory.registerCommands(Concat.class);
7374
}
7475

7576
protected FsCommand() {}

hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,18 @@ Example:
813813
* `hadoop fs -truncate 55 /user/hadoop/file1 /user/hadoop/file2`
814814
* `hadoop fs -truncate -w 127 hdfs://nn1.example.com/user/hadoop/file1`
815815

816+
concat
817+
--------
818+
819+
Usage: `hadoop fs -concat <target file> <source files>`
820+
821+
Concatenate existing source files into the target file. Target file and source
822+
files should be in the same directory.
823+
824+
Example:
825+
826+
* `hadoop fs -concat hdfs://cluster/user/hadoop/target-file hdfs://cluster/user/hadoop/file-0 hdfs://cluster/user/hadoop/file-1`
827+
816828
usage
817829
-----
818830

@@ -1092,6 +1104,7 @@ actually fail.
10921104
| `setfattr` | generally unsupported permissions model |
10931105
| `setrep`| has no effect |
10941106
| `truncate` | generally unsupported |
1107+
| `concat` | generally unsupported |
10951108

10961109
Different object store clients *may* support these commands: do consult the
10971110
documentation and test against the target store.
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs.shell;
19+
20+
import java.io.ByteArrayOutputStream;
21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
import java.io.IOException;
24+
import java.io.PrintStream;
25+
import java.net.URI;
26+
import java.util.Random;
27+
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.Mockito;
31+
import org.assertj.core.api.Assertions;
32+
33+
import org.apache.hadoop.conf.Configuration;
34+
import org.apache.hadoop.fs.FileSystem;
35+
import org.apache.hadoop.fs.FsShell;
36+
import org.apache.hadoop.fs.LocalFileSystem;
37+
import org.apache.hadoop.fs.Path;
38+
import org.apache.hadoop.fs.contract.ContractTestUtils;
39+
import org.apache.hadoop.io.IOUtils;
40+
import org.apache.hadoop.test.GenericTestUtils;
41+
import org.apache.hadoop.test.AbstractHadoopTestBase;
42+
43+
import static org.mockito.ArgumentMatchers.any;
44+
import static org.junit.Assert.assertEquals;
45+
46+
/**
47+
* Test Concat.
48+
*/
49+
public class TestFsShellConcat extends AbstractHadoopTestBase {
50+
51+
private static Configuration conf;
52+
private static FsShell shell;
53+
private static LocalFileSystem lfs;
54+
private static Path testRootDir;
55+
private static Path dstPath;
56+
57+
@Before
58+
public void before() throws IOException {
59+
conf = new Configuration();
60+
shell = new FsShell(conf);
61+
lfs = FileSystem.getLocal(conf);
62+
testRootDir = lfs.makeQualified(new Path(GenericTestUtils.getTempPath(
63+
"testFsShellCopy")));
64+
65+
lfs.delete(testRootDir, true);
66+
lfs.mkdirs(testRootDir);
67+
lfs.setWorkingDirectory(testRootDir);
68+
dstPath = new Path(testRootDir, "dstFile");
69+
lfs.create(dstPath).close();
70+
71+
Random random = new Random();
72+
for (int i = 0; i < 10; i++) {
73+
OutputStream out =
74+
lfs.create(new Path(testRootDir, String.format("file-%02d", i)));
75+
out.write(random.nextInt());
76+
out.close();
77+
}
78+
}
79+
80+
@Test
81+
public void testConcat() throws Exception {
82+
// Read concatenated files to build the expected file content.
83+
ByteArrayOutputStream out = new ByteArrayOutputStream();
84+
for (int i = 0; i < 10; i++) {
85+
try (InputStream in = lfs
86+
.open(new Path(testRootDir, String.format("file-%02d", i)))) {
87+
IOUtils.copyBytes(in, out, 1024);
88+
}
89+
}
90+
byte[] expectContent = out.toByteArray();
91+
92+
// Do concat.
93+
FileSystem mockFs = Mockito.mock(FileSystem.class);
94+
Mockito.doAnswer(invocation -> {
95+
Object[] args = invocation.getArguments();
96+
Path target = (Path)args[0];
97+
Path[] src = (Path[]) args[1];
98+
mockConcat(target, src);
99+
return null;
100+
}).when(mockFs).concat(any(Path.class), any(Path[].class));
101+
Concat.setTestFs(mockFs);
102+
shellRun(0, "-concat", dstPath.toString(), testRootDir+"/file-*");
103+
104+
// Verify concat result.
105+
ContractTestUtils
106+
.assertPathExists(lfs, "The target file doesn't exist.", dstPath);
107+
Assertions.assertThat(lfs.listStatus(testRootDir).length).isEqualTo(1);
108+
assertEquals(expectContent.length, lfs.getFileStatus(dstPath).getLen());
109+
out = new ByteArrayOutputStream();
110+
try (InputStream in = lfs.open(dstPath)) {
111+
IOUtils.copyBytes(in, out, 1024);
112+
}
113+
// Verify content.
114+
byte[] concatedContent = out.toByteArray();
115+
assertEquals(expectContent.length, concatedContent.length);
116+
ContractTestUtils.compareByteArrays(expectContent, concatedContent,
117+
expectContent.length);
118+
}
119+
120+
@Test
121+
public void testUnsupportedFs() throws Exception {
122+
FileSystem mockFs = Mockito.mock(FileSystem.class);
123+
Mockito.doThrow(
124+
new UnsupportedOperationException("Mock unsupported exception."))
125+
.when(mockFs).concat(any(Path.class), any(Path[].class));
126+
Mockito.doAnswer(invocationOnMock -> new URI("mockfs:///")).when(mockFs)
127+
.getUri();
128+
Concat.setTestFs(mockFs);
129+
final ByteArrayOutputStream err = new ByteArrayOutputStream();
130+
PrintStream oldErr = System.err;
131+
System.setErr(new PrintStream(err));
132+
try {
133+
shellRun(1, "-concat", dstPath.toString(), testRootDir + "/file-*");
134+
} finally {
135+
System.setErr(oldErr);
136+
}
137+
System.err.print(err.toString());
138+
String expectedErrMsg = "Dest filesystem 'mockfs' doesn't support concat";
139+
Assertions.assertThat(err.toString().contains(expectedErrMsg))
140+
.withFailMessage("The err message should contain \"" + expectedErrMsg
141+
+ "\" message.").isTrue();
142+
}
143+
144+
private void shellRun(int n, String... args) {
145+
assertEquals(n, shell.run(args));
146+
}
147+
148+
/**
149+
* Simple simulation of concat.
150+
*/
151+
private void mockConcat(Path target, Path[] srcArray) throws IOException {
152+
Path tmp = new Path(target.getParent(), target.getName() + ".bak");
153+
lfs.rename(target, tmp);
154+
try (OutputStream out = lfs.create(target)) {
155+
try (InputStream in = lfs.open(tmp)) {
156+
IOUtils.copyBytes(in, out, 1024);
157+
}
158+
lfs.delete(tmp, true);
159+
for (int i = 0; i < srcArray.length; i++) {
160+
try (InputStream iin = lfs.open(srcArray[i])) {
161+
IOUtils.copyBytes(iin, out, 1024);
162+
}
163+
lfs.delete(srcArray[i], true);
164+
}
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)