Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -667,39 +668,28 @@ public void rename2(final String src, final String dst,
public void concat(String trg, String[] src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

// See if the src and target files are all in the same namespace
LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1);
if (targetBlocks == null) {
throw new IOException("Cannot locate blocks for target file - " + trg);
}
LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock();
String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId();
for (String source : src) {
LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1);
if (sourceBlocks == null) {
throw new IOException(
"Cannot located blocks for source file " + source);
}
String sourceBlockPoolId =
sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId();
if (!sourceBlockPoolId.equals(targetBlockPoolId)) {
throw new IOException("Cannot concatenate source file " + source
+ " because it is located in a different namespace"
+ " with block pool id " + sourceBlockPoolId
+ " from the target file with block pool id "
+ targetBlockPoolId);
}
// Concat only effects when all files in the same namespace.
RemoteLocation targetDestination = getFileRemoteLocation(trg);
if (targetDestination == null) {
throw new IOException("Cannot find target file - " + trg);
}
String targetNameService = targetDestination.getNameserviceId();

// Find locations in the matching namespace.
final RemoteLocation targetDestination =
rpcServer.getLocationForPath(trg, true, targetBlockPoolId);
String[] sourceDestinations = new String[src.length];
for (int i = 0; i < src.length; i++) {
String sourceFile = src[i];
RemoteLocation location =
rpcServer.getLocationForPath(sourceFile, true, targetBlockPoolId);
sourceDestinations[i] = location.getDest();
RemoteLocation srcLocation = getFileRemoteLocation(sourceFile);
if (srcLocation == null) {
throw new IOException("Cannot find source file - " + sourceFile);
}
sourceDestinations[i] = srcLocation.getDest();

if (!targetNameService.equals(srcLocation.getNameserviceId())) {
throw new IOException("Cannot concatenate source file " + sourceFile
+ " because it is located in a different namespace" + " with nameservice "
+ srcLocation.getNameserviceId() + " from the target file with nameservice "
+ targetNameService);
}
}
// Invoke
RemoteMethod method = new RemoteMethod("concat",
Expand Down Expand Up @@ -1009,6 +999,28 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
return ret;
}

public RemoteLocation getFileRemoteLocation(String path) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
if (locations.size() == 1) {
return locations.get(0);
}
RemoteLocation remoteLocation = null;
for (RemoteLocation location : locations) {
RemoteMethod method =
new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam());
HdfsFileStatus ret = rpcClient.invokeSequential(Collections.singletonList(location), method,
HdfsFileStatus.class, null);
if (ret != null) {
remoteLocation = location;
break;
}
}

return remoteLocation;
}

@Override
public boolean isFileClosed(String src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,42 +1697,6 @@ public Long getNextSPSPath() throws IOException {
return nnProto.getNextSPSPath();
}

/**
* Locate the location with the matching block pool id.
*
* @param path Path to check.
* @param failIfLocked Fail the request if locked (top mount point).
* @param blockPoolId The block pool ID of the namespace to search for.
* @return Prioritized list of locations in the federated cluster.
* @throws IOException if the location for this path cannot be determined.
*/
protected RemoteLocation getLocationForPath(
String path, boolean failIfLocked, String blockPoolId)
throws IOException {

final List<RemoteLocation> locations =
getLocationsForPath(path, failIfLocked);

String nameserviceId = null;
Set<FederationNamespaceInfo> namespaces =
this.namenodeResolver.getNamespaces();
for (FederationNamespaceInfo namespace : namespaces) {
if (namespace.getBlockPoolId().equals(blockPoolId)) {
nameserviceId = namespace.getNameserviceId();
break;
}
}
if (nameserviceId != null) {
for (RemoteLocation location : locations) {
if (location.getNameserviceId().equals(nameserviceId)) {
return location;
}
}
}
throw new IOException(
"Cannot locate a nameservice for block pool " + blockPoolId);
}

/**
* Get the possible locations of a path in the federated cluster.
* During the get operation, it will do the quota verification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,21 @@ public void testProxyGetPreferedBlockSize() throws Exception {
routerProtocol, nnProtocol, m, new Object[] {badPath});
}

private void testConcat(
String source, String target, boolean failureExpected, boolean verfiyException, String msg) {
boolean failure = false;
try {
// Concat test file with fill block length file via router
routerProtocol.concat(target, new String[] {source});
} catch (IOException ex) {
failure = true;
if (verfiyException) {
assertExceptionContains(msg, ex);
}
}
assertEquals(failureExpected, failure);
}

private void testConcat(
String source, String target, boolean failureExpected) {
boolean failure = false;
Expand Down Expand Up @@ -1224,6 +1239,27 @@ public void testProxyConcatFile() throws Exception {
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, new String[] {routerFile}});

// Test when concat trg is a empty file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to check the empty source file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is a empty source file, it will throw exception in namenode. This behaiver is as same as with dfsrouter.
And when trg is a empty file, it is diffrent . Without dfsrouter ,it success. And with dfsrouter, it will throw Exception.
So I think there is no need to check the empty source file. Or implement it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the namenode and rbf throw the same Exception?

Maybe RBF throws NPE, but NN throws org.apache.hadoop.HadoopIllegalArgumentException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you modify the UT to cover the case that one or more source files are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test for a empty src file

createFile(routerFS, existingFile, existingFileSize);
String sameRouterEmptyFile =
cluster.getFederatedTestDirectoryForNS(sameNameservice) +
"_newemptyfile";
createFile(routerFS, sameRouterEmptyFile, 0);
// Concat in same namespaces, succeeds
testConcat(existingFile, sameRouterEmptyFile, false);
FileStatus mergedStatus = getFileStatus(routerFS, sameRouterEmptyFile);
assertEquals(existingFileSize, mergedStatus.getLen());

// Test when concat srclist has some empty file, namenode will throw IOException.
String srcEmptyFile = cluster.getFederatedTestDirectoryForNS(sameNameservice) + "_srcEmptyFile";
createFile(routerFS, srcEmptyFile, 0);
String targetFile = cluster.getFederatedTestDirectoryForNS(sameNameservice) + "_targetFile";
createFile(routerFS, targetFile, existingFileSize);
// Concat in same namespaces, succeeds
testConcat(srcEmptyFile, targetFile, true, true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here please check if the exception is org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

"org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): concat: source file "
+ srcEmptyFile + " is invalid or empty or underConstruction");
}

@Test
Expand Down