From 0a0dd6638c9bbcc275c815e68d25f0577620f591 Mon Sep 17 00:00:00 2001 From: Yang Yun Date: Fri, 21 Aug 2020 10:59:41 +0800 Subject: [PATCH 1/4] HDFS-15484 Add new method batchRename for DistributedFileSystem and WebHdfsFileSystem --- .../org/apache/hadoop/fs/BatchOperations.java | 43 +++ .../hadoop/fs/CommonPathCapabilities.java | 7 + .../org/apache/hadoop/hdfs/DFSClient.java | 21 ++ .../hadoop/hdfs/DFSOpsCountStatistics.java | 3 +- .../hadoop/hdfs/DistributedFileSystem.java | 23 +- .../hdfs/client/DfsPathCapabilities.java | 1 + .../hdfs/protocol/BatchOpsException.java | 87 ++++++ .../hadoop/hdfs/protocol/ClientProtocol.java | 13 + .../ClientNamenodeProtocolTranslatorPB.java | 28 ++ .../hadoop/hdfs/web/WebHdfsFileSystem.java | 36 ++- .../hadoop/hdfs/web/resources/PutOpParam.java | 1 + .../main/proto/ClientNamenodeProtocol.proto | 11 + .../router/RouterClientProtocol.java | 6 + .../federation/router/RouterRpcServer.java | 6 + ...amenodeProtocolServerSideTranslatorPB.java | 33 +++ .../hdfs/server/namenode/FSNamesystem.java | 108 ++++++++ .../server/namenode/NameNodeRpcServer.java | 25 +- .../web/resources/NamenodeWebHdfsMethods.java | 8 + .../apache/hadoop/hdfs/TestBatchRename.java | 259 ++++++++++++++++++ 19 files changed, 713 insertions(+), 6 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchOpsException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java new file mode 100644 index 0000000000000..9dfe8a5780062 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; + +/** + * Interface filesystems MAY implement to offer a batched operations. + */ + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BatchOperations { + + /** + * Batched rename API that rename a batch of files. + * + * @param srcs source file list. + * @param dsts target file list. + * @throws IOException failure exception. + */ + void batchRename(String[] srcs, String[] dsts, Options.Rename... options) + throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java index 539b3e27c0351..9795536ba9eec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java @@ -139,4 +139,11 @@ private CommonPathCapabilities() { public static final String FS_MULTIPART_UPLOADER = "fs.capability.multipart.uploader"; + /** + * Does the store support multipart uploading? + * Value: {@value}. + */ + public static final String FS_BATCH_RENAME = + "fs.capability.batch.rename"; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0f7e1bb963496..3d3e506573812 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchOpsException; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -1609,6 +1610,26 @@ public void rename(String src, String dst, Options.Rename... options) } } + /** + * Rename a batch files or directories. + * @see ClientProtocol#batchRename(String[] , String[], Options.Rename...) + */ + public void batchRename(String[] srcs, String[] dsts, + Options.Rename... options) throws IOException { + checkOpen(); + try { + namenode.batchRename(srcs, dsts, options); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + NSQuotaExceededException.class, + DSQuotaExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class, + BatchOpsException.class); + } + } + + /** * Truncate a file to an indicated size * See {@link ClientProtocol#truncate}. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java index fdd0072905fd4..8217af67c869e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java @@ -114,7 +114,8 @@ public enum OpType { GET_SNAPSHOT_LIST("op_get_snapshot_list"), TRUNCATE(CommonStatisticNames.OP_TRUNCATE), UNSET_EC_POLICY("op_unset_ec_policy"), - UNSET_STORAGE_POLICY("op_unset_storage_policy"); + UNSET_STORAGE_POLICY("op_unset_storage_policy"), + BATCH_RENAME("op_batch_rename"); private static final Map SYMBOL_MAP = new HashMap<>(OpType.values().length); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 1d5c5af8c38e1..ba9b3d1d30edd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -30,6 +30,7 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.BatchListingOperations; +import org.apache.hadoop.fs.BatchOperations; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.CacheFlag; @@ -50,6 +51,7 @@ import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; +import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.InvalidPathHandleException; import org.apache.hadoop.fs.PartialListing; import org.apache.hadoop.fs.MultipartUploaderBuilder; @@ -147,7 +149,7 @@ @InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" }) @InterfaceStability.Unstable public class DistributedFileSystem extends FileSystem - implements KeyProviderTokenIssuer, BatchListingOperations { + implements KeyProviderTokenIssuer, BatchListingOperations, BatchOperations{ private Path workingDir; private URI uri; @@ -965,6 +967,25 @@ public Void next(final FileSystem fs, final Path p) } } + protected String[] getBatchPathName(String[] files) { + List ret = new ArrayList<>(); + for(String f : files) { + ret.add(getPathName(new Path(f))); + } + return ret.toArray(new String[ret.size()]); + } + + @Override + public void batchRename(final String[] srcs, final String[] dsts, + final Options.Rename... options) throws IOException { + if (srcs.length != dsts.length) { + throw new InvalidPathException("mismatch batch path src: " + + Arrays.toString(srcs) + " dst: " + Arrays.toString(dsts)); + } + statistics.incrementWriteOps(1); + dfs.batchRename(getBatchPathName(srcs), getBatchPathName(dsts)); + } + @Override public boolean truncate(Path f, final long newLength) throws IOException { statistics.incrementWriteOps(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java index 30e7e00653bcc..8ffbefa033d6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java @@ -53,6 +53,7 @@ public static Optional hasPathCapability(final Path path, case CommonPathCapabilities.FS_SNAPSHOTS: case CommonPathCapabilities.FS_STORAGEPOLICY: case CommonPathCapabilities.FS_XATTRS: + case CommonPathCapabilities.FS_BATCH_RENAME: return Optional.of(true); case CommonPathCapabilities.FS_SYMLINKS: return Optional.of(FileSystem.areSymlinksEnabled()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchOpsException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchOpsException.java new file mode 100644 index 0000000000000..839e6193d6f9d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchOpsException.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; + +/** + * Thrown when break during a batch operation . + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BatchOpsException extends IOException { + private static final long serialVersionUID = 1L; + private static final String TAG_INDEX = "index"; + private static final String TAG_TOTAL = "total"; + private static final String TAG_REASON = "reason"; + + /** + * Used by RemoteException to instantiate an BatchOpsException. + */ + public BatchOpsException(String msg) { + super(msg); + } + + public BatchOpsException(long index, long total, Throwable cause) { + this(index, total, + cause.getClass().getName() + ": " + cause.getMessage()); + } + + public BatchOpsException(long index, long total, + String cause) { + super("Batch operation break! " + + getTagHeader(TAG_INDEX) + index + getTagTailer(TAG_INDEX) + + getTagHeader(TAG_TOTAL) + total + getTagTailer(TAG_TOTAL) + + getTagHeader(TAG_REASON) + cause + getTagTailer(TAG_REASON)); + } + + public long getIndex() { + return Long.parseLong(getValue(TAG_INDEX)); + } + + public long getTotal() { + return Long.parseLong(getValue(TAG_TOTAL)); + } + + public String getReason() { + return getValue(TAG_REASON); + } + + private static String getTagHeader(String tag) { + return "<"+tag + ">"; + } + + private static String getTagTailer(String tag) { + return ""; + } + + private String getValue(String target) { + String msg = getMessage(); + String header = getTagHeader(target); + String tailer = getTagTailer(target); + int pos1 = msg.indexOf(header) + header.length(); + int pos2 = msg.indexOf(tailer, pos1); + + assert pos2 > pos1; + return msg.substring(pos1, pos2); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index ea90645ca082b..21f089d6e9793 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -583,6 +583,19 @@ void concat(String trg, String[] srcs) void rename2(String src, String dst, Options.Rename... options) throws IOException; + + /** + * Rename an batch items in the file system namespace. + * @param srcs existing files or directories name.216 + * @param dsts new names. + * @param options Rename options + * + * @throws IOException an I/O error occurred + */ + @AtMostOnce + void batchRename(String[] srcs, String[] dsts, + Options.Rename... options) throws IOException; + /** * Truncate file src to new size. *
    diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 0674cefe2e2df..af754fa4a832a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -104,6 +104,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.BatchRenameRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; @@ -637,6 +638,33 @@ public void rename2(String src, String dst, Rename... options) } + @Override + public void batchRename(String[] srcs, String[] dsts, Rename... options) + throws IOException { + boolean overwrite = false; + boolean toTrash = false; + if (options != null) { + for (Rename option : options) { + if (option == Rename.OVERWRITE) { + overwrite = true; + } + if (option == Rename.TO_TRASH) { + toTrash = true; + } + } + } + BatchRenameRequestProto req = BatchRenameRequestProto.newBuilder() + .addAllSrcs(Arrays.asList(srcs)) + .addAllDsts(Arrays.asList(dsts)) + .setOverwriteDest(overwrite) + .setMoveToTrash(toTrash).build(); + try { + rpcProxy.batchRename(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public void concat(String trg, String[] srcs) throws IOException { ConcatRequestProto req = ConcatRequestProto.newBuilder(). diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 25e7f7373226b..4d49f10292448 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -40,6 +40,7 @@ import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Base64.Decoder; import java.util.Collection; @@ -61,9 +62,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; +import org.apache.hadoop.fs.BatchOperations; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.DelegationTokenRenewer; @@ -76,9 +77,9 @@ import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; +import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.MultipartUploaderBuilder; import org.apache.hadoop.fs.QuotaUsage; -import org.apache.hadoop.fs.PathCapabilities; import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.impl.FileSystemMultipartUploaderBuilder; @@ -144,7 +145,8 @@ /** A FileSystem for HDFS over the web. */ public class WebHdfsFileSystem extends FileSystem implements DelegationTokenRenewer.Renewable, - TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer { + TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer, + BatchOperations { public static final Logger LOG = LoggerFactory .getLogger(WebHdfsFileSystem.class); /** WebHdfs version. */ @@ -1184,6 +1186,34 @@ public void rename(final Path src, final Path dst, ).run(); } + protected String[] getBatchPathName(String[] files) throws IOException{ + List ret = new ArrayList<>(); + for(String f : files) { + if(!f.startsWith(Path.SEPARATOR)) { + throw new InvalidPathException("Path is not absolute! " + f); + } + ret.add(makeQualified(new Path(f)).toUri().getPath()); + } + return ret.toArray(new String[ret.size()]); + } + + @Override + public void batchRename(final String[] srcs, final String[] dsts, + final Options.Rename... options) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.BATCH_RENAME); + final HttpOpParam.Op op = PutOpParam.Op.BATCH_RENAME; + if (srcs.length != dsts.length) { + throw new InvalidPathException("mismatch batch path src: " + + Arrays.toString(srcs) + " dst: " + Arrays.toString(dsts)); + } + new FsPathRunner(op, + new Path(StringUtils.join(":", getBatchPathName(srcs))), + new DestinationParam(StringUtils.join(":", getBatchPathName(dsts))), + new RenameOptionSetParam(options) + ).run(); + } + @Override public void setXAttr(Path p, String name, byte[] value, EnumSet flag) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java index 4fc00011ca1e6..1d5cd1bd3533f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java @@ -60,6 +60,7 @@ public enum Op implements HttpOpParam.Op { SETQUOTA(false, HttpURLConnection.HTTP_OK), SETQUOTABYSTORAGETYPE(false, HttpURLConnection.HTTP_OK), + BATCH_RENAME(false, HttpURLConnection.HTTP_OK), NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); final boolean doOutputAndRedirect; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 20967cc13ab86..b396ec53d96a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -257,6 +257,16 @@ message Rename2RequestProto { message Rename2ResponseProto { // void response } +message BatchRenameRequestProto { + repeated string srcs = 1; + repeated string dsts = 2; + required bool overwriteDest = 3; + optional bool moveToTrash = 4; +} + +message BatchRenameResponseProto { +} + message DeleteRequestProto { required string src = 1; required bool recursive = 2; @@ -904,6 +914,7 @@ service ClientNamenodeProtocol { rpc truncate(TruncateRequestProto) returns(TruncateResponseProto); rpc rename(RenameRequestProto) returns(RenameResponseProto); rpc rename2(Rename2RequestProto) returns(Rename2ResponseProto); + rpc batchRename(BatchRenameRequestProto) returns(BatchRenameResponseProto); rpc delete(DeleteRequestProto) returns(DeleteResponseProto); rpc mkdirs(MkdirsRequestProto) returns(MkdirsResponseProto); rpc getListing(GetListingRequestProto) returns(GetListingResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index e2ec0303333f0..2e2aba8da2016 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -638,6 +638,12 @@ public void rename2(final String src, final String dst, } } + @Override + public void batchRename(String[] srcs, String[] dsts, + Options.Rename... options) throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } + @Override public void concat(String trg, String[] src) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 97b146c947442..673a39bb620cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -831,6 +831,12 @@ public void rename2(final String src, final String dst, clientProto.rename2(src, dst, options); } + @Override // ClientProtocol + public void batchRename(String[] srcs, String[] dsts, + final Options.Rename... options) throws IOException { + clientProto.batchRename(srcs, dsts, options); + } + @Override // ClientProtocol public void concat(String trg, String[] src) throws IOException { clientProto.concat(trg, src); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 5132afaa4b15c..45ae11ab7eb6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -92,6 +92,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.BatchRenameRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.BatchRenameResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; @@ -358,6 +360,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements private static final Rename2ResponseProto VOID_RENAME2_RESPONSE = Rename2ResponseProto.newBuilder().build(); + private static final BatchRenameResponseProto VOID_BATCHRENAME_RESPONSE = + BatchRenameResponseProto.newBuilder().build(); + private static final GetListingResponseProto VOID_GETLISTING_RESPONSE = GetListingResponseProto.newBuilder().build(); @@ -712,6 +717,34 @@ public Rename2ResponseProto rename2(RpcController controller, return VOID_RENAME2_RESPONSE; } + @Override + public BatchRenameResponseProto batchRename(RpcController controller, + BatchRenameRequestProto req) throws ServiceException { + // resolve rename options + ArrayList optionList = new ArrayList(); + if(req.getOverwriteDest()) { + optionList.add(Rename.OVERWRITE); + } + if (req.hasMoveToTrash() && req.getMoveToTrash()) { + optionList.add(Rename.TO_TRASH); + } + + if(optionList.isEmpty()) { + optionList.add(Rename.NONE); + } + List srcs = req.getSrcsList(); + List dsts = req.getDstsList(); + try { + server.batchRename( + srcs.toArray(new String[srcs.size()]), + dsts.toArray(new String[dsts.size()]), + optionList.toArray(new Rename[optionList.size()])); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_BATCHRENAME_RESPONSE; + } + @Override public TruncateResponseProto truncate(RpcController controller, TruncateRequestProto req) throws ServiceException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 15bf6b16b569d..2f562ac23124c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -97,6 +97,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.text.CaseUtils; +import org.apache.hadoop.hdfs.protocol.BatchOpsException; import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotStatus; @@ -3283,6 +3284,113 @@ void renameTo(final String src, final String dst, Arrays.toString(options) + ")", src, dst, res.auditStat); } + class RenameInfo { + private String src; + private String dst; + private FSDirRenameOp.RenameResult ret; + + RenameInfo(String src, String dst) throws InvalidPathException { + this.src = src; + this.dst = dst; + + if (!DFSUtil.isValidName(dst)) { + throw new InvalidPathException("Invalid name: " + dst); + } + this.ret = null; + } + + String getSrc() { + return src; + } + + String getDst(){ + return dst; + } + + void setResult(FSDirRenameOp.RenameResult r) { + this.ret = r; + } + + public void logAudit() throws IOException { + if (ret != null) { + logAuditEvent(true, "rename", src, dst, ret.auditStat); + } else { + logAuditEvent(false, "rename", src, dst, null); + } + } + } + + /** Batch rename src to dst. */ + void batchRename(final String[] srcs, final String[] dsts, + boolean logRetryCache, Options.Rename... options) + throws IOException { + final String operationName = "batchRename"; + FSDirRenameOp.RenameResult res = null; + checkOperation(OperationCategory.WRITE); + final FSPermissionChecker pc = getPermissionChecker(); + FSPermissionChecker.setOperationType(operationName); + + List infos = new ArrayList<>(); + for (int i = 0; i < srcs.length; i++) { + infos.add(new RenameInfo(srcs[i], dsts[i])); + } + BatchOpsException breakException = null; + int cnt = 0; + try { + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode("Cannot rename " + Arrays.toString(srcs)); + for (RenameInfo info : infos) { + try { + res = FSDirRenameOp.renameToInt(dir, pc, info.getSrc(), + info.getDst(), logRetryCache, options); + info.setResult(res); + cnt++; + } catch (IOException ioe) { + if (cnt > 0) { + breakException = new BatchOpsException(cnt, infos.size(), ioe); + break; + } else { + throw ioe; + } + } + } + } finally { + FileStatus status = res != null ? res.auditStat : null; + writeUnlock(operationName, + getLockReportInfoSupplier(Arrays.toString(srcs), + Arrays.toString(srcs), status)); + } + } catch (AccessControlException e) { + logAuditEvent(false, operationName + " (options=" + + Arrays.toString(options) + ")", + Arrays.toString(srcs), Arrays.toString(srcs), null); + throw e; + } + getEditLog().logSync(); + assert res != null; + BlocksMapUpdateInfo collectedBlocks = res.collectedBlocks; + if (!collectedBlocks.getToDeleteList().isEmpty()) { + removeBlocks(collectedBlocks); + collectedBlocks.clear(); + } + int recordNum = cnt; + for (RenameInfo info : infos) { + if (recordNum < 0) { + break; + } + recordNum--; + info.logAudit(); + } + if (breakException != null) { + LOG.warn("batchRename "+ cnt + "/" + infos.size() + " success. from " + + Arrays.toString(srcs) + " to " + Arrays.toString(dsts), + breakException); + throw breakException; + } + } + /** * Remove the indicated file from namespace. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 3f1fb5a93915a..67e2ecfb05ed1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.BatchOpsException; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -543,7 +544,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) QuotaByStorageTypeExceededException.class, AclException.class, FSLimitException.PathComponentTooLongException.class, - FSLimitException.MaxDirectoryItemsExceededException.class); + FSLimitException.MaxDirectoryItemsExceededException.class, + BatchOpsException.class); clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class, UnresolvedPathException.class); @@ -1096,6 +1098,27 @@ public void rename2(String src, String dst, Options.Rename... options) metrics.incrFilesRenamed(); } + @Override // ClientProtocol + public void batchRename(String[] srcs, String[] dsts, + Options.Rename... options) throws IOException { + if(stateChangeLog.isDebugEnabled()) { + stateChangeLog.debug("*DIR* NameNode.batchRename: " + + Arrays.toString(srcs) + " to " + Arrays.toString(dsts)); + } + namesystem.checkOperation(OperationCategory.WRITE); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + try { + namesystem.batchRename(srcs, dsts, cacheEntry != null); + } finally { + RetryCache.setState(cacheEntry, true); + } + metrics.incrFilesRenamed(); + } + + @Override // ClientProtocol public boolean truncate(String src, long newLength, String clientName) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 9baed4f06730c..281615c2078eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -732,6 +732,14 @@ protected Response put( return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); } } + case BATCH_RENAME: + { + validateOpParams(op, destination); + final EnumSet s = renameOptions.getValue(); + cp.batchRename(fullpath.split(":"), destination.getValue().split(":"), + s.toArray(new Options.Rename[s.size()])); + return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); + } case SETREPLICATION: { final boolean b = cp.setReplication(fullpath, replication.getValue(conf)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java new file mode 100644 index 0000000000000..8a530d005fd49 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BatchOperations; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.BatchOpsException; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestBatchRename { + private static MiniDFSCluster cluster; + private static Configuration conf; + private static DistributedFileSystem dfs; + private static WebHdfsFileSystem webHdfs; + private Path root = new Path("/test/batchrename/"); + + @BeforeClass + public static void beforeClass() throws Exception { + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .build(); + dfs = cluster.getFileSystem(); + + webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsConstants.WEBHDFS_SCHEME); + } + + @AfterClass + public static void afterClass() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testhasPathCapability() throws Exception { + assertTrue("DistributedFileSystem should has batch rename capbility", + dfs.hasPathCapability(root, "fs.capability.batch.rename")); + } + + private List generateBatchFiles( + int totalNum, int createNum, final Path dir, String tag) + throws IOException { + List files = new ArrayList<>(); + for (int i = 0; i < totalNum; i++) { + Path p = new Path(dir, tag + "_" + i); + if (createNum-- > 0) { + DFSTestUtil.createFile(dfs, p, 10, (short) 1, 0); + assertTrue(dfs.exists(p)); + } else { + assertFalse(dfs.exists(p)); + } + files.add(p.toString()); + } + return files; + } + + private void testBatchRename(BatchOperations batchFS) throws Exception { + Path testDir = new Path(root, "testBatchRename"); + assertTrue(dfs.mkdirs(testDir)); + + List srcs = generateBatchFiles( + 2, 2, testDir, "src"); + List dsts =generateBatchFiles( + 2, 0, testDir, "dst"); + + batchFS.batchRename( + srcs.toArray(new String[srcs.size()]), + dsts.toArray(new String[dsts.size()])); + + for (String f : srcs) { + assertFalse(dfs.exists(new Path(f))); + } + for (String f : dsts) { + assertTrue(dfs.exists(new Path(f))); + dfs.delete(new Path(f), true); + } + } + + @Test + public void testBatchRaname() throws Exception { + testBatchRename(dfs); + testBatchRename(webHdfs); + } + + private void testInvalidInput(BatchOperations batchFS) throws Exception { + List srcs = new ArrayList<>(); + srcs.add("/testInvalidInput_Mismatch"); + List dsts = new ArrayList<>(); + LambdaTestUtils.intercept(InvalidPathException.class, + "mismatch batch path", + () -> batchFS.batchRename( + srcs.toArray(new String[srcs.size()]), + dsts.toArray(new String[dsts.size()]))); + } + + @Test + public void testInvalidInput() throws Exception { + testInvalidInput(dfs); + testInvalidInput(webHdfs); + } + + // rename /src_1:/src_2(not existing) to /dst_1:/dst_2 + private void testPartialSuccess1(BatchOperations batchFS) throws Exception { + Path testDir = new Path(root, "partial_success"); + assertTrue(dfs.mkdirs(testDir)); + + List srcs = generateBatchFiles( + 2, 1, testDir, "src"); + List dsts = generateBatchFiles( + 2, 0, testDir, "dst"); + try { + batchFS.batchRename( + srcs.toArray(new String[srcs.size()]), + dsts.toArray(new String[dsts.size()])); + } catch (BatchOpsException e) { + long index = e.getIndex(); + assertEquals(1, index); + long total = e.getTotal(); + assertEquals(2, total); + + String reason = e.getReason(); + assertTrue(reason.contains("FileNotFoundException")); + + for (int i = 0; i < index; i++) { + Path p = new Path(testDir, "src_" + i); + assertFalse(dfs.exists(p)); + } + for (int i = 0; i < index; i++) { + Path p = new Path(testDir, "dst_" + i); + assertTrue(dfs.exists(p)); + dfs.delete(p, true); + } + } + } + + // rename src_1:src_1/subdir to /dst_1:/dst_2 + private void testPartialSuccess2(BatchOperations batchFS) throws Exception { + Path testDir = new Path(root, "partial_success"); + List srcs = new ArrayList<>(); + Path src1 = new Path(testDir, "src_1"); + assertTrue(dfs.mkdirs(src1)); + srcs.add(src1.toString()); + Path src1Subdir = new Path(src1, "subdir"); + assertTrue(dfs.mkdirs(src1Subdir)); + srcs.add(src1Subdir.toString()); + + List dsts = generateBatchFiles( + 2, 0, testDir, "dst"); + try { + batchFS.batchRename( + srcs.toArray(new String[srcs.size()]), + dsts.toArray(new String[dsts.size()])); + } catch (BatchOpsException e) { + long index = e.getIndex(); + assertEquals(1, index); + long total = e.getTotal(); + assertEquals(2, total); + String reason = e.getReason(); + assertTrue(reason.contains("FileNotFoundException")); + for (int i = 0; i < index; i++) { + Path p = new Path(testDir, "src_" + i); + assertFalse(dfs.exists(p)); + } + for (int i = 0; i < index; i++) { + Path p = new Path(testDir, "dst_" + i); + assertTrue(dfs.exists(p)); + dfs.delete(p, true); + } + } + } + + // rename src_1:src_2 /dst_1:/dst_1 + private void testPartialSuccess3(BatchOperations batchFS) throws Exception { + Path testDir = new Path(root, "partial_success_3"); + List srcs = generateBatchFiles( + 2, 2, testDir, "src"); + List dsts = generateBatchFiles( + 1, 0, testDir, "dst"); + dsts.add(dsts.get(0)); + + try { + batchFS.batchRename( + srcs.toArray(new String[srcs.size()]), + dsts.toArray(new String[dsts.size()])); + } catch (BatchOpsException e) { + long index = e.getIndex(); + assertEquals(1, index); + long total = e.getTotal(); + assertEquals(2, total); + String reason = e.getReason(); + assertTrue(reason.contains("FileAlreadyExistsException")); + for (int i = 0; i < index; i++) { + Path p = new Path(testDir, "src_" + i); + assertFalse(dfs.exists(p)); + } + for (int i = 0; i < index; i++) { + Path p = new Path(testDir, "dst_" + i); + assertTrue(dfs.exists(p)); + dfs.delete(p, true); + } + } + } + + @Test + public void testPartialSuccess() throws Exception { + testPartialSuccess1(dfs); + testPartialSuccess1(webHdfs); + testPartialSuccess2(dfs); + testPartialSuccess2(webHdfs); + testPartialSuccess3(dfs); + testPartialSuccess3(webHdfs); + } + + @Test + public void testPathMustBeAbsoluteForWebFS() throws Exception { + List srcs = new ArrayList<>(); + srcs.add("hdfs://namenodelong startTime = System.currentTimeMillis();/f1"); + List dsts = new ArrayList<>(); + dsts.add("hdfs://namenode/f2"); + LambdaTestUtils.intercept(InvalidPathException.class, + "Path is not absolute", + () -> webHdfs.batchRename( + srcs.toArray(new String[srcs.size()]), + dsts.toArray(new String[dsts.size()]))); + } +} From 913c5153114a9ad9972707bb2d7d8c23c6419035 Mon Sep 17 00:00:00 2001 From: Yang Yun Date: Mon, 24 Aug 2020 15:57:59 +0800 Subject: [PATCH 2/4] HDFS-15484 change accoring to comments --- .../org/apache/hadoop/fs/BatchOperations.java | 5 +- .../hadoop/fs/CommonPathCapabilities.java | 2 +- .../org/apache/hadoop/hdfs/DFSClient.java | 9 +- .../hadoop/hdfs/DistributedFileSystem.java | 10 +-- ...atchOpsException.java => BatchRename.java} | 20 ++--- .../hadoop/hdfs/protocol/ClientProtocol.java | 4 +- .../ClientNamenodeProtocolTranslatorPB.java | 8 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 11 +-- .../federation/router/RouterRpcServer.java | 2 +- ...amenodeProtocolServerSideTranslatorPB.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 22 ++--- .../server/namenode/NameNodeRpcServer.java | 8 +- .../web/resources/NamenodeWebHdfsMethods.java | 4 +- .../apache/hadoop/hdfs/TestBatchRename.java | 90 ++++++++++--------- 14 files changed, 103 insertions(+), 96 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/{BatchOpsException.java => BatchRename.java} (80%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java index 9dfe8a5780062..85c8cab44d8fe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability; import java.io.IOException; +import java.util.List; /** * Interface filesystems MAY implement to offer a batched operations. @@ -38,6 +39,6 @@ public interface BatchOperations { * @param dsts target file list. * @throws IOException failure exception. */ - void batchRename(String[] srcs, String[] dsts, Options.Rename... options) - throws IOException; + void batchRename(List srcs, List dsts, + Options.Rename... options) throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java index 9795536ba9eec..dc2818cfb9aac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java @@ -140,7 +140,7 @@ private CommonPathCapabilities() { "fs.capability.multipart.uploader"; /** - * Does the store support multipart uploading? + * Does the store support batch rename? * Value: {@value}. */ public static final String FS_BATCH_RENAME = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3d3e506573812..16ddd9099c1d4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -105,7 +105,7 @@ import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; -import org.apache.hadoop.hdfs.protocol.BatchOpsException; +import org.apache.hadoop.hdfs.protocol.BatchRename; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -1612,9 +1612,10 @@ public void rename(String src, String dst, Options.Rename... options) /** * Rename a batch files or directories. - * @see ClientProtocol#batchRename(String[] , String[], Options.Rename...) + * @see ClientProtocol#batchRename(List, List, + * Options.Rename...) */ - public void batchRename(String[] srcs, String[] dsts, + public void batchRename(List srcs, List dsts, Options.Rename... options) throws IOException { checkOpen(); try { @@ -1625,7 +1626,7 @@ public void batchRename(String[] srcs, String[] dsts, DSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, - BatchOpsException.class); + BatchRename.class); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index ba9b3d1d30edd..0896279c80f50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -967,20 +967,20 @@ public Void next(final FileSystem fs, final Path p) } } - protected String[] getBatchPathName(String[] files) { + protected List getBatchPathName(List files) { List ret = new ArrayList<>(); for(String f : files) { ret.add(getPathName(new Path(f))); } - return ret.toArray(new String[ret.size()]); + return ret; } @Override - public void batchRename(final String[] srcs, final String[] dsts, + public void batchRename(List srcs, List dsts, final Options.Rename... options) throws IOException { - if (srcs.length != dsts.length) { + if (srcs.size() != dsts.size()) { throw new InvalidPathException("mismatch batch path src: " + - Arrays.toString(srcs) + " dst: " + Arrays.toString(dsts)); + String.join(",", srcs) + " dst: " + String.join(",", dsts)); } statistics.incrementWriteOps(1); dfs.batchRename(getBatchPathName(srcs), getBatchPathName(dsts)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchOpsException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRename.java similarity index 80% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchOpsException.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRename.java index 839e6193d6f9d..c992e0077e2ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchOpsException.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRename.java @@ -28,27 +28,27 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public final class BatchOpsException extends IOException { - private static final long serialVersionUID = 1L; +public final class BatchRename extends IOException { + private static final long serialVersionUID = -1881850913029509889L; private static final String TAG_INDEX = "index"; private static final String TAG_TOTAL = "total"; private static final String TAG_REASON = "reason"; /** - * Used by RemoteException to instantiate an BatchOpsException. + * Used by RemoteException to instantiate an BatchRename. */ - public BatchOpsException(String msg) { + public BatchRename(String msg) { super(msg); } - public BatchOpsException(long index, long total, Throwable cause) { - this(index, total, - cause.getClass().getName() + ": " + cause.getMessage()); + public BatchRename(long index, long total, Throwable cause) { + this(index, total, (cause != null) ? cause.getClass().getName() + ": " + + cause.getMessage() : "Unknown reason"); } - public BatchOpsException(long index, long total, - String cause) { - super("Batch operation break! " + + public BatchRename(long index, long total, + String cause) { + super("Batch operation partial success. " + getTagHeader(TAG_INDEX) + index + getTagTailer(TAG_INDEX) + getTagHeader(TAG_TOTAL) + total + getTagTailer(TAG_TOTAL) + getTagHeader(TAG_REASON) + cause + getTagTailer(TAG_REASON)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 21f089d6e9793..50d3267915c7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -586,14 +586,14 @@ void rename2(String src, String dst, Options.Rename... options) /** * Rename an batch items in the file system namespace. - * @param srcs existing files or directories name.216 + * @param srcs existing files or directories name. * @param dsts new names. * @param options Rename options * * @throws IOException an I/O error occurred */ @AtMostOnce - void batchRename(String[] srcs, String[] dsts, + void batchRename(List srcs, List dsts, Options.Rename... options) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index af754fa4a832a..3170a2f977fdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -639,8 +639,8 @@ public void rename2(String src, String dst, Rename... options) } @Override - public void batchRename(String[] srcs, String[] dsts, Rename... options) - throws IOException { + public void batchRename(List srcs, List dsts, + Rename... options) throws IOException { boolean overwrite = false; boolean toTrash = false; if (options != null) { @@ -654,8 +654,8 @@ public void batchRename(String[] srcs, String[] dsts, Rename... options) } } BatchRenameRequestProto req = BatchRenameRequestProto.newBuilder() - .addAllSrcs(Arrays.asList(srcs)) - .addAllDsts(Arrays.asList(dsts)) + .addAllSrcs(srcs) + .addAllDsts(dsts) .setOverwriteDest(overwrite) .setMoveToTrash(toTrash).build(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 4d49f10292448..d301c61e1de66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -1186,7 +1186,8 @@ public void rename(final Path src, final Path dst, ).run(); } - protected String[] getBatchPathName(String[] files) throws IOException{ + protected List getBatchPathName(List files) + throws IOException{ List ret = new ArrayList<>(); for(String f : files) { if(!f.startsWith(Path.SEPARATOR)) { @@ -1194,18 +1195,18 @@ protected String[] getBatchPathName(String[] files) throws IOException{ } ret.add(makeQualified(new Path(f)).toUri().getPath()); } - return ret.toArray(new String[ret.size()]); + return ret; } @Override - public void batchRename(final String[] srcs, final String[] dsts, + public void batchRename(List srcs, List dsts, final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.BATCH_RENAME); final HttpOpParam.Op op = PutOpParam.Op.BATCH_RENAME; - if (srcs.length != dsts.length) { + if (srcs.size() != dsts.size()) { throw new InvalidPathException("mismatch batch path src: " + - Arrays.toString(srcs) + " dst: " + Arrays.toString(dsts)); + String.join(",", srcs) + " dst: " + String.join(",", dsts)); } new FsPathRunner(op, new Path(StringUtils.join(":", getBatchPathName(srcs))), diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 673a39bb620cc..dee3eba770b2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -832,7 +832,7 @@ public void rename2(final String src, final String dst, } @Override // ClientProtocol - public void batchRename(String[] srcs, String[] dsts, + public void batchRename(List srcs, List dsts, final Options.Rename... options) throws IOException { clientProto.batchRename(srcs, dsts, options); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 45ae11ab7eb6b..771393dc99dc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -736,9 +736,7 @@ public BatchRenameResponseProto batchRename(RpcController controller, List dsts = req.getDstsList(); try { server.batchRename( - srcs.toArray(new String[srcs.size()]), - dsts.toArray(new String[dsts.size()]), - optionList.toArray(new Rename[optionList.size()])); + srcs, dsts, optionList.toArray(new Rename[optionList.size()])); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2f562ac23124c..d2fea69a47ae8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -97,7 +97,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.text.CaseUtils; -import org.apache.hadoop.hdfs.protocol.BatchOpsException; +import org.apache.hadoop.hdfs.protocol.BatchRename; import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotStatus; @@ -3321,7 +3321,7 @@ public void logAudit() throws IOException { } /** Batch rename src to dst. */ - void batchRename(final String[] srcs, final String[] dsts, + void batchRename(List srcs, List dsts, boolean logRetryCache, Options.Rename... options) throws IOException { final String operationName = "batchRename"; @@ -3331,16 +3331,16 @@ void batchRename(final String[] srcs, final String[] dsts, FSPermissionChecker.setOperationType(operationName); List infos = new ArrayList<>(); - for (int i = 0; i < srcs.length; i++) { - infos.add(new RenameInfo(srcs[i], dsts[i])); + for (int i = 0; i < srcs.size(); i++) { + infos.add(new RenameInfo(srcs.get(i), dsts.get(i))); } - BatchOpsException breakException = null; + BatchRename breakException = null; int cnt = 0; try { writeLock(); try { checkOperation(OperationCategory.WRITE); - checkNameNodeSafeMode("Cannot rename " + Arrays.toString(srcs)); + checkNameNodeSafeMode("Cannot rename " + String.join(",", srcs)); for (RenameInfo info : infos) { try { res = FSDirRenameOp.renameToInt(dir, pc, info.getSrc(), @@ -3349,7 +3349,7 @@ void batchRename(final String[] srcs, final String[] dsts, cnt++; } catch (IOException ioe) { if (cnt > 0) { - breakException = new BatchOpsException(cnt, infos.size(), ioe); + breakException = new BatchRename(cnt, infos.size(), ioe); break; } else { throw ioe; @@ -3359,13 +3359,13 @@ void batchRename(final String[] srcs, final String[] dsts, } finally { FileStatus status = res != null ? res.auditStat : null; writeUnlock(operationName, - getLockReportInfoSupplier(Arrays.toString(srcs), - Arrays.toString(srcs), status)); + getLockReportInfoSupplier(String.join(",", srcs), + String.join(",", dsts), status)); } } catch (AccessControlException e) { logAuditEvent(false, operationName + " (options=" + Arrays.toString(options) + ")", - Arrays.toString(srcs), Arrays.toString(srcs), null); + String.join(",", srcs), String.join(",", dsts), null); throw e; } getEditLog().logSync(); @@ -3385,7 +3385,7 @@ void batchRename(final String[] srcs, final String[] dsts, } if (breakException != null) { LOG.warn("batchRename "+ cnt + "/" + infos.size() + " success. from " + - Arrays.toString(srcs) + " to " + Arrays.toString(dsts), + String.join(",", srcs) + " to " + String.join(",", dsts), breakException); throw breakException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 67e2ecfb05ed1..1044e8538b0de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -89,7 +89,7 @@ import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; -import org.apache.hadoop.hdfs.protocol.BatchOpsException; +import org.apache.hadoop.hdfs.protocol.BatchRename; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -545,7 +545,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) AclException.class, FSLimitException.PathComponentTooLongException.class, FSLimitException.MaxDirectoryItemsExceededException.class, - BatchOpsException.class); + BatchRename.class); clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class, UnresolvedPathException.class); @@ -1099,11 +1099,11 @@ public void rename2(String src, String dst, Options.Rename... options) } @Override // ClientProtocol - public void batchRename(String[] srcs, String[] dsts, + public void batchRename(List srcs, List dsts, Options.Rename... options) throws IOException { if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.batchRename: " - + Arrays.toString(srcs) + " to " + Arrays.toString(dsts)); + + String.join(",", srcs) + " to " + String.join(",", dsts)); } namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 281615c2078eb..7fb2bda384410 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -28,6 +28,7 @@ import java.net.UnknownHostException; import java.security.Principal; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Base64; import java.util.Base64.Encoder; import java.util.EnumSet; @@ -736,7 +737,8 @@ protected Response put( { validateOpParams(op, destination); final EnumSet s = renameOptions.getValue(); - cp.batchRename(fullpath.split(":"), destination.getValue().split(":"), + cp.batchRename(Arrays.asList(fullpath.split(":")), + Arrays.asList(destination.getValue().split(":")), s.toArray(new Options.Rename[s.size()])); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java index 8a530d005fd49..1785efae1cb08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java @@ -21,7 +21,8 @@ import org.apache.hadoop.fs.BatchOperations; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.BatchOpsException; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.hdfs.protocol.BatchRename; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; @@ -78,9 +79,10 @@ private List generateBatchFiles( Path p = new Path(dir, tag + "_" + i); if (createNum-- > 0) { DFSTestUtil.createFile(dfs, p, 10, (short) 1, 0); - assertTrue(dfs.exists(p)); + ContractTestUtils.assertPathExists(dfs,"Source file not created", p); } else { - assertFalse(dfs.exists(p)); + ContractTestUtils.assertPathsDoNotExist(dfs, + "Destination file is existing", p); } files.add(p.toString()); } @@ -89,22 +91,23 @@ private List generateBatchFiles( private void testBatchRename(BatchOperations batchFS) throws Exception { Path testDir = new Path(root, "testBatchRename"); - assertTrue(dfs.mkdirs(testDir)); + dfs.mkdirs(testDir); List srcs = generateBatchFiles( 2, 2, testDir, "src"); List dsts =generateBatchFiles( 2, 0, testDir, "dst"); - batchFS.batchRename( - srcs.toArray(new String[srcs.size()]), - dsts.toArray(new String[dsts.size()])); + batchFS.batchRename(srcs, dsts); for (String f : srcs) { - assertFalse(dfs.exists(new Path(f))); + ContractTestUtils.assertPathsDoNotExist(dfs, + "Source file not renamed", new Path(f)); } for (String f : dsts) { assertTrue(dfs.exists(new Path(f))); + ContractTestUtils.assertPathExists(dfs, + "Destination file not created", new Path(f)); dfs.delete(new Path(f), true); } } @@ -121,9 +124,7 @@ private void testInvalidInput(BatchOperations batchFS) throws Exception { List dsts = new ArrayList<>(); LambdaTestUtils.intercept(InvalidPathException.class, "mismatch batch path", - () -> batchFS.batchRename( - srcs.toArray(new String[srcs.size()]), - dsts.toArray(new String[dsts.size()]))); + () -> batchFS.batchRename(srcs, dsts)); } @Test @@ -135,32 +136,33 @@ public void testInvalidInput() throws Exception { // rename /src_1:/src_2(not existing) to /dst_1:/dst_2 private void testPartialSuccess1(BatchOperations batchFS) throws Exception { Path testDir = new Path(root, "partial_success"); - assertTrue(dfs.mkdirs(testDir)); + dfs.mkdirs(testDir); List srcs = generateBatchFiles( 2, 1, testDir, "src"); List dsts = generateBatchFiles( 2, 0, testDir, "dst"); try { - batchFS.batchRename( - srcs.toArray(new String[srcs.size()]), - dsts.toArray(new String[dsts.size()])); - } catch (BatchOpsException e) { + batchFS.batchRename(srcs, dsts); + } catch (BatchRename e) { long index = e.getIndex(); - assertEquals(1, index); + assertEquals("Partial success number mismatch!", 1, index); long total = e.getTotal(); - assertEquals(2, total); + assertEquals("Total number mismatch!", 2, total); String reason = e.getReason(); - assertTrue(reason.contains("FileNotFoundException")); + assertTrue("Error message mismatch", + reason.contains("FileNotFoundException")); for (int i = 0; i < index; i++) { Path p = new Path(testDir, "src_" + i); - assertFalse(dfs.exists(p)); + ContractTestUtils.assertPathsDoNotExist(dfs, + "Source file not renamed", p); } for (int i = 0; i < index; i++) { Path p = new Path(testDir, "dst_" + i); - assertTrue(dfs.exists(p)); + ContractTestUtils.assertPathExists(dfs, + "Destination file not created", p); dfs.delete(p, true); } } @@ -171,32 +173,33 @@ private void testPartialSuccess2(BatchOperations batchFS) throws Exception { Path testDir = new Path(root, "partial_success"); List srcs = new ArrayList<>(); Path src1 = new Path(testDir, "src_1"); - assertTrue(dfs.mkdirs(src1)); + dfs.mkdirs(src1); srcs.add(src1.toString()); Path src1Subdir = new Path(src1, "subdir"); - assertTrue(dfs.mkdirs(src1Subdir)); + dfs.mkdirs(src1Subdir); srcs.add(src1Subdir.toString()); List dsts = generateBatchFiles( 2, 0, testDir, "dst"); try { - batchFS.batchRename( - srcs.toArray(new String[srcs.size()]), - dsts.toArray(new String[dsts.size()])); - } catch (BatchOpsException e) { + batchFS.batchRename(srcs, dsts); + } catch (BatchRename e) { long index = e.getIndex(); - assertEquals(1, index); + assertEquals("Partial success number mismatch!", 1, index); long total = e.getTotal(); - assertEquals(2, total); + assertEquals("Total number mismatch!", 2, total); String reason = e.getReason(); - assertTrue(reason.contains("FileNotFoundException")); + assertTrue("Error message mismatch", + reason.contains("FileNotFoundException")); for (int i = 0; i < index; i++) { Path p = new Path(testDir, "src_" + i); - assertFalse(dfs.exists(p)); + ContractTestUtils.assertPathsDoNotExist(dfs, + "Source file not renamed", p); } for (int i = 0; i < index; i++) { Path p = new Path(testDir, "dst_" + i); - assertTrue(dfs.exists(p)); + ContractTestUtils.assertPathExists(dfs, + "Destination file not created", p); dfs.delete(p, true); } } @@ -212,23 +215,24 @@ private void testPartialSuccess3(BatchOperations batchFS) throws Exception { dsts.add(dsts.get(0)); try { - batchFS.batchRename( - srcs.toArray(new String[srcs.size()]), - dsts.toArray(new String[dsts.size()])); - } catch (BatchOpsException e) { + batchFS.batchRename(srcs, dsts); + } catch (BatchRename e) { long index = e.getIndex(); - assertEquals(1, index); + assertEquals("Partial success number mismatch!", 1, index); long total = e.getTotal(); - assertEquals(2, total); + assertEquals("Total number mismatch!", 2, total); String reason = e.getReason(); - assertTrue(reason.contains("FileAlreadyExistsException")); + assertTrue("Error message mismatch!", + reason.contains("FileAlreadyExistsException")); for (int i = 0; i < index; i++) { Path p = new Path(testDir, "src_" + i); - assertFalse(dfs.exists(p)); + ContractTestUtils.assertPathsDoNotExist(dfs, + "Source file not renamed", p); } for (int i = 0; i < index; i++) { Path p = new Path(testDir, "dst_" + i); - assertTrue(dfs.exists(p)); + ContractTestUtils.assertPathExists(dfs, + "Destination file not created", p); dfs.delete(p, true); } } @@ -252,8 +256,8 @@ public void testPathMustBeAbsoluteForWebFS() throws Exception { dsts.add("hdfs://namenode/f2"); LambdaTestUtils.intercept(InvalidPathException.class, "Path is not absolute", + "Error message mismatch when rename invalid path o WebFS!", () -> webHdfs.batchRename( - srcs.toArray(new String[srcs.size()]), - dsts.toArray(new String[dsts.size()]))); + srcs, dsts)); } } From 082282ab59751057c5565ea4f2745db24239e942 Mon Sep 17 00:00:00 2001 From: Yang Yun Date: Mon, 24 Aug 2020 18:30:52 +0800 Subject: [PATCH 3/4] HDFS-15484. fix error change --- ...{BatchOperations.java => BatchRename.java} | 6 +++--- .../org/apache/hadoop/hdfs/DFSClient.java | 4 ++-- .../hadoop/hdfs/DistributedFileSystem.java | 4 ++-- ...hRename.java => BatchRenameException.java} | 12 +++++------ .../hadoop/hdfs/web/WebHdfsFileSystem.java | 8 ++------ .../hdfs/server/namenode/FSNamesystem.java | 6 +++--- .../server/namenode/NameNodeRpcServer.java | 4 ++-- .../apache/hadoop/hdfs/TestBatchRename.java | 20 +++++++++---------- 8 files changed, 30 insertions(+), 34 deletions(-) rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/{BatchOperations.java => BatchRename.java} (90%) rename hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/{BatchRename.java => BatchRenameException.java} (87%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchRename.java similarity index 90% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchRename.java index 85c8cab44d8fe..87c3d91f2640f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchOperations.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchRename.java @@ -30,7 +30,7 @@ @InterfaceAudience.Public @InterfaceStability.Unstable -public interface BatchOperations { +public interface BatchRename { /** * Batched rename API that rename a batch of files. @@ -39,6 +39,6 @@ public interface BatchOperations { * @param dsts target file list. * @throws IOException failure exception. */ - void batchRename(List srcs, List dsts, - Options.Rename... options) throws IOException; + void batchRename(List srcs, List dsts, + Options.Rename... options) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 16ddd9099c1d4..1a96144cd3eec 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -105,7 +105,7 @@ import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; -import org.apache.hadoop.hdfs.protocol.BatchRename; +import org.apache.hadoop.hdfs.protocol.BatchRenameException; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -1626,7 +1626,7 @@ public void batchRename(List srcs, List dsts, DSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, - BatchRename.class); + BatchRenameException.class); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 0896279c80f50..33fad4bb38635 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -30,7 +30,7 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.BatchListingOperations; -import org.apache.hadoop.fs.BatchOperations; +import org.apache.hadoop.fs.BatchRename; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.CacheFlag; @@ -149,7 +149,7 @@ @InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" }) @InterfaceStability.Unstable public class DistributedFileSystem extends FileSystem - implements KeyProviderTokenIssuer, BatchListingOperations, BatchOperations{ + implements KeyProviderTokenIssuer, BatchListingOperations, BatchRename { private Path workingDir; private URI uri; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRename.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRenameException.java similarity index 87% rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRename.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRenameException.java index c992e0077e2ab..ccfd35172635a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchRenameException.java @@ -28,26 +28,26 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public final class BatchRename extends IOException { +public final class BatchRenameException extends IOException { private static final long serialVersionUID = -1881850913029509889L; private static final String TAG_INDEX = "index"; private static final String TAG_TOTAL = "total"; private static final String TAG_REASON = "reason"; /** - * Used by RemoteException to instantiate an BatchRename. + * Used by RemoteException to instantiate an BatchRenameException. */ - public BatchRename(String msg) { + public BatchRenameException(String msg) { super(msg); } - public BatchRename(long index, long total, Throwable cause) { + public BatchRenameException(long index, long total, Throwable cause) { this(index, total, (cause != null) ? cause.getClass().getName() + ": " + cause.getMessage() : "Unknown reason"); } - public BatchRename(long index, long total, - String cause) { + public BatchRenameException(long index, long total, + String cause) { super("Batch operation partial success. " + getTagHeader(TAG_INDEX) + index + getTagTailer(TAG_INDEX) + getTagHeader(TAG_TOTAL) + total + getTagTailer(TAG_TOTAL) + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index d301c61e1de66..543480e5f3b24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -40,14 +40,12 @@ import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; import java.util.Base64.Decoder; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -62,7 +60,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; -import org.apache.hadoop.fs.BatchOperations; +import org.apache.hadoop.fs.BatchRename; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.ContentSummary; @@ -140,13 +138,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; - /** A FileSystem for HDFS over the web. */ public class WebHdfsFileSystem extends FileSystem implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer, - BatchOperations { + BatchRename { public static final Logger LOG = LoggerFactory .getLogger(WebHdfsFileSystem.class); /** WebHdfs version. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d2fea69a47ae8..3d5a5a39b570e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -97,7 +97,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.text.CaseUtils; -import org.apache.hadoop.hdfs.protocol.BatchRename; +import org.apache.hadoop.hdfs.protocol.BatchRenameException; import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotStatus; @@ -3334,7 +3334,7 @@ void batchRename(List srcs, List dsts, for (int i = 0; i < srcs.size(); i++) { infos.add(new RenameInfo(srcs.get(i), dsts.get(i))); } - BatchRename breakException = null; + BatchRenameException breakException = null; int cnt = 0; try { writeLock(); @@ -3349,7 +3349,7 @@ void batchRename(List srcs, List dsts, cnt++; } catch (IOException ioe) { if (cnt > 0) { - breakException = new BatchRename(cnt, infos.size(), ioe); + breakException = new BatchRenameException(cnt, infos.size(), ioe); break; } else { throw ioe; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 1044e8538b0de..1468cecf4c466 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -89,7 +89,7 @@ import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; -import org.apache.hadoop.hdfs.protocol.BatchRename; +import org.apache.hadoop.hdfs.protocol.BatchRenameException; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -545,7 +545,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) AclException.class, FSLimitException.PathComponentTooLongException.class, FSLimitException.MaxDirectoryItemsExceededException.class, - BatchRename.class); + BatchRenameException.class); clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class, UnresolvedPathException.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java index 1785efae1cb08..2a498dbea9b5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java @@ -18,11 +18,11 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BatchOperations; +import org.apache.hadoop.fs.BatchRename; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.hdfs.protocol.BatchRename; +import org.apache.hadoop.hdfs.protocol.BatchRenameException; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; @@ -89,7 +89,7 @@ private List generateBatchFiles( return files; } - private void testBatchRename(BatchOperations batchFS) throws Exception { + private void testBatchRename(BatchRename batchFS) throws Exception { Path testDir = new Path(root, "testBatchRename"); dfs.mkdirs(testDir); @@ -118,7 +118,7 @@ public void testBatchRaname() throws Exception { testBatchRename(webHdfs); } - private void testInvalidInput(BatchOperations batchFS) throws Exception { + private void testInvalidInput(BatchRename batchFS) throws Exception { List srcs = new ArrayList<>(); srcs.add("/testInvalidInput_Mismatch"); List dsts = new ArrayList<>(); @@ -134,7 +134,7 @@ public void testInvalidInput() throws Exception { } // rename /src_1:/src_2(not existing) to /dst_1:/dst_2 - private void testPartialSuccess1(BatchOperations batchFS) throws Exception { + private void testPartialSuccess1(BatchRename batchFS) throws Exception { Path testDir = new Path(root, "partial_success"); dfs.mkdirs(testDir); @@ -144,7 +144,7 @@ private void testPartialSuccess1(BatchOperations batchFS) throws Exception { 2, 0, testDir, "dst"); try { batchFS.batchRename(srcs, dsts); - } catch (BatchRename e) { + } catch (BatchRenameException e) { long index = e.getIndex(); assertEquals("Partial success number mismatch!", 1, index); long total = e.getTotal(); @@ -169,7 +169,7 @@ private void testPartialSuccess1(BatchOperations batchFS) throws Exception { } // rename src_1:src_1/subdir to /dst_1:/dst_2 - private void testPartialSuccess2(BatchOperations batchFS) throws Exception { + private void testPartialSuccess2(BatchRename batchFS) throws Exception { Path testDir = new Path(root, "partial_success"); List srcs = new ArrayList<>(); Path src1 = new Path(testDir, "src_1"); @@ -183,7 +183,7 @@ private void testPartialSuccess2(BatchOperations batchFS) throws Exception { 2, 0, testDir, "dst"); try { batchFS.batchRename(srcs, dsts); - } catch (BatchRename e) { + } catch (BatchRenameException e) { long index = e.getIndex(); assertEquals("Partial success number mismatch!", 1, index); long total = e.getTotal(); @@ -206,7 +206,7 @@ private void testPartialSuccess2(BatchOperations batchFS) throws Exception { } // rename src_1:src_2 /dst_1:/dst_1 - private void testPartialSuccess3(BatchOperations batchFS) throws Exception { + private void testPartialSuccess3(BatchRename batchFS) throws Exception { Path testDir = new Path(root, "partial_success_3"); List srcs = generateBatchFiles( 2, 2, testDir, "src"); @@ -216,7 +216,7 @@ private void testPartialSuccess3(BatchOperations batchFS) throws Exception { try { batchFS.batchRename(srcs, dsts); - } catch (BatchRename e) { + } catch (BatchRenameException e) { long index = e.getIndex(); assertEquals("Partial success number mismatch!", 1, index); long total = e.getTotal(); From 67e197ce042239d26cfaa1d7bbf61cd9e77c1f6e Mon Sep 17 00:00:00 2001 From: Yang Yun Date: Mon, 31 Aug 2020 18:19:31 +0800 Subject: [PATCH 4/4] add RenameBuilder --- .../java/org/apache/hadoop/fs/FileSystem.java | 72 ++++++++++++ .../apache/hadoop/fs/FutureRenameBuilder.java | 51 +++++++++ ...atchRename.java => IORenameStatistic.java} | 23 +--- .../fs/impl/FutureRenameBuilderImpl.java | 105 ++++++++++++++++++ .../hadoop/hdfs/DistributedFileSystem.java | 9 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 10 +- .../apache/hadoop/hdfs/TestBatchRename.java | 66 +++++++++-- 7 files changed, 295 insertions(+), 41 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureRenameBuilder.java rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/{BatchRename.java => IORenameStatistic.java} (68%) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureRenameBuilderImpl.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index ab5040486dffc..dfc011a81a33c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; +import org.apache.hadoop.fs.impl.FutureRenameBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; @@ -1609,6 +1610,18 @@ protected void rename(final Path src, final Path dst, } } + public IORenameStatistic batchRename(final List srcs, final List dsts, + final Rename... options) throws IOException { + if (srcs.size() != dsts.size()) { + throw new InvalidPathException("mismatch batch path src: " + + String.join(",", srcs) + " dst: " + String.join(",", dsts)); + } + for(int i = 0; i < srcs.size(); i++) { + rename(new Path(srcs.get(i)), new Path(dsts.get(i)), options); + } + return new IORenameStatistic(); + } + /** * Truncate the file in the indicated path to the indicated size. *
      @@ -4670,4 +4683,63 @@ public MultipartUploaderBuilder createMultipartUploader(Path basePath) methodNotSupported(); return null; } + + /** + * Builder returned for {@code #openFile(Path)} + * and {@code #openFile(PathHandle)}. + */ + private static class FSRenameBuilder + extends FutureRenameBuilderImpl + implements FutureRenameBuilder { + + /** + * Path Constructor. * + * @param srcs path to open. + */ + protected FSRenameBuilder( + @Nonnull final FileSystem fs, + @Nonnull final List srcs, + @Nonnull final List dsts, + Rename... options) { + super(fs, srcs, dsts, options); + } + + /** + * Perform the open operation. + * Returns a future which, when get() or a chained completion + * operation is invoked, will supply the input stream of the file + * referenced by the path/path handle. + * @return a future to the input stream. + * @throws IOException early failure to open + * @throws UnsupportedOperationException if the specific operation + * is not supported. + * @throws IllegalArgumentException if the parameters are not valid. + */ + @Override + public CompletableFuture build() throws IOException { + return LambdaUtils.eval(new CompletableFuture<>(), + () ->getFS().batchRename(getSrcs(), getDsts(), getOptions())); + } + } + + /** + * rename one or batch file through a builder API. + * Ultimately calls {@link #rename(Path, Path)} unless a subclass + * executes the open command differently. + * + * The semantics of this call are therefore the same as that of + * {@link #rename(Path, int)} with one special point: it is in + * {@code FSDataInputStreamBuilder.build()} in which the open operation + * takes place -it is there where all preconditions to the operation + * are checked. + * @param path file path + * @return a FSDataInputStreamBuilder object to build the input stream + * @throws IOException if some early checks cause IO failures. + * @throws UnsupportedOperationException if support is checked early. + */ + @InterfaceStability.Unstable + public FutureRenameBuilder renameFile(List srcs, List dsts, Rename... options) + throws IOException, UnsupportedOperationException { + return new FSRenameBuilder(this, srcs, dsts, options); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureRenameBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureRenameBuilder.java new file mode 100644 index 0000000000000..464ca8e96c648 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureRenameBuilder.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import com.sun.tools.javac.util.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** + * Builder for input streams and subclasses whose return value is + * actually a completable future: this allows for better asynchronous + * operation. + * + * To be more generic, {@link #opt(String, int)} and {@link #must(String, int)} + * variants provide implementation-agnostic way to customize the builder. + * Each FS-specific builder implementation can interpret the FS-specific + * options accordingly, for example: + * + * If the option is not related to the file system, the option will be ignored. + * If the option is must, but not supported by the file system, a + * {@link IllegalArgumentException} will be thrown. + * + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface FutureRenameBuilder { + public FutureRenameBuilder rename(Pair src2dst); + public FutureRenameBuilder option(Options.Rename... options); + + CompletableFuture build() + throws IllegalArgumentException, UnsupportedOperationException, + IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchRename.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORenameStatistic.java similarity index 68% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchRename.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORenameStatistic.java index 87c3d91f2640f..8355a5d735b8c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchRename.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IORenameStatistic.java @@ -1,4 +1,5 @@ -/* +/** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,30 +16,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.fs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import java.io.IOException; -import java.util.List; - -/** - * Interface filesystems MAY implement to offer a batched operations. - */ - @InterfaceAudience.Public @InterfaceStability.Unstable -public interface BatchRename { - - /** - * Batched rename API that rename a batch of files. - * - * @param srcs source file list. - * @param dsts target file list. - * @throws IOException failure exception. - */ - void batchRename(List srcs, List dsts, - Options.Rename... options) throws IOException; +public class IORenameStatistic { } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureRenameBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureRenameBuilderImpl.java new file mode 100644 index 0000000000000..bda68b065f4c3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureRenameBuilderImpl.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import com.sun.tools.javac.util.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureRenameBuilder; +import org.apache.hadoop.fs.Options; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * Builder for input streams and subclasses whose return value is + * actually a completable future: this allows for better asynchronous + * operation. + * + * To be more generic, {@link #opt(String, int)} and {@link #must(String, int)} + * variants provide implementation-agnostic way to customize the builder. + * Each FS-specific builder implementation can interpret the FS-specific + * options accordingly, for example: + * + * If the option is not related to the file system, the option will be ignored. + * If the option is must, but not supported by the file system, a + * {@link IllegalArgumentException} will be thrown. + * + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class FutureRenameBuilderImpl + implements FutureRenameBuilder { + + private final FileSystem fileSystem; + private List srcs = new ArrayList<>(); + private List dsts = new ArrayList<>(); + private List options = new ArrayList<>(); + /** + * Constructor. + * @param srcs owner FS. + * @param dsts path + * @param dsts options + */ + protected FutureRenameBuilderImpl(@Nonnull FileSystem fileSystem, + @Nonnull List srcs, + @Nonnull List dsts, + Options.Rename... options) { + this.fileSystem = requireNonNull(fileSystem, "fileSystem"); + this.srcs.addAll(srcs); + this.dsts.addAll(dsts); + for (Options.Rename opt : options) { + this.options.add(opt); + } + } + + public FileSystem getFS() { + return fileSystem; + } + + public List getSrcs() { + return srcs; + } + + public List getDsts() { + return dsts; + } + + public Options.Rename[] getOptions() { + return options.toArray(new Options.Rename[options.size()]); + } + + public FutureRenameBuilder rename(Pair src2dst) { + srcs.add(src2dst.fst); + dsts.add(src2dst.snd); + return this; + } + + public FutureRenameBuilder option(Options.Rename... options) { + for (Options.Rename opt : options) { + this.options.add(opt); + } + return this; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 33fad4bb38635..86003ae2da0fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -30,7 +30,6 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.BatchListingOperations; -import org.apache.hadoop.fs.BatchRename; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.CacheFlag; @@ -51,6 +50,7 @@ import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; +import org.apache.hadoop.fs.IORenameStatistic; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.InvalidPathHandleException; import org.apache.hadoop.fs.PartialListing; @@ -149,7 +149,7 @@ @InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" }) @InterfaceStability.Unstable public class DistributedFileSystem extends FileSystem - implements KeyProviderTokenIssuer, BatchListingOperations, BatchRename { + implements KeyProviderTokenIssuer, BatchListingOperations { private Path workingDir; private URI uri; @@ -976,14 +976,15 @@ protected List getBatchPathName(List files) { } @Override - public void batchRename(List srcs, List dsts, - final Options.Rename... options) throws IOException { + public IORenameStatistic batchRename(List srcs, List dsts, + final Options.Rename... options) throws IOException { if (srcs.size() != dsts.size()) { throw new InvalidPathException("mismatch batch path src: " + String.join(",", srcs) + " dst: " + String.join(",", dsts)); } statistics.incrementWriteOps(1); dfs.batchRename(getBatchPathName(srcs), getBatchPathName(dsts)); + return new IORenameStatistic(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 543480e5f3b24..99fcfa9bfc43b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -60,7 +60,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; -import org.apache.hadoop.fs.BatchRename; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.ContentSummary; @@ -75,6 +74,7 @@ import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; +import org.apache.hadoop.fs.IORenameStatistic; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.MultipartUploaderBuilder; import org.apache.hadoop.fs.QuotaUsage; @@ -141,8 +141,7 @@ /** A FileSystem for HDFS over the web. */ public class WebHdfsFileSystem extends FileSystem implements DelegationTokenRenewer.Renewable, - TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer, - BatchRename { + TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer { public static final Logger LOG = LoggerFactory .getLogger(WebHdfsFileSystem.class); /** WebHdfs version. */ @@ -1195,8 +1194,8 @@ protected List getBatchPathName(List files) } @Override - public void batchRename(List srcs, List dsts, - final Options.Rename... options) throws IOException { + public IORenameStatistic batchRename(List srcs, List dsts, + final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.BATCH_RENAME); final HttpOpParam.Op op = PutOpParam.Op.BATCH_RENAME; @@ -1209,6 +1208,7 @@ public void batchRename(List srcs, List dsts, new DestinationParam(StringUtils.join(":", getBatchPathName(dsts))), new RenameOptionSetParam(options) ).run(); + return new IORenameStatistic(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java index 2a498dbea9b5b..2263ea90c0b73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchRename.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BatchRename; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.FutureRenameBuilder; +import org.apache.hadoop.fs.IORenameStatistic; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.hdfs.protocol.BatchRenameException; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; @@ -31,18 +35,20 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestBatchRename { private static MiniDFSCluster cluster; private static Configuration conf; private static DistributedFileSystem dfs; + private static FileSystem fs; private static WebHdfsFileSystem webHdfs; private Path root = new Path("/test/batchrename/"); @@ -53,6 +59,7 @@ public static void beforeClass() throws Exception { .numDataNodes(1) .build(); dfs = cluster.getFileSystem(); + fs = cluster.getFileSystem();; webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); @@ -89,7 +96,7 @@ private List generateBatchFiles( return files; } - private void testBatchRename(BatchRename batchFS) throws Exception { + private void testBatchRename(FileSystem doFS) throws Exception { Path testDir = new Path(root, "testBatchRename"); dfs.mkdirs(testDir); @@ -98,7 +105,7 @@ private void testBatchRename(BatchRename batchFS) throws Exception { List dsts =generateBatchFiles( 2, 0, testDir, "dst"); - batchFS.batchRename(srcs, dsts); + doFS.batchRename(srcs, dsts); for (String f : srcs) { ContractTestUtils.assertPathsDoNotExist(dfs, @@ -118,13 +125,13 @@ public void testBatchRaname() throws Exception { testBatchRename(webHdfs); } - private void testInvalidInput(BatchRename batchFS) throws Exception { + private void testInvalidInput(FileSystem doFS) throws Exception { List srcs = new ArrayList<>(); srcs.add("/testInvalidInput_Mismatch"); List dsts = new ArrayList<>(); LambdaTestUtils.intercept(InvalidPathException.class, "mismatch batch path", - () -> batchFS.batchRename(srcs, dsts)); + () -> doFS.batchRename(srcs, dsts)); } @Test @@ -134,7 +141,7 @@ public void testInvalidInput() throws Exception { } // rename /src_1:/src_2(not existing) to /dst_1:/dst_2 - private void testPartialSuccess1(BatchRename batchFS) throws Exception { + private void testPartialSuccess1(FileSystem doFS) throws Exception { Path testDir = new Path(root, "partial_success"); dfs.mkdirs(testDir); @@ -143,7 +150,7 @@ private void testPartialSuccess1(BatchRename batchFS) throws Exception { List dsts = generateBatchFiles( 2, 0, testDir, "dst"); try { - batchFS.batchRename(srcs, dsts); + doFS.batchRename(srcs, dsts); } catch (BatchRenameException e) { long index = e.getIndex(); assertEquals("Partial success number mismatch!", 1, index); @@ -169,7 +176,7 @@ private void testPartialSuccess1(BatchRename batchFS) throws Exception { } // rename src_1:src_1/subdir to /dst_1:/dst_2 - private void testPartialSuccess2(BatchRename batchFS) throws Exception { + private void testPartialSuccess2(FileSystem dofs) throws Exception { Path testDir = new Path(root, "partial_success"); List srcs = new ArrayList<>(); Path src1 = new Path(testDir, "src_1"); @@ -182,7 +189,7 @@ private void testPartialSuccess2(BatchRename batchFS) throws Exception { List dsts = generateBatchFiles( 2, 0, testDir, "dst"); try { - batchFS.batchRename(srcs, dsts); + dofs.batchRename(srcs, dsts); } catch (BatchRenameException e) { long index = e.getIndex(); assertEquals("Partial success number mismatch!", 1, index); @@ -206,7 +213,7 @@ private void testPartialSuccess2(BatchRename batchFS) throws Exception { } // rename src_1:src_2 /dst_1:/dst_1 - private void testPartialSuccess3(BatchRename batchFS) throws Exception { + private void testPartialSuccess3(FileSystem doFS) throws Exception { Path testDir = new Path(root, "partial_success_3"); List srcs = generateBatchFiles( 2, 2, testDir, "src"); @@ -215,7 +222,7 @@ private void testPartialSuccess3(BatchRename batchFS) throws Exception { dsts.add(dsts.get(0)); try { - batchFS.batchRename(srcs, dsts); + doFS.batchRename(srcs, dsts); } catch (BatchRenameException e) { long index = e.getIndex(); assertEquals("Partial success number mismatch!", 1, index); @@ -260,4 +267,39 @@ public void testPathMustBeAbsoluteForWebFS() throws Exception { () -> webHdfs.batchRename( srcs, dsts)); } + + @Test + public void testOpenBuild() throws Exception { + FileSystem fs = cluster.getFileSystem(); + Path p = new Path("/f1"); + FutureDataInputStreamBuilder builder = + fs.openFile(p); + interceptFuture(FileNotFoundException.class, + "", builder.build()); + } + + @Test + public void testRenameByBuilder() throws Exception { + Path testDir = new Path(root, "testBatchRename"); + dfs.mkdirs(testDir); + + List srcs = generateBatchFiles( + 2, 2, testDir, "src"); + List dsts =generateBatchFiles( + 2, 0, testDir, "dst"); + + FutureRenameBuilder builder = fs.renameFile(srcs, dsts); + IORenameStatistic ret = FutureIOSupport.awaitFuture(builder.build()); + + for (String f : srcs) { + ContractTestUtils.assertPathsDoNotExist(dfs, + "Source file not renamed", new Path(f)); + } + for (String f : dsts) { + assertTrue(dfs.exists(new Path(f))); + ContractTestUtils.assertPathExists(dfs, + "Destination file not created", new Path(f)); + dfs.delete(new Path(f), true); + } + } }