From 8dc7b5c2063f883e476a2ca150056148cf1e30d0 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 10 Nov 2016 10:26:34 +0000 Subject: [PATCH 1/2] HADOOP-13600 starting on parallel rename, still designing code for max parallelism. Even listing and delete calls should be in parallel threads. Indeed: listing could consider doing a pre-emptive call to grab all of the list, though for a bucket with a few million files this would be too expensive. Really only need to be collecting at the same rate as copies, which is implicitly defined by the rate of keys added to a delete queue Change-Id: I906a1a15f3a7567cbff1999236549627859319a5 --- .../BlockingThreadPoolExecutorService.java | 19 +- .../hadoop/fs/s3a/RenameFailedException.java | 60 ++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 306 +++++++++++++----- 3 files changed, 293 insertions(+), 92 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RenameFailedException.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java index eb40c3ab1a314..fb1d269f3b057 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java @@ -50,8 +50,13 @@ final class BlockingThreadPoolExecutorService private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); + private final int maxActiveTasks; private final ThreadPoolExecutor eventProcessingExecutor; + public int getMaxActiveTasks() { + return maxActiveTasks; + } + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each * created thread uniquely, @@ -104,10 +109,17 @@ public Thread newThread(Runnable r) { }; } + /** + * Create an instance. + * @param permitCount total permit count + * @param maxActiveTasks maximum number of active tasks (for lookup only) + * @param eventProcessingExecutor the executor doing the real work. + */ private BlockingThreadPoolExecutorService(int permitCount, - ThreadPoolExecutor eventProcessingExecutor) { + int maxActiveTasks, ThreadPoolExecutor eventProcessingExecutor) { super(MoreExecutors.listeningDecorator(eventProcessingExecutor), permitCount, false); + this.maxActiveTasks = maxActiveTasks; this.eventProcessingExecutor = eventProcessingExecutor; } @@ -131,8 +143,9 @@ public static BlockingThreadPoolExecutorService newInstance( /* Although we generally only expect up to waitingTasks tasks in the queue, we need to be able to buffer all tasks in case dequeueing is slower than enqueueing. */ + int totalTasks = waitingTasks + activeTasks; final BlockingQueue workQueue = - new LinkedBlockingQueue<>(waitingTasks + activeTasks); + new LinkedBlockingQueue<>(totalTasks); ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit, workQueue, newDaemonThreadFactory(prefixName), @@ -146,7 +159,7 @@ public void rejectedExecution(Runnable r, } }); eventProcessingExecutor.allowCoreThreadTimeOut(true); - return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks, + return new BlockingThreadPoolExecutorService(totalTasks, activeTasks, eventProcessingExecutor); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RenameFailedException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RenameFailedException.java new file mode 100644 index 0000000000000..b37f142fd973b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RenameFailedException.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; + +/** + * Error to indicate that a specific rename failed. + * Target path is set to destination. + */ +public class RenameFailedException extends PathIOException { + + private boolean exitCode = false; + + public RenameFailedException(String src, String dest, Throwable cause) { + super(src, cause); + setOperation("rename"); + setTargetPath(dest); + } + + public RenameFailedException(String src, String dest, String error) { + super(src, error); + setOperation("rename"); + setTargetPath(dest); + } + + public RenameFailedException(Path src, Path optionalDest, String error) { + super(src.toString(), error); + setOperation("rename"); + if (optionalDest != null) { + setTargetPath(optionalDest.toString()); + } + } + + public boolean getExitCode() { + return exitCode; + } + + public RenameFailedException withExitCode(boolean exitCode) { + this.exitCode = exitCode; + return this; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index b9b88104c12dc..5859f9bcf6695 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -25,11 +25,18 @@ import java.io.InterruptedIOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +66,7 @@ import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; +import com.amazonaws.services.s3.transfer.model.CopyResult; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; @@ -149,6 +157,12 @@ public class S3AFileSystem extends FileSystem { private S3ADataBlocks.BlockFactory blockFactory; private int blockOutputActiveBlocks; + /** + * Pool of threads which await copy operations to complete. + * These do no actual work other than + */ + private BlockingThreadPoolExecutorService copyWaitingThreadPool; + /** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. * for this FileSystem @@ -208,7 +222,13 @@ public StorageStatistics provide() { DEFAULT_KEEPALIVE_TIME, 0); threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance( maxThreads, - maxThreads + totalTasks, + Math.max(totalTasks - maxThreads, 0), + keepAliveTime, TimeUnit.SECONDS, + "s3a-transfer-shared"); + + copyWaitingThreadPool = BlockingThreadPoolExecutorService.newInstance( + maxThreads, + 0, keepAliveTime, TimeUnit.SECONDS, "s3a-transfer-shared"); @@ -643,15 +663,27 @@ public boolean rename(Path src, Path dst) throws IOException { return innerRename(src, dst); } catch (AmazonClientException e) { throw translateException("rename(" + src +", " + dst + ")", src, e); + } catch (FileNotFoundException e) { + LOG.error("rename: src not found {}", src); + return false; + } catch (RenameFailedException e) { + LOG.debug(e.getMessage()); + return e.getExitCode(); } } /** * The inner rename operation. See {@link #rename(Path, Path)} for * the description of the operation. + * This operation throws an exception on any failure which needs to be + * reported and downgraded to a failure. That is: if a rename * @param src path to be renamed * @param dst new path after rename - * @return true if rename is successful + * @throws RenameFailedException if some criteria for a state changing + * rename was not met. This means work didn't happen; it's not something + * which is reported upstream to the FileSystem APIs, for which the semantics + * of "false" are pretty vague. + * @throws FileNotFoundException there's no source file. * @throws IOException on IO failure. * @throws AmazonClientException on failures inside the AWS SDK */ @@ -659,27 +691,24 @@ private boolean innerRename(Path src, Path dst) throws IOException, AmazonClientException { LOG.debug("Rename path {} to {}", src, dst); incrementStatistic(INVOCATION_RENAME); - String srcKey = pathToKey(src); String dstKey = pathToKey(dst); - if (srcKey.isEmpty() || dstKey.isEmpty()) { - LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey); - return false; + if (srcKey.isEmpty()) { + throw new RenameFailedException(src, dst, "source is root directory"); } - - S3AFileStatus srcStatus; - try { - srcStatus = getFileStatus(src); - } catch (FileNotFoundException e) { - LOG.error("rename: src not found {}", src); - return false; + if (dstKey.isEmpty()) { + throw new RenameFailedException(src, dst, "dest is root directory"); } + S3AFileStatus srcStatus = getFileStatus(src); + if (srcKey.equals(dstKey)) { - LOG.debug("rename: src and dst refer to the same file or directory: {}", + LOG.debug("rename: src and dest refer to the same file or directory: {}", dst); - return srcStatus.isFile(); + throw new RenameFailedException(src, dst, + "source and dest refer to the same file or directory") + .withExitCode(srcStatus.isFile()); } S3AFileStatus dstStatus = null; @@ -687,11 +716,10 @@ private boolean innerRename(Path src, Path dst) throws IOException, dstStatus = getFileStatus(dst); if (srcStatus.isDirectory() && dstStatus.isFile()) { - LOG.debug("rename: src {} is a directory and dst {} is a file", - src, dst); - return false; + throw new RenameFailedException(src, dst, + "source is a directory and dest is a file") + .withExitCode(srcStatus.isFile()); } - if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) { return false; } @@ -703,12 +731,12 @@ private boolean innerRename(Path src, Path dst) throws IOException, try { S3AFileStatus dstParentStatus = getFileStatus(dst.getParent()); if (!dstParentStatus.isDirectory()) { - return false; + throw new RenameFailedException(src, dst, + "destination parent is not a directory"); } } catch (FileNotFoundException e2) { - LOG.debug("rename: destination path {} has no parent {}", - dst, parent); - return false; + throw new RenameFailedException(src, dst, + "destination has no parent "); } } } @@ -743,9 +771,8 @@ private boolean innerRename(Path src, Path dst) throws IOException, //Verify dest is not a child of the source directory if (dstKey.startsWith(srcKey)) { - LOG.debug("cannot rename a directory {}" + - " to a subdirectory of self: {}", srcKey, dstKey); - return false; + throw new RenameFailedException(srcKey, dstKey, + "cannot rename a directory to a subdirectory o fitself "); } List keysToDelete = new ArrayList<>(); @@ -754,42 +781,131 @@ private boolean innerRename(Path src, Path dst) throws IOException, keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); } - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(srcKey); - request.setMaxKeys(maxKeys); + renameFilesUnderDirectory(srcKey, dstKey, keysToDelete); + } - ObjectListing objects = listObjects(request); + if (src.getParent() != dst.getParent()) { + deleteUnnecessaryFakeDirectories(dst.getParent()); + createFakeDirectoryIfNecessary(src.getParent()); + } + return true; + } - while (true) { - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - keysToDelete.add( - new DeleteObjectsRequest.KeyVersion(summary.getKey())); - String newDstKey = - dstKey + summary.getKey().substring(srcKey.length()); - copyFile(summary.getKey(), newDstKey, summary.getSize()); + /** + * Rename files under a directory. + * @param srcKey key to source directory + * @param dstKey destination directory + * @param keysToDelete Possibly non-empty list of keys to delete + * @throws IOException + * @throws AmazonClientException + */ + private void renameFilesUnderDirectory(final String srcKey, + final String dstKey, + final List keysToDelete) throws IOException { + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(srcKey); + request.setMaxKeys(maxKeys); - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - removeKeys(keysToDelete, true, false); - } + ObjectListing objects = listObjects(request); + + Comparator compareObjectSizes = new Comparator() { + @Override + public int compare(S3ObjectSummary o1, S3ObjectSummary o2) { + long result = o1.getSize() - o2.getSize(); + if (result < 0) { + return -1; + } else if (result > 0) { + return 1; } + return 0; + } + }; - if (objects.isTruncated()) { - objects = continueListObjects(objects); - } else { - if (!keysToDelete.isEmpty()) { - removeKeys(keysToDelete, false, false); + + // THIS IS WRONG: need to have a queue with a pool of threads taking from + // a blocking queue and doing copy + wait. Threads finish when some event + // signals that (push in end of queue marker, recipient resubmits then exits? + // The primary thread then just needs to + // wait for the entire pool finishing. + // will need to do the listing and delete operations in a thread too + // for max performance; schedule them first. + // maybe also add a metric on number of copies queued for monitoring + // this is ... complicated... + CompletionService ecs + = new ExecutorCompletionService<>(copyWaitingThreadPool); + while (true) { + List objectSummaries = objects.getObjectSummaries(); + Collections.sort(objectSummaries, compareObjectSizes); + for (S3ObjectSummary summary : objectSummaries) { + final String copySourceKey = summary.getKey(); + String newDstKey = + dstKey + copySourceKey.substring(srcKey.length()); + final long size = summary.getSize(); + final Copy copy = scheduleFileCopy(copySourceKey, + newDstKey, + size); + // submit the next piece of work + ecs.submit(new Callable() { + @Override + public CopyResult call() throws Exception { + CopyResult result = copy.waitForCopyResult(); + incrementWriteOperations(); + instrumentation.filesCopied(1, size); + keysToDelete.add( + new DeleteObjectsRequest.KeyVersion(copySourceKey)); + return result; + } + }); + // now, once that first has been submitted, we can await them finishing + try { + ecs.take().get(); + } catch (CancellationException e) { + LOG.info("Cancelled copy {} to {}", srcKey, newDstKey, e); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + newDstKey + ", cancelling"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause != null) { + // rethrow the cause so as to use Java lang exception mapping + try { + throw cause; + } catch (AmazonClientException | IOException ex) { + throw ex; + } catch (InterruptedException ex) { + throw (IOException) new InterruptedIOException("interrupted") + .initCause(ex); + } catch (Throwable ex) { + throw new IOException(ex); + } + } else { + throw new IOException(e); } - break; } + } - } - if (src.getParent() != dst.getParent()) { - deleteUnnecessaryFakeDirectories(dst.getParent()); - createFakeDirectoryIfNecessary(src.getParent()); + // old code + for (S3ObjectSummary summary : objectSummaries) { + keysToDelete.add( + new DeleteObjectsRequest.KeyVersion(summary.getKey())); + + if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { + removeKeys(keysToDelete, true, false); + } + copyFile(); + } + + if (objects.isTruncated()) { + objects = continueListObjects(objects); + } else { + if (!keysToDelete.isEmpty()) { + removeKeys(keysToDelete, false, false); + } + break; + } } - return true; } /** @@ -1699,55 +1815,67 @@ public String getCanonicalServiceName() { return null; } + /** + * A progress listener called during copy operations. + */ + private final ProgressListener copyProgressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventType()) { + case TRANSFER_PART_COMPLETED_EVENT: + incrementWriteOperations(); + break; + default: + break; + } + } + }; + /** * Copy a single object in the bucket via a COPY operation. * @param srcKey source object path * @param dstKey destination object path * @param size object size * @throws AmazonClientException on failures inside the AWS SDK + * @throws IOException Other IO problems + */ + private Copy scheduleFileCopy(String srcKey, String dstKey, long size) + throws IOException, AmazonClientException { + LOG.debug("copyFile {} -> {} ", srcKey, dstKey); + + ObjectMetadata srcom = getObjectMetadata(srcKey); + ObjectMetadata dstom = cloneObjectMetadata(srcom); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); + } + CopyObjectRequest copyObjectRequest = + new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + copyObjectRequest.setCannedAccessControlList(cannedACL); + copyObjectRequest.setNewObjectMetadata(dstom); + + Copy copy = transfers.copy(copyObjectRequest); + copy.addProgressListener(copyProgressListener); + return copy; + } + + /** + * Copy a single object in the bucket via a COPY operation, + * blocking until the copy completes. + * @param srcKey source object path + * @param dstKey destination object path + * @param size object size + * @throws AmazonClientException on failures inside the AWS SDK * @throws InterruptedIOException the operation was interrupted * @throws IOException Other IO problems */ private void copyFile(String srcKey, String dstKey, long size) throws IOException, InterruptedIOException, AmazonClientException { - LOG.debug("copyFile {} -> {} ", srcKey, dstKey); - try { - ObjectMetadata srcom = getObjectMetadata(srcKey); - ObjectMetadata dstom = cloneObjectMetadata(srcom); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - CopyObjectRequest copyObjectRequest = - new CopyObjectRequest(bucket, srcKey, bucket, dstKey); - copyObjectRequest.setCannedAccessControlList(cannedACL); - copyObjectRequest.setNewObjectMetadata(dstom); - - ProgressListener progressListener = new ProgressListener() { - public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventType()) { - case TRANSFER_PART_COMPLETED_EVENT: - incrementWriteOperations(); - break; - default: - break; - } - } - }; - - Copy copy = transfers.copy(copyObjectRequest); - copy.addProgressListener(progressListener); - try { - copy.waitForCopyResult(); - incrementWriteOperations(); - instrumentation.filesCopied(1, size); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted copying " + srcKey - + " to " + dstKey + ", cancelling"); - } - } catch (AmazonClientException e) { - throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")", - srcKey, e); + scheduleFileCopy(srcKey, dstKey, size).waitForCopyResult(); + incrementWriteOperations(); + instrumentation.filesCopied(1, size); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + dstKey + ", cancelling"); } } From b33b28c82363b76a53f9dc9f443f23c7ada20dcf Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 21 Nov 2016 12:10:19 +0000 Subject: [PATCH 2/2] HADOOP-13600 moving parallel copy into its own class Change-Id: Ibfe08fb289deaa3b2fa21a8281f58651edb6d2ab --- .../apache/hadoop/fs/s3a/ParallelCopier.java | 26 +++++++++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 38 +++++++++++++------ 2 files changed, 53 insertions(+), 11 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelCopier.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelCopier.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelCopier.java new file mode 100644 index 0000000000000..3f17656856266 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelCopier.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +/** + * Execute copy operations in parallel; blocks until all operations + * are completed. + */ +class ParallelCopier { +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 5859f9bcf6695..b03ee7f969340 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -663,9 +663,6 @@ public boolean rename(Path src, Path dst) throws IOException { return innerRename(src, dst); } catch (AmazonClientException e) { throw translateException("rename(" + src +", " + dst + ")", src, e); - } catch (FileNotFoundException e) { - LOG.error("rename: src not found {}", src); - return false; } catch (RenameFailedException e) { LOG.debug(e.getMessage()); return e.getExitCode(); @@ -701,6 +698,8 @@ private boolean innerRename(Path src, Path dst) throws IOException, throw new RenameFailedException(src, dst, "dest is root directory"); } + // get the source file status; this raises a FNFE if there is no source + // file. S3AFileStatus srcStatus = getFileStatus(src); if (srcKey.equals(dstKey)) { @@ -714,15 +713,31 @@ private boolean innerRename(Path src, Path dst) throws IOException, S3AFileStatus dstStatus = null; try { dstStatus = getFileStatus(dst); - - if (srcStatus.isDirectory() && dstStatus.isFile()) { - throw new RenameFailedException(src, dst, - "source is a directory and dest is a file") - .withExitCode(srcStatus.isFile()); - } - if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) { - return false; + // if there is no destination entry, an exception is raised. + // hence this code sequence can assume that there is something + // at the end of the path; the only detail being what it is and + // whether or not it can be the destination of the rename. + if (srcStatus.isDirectory()) { + if (dstStatus.isFile()) { + throw new RenameFailedException(src, dst, + "source is a directory and dest is a file") + .withExitCode(srcStatus.isFile()); + } else if (!dstStatus.isEmptyDirectory()) { + throw new RenameFailedException(src, dst, + "Destination is a non-empty directory") + .withExitCode(false); + } + // at this point the destination is an empty directory + } else { + // source is a file. Look at the destination + if (dstStatus.isFile()) { + throw new RenameFailedException(src, dst, + "destination file for rename operations already exists") + .withExitCode(false); + } + // } + } catch (FileNotFoundException e) { LOG.debug("rename: destination path {} not found", dst); // Parent must exist @@ -880,6 +895,7 @@ public CopyResult call() throws Exception { throw new IOException(ex); } } else { + //no cause throw new IOException(e); } }