diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java index eb40c3ab1a314..fb1d269f3b057 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java @@ -50,8 +50,13 @@ final class BlockingThreadPoolExecutorService private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); + private final int maxActiveTasks; private final ThreadPoolExecutor eventProcessingExecutor; + public int getMaxActiveTasks() { + return maxActiveTasks; + } + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each * created thread uniquely, @@ -104,10 +109,17 @@ public Thread newThread(Runnable r) { }; } + /** + * Create an instance. + * @param permitCount total permit count + * @param maxActiveTasks maximum number of active tasks (for lookup only) + * @param eventProcessingExecutor the executor doing the real work. + */ private BlockingThreadPoolExecutorService(int permitCount, - ThreadPoolExecutor eventProcessingExecutor) { + int maxActiveTasks, ThreadPoolExecutor eventProcessingExecutor) { super(MoreExecutors.listeningDecorator(eventProcessingExecutor), permitCount, false); + this.maxActiveTasks = maxActiveTasks; this.eventProcessingExecutor = eventProcessingExecutor; } @@ -131,8 +143,9 @@ public static BlockingThreadPoolExecutorService newInstance( /* Although we generally only expect up to waitingTasks tasks in the queue, we need to be able to buffer all tasks in case dequeueing is slower than enqueueing. */ + int totalTasks = waitingTasks + activeTasks; final BlockingQueue workQueue = - new LinkedBlockingQueue<>(waitingTasks + activeTasks); + new LinkedBlockingQueue<>(totalTasks); ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit, workQueue, newDaemonThreadFactory(prefixName), @@ -146,7 +159,7 @@ public void rejectedExecution(Runnable r, } }); eventProcessingExecutor.allowCoreThreadTimeOut(true); - return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks, + return new BlockingThreadPoolExecutorService(totalTasks, activeTasks, eventProcessingExecutor); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelCopier.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelCopier.java new file mode 100644 index 0000000000000..3f17656856266 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelCopier.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +/** + * Execute copy operations in parallel; blocks until all operations + * are completed. + */ +class ParallelCopier { +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RenameFailedException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RenameFailedException.java new file mode 100644 index 0000000000000..b37f142fd973b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RenameFailedException.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; + +/** + * Error to indicate that a specific rename failed. + * Target path is set to destination. + */ +public class RenameFailedException extends PathIOException { + + private boolean exitCode = false; + + public RenameFailedException(String src, String dest, Throwable cause) { + super(src, cause); + setOperation("rename"); + setTargetPath(dest); + } + + public RenameFailedException(String src, String dest, String error) { + super(src, error); + setOperation("rename"); + setTargetPath(dest); + } + + public RenameFailedException(Path src, Path optionalDest, String error) { + super(src.toString(), error); + setOperation("rename"); + if (optionalDest != null) { + setTargetPath(optionalDest.toString()); + } + } + + public boolean getExitCode() { + return exitCode; + } + + public RenameFailedException withExitCode(boolean exitCode) { + this.exitCode = exitCode; + return this; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index b9b88104c12dc..b03ee7f969340 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -25,11 +25,18 @@ import java.io.InterruptedIOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +66,7 @@ import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; +import com.amazonaws.services.s3.transfer.model.CopyResult; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; @@ -149,6 +157,12 @@ public class S3AFileSystem extends FileSystem { private S3ADataBlocks.BlockFactory blockFactory; private int blockOutputActiveBlocks; + /** + * Pool of threads which await copy operations to complete. + * These do no actual work other than + */ + private BlockingThreadPoolExecutorService copyWaitingThreadPool; + /** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. * for this FileSystem @@ -208,7 +222,13 @@ public StorageStatistics provide() { DEFAULT_KEEPALIVE_TIME, 0); threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance( maxThreads, - maxThreads + totalTasks, + Math.max(totalTasks - maxThreads, 0), + keepAliveTime, TimeUnit.SECONDS, + "s3a-transfer-shared"); + + copyWaitingThreadPool = BlockingThreadPoolExecutorService.newInstance( + maxThreads, + 0, keepAliveTime, TimeUnit.SECONDS, "s3a-transfer-shared"); @@ -643,15 +663,24 @@ public boolean rename(Path src, Path dst) throws IOException { return innerRename(src, dst); } catch (AmazonClientException e) { throw translateException("rename(" + src +", " + dst + ")", src, e); + } catch (RenameFailedException e) { + LOG.debug(e.getMessage()); + return e.getExitCode(); } } /** * The inner rename operation. See {@link #rename(Path, Path)} for * the description of the operation. + * This operation throws an exception on any failure which needs to be + * reported and downgraded to a failure. That is: if a rename * @param src path to be renamed * @param dst new path after rename - * @return true if rename is successful + * @throws RenameFailedException if some criteria for a state changing + * rename was not met. This means work didn't happen; it's not something + * which is reported upstream to the FileSystem APIs, for which the semantics + * of "false" are pretty vague. + * @throws FileNotFoundException there's no source file. * @throws IOException on IO failure. * @throws AmazonClientException on failures inside the AWS SDK */ @@ -659,42 +688,56 @@ private boolean innerRename(Path src, Path dst) throws IOException, AmazonClientException { LOG.debug("Rename path {} to {}", src, dst); incrementStatistic(INVOCATION_RENAME); - String srcKey = pathToKey(src); String dstKey = pathToKey(dst); - if (srcKey.isEmpty() || dstKey.isEmpty()) { - LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey); - return false; + if (srcKey.isEmpty()) { + throw new RenameFailedException(src, dst, "source is root directory"); } - - S3AFileStatus srcStatus; - try { - srcStatus = getFileStatus(src); - } catch (FileNotFoundException e) { - LOG.error("rename: src not found {}", src); - return false; + if (dstKey.isEmpty()) { + throw new RenameFailedException(src, dst, "dest is root directory"); } + // get the source file status; this raises a FNFE if there is no source + // file. + S3AFileStatus srcStatus = getFileStatus(src); + if (srcKey.equals(dstKey)) { - LOG.debug("rename: src and dst refer to the same file or directory: {}", + LOG.debug("rename: src and dest refer to the same file or directory: {}", dst); - return srcStatus.isFile(); + throw new RenameFailedException(src, dst, + "source and dest refer to the same file or directory") + .withExitCode(srcStatus.isFile()); } S3AFileStatus dstStatus = null; try { dstStatus = getFileStatus(dst); - - if (srcStatus.isDirectory() && dstStatus.isFile()) { - LOG.debug("rename: src {} is a directory and dst {} is a file", - src, dst); - return false; + // if there is no destination entry, an exception is raised. + // hence this code sequence can assume that there is something + // at the end of the path; the only detail being what it is and + // whether or not it can be the destination of the rename. + if (srcStatus.isDirectory()) { + if (dstStatus.isFile()) { + throw new RenameFailedException(src, dst, + "source is a directory and dest is a file") + .withExitCode(srcStatus.isFile()); + } else if (!dstStatus.isEmptyDirectory()) { + throw new RenameFailedException(src, dst, + "Destination is a non-empty directory") + .withExitCode(false); + } + // at this point the destination is an empty directory + } else { + // source is a file. Look at the destination + if (dstStatus.isFile()) { + throw new RenameFailedException(src, dst, + "destination file for rename operations already exists") + .withExitCode(false); + } + // } - if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) { - return false; - } } catch (FileNotFoundException e) { LOG.debug("rename: destination path {} not found", dst); // Parent must exist @@ -703,12 +746,12 @@ private boolean innerRename(Path src, Path dst) throws IOException, try { S3AFileStatus dstParentStatus = getFileStatus(dst.getParent()); if (!dstParentStatus.isDirectory()) { - return false; + throw new RenameFailedException(src, dst, + "destination parent is not a directory"); } } catch (FileNotFoundException e2) { - LOG.debug("rename: destination path {} has no parent {}", - dst, parent); - return false; + throw new RenameFailedException(src, dst, + "destination has no parent "); } } } @@ -743,9 +786,8 @@ private boolean innerRename(Path src, Path dst) throws IOException, //Verify dest is not a child of the source directory if (dstKey.startsWith(srcKey)) { - LOG.debug("cannot rename a directory {}" + - " to a subdirectory of self: {}", srcKey, dstKey); - return false; + throw new RenameFailedException(srcKey, dstKey, + "cannot rename a directory to a subdirectory o fitself "); } List keysToDelete = new ArrayList<>(); @@ -754,42 +796,132 @@ private boolean innerRename(Path src, Path dst) throws IOException, keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); } - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(srcKey); - request.setMaxKeys(maxKeys); + renameFilesUnderDirectory(srcKey, dstKey, keysToDelete); + } - ObjectListing objects = listObjects(request); + if (src.getParent() != dst.getParent()) { + deleteUnnecessaryFakeDirectories(dst.getParent()); + createFakeDirectoryIfNecessary(src.getParent()); + } + return true; + } - while (true) { - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - keysToDelete.add( - new DeleteObjectsRequest.KeyVersion(summary.getKey())); - String newDstKey = - dstKey + summary.getKey().substring(srcKey.length()); - copyFile(summary.getKey(), newDstKey, summary.getSize()); + /** + * Rename files under a directory. + * @param srcKey key to source directory + * @param dstKey destination directory + * @param keysToDelete Possibly non-empty list of keys to delete + * @throws IOException + * @throws AmazonClientException + */ + private void renameFilesUnderDirectory(final String srcKey, + final String dstKey, + final List keysToDelete) throws IOException { + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(srcKey); + request.setMaxKeys(maxKeys); - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - removeKeys(keysToDelete, true, false); - } + ObjectListing objects = listObjects(request); + + Comparator compareObjectSizes = new Comparator() { + @Override + public int compare(S3ObjectSummary o1, S3ObjectSummary o2) { + long result = o1.getSize() - o2.getSize(); + if (result < 0) { + return -1; + } else if (result > 0) { + return 1; } + return 0; + } + }; - if (objects.isTruncated()) { - objects = continueListObjects(objects); - } else { - if (!keysToDelete.isEmpty()) { - removeKeys(keysToDelete, false, false); + + // THIS IS WRONG: need to have a queue with a pool of threads taking from + // a blocking queue and doing copy + wait. Threads finish when some event + // signals that (push in end of queue marker, recipient resubmits then exits? + // The primary thread then just needs to + // wait for the entire pool finishing. + // will need to do the listing and delete operations in a thread too + // for max performance; schedule them first. + // maybe also add a metric on number of copies queued for monitoring + // this is ... complicated... + CompletionService ecs + = new ExecutorCompletionService<>(copyWaitingThreadPool); + while (true) { + List objectSummaries = objects.getObjectSummaries(); + Collections.sort(objectSummaries, compareObjectSizes); + for (S3ObjectSummary summary : objectSummaries) { + final String copySourceKey = summary.getKey(); + String newDstKey = + dstKey + copySourceKey.substring(srcKey.length()); + final long size = summary.getSize(); + final Copy copy = scheduleFileCopy(copySourceKey, + newDstKey, + size); + // submit the next piece of work + ecs.submit(new Callable() { + @Override + public CopyResult call() throws Exception { + CopyResult result = copy.waitForCopyResult(); + incrementWriteOperations(); + instrumentation.filesCopied(1, size); + keysToDelete.add( + new DeleteObjectsRequest.KeyVersion(copySourceKey)); + return result; + } + }); + // now, once that first has been submitted, we can await them finishing + try { + ecs.take().get(); + } catch (CancellationException e) { + LOG.info("Cancelled copy {} to {}", srcKey, newDstKey, e); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + newDstKey + ", cancelling"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause != null) { + // rethrow the cause so as to use Java lang exception mapping + try { + throw cause; + } catch (AmazonClientException | IOException ex) { + throw ex; + } catch (InterruptedException ex) { + throw (IOException) new InterruptedIOException("interrupted") + .initCause(ex); + } catch (Throwable ex) { + throw new IOException(ex); + } + } else { + //no cause + throw new IOException(e); } - break; } + } - } - if (src.getParent() != dst.getParent()) { - deleteUnnecessaryFakeDirectories(dst.getParent()); - createFakeDirectoryIfNecessary(src.getParent()); + // old code + for (S3ObjectSummary summary : objectSummaries) { + keysToDelete.add( + new DeleteObjectsRequest.KeyVersion(summary.getKey())); + + if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { + removeKeys(keysToDelete, true, false); + } + copyFile(); + } + + if (objects.isTruncated()) { + objects = continueListObjects(objects); + } else { + if (!keysToDelete.isEmpty()) { + removeKeys(keysToDelete, false, false); + } + break; + } } - return true; } /** @@ -1699,55 +1831,67 @@ public String getCanonicalServiceName() { return null; } + /** + * A progress listener called during copy operations. + */ + private final ProgressListener copyProgressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventType()) { + case TRANSFER_PART_COMPLETED_EVENT: + incrementWriteOperations(); + break; + default: + break; + } + } + }; + /** * Copy a single object in the bucket via a COPY operation. * @param srcKey source object path * @param dstKey destination object path * @param size object size * @throws AmazonClientException on failures inside the AWS SDK + * @throws IOException Other IO problems + */ + private Copy scheduleFileCopy(String srcKey, String dstKey, long size) + throws IOException, AmazonClientException { + LOG.debug("copyFile {} -> {} ", srcKey, dstKey); + + ObjectMetadata srcom = getObjectMetadata(srcKey); + ObjectMetadata dstom = cloneObjectMetadata(srcom); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); + } + CopyObjectRequest copyObjectRequest = + new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + copyObjectRequest.setCannedAccessControlList(cannedACL); + copyObjectRequest.setNewObjectMetadata(dstom); + + Copy copy = transfers.copy(copyObjectRequest); + copy.addProgressListener(copyProgressListener); + return copy; + } + + /** + * Copy a single object in the bucket via a COPY operation, + * blocking until the copy completes. + * @param srcKey source object path + * @param dstKey destination object path + * @param size object size + * @throws AmazonClientException on failures inside the AWS SDK * @throws InterruptedIOException the operation was interrupted * @throws IOException Other IO problems */ private void copyFile(String srcKey, String dstKey, long size) throws IOException, InterruptedIOException, AmazonClientException { - LOG.debug("copyFile {} -> {} ", srcKey, dstKey); - try { - ObjectMetadata srcom = getObjectMetadata(srcKey); - ObjectMetadata dstom = cloneObjectMetadata(srcom); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - CopyObjectRequest copyObjectRequest = - new CopyObjectRequest(bucket, srcKey, bucket, dstKey); - copyObjectRequest.setCannedAccessControlList(cannedACL); - copyObjectRequest.setNewObjectMetadata(dstom); - - ProgressListener progressListener = new ProgressListener() { - public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventType()) { - case TRANSFER_PART_COMPLETED_EVENT: - incrementWriteOperations(); - break; - default: - break; - } - } - }; - - Copy copy = transfers.copy(copyObjectRequest); - copy.addProgressListener(progressListener); - try { - copy.waitForCopyResult(); - incrementWriteOperations(); - instrumentation.filesCopied(1, size); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted copying " + srcKey - + " to " + dstKey + ", cancelling"); - } - } catch (AmazonClientException e) { - throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")", - srcKey, e); + scheduleFileCopy(srcKey, dstKey, size).waitForCopyResult(); + incrementWriteOperations(); + instrumentation.filesCopied(1, size); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + dstKey + ", cancelling"); } }