diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java index df932df43aebd..aa231554eb0cb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java @@ -146,4 +146,22 @@ private CommonPathCapabilities() { */ public static final String ABORTABLE_STREAM = "fs.capability.outputstream.abortable"; + + /** + * Does this FS support etags? + * That is: will FileStatus entries from listing/getFileStatus + * probes support EtagSource and return real values. + */ + public static final String ETAGS_AVAILABLE = + "fs.capability.etags.available"; + + /** + * Are etags guaranteed to be preserved across rename() operations.. + * FileSystems MUST NOT declare support for this feature + * unless this holds. + */ + public static final String ETAGS_PRESERVED_IN_RENAME = + "fs.capability.etags.preserved.in.rename"; + + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EtagSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EtagSource.java new file mode 100644 index 0000000000000..d7efdc705d8e5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EtagSource.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +/** + * An optional interface for {@link FileStatus} subclasses to implement + * to provide access to etags. + * If available FS SHOULD also implement the matching PathCapabilities + * -- etag supported: {@link CommonPathCapabilities#ETAGS_AVAILABLE}. + * -- etag consistent over rename: + * {@link CommonPathCapabilities#ETAGS_PRESERVED_IN_RENAME}. + */ +public interface EtagSource { + + /** + * Return an etag of this file status. + * A return value of null or "" means "no etag" + * @return a possibly null or empty etag. + */ + String getEtag(); + +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 2eb0bc07196d6..9991ed0990be8 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -116,6 +116,64 @@ for both files and directories, MUST always return `true` to the `isEncrypted()` predicate. This can be done by setting the `encrypted` flag to true when creating the `FileStatus` instance. +#### Interface `EtagFromFileStatus` + +FileSystem implementations MAY support querying HTTP etags from `FileStatus` +entries. If so, the requirements are + +#### Etags MUST BE different for different file contents. + +Two different arrays of data written to the same path MUST have different etag +values when probed. +This is a requirement of the HTTP specification. + +##### Etags MUST BE Consistent across listing operations. + +The value of `EtagFromFileStatus.getEtag()` MUST be the same for list* queries as + for `getFileStatus()`. + +```java + ((EtagFromFileStatus)getFileStatus(path)).getEtag() == ((EtagFromFileStatus)listStatus(path)[0]).getEtag() +``` + ++the same value is returned for `listFiles()`, `listStatusIncremental()` of the path +and, when listing the parent path, of all files in the listing. + +##### Etags MUST BE preserved across rename operations + +The value of `EtagFromFileStatus.getEtag()` SHOULD be the same after a file is renamed. +This is an implementation detail of the store; it does not hold for AWS S3. + +#### `FileStatus` subclass MUST BE `Serializable`; MAY BE `Writable` + +The base `FileStatus` class implements `Serializable` and `Writable` and marshalls +its fields appropriately. + +Subclasses MUST support java serialization (Some Apache Spark applications use it), +preserving the etag. This is a matter of making the etag field non-static and +adding a `serialVersionUID`. + +The `Writable` support was used for marshalling status data over Hadoop IPC calls; +Now that is implemented through `org/apache/hadoop/fs/protocolPB/PBHelper.java` +and the methods deprecated. +Subclasses MAY override the deprecated methods to add etag marshalling -but there +is no expectation of this. + +#### Appropriate etag Path Capabilities MUST BE declared + +1. `hasPathCapability(path, "fs.capability.etags.available")` MUST return true iff + the filesystem returns valid (non-empty etags). +3. `hasPathCapability(path, "fs.capability.etags.consistent.across.rename")` MUST return true + if and only if etags are preserved across renames. + + +#### Non-requirements of etag support + +* There is no requirement/expectation that `FileSystem.getFileChecksum(Path)` returns a + checksum value related to the etag of an object, if any value is returned. +* If the same data is uploaded to the twice to the same or a different path, + the etag of the second upload MAY NOT metch that of the first upload. + ### `msync()` @@ -1240,7 +1298,7 @@ Renaming a file where the destination is a directory moves the file as a child FS' where: not exists(FS', src) and exists(FS', dest) - and data(FS', dest) == data (FS, dest) + and data(FS', dest) == data (FS, source) result = True diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractEtagTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractEtagTest.java new file mode 100644 index 0000000000000..8a6717edb6e9b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractEtagTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; + +/** + * For filesystems which support etags, validate correctness + * of their implementation. + */ +public abstract class AbstractContractEtagTest extends + AbstractFSContractTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractEtagTest.class); + + /** + * basic consistency across operations, as well as being non-empty. + */ + @Test + public void testEtagConsistencyAcrossListAndHead() throws Throwable { + describe("Etag values must be non-empty and consistent across LIST and HEAD Calls."); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + + Assertions.assertThat(fs.hasPathCapability(path, ETAGS_AVAILABLE)) + .describedAs("path capability %s of %s", + ETAGS_AVAILABLE, path) + .isTrue(); + + ContractTestUtils.touch(fs, path); + + + final FileStatus st = fs.getFileStatus(path); + final String etag = etagFromStatus(st); + Assertions.assertThat(etag) + .describedAs("Etag of %s", st) + .isNotBlank(); + LOG.info("etag of empty file is \"{}\"", etag); + + final FileStatus[] statuses = fs.listStatus(path); + Assertions.assertThat(statuses) + .describedAs("List(%s)", path) + .hasSize(1); + final FileStatus lsStatus = statuses[0]; + Assertions.assertThat(etagFromStatus(lsStatus)) + .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st) + .isEqualTo(etag); + } + + /** + * Get an etag from a FileStatus which MUST BE + * an implementation of EtagSource and + * whose etag MUST NOT BE null/empty. + * @param st the status + * @return the etag + */ + String etagFromStatus(FileStatus st) { + Assertions.assertThat(st) + .describedAs("FileStatus %s", st) + .isInstanceOf(EtagSource.class); + final String etag = ((EtagSource) st).getEtag(); + Assertions.assertThat(etag) + .describedAs("Etag of %s", st) + .isNotBlank(); + return etag; + } + + /** + * Overwritten data has different etags. + */ + @Test + public void testEtagsOfDifferentDataDifferent() throws Throwable { + describe("Verify that two different blocks of data written have different tags"); + + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Path src = new Path(path, "src"); + + ContractTestUtils.createFile(fs, src, true, + "data1234".getBytes(StandardCharsets.UTF_8)); + final FileStatus srcStatus = fs.getFileStatus(src); + final String srcTag = etagFromStatus(srcStatus); + LOG.info("etag of file 1 is \"{}\"", srcTag); + + // now overwrite with data of same length + // (ensure that path or length aren't used exclusively as tag) + ContractTestUtils.createFile(fs, src, true, + "1234data".getBytes(StandardCharsets.UTF_8)); + + // validate + final String tag2 = etagFromStatus(fs.getFileStatus(src)); + LOG.info("etag of file 2 is \"{}\"", tag2); + + Assertions.assertThat(tag2) + .describedAs("etag of updated file") + .isNotEqualTo(srcTag); + } + + /** + * If supported, rename preserves etags. + */ + @Test + public void testEtagConsistencyAcrossRename() throws Throwable { + describe("Verify that when a file is renamed, the etag remains unchanged"); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Assume.assumeTrue( + "Filesystem does not declare that etags are preserved across renames", + fs.hasPathCapability(path, ETAGS_PRESERVED_IN_RENAME)); + Path src = new Path(path, "src"); + Path dest = new Path(path, "dest"); + + ContractTestUtils.createFile(fs, src, true, + "sample data".getBytes(StandardCharsets.UTF_8)); + final FileStatus srcStatus = fs.getFileStatus(src); + final String srcTag = etagFromStatus(srcStatus); + LOG.info("etag of short file is \"{}\"", srcTag); + + Assertions.assertThat(srcTag) + .describedAs("Etag of %s", srcStatus) + .isNotBlank(); + + // rename + fs.rename(src, dest); + + // validate + FileStatus destStatus = fs.getFileStatus(dest); + final String destTag = etagFromStatus(destStatus); + Assertions.assertThat(destTag) + .describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus) + .isEqualTo(srcTag); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 877d73c5a59de..8c1a0a94b20ee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -20,6 +20,14 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -29,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -38,12 +47,13 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** An {@link OutputCommitter} that commits files specified +/** An {@link OutputCommitter} that commits files specified * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. **/ @InterfaceAudience.Public @@ -100,11 +110,31 @@ public class FileOutputCommitter extends PathOutputCommitter { public static final boolean FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; + public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS = + "mapreduce.fileoutputcommitter.algorithm.version.v1.experimental.mv.threads"; + public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS_DEFAULT = 1; + + public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT = + "mapreduce.fileoutputcommitter.algorithm.version.v1.experimental.parallel.task.commit"; + public static final boolean + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT_DEFAULT = false; + + /** + * Attemt to recover from rename failures if the store supports etags. + */ + public static final String FILEOUTPUTCOMMITTER_PARALLEL_RENAME_RECOVERY = + "mapreduce.fileoutputcommitter.algorithm.version.v1.experimental.parallel.rename.recovery"; + public static final boolean FILEOUTPUTCOMMITTER_PARALLEL_RENAME_RECOVERY_DEFAULT = false; + private Path outputPath = null; private Path workPath = null; private final int algorithmVersion; private final boolean skipCleanup; private final boolean ignoreCleanupFailures; + private final int moveThreads; + private final AtomicInteger numberOfTasks = new AtomicInteger(); + private final boolean isParallelTaskCommitEnabled; + private ResilientCommitByRenameHelper resilientCommitHelper; /** * Create a file output committer @@ -139,6 +169,14 @@ public FileOutputCommitter(Path outputPath, algorithmVersion = conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); + int configuredThreads = + context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS_DEFAULT); + int minThreads = Math.max(1, configuredThreads); + moveThreads = Math.min(minThreads, (Runtime.getRuntime().availableProcessors() * 16)); + isParallelTaskCommitEnabled = context.getConfiguration().getBoolean( + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT, + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT_DEFAULT); LOG.info("File Output Committer Algorithm version is " + algorithmVersion); if (algorithmVersion != 1 && algorithmVersion != 2) { throw new IOException("Only 1 or 2 algorithm version is supported"); @@ -396,6 +434,11 @@ public void commitJob(JobContext context) throws IOException { */ @VisibleForTesting protected void commitJobInternal(JobContext context) throws IOException { + if (isParallelMoveEnabled()) { + // Note: Code paths are intentionally copied + parallelCommitJobInternal(context); + return; + } if (hasOutputPath()) { Path finalOutput = getOutputPath(); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); @@ -497,6 +540,306 @@ private void mergePaths(FileSystem fs, final FileStatus from, } } + @VisibleForTesting + public int getMoveThreads() { + return moveThreads; + } + + public boolean isParallelMoveEnabled() { + // Only available for algo v1 + return (moveThreads > 1 && algorithmVersion == 1); + } + + void validateParallelMove() throws IOException { + if (!isParallelMoveEnabled()) { + throw new IOException("Parallel file move is not enabled. " + + "moveThreads=" + moveThreads + + ", algo=" + algorithmVersion); + } + } + + void validateThreadPool(ExecutorService pool, BlockingQueue> futures) + throws IOException { + boolean threadPoolEnabled = isParallelMoveEnabled(); + if (!threadPoolEnabled || pool == null || futures == null) { + String errorMsg = "Thread pool is not configured correctly. " + + "threadPoolEnabled: " + threadPoolEnabled + + ", pool: " + pool + + ", futures: " + futures; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + + /** + * Get executor service for moving files for v1 algorithm. + * @return executor service + * @throws IOException on error + */ + private ExecutorService createExecutorService() throws IOException { + // intentional validation + validateParallelMove(); + + ExecutorService pool = new ThreadPoolExecutor(moveThreads, moveThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("FileCommitter-v1-move-thread-%d") + .build(), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + LOG.info("Size of move thread pool: {}, pool: {}", moveThreads, pool); + return pool; + } + + private void parallelCommitJobInternal(JobContext context) throws IOException { + // validate to be on safer side. + validateParallelMove(); + + if (hasOutputPath()) { + Path finalOutput = getOutputPath(); + final Configuration conf = context.getConfiguration(); + FileSystem fs = finalOutput.getFileSystem(conf); + // created resilient commit helper bonded to the destination FS/path + resilientCommitHelper = new ResilientCommitByRenameHelper(fs, + finalOutput, + conf.getBoolean(FILEOUTPUTCOMMITTER_PARALLEL_RENAME_RECOVERY, + FILEOUTPUTCOMMITTER_PARALLEL_RENAME_RECOVERY_DEFAULT)); + if (resilientCommitHelper.isRenameRecoveryAvailable()) { + LOG.info("Using resilient commit API to move files"); + } + + // No need to check for algo ver=1, as this entire code path is for V1. + try (DurationInfo d = new DurationInfo(LOG, true, + "Merging data from committed tasks %s", finalOutput)) { + mergeCommittedTasksInParallel(fs, context, finalOutput); + } + + if (resilientCommitHelper.isRenameRecoveryAvailable()) { + LOG.info("Number of rename recoveries: {}", + resilientCommitHelper.getRecoveryCount()); + } + + if (skipCleanup) { + LOG.info("Skip cleanup the _temporary folders under job's output " + + "directory in commitJob."); + } else { + // delete the _temporary folder. + try { + cleanupJob(context); + } catch (IOException e) { + if (ignoreCleanupFailures) { + // swallow exceptions in cleanup as user configure to make sure + // commitJob could be success even when cleanup get failure. + LOG.error("Error in cleanup job, manually cleanup is needed.", e); + } else { + // throw back exception to fail commitJob. + throw e; + } + } + } + // True if the job requires output.dir marked on successful job. + // Note that by default it is set to true. + if (conf.getBoolean( + SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { + Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); + // If job commit is repeatable and previous/another AM could write + // mark file already, we need to set overwritten to be true explicitly + // in case other FS implementations don't overwritten by default. + fs.create(markerPath, true).close(); + } + } else { + LOG.warn("Output Path is null in commitJob()"); + } + } + + private void mergeCommittedTasksInParallel(FileSystem fs, JobContext context, + Path finalOutput) throws IOException { + + // validate to be on safer side. + validateParallelMove(); + + ExecutorService pool = createExecutorService(); + BlockingQueue> futures = new LinkedBlockingQueue<>(); + try { + for (FileStatus stat : getAllCommittedTaskPaths(context)) { + LOG.info("Merging in parallel, from: {}, to: {}, parallelTaskCommitEnabled: {}", + stat.getPath(), finalOutput, isParallelTaskCommitEnabled); + if (isParallelTaskCommitEnabled) { + futures.add(pool.submit(() -> { + mergePathsInParallel(fs, stat, finalOutput, context, pool, futures); + return null; + })); + numberOfTasks.getAndIncrement(); + } else { + mergePathsInParallel(fs, stat, finalOutput, context, pool, futures); + } + } + drainFutures(pool, futures); + } finally { + shutdownThreadPool(pool); + } + } + + /** + * Drain all future tasks and clean up thread pool. + * + * @throws IOException if an exception was raised in any pool. + */ + private void drainFutures(ExecutorService pool, BlockingQueue> futures) + throws IOException { + if (futures == null) { + return; + } + try { + int i = 0; + while(!futures.isEmpty()) { + futures.take().get(); + if (i % 1000 == 0) { + LOG.info("Drained task id: {}, overall: {}", i, numberOfTasks.get()); + } + i++; + } + } catch (InterruptedException | ExecutionException e) { + throw cancelTasks(pool, e); + } + } + + @VisibleForTesting + public int getNumCompletedTasks() { + return numberOfTasks.get(); + } + + /** + * Cancel pending tasks in case of exception. + * + * @param pool threadpool + * @param e exception + * @return IOException + */ + private IOException cancelTasks(ExecutorService pool, Exception e) { + if (e == null) { + // shouldn't land here + return new IOException("exception was null"); + } + LOG.error("Cancelling all tasks and shutting down thread pool.", e); + pool.shutdownNow(); + if (e.getCause() instanceof IOException) { + return (IOException) e.getCause(); + } + return new IOException(e); + } + + /** + * Shutdown thread pool. + */ + private void shutdownThreadPool(ExecutorService pool) { + if (pool != null && !pool.isTerminated()) { + LOG.info("Shutting down thread pool"); + pool.shutdown(); + numberOfTasks.set(0); + } + } + + /** + * Merge two paths together with parallel threads. + * Anything in from will be moved into to, if there + * are any name conflicts while merging the files or directories in from win. + * @param fs the File System to use + * @param from the path data is coming from. + * @param to the path data is going to. + * @throws IOException on any error + */ + private void mergePathsInParallel(FileSystem fs, final FileStatus from, + final Path to, JobContext context, ExecutorService pool, + BlockingQueue> futures) throws IOException { + validateThreadPool(pool, futures); + + try (DurationInfo d = new DurationInfo(LOG, false, + "Merging data from %s to %s", from.getPath(), to)) { + reportProgress(context); + FileStatus toStat; + try { + toStat = fs.getFileStatus(to); + } catch (FileNotFoundException fnfe) { + toStat = null; + } + + if (from.isFile()) { + if (toStat != null) { + if (!fs.delete(to, true) && fs.exists(to)) { + throw new IOException("Failed to delete " + to); + } + } + + // do the rename + moveFileInParallelCommit(from, to); + } else if (from.isDirectory()) { + if (toStat != null) { + if (!toStat.isDirectory()) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } + boolean dirCreated = fs.mkdirs(to); + LOG.debug("Merging from:{} to:{}, destCreated: {}", from.getPath(), to, dirCreated); + mergePathsInParallel(fs, from, to, context, pool, futures); + } else { + //It is a directory so merge everything in the directories + LOG.debug("Dir merge from : {} to: {}", from.getPath(), to); + mergeDirInParallel(fs, from, to, context, pool, futures); + } + } else { + // Init dir to avoid conflicting multi-threaded rename + boolean dirCreated = fs.mkdirs(to); + LOG.debug("Created dir upfront. from: {} --> to:{}, totalTasks:{}, destCreated: {}", + from.getPath(), to, futures.size(), dirCreated); + mergePathsInParallel(fs, from, to, context, pool, futures); + } + } + } catch (RuntimeException e) { + throw new IOException(e); + } + } + + /** + * Rename the file via the resilient commit helper. + * @param from source filestatus + * @param to destination path + * @throws IOException failure to commit or rename. + */ + private void moveFileInParallelCommit(final FileStatus from, final Path to) + throws IOException { + resilientCommitHelper.commitFile(from, to); + } + + /** + * Merge a directory in parallel fashion. + * + * @param fs + * @param from + * @param to + * @param context + * @throws IOException + */ + private void mergeDirInParallel(FileSystem fs, FileStatus from, Path to, + JobContext context, ExecutorService pool, + BlockingQueue> futures) throws IOException { + + validateThreadPool(pool, futures); + RemoteIterator it = fs.listStatusIterator(from.getPath()); + while(it.hasNext()) { + FileStatus subFrom = it.next(); + Path subTo = new Path(to, subFrom.getPath().getName()); + LOG.debug("Merge from: {}, to: {} in parallel", subFrom.getPath(), subTo); + futures.add(pool.submit(() -> { + mergePathsInParallel(fs, subFrom, subTo, context, pool, futures); + return null; + })); + numberOfTasks.getAndIncrement(); + } + } + private void reportProgress(JobContext context) { if (context instanceof Progressable) { ((Progressable) context).progress(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/ResilientCommitByRenameHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/ResilientCommitByRenameHelper.java new file mode 100644 index 0000000000000..16b450bed2925 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/ResilientCommitByRenameHelper.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.CommonPathCapabilities; +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.util.DurationInfo; + +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +/** + * Support for committing work using etags to recover from failure. + * This is for internal use only. + */ +@VisibleForTesting +public class ResilientCommitByRenameHelper { + + private static final Logger LOG = + LoggerFactory.getLogger(ResilientCommitByRenameHelper.class); + + /** + * IO callbacks. + */ + private final FileSystemOperations operations; + + /** + * Is attempt recovery enabled? + */ + private final boolean renameRecoveryAvailable; + + /** + * Counter of times recovery took place. + */ + private final AtomicInteger recoveryCount = new AtomicInteger(); + + /** + * Instantiate. + * @param fileSystem filesystem to work with. + * @param finalOutput output path under which renames take place + * @param attemptRecovery attempt recovery if the store has etags. + */ + public ResilientCommitByRenameHelper(final FileSystem fileSystem, + final Path finalOutput, + final boolean attemptRecovery) { + this(new FileSystemOperations(requireNonNull(fileSystem)), + finalOutput, attemptRecovery); + } + + /** + * Instantiate. + * @param operations store operations + * @param finalOutput output path under which renames take place + * @param attemptRecovery attempt recovery if the store has etags. + */ + @VisibleForTesting + public ResilientCommitByRenameHelper( + final FileSystemOperations operations, + final Path finalOutput, + final boolean attemptRecovery) { + this.operations = operations; + // enable recovery if requested and the store supports it. + this.renameRecoveryAvailable = attemptRecovery + && operations.storePreservesEtagsThroughRenames(finalOutput); + } + + /** + * Is resilient commit available? + * @return true if the resilient commit API can be used + */ + public boolean isRenameRecoveryAvailable() { + + return renameRecoveryAvailable; + } + + /** + * Get count of rename failures recovered from. + * @return count of recoveries. + */ + public int getRecoveryCount() { + return recoveryCount.get(); + } + + /** + * What is the resilence of this filesystem? + * @param fs filesystem + * @param path path to use + * @return true if the conditions of use are met. + */ + public static boolean filesystemHasResilientCommmit( + final FileSystem fs, + final Path path) { + try { + return fs.hasPathCapability(path, + CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME); + } catch (IOException ignored) { + return false; + } + } + + /** + * Commit a file. + * Rename a file from source to dest with recovery attempted + * if the operation raises an exception/returns false, and + * recovery is enabled. + * @param sourceStatus source status file + * @param dest destination path + * @return the outcome + * @throws IOException failure + * @throws PathIOException if the rename() call returned false. + */ + public CommitOutcome commitFile( + final FileStatus sourceStatus, final Path dest) + throws IOException { + final Path source = sourceStatus.getPath(); + String operation = String.format("rename(%s, %s)", source, dest); + + try (DurationInfo du = new DurationInfo(LOG, false, + "%s with status %s", operation, sourceStatus)) { + if (operations.renameFile(source, dest)) { + // success + return new CommitOutcome(); + } else { + // no caught exception; generate one with status info. + // if this triggers a FNFE from the missing source, + throw escalateRenameFailure(source, dest); + } + } catch (FileNotFoundException caughtException) { + // any other IOE is passed up; + // recovery only cares about reporting of + // missing files + LOG.debug("{} raised a FileNotFoundException", operation, caughtException); + // failure. + // Start with etag checking of the source entry and + // the destination file status. + String sourceEtag = getEtag(sourceStatus); + + if (renameRecoveryAvailable && !isEmpty(sourceEtag)) { + LOG.info("{} Failure, starting etag checking with source etag {}", + operation, sourceEtag); + final FileStatus currentSourceStatus = operations.getFileStatusOrNull(source); + if (currentSourceStatus != null) { + // source is still there so whatever happened, the rename + // hasn't taken place. + // (for example, dest parent path not present) + LOG.info("{}: source is still present; not checking destination", operation); + throw caughtException; + } + + // the source is missing, we have an etag passed down, so + // probe for a destination + LOG.debug("{}: source is missing; checking destination", operation); + + // get the destination status and its etag, if any. + final FileStatus destStatus = operations.getFileStatusOrNull(dest); + String destEtag = getEtag(destStatus); + if (sourceEtag.equals(destEtag)) { + // rename failed somehow + // but the etag comparision implies all was good. + LOG.info("{} failed but etag comparison of" + + " source {} and destination status {} determined the rename had succeeded", + operation, sourceStatus, destStatus); + + // and so return successfully with a report which can be used by + // the committer for its statistics + recoveryCount.incrementAndGet(); + return new CommitOutcome(true, caughtException); + } else { + // failure of etag checking, either dest is absent + // or the tags don't match. report and fall through + // to the exception rethrow + + LOG.info("{}: etag comparison of" + + " source {} and destination status {} did not match; failing", + operation, sourceStatus, destStatus); + } + } + + // etag comparison failure/unsupported. Fail the operation. + // throw the caught exception + throw caughtException; + } + + } + + /** + * Get an etag from a FileStatus if it provides one. + * @param status the status; may be null. + * @return the etag or null/empty if not provided + */ + private String getEtag(FileStatus status) { + if (status instanceof EtagSource) { + return ((EtagSource) status).getEtag(); + } else { + return null; + } + } + + /** + * Escalate a rename failure to an exception. + * @param source source path + * @param dest dest path + * @return an exception to throw + * @throws FileNotFoundException if source is absent + * @throws IOException other getFileStatus failure + */ + private PathIOException escalateRenameFailure(Path source, Path dest) + throws IOException { + // rename just returned false. + // collect information for a meaningful error message + // and include in an exception raised. + + // get the source status; this will implicitly raise + // a FNFE. + final FileStatus sourceStatus = operations.getFileStatus(source); + + LOG.error("Failure to rename {} to {} with" + + " source status {}", + source, dest, + sourceStatus); + + return new PathIOException(source.toString(), + "Failed to rename to " + dest); + } + + @Override + public String toString() { + return "ResilientCommitByRenameHelper{" + + "renameRecoveryAvailable=" + renameRecoveryAvailable + + ", recoveries=" + recoveryCount.get() + + '}'; + } + + + /** + * Outcome from the commit. + */ + public static final class CommitOutcome { + + /** + * Rename failed but etag checking concluded it finished. + */ + private final boolean renameFailureResolvedThroughEtags; + + /** + * Any exception caught before etag checking succeeded. + */ + private final IOException caughtException; + + /** + * Success constructor. + */ + CommitOutcome() { + this(false, null); + } + + /** + * Full constructor; used on failures. + * @param renameFailureResolvedThroughEtags was a rename failure recovered? + * @param caughtException Any exception caught before etag checking succeeded. + */ + CommitOutcome( + boolean renameFailureResolvedThroughEtags, + IOException caughtException) { + this.renameFailureResolvedThroughEtags = renameFailureResolvedThroughEtags; + this.caughtException = caughtException; + } + + public boolean isRenameFailureResolvedThroughEtags() { + return renameFailureResolvedThroughEtags; + } + + public IOException getCaughtException() { + return caughtException; + } + + @Override + public String toString() { + return "CommitOutcome{" + + "renameFailureResolvedThroughEtags=" + renameFailureResolvedThroughEtags + + ", caughtException=" + caughtException + + '}'; + } + } + + /** + * Class for FS callbacks; designed to be overridden + * for tests simulating etag mismatch. + */ + @VisibleForTesting + public static class FileSystemOperations { + + /** + * Target FS. + */ + private final FileSystem fileSystem; + + /** + * Constructor. + * @param fileSystem filesystem to invoke. + */ + public FileSystemOperations(final FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + /** + * Forward to {@link FileSystem#getFileStatus(Path)}. + * @param path path + * @return status + * @throws IOException failure. + */ + public FileStatus getFileStatus(Path path) throws IOException { + return fileSystem.getFileStatus(path); + } + + /** + * Get a file status value or, if the operation failed + * for any reason, return null. + * This is used for reporting/probing the files. + * @param path path + * @return status or null + */ + public FileStatus getFileStatusOrNull(final Path path) { + try { + return getFileStatus(path); + } catch (IOException e) { + return null; + } + } + + /** + * Forward to {@link FileSystem#rename(Path, Path)}. + * Usual "what does 'false' mean" ambiguity. + * @param source source file + * @param dest destination path -which must not exist. + * @return true if the file was renamed. + * @throws IOException failure. + */ + public boolean renameFile(Path source, Path dest) + throws IOException { + return fileSystem.rename(source, dest); + } + + /** + * Probe filesystem capabilities. + * @param path path to probe. + * @return true if the FS declares its renames work. + */ + public boolean storePreservesEtagsThroughRenames(Path path) { + return filesystemHasResilientCommmit(fileSystem, path); + } + + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index 526485df93490..290e1c6d40a52 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -23,6 +23,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -30,7 +32,15 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createSubdirs; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME; import static org.junit.Assert.*; + import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.Assert; @@ -39,7 +49,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -47,6 +60,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -55,6 +69,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +82,8 @@ import static org.mockito.Mockito.verify; @SuppressWarnings("unchecked") -public class TestFileOutputCommitter { +@RunWith(Parameterized.class) +public class TestFileOutputCommitter extends AbstractHadoopTestBase { private static final Path outDir = new Path( System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), @@ -72,6 +91,7 @@ public class TestFileOutputCommitter { private final static String SUB_DIR = "SUB_DIR"; private final static Path OUT_SUB_DIR = new Path(outDir, SUB_DIR); + static final String EXCEPTION_DURING_PROGRESS = "Throwing exception during progress"; private static final Logger LOG = LoggerFactory.getLogger(TestFileOutputCommitter.class); @@ -88,14 +108,28 @@ public class TestFileOutputCommitter { private Text key2 = new Text("key2"); private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); + private final int mvThreads; + + public TestFileOutputCommitter(int threads) { + this.mvThreads = threads; + } + + /** + * Test parameter is thread count. + * @return test params + */ + @Parameterized.Parameters(name="t-{0}") + public static Collection getParameters() { + // -1 is covered in separate test case + return Arrays.asList(new Object[]{0, 1, 2, 4}); + } - private static void cleanup() throws IOException { Configuration conf = new Configuration(); FileSystem fs = outDir.getFileSystem(conf); fs.delete(outDir, true); } - + @Before public void setUp() throws IOException { cleanup(); @@ -144,6 +178,7 @@ private void testRecoveryInternal(int commitVersion, int recoveryVersion) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, @@ -204,6 +239,14 @@ private void testRecoveryInternal(int commitVersion, int recoveryVersion) FileUtil.fullyDelete(new File(outDir.toString())); } + /** + * Apply test parameters. + * @param conf configuration to patch. + */ + private void applyParameters(final Configuration conf) { + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, mvThreads); + } + @Test public void testRecoveryV1() throws Exception { testRecoveryInternal(1, 1); @@ -237,6 +280,19 @@ private void validateContent(File dir) throws IOException { assertThat(output).isEqualTo(expectedOutput.toString()); } + private void validateSpecificFile(File expectedFile) throws IOException { + assertTrue("Could not find "+expectedFile, expectedFile.exists()); + StringBuffer expectedOutput = new StringBuffer(); + expectedOutput.append(key1).append('\t').append(val1).append("\n"); + expectedOutput.append(val1).append("\n"); + expectedOutput.append(val2).append("\n"); + expectedOutput.append(key2).append("\n"); + expectedOutput.append(key1).append("\n"); + expectedOutput.append(key2).append('\t').append(val2).append("\n"); + String output = slurp(expectedFile); + assertEquals(output, expectedOutput.toString()); + } + private void validateMapFileOutputContent( FileSystem fs, Path dir) throws IOException { // map output is a directory with index and data files @@ -266,6 +322,7 @@ private void testCommitterInternal(int version, boolean taskCleanup) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt( FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, @@ -288,7 +345,7 @@ private void testCommitterInternal(int version, boolean taskCleanup) // check task and job temp directories exist File jobOutputDir = new File( - new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString()); + new Path(outDir, PENDING_DIR_NAME).toString()); File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( committer.getWorkPath()).toString()); assertTrue("job temp dir does not exist", jobOutputDir.exists()); @@ -316,6 +373,272 @@ private void testCommitterInternal(int version, boolean taskCleanup) FileUtil.fullyDelete(new File(outDir.toString())); } + @Test + public void testNegativeThreadCount() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, -1); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + assertFalse("Threadpool disabled for v1 with -1 thread count", + committer.isParallelMoveEnabled()); + assertEquals("Threadpool disabled for thread config of -1", + 1, committer.getMoveThreads()); + } + + @Test + public void testThreadsWithAlgoV2() throws Exception { + testThreadsWithAlgoV2(mvThreads); + } + + @Test + public void testNegativeThreadCountAlgoV2() throws Exception { + testThreadsWithAlgoV2(-1); + } + + public void testThreadsWithAlgoV2(int threads) throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 2); + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, threads); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + assertFalse("Threadpool disabled for algo v2", committer.isParallelMoveEnabled()); + } + + private void createAndCommitTask(Configuration conf, String attemptId, TaskAttemptID tID, + int version, boolean taskCleanup, final boolean setupJob) + throws IOException, InterruptedException { + + applyParameters(conf); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attemptId); + conf.setInt( + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + version); + conf.setBoolean( + FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, + taskCleanup); + JobContext jContext = new JobContextImpl(conf, tID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, tID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + if (setupJob) { + committer.setupJob(jContext); + } + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + String filename = String.format("dummy-file-%s-", tID.getTaskID()); + ContractTestUtils.TreeScanResults created = + createSubdirs(FileSystem.get(conf), + committer.getWorkPath(), 3, 3, 2, 0, + "sub-dir-", filename, "0"); + LOG.info("Created subdirs: {}, toString: {}", created.getDirectories(), + created); + + // check task and job temp directories exist + File jobOutputDir = new File( + new Path(outDir, PENDING_DIR_NAME).toString()); + File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( + committer.getWorkPath()).toString()); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + + // do commit + committer.commitTask(tContext); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + + if (version == 1 || taskCleanup) { + // Task temp dir gets renamed in v1 and deleted if taskCleanup is + // enabled in v2 + assertFalse("task temp dir still exists", taskOutputDir.exists()); + } else { + // By default, in v2 the task temp dir is only deleted during commitJob + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + } + } + + private void createNTasks(Configuration conf, int version, boolean taskCleanup) + throws IOException, InterruptedException { + for (int i = 0; i <= 9; i++) { + String attemptId = String.format("attempt_200707121733_0001_m_%03d_0", i); + TaskAttemptID tid = TaskAttemptID.forName(attemptId); + createAndCommitTask(conf, attemptId, tid, version, taskCleanup, i == 0); + } + } + + private void testCommitterInternalWithMultipleTasks(int version, boolean taskCleanup, + boolean parallelCommit) throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + applyParameters(conf); + conf.setBoolean(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT, parallelCommit); + + // Create multiple tasks and commit + createNTasks(conf, version, taskCleanup); + + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + FileOutputCommitter committer = new FileOutputCommitter(outDir, jContext); + try (DurationInfo du = new DurationInfo(LOG, + "commit job with with mvThreads: %s", mvThreads)) { + committer.commitJob(jContext); + } + + //Verify if temp dirs are cleared up + if (committer.hasOutputPath()) { + Path path = new Path(committer.getOutputPath(), PENDING_DIR_NAME); + final FileSystem fs = path.getFileSystem(conf); + ContractTestUtils.assertPathDoesNotExist(fs, + "Job attempt path should have been deleted", + path); + } + + RemoteIterator it = FileSystem.get(conf).listFiles(outDir, true); + while(it.hasNext()) { + LocatedFileStatus fileStatus = it.next(); + Path file = fileStatus.getPath(); + if (file.getName().equals("_SUCCESS")) { + continue; + } + // Validate only real file (ignoring dummy-file-* created via createSubdirs() here). + if (fileStatus.isFile() && !file.getName().contains("dummy-file-")) { + LOG.info("validate file:{}", file); + validateSpecificFile(new File(file.toUri().getPath())); + } else { + LOG.info("Not validating {}", file.toString()); + } + } + FileUtil.fullyDelete(new File(outDir.toString())); + } + + private void testAbortWithMultipleTasksV1(int version, boolean taskCleanup, + boolean parallelCommit) throws IOException, InterruptedException { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + applyParameters(conf); + conf.setBoolean(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_PARALLEL_TASK_COMMIT, parallelCommit); + + // Create multiple tasks and commit + createNTasks(conf, version, taskCleanup); + + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + FileOutputCommitter committer = new FileOutputCommitter(outDir, jContext); + LOG.info("Running with mvThreads:{}", mvThreads); + // Abort the job + committer.abortJob(jContext, JobStatus.State.FAILED); + File expectedFile = new File(new Path(outDir, PENDING_DIR_NAME) + .toString()); + assertFalse("job temp dir still exists", expectedFile.exists()); + assertEquals("Output directory not empty", 0, new File(outDir.toString()) + .listFiles().length); + verifyNumScheduledTasks(committer); + FileUtil.fullyDelete(new File(outDir.toString())); + } + + @Test + public void testCommitterInternalWithMultipleTasksV1() throws Exception { + testCommitterInternalWithMultipleTasks(1, true, false); + } + + @Test + public void testCommitterInternalWithMultipleTasksV1Parallel() throws Exception { + testCommitterInternalWithMultipleTasks(1, true, true); + } + + @Test + public void testAbortWithMultipleTasksV1() throws IOException, InterruptedException { + testAbortWithMultipleTasksV1(1, true, false); + } + @Test + public void testAbortWithMultipleTasksV1Parallel() throws IOException, InterruptedException { + testAbortWithMultipleTasksV1(1, true, true); + } + + static class CustomJobContextImpl extends JobContextImpl implements Progressable { + private FileOutputCommitter committer; + + CustomJobContextImpl(Configuration conf, JobID jobId) { + super(conf, jobId); + } + + public void progress() { + if (committer != null && committer.isParallelMoveEnabled()) { + throw new RuntimeException(EXCEPTION_DURING_PROGRESS + + ". moveThreads " + + committer.getMoveThreads()); + } + } + + public void setCommitter(FileOutputCommitter committer) { + this.committer = committer; + } + } + + @Test + public void testV1CommitterInternalWithException() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + applyParameters(conf); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt( + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1); + conf.setBoolean( + FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, true); + // Custom job context which can be used for triggering exceptions + CustomJobContextImpl jContext = new CustomJobContextImpl(conf, taskID.getJobID()); + TaskAttemptContextImpl tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + //This will help in triggering exceptions when parallel threads are enabled + jContext.setCommitter(committer); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // check task and job temp directories exist + File jobOutputDir = new File( + new Path(outDir, PENDING_DIR_NAME).toString()); + File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( + committer.getWorkPath()).toString()); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + + // do commit + committer.commitTask(tContext); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + + try { + committer.commitJob(jContext); + if (committer.isParallelMoveEnabled()) { + // Exception is thrown from CustomJobContextImpl, only when parallel file moves are enabled. + Assert.fail("Commit successful: wrong behavior for version 1. moveThreads:" + mvThreads); + } + } catch(IOException e) { + if (committer.isParallelMoveEnabled()) { + GenericTestUtils.assertExceptionContains(EXCEPTION_DURING_PROGRESS, e); + } + } + + // Clear off output dir + FileUtil.fullyDelete(new File(outDir.toString())); + } + @Test public void testCommitterV1() throws Exception { testCommitterInternal(1, false); @@ -346,6 +669,7 @@ private void testCommitterWithDuplicatedCommitInternal(int version) throws Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -400,6 +724,7 @@ private void testCommitterWithFailureInternal(int version, int maxAttempts) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -444,6 +769,7 @@ public void testProgressDuringMerge() throws Exception { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 2); @@ -484,6 +810,7 @@ private void testCommitterRetryInternal(int version) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -539,6 +866,7 @@ private void testMapFileOutputCommitterInternal(int version) Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -587,6 +915,7 @@ public void testInvalidVersionNumber() throws IOException { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); @@ -598,11 +927,17 @@ public void testInvalidVersionNumber() throws IOException { } } + private void verifyNumScheduledTasks(FileOutputCommitter committer) { + assertEquals("Scheduled tasks should have been 0 after shutting down thread pool", + 0, committer.getNumCompletedTasks()); + } + private void testAbortInternal(int version) throws IOException, InterruptedException { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); + applyParameters(conf); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); @@ -626,11 +961,12 @@ private void testAbortInternal(int version) assertFalse("task temp dir still exists", expectedFile.exists()); committer.abortJob(jContext, JobStatus.State.FAILED); - expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME) + expectedFile = new File(new Path(outDir, PENDING_DIR_NAME) .toString()); assertFalse("job temp dir still exists", expectedFile.exists()); assertEquals("Output directory not empty", 0, new File(outDir.toString()) .listFiles().length); + verifyNumScheduledTasks(committer); FileUtil.fullyDelete(new File(outDir.toString())); } @@ -713,6 +1049,7 @@ private void testFailAbortInternal(int version) assertTrue(th.getMessage().contains("fake delete failed")); assertTrue("job temp dir does not exists", jobTmpDir.exists()); FileUtil.fullyDelete(new File(outDir.toString())); + verifyNumScheduledTasks(committer); } @Test @@ -816,6 +1153,7 @@ public Void call() throws IOException, InterruptedException { // validate output validateContent(OUT_SUB_DIR); FileUtil.fullyDelete(new File(outDir.toString())); + verifyNumScheduledTasks(amCommitter); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java index 19b712f3da19b..670cff63c4544 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java @@ -20,6 +20,8 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -36,16 +38,35 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; /** * A JUnit test to test Map-Reduce job committer. */ +@RunWith(Parameterized.class) public class TestJobOutputCommitter extends HadoopTestCase { - public TestJobOutputCommitter() throws IOException { - super(CLUSTER_MR, LOCAL_FS, 1, 1); + public TestJobOutputCommitter(int mrMode, int fsMode, int taskTrackers, int dataNodes, + int mvThreads) throws IOException { + super(mrMode, fsMode, taskTrackers, dataNodes); + this.mvThreads = mvThreads; + } + + @Parameterized.Parameters + public static Collection getParameters() { + // CLUSTER_MR, LOCAL_FS, taskTrackers, dataNodes, mvThreads + return Arrays.asList(new Object[][] { + {2, 4, 1, 1, 1}, + {2, 4, 1, 1, 2}, + {2, 4, 1, 1, 4}, + {2, 4, 1, 1, 8}, + {2, 4, 1, 1, 10}, + }); } private static String TEST_ROOT_DIR = new File(System.getProperty( @@ -58,11 +79,13 @@ public TestJobOutputCommitter() throws IOException { private static int outDirs = 0; private FileSystem fs; private Configuration conf = null; + private int mvThreads; @Before public void setUp() throws Exception { super.setUp(); conf = createJobConf(); + conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_V1_MV_THREADS, mvThreads); fs = getFileSystem(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 5c806e10e73d8..0f9b811fbcf24 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -306,6 +306,11 @@ public class AbfsConfiguration{ FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR) private boolean enableAbfsListIterator; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_RENAME_RAISES_EXCEPTIONS, + DefaultValue = DEFAULT_FS_AZURE_RENAME_RAISES_EXCEPTIONS) + private boolean renameRaisesExceptions; + public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, InvalidConfigurationValueException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( @@ -779,6 +784,14 @@ public boolean shouldTrackLatency() { return this.trackLatency; } + /** + * Should rename raise meaningful exceptions on failure? + * @return true if rename is to fail meaningfully + */ + public boolean getRenameRaisesExceptions() { + return renameRaisesExceptions; + } + public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException { AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); if (authType == AuthType.OAuth) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index d30129b7a7d68..0af07b90c01fd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -436,16 +436,23 @@ public boolean rename(final Path src, final Path dst) throws IOException { abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext); return true; } catch(AzureBlobFileSystemException ex) { - LOG.debug("Rename operation failed. ", ex); - checkException( - src, - ex, - AzureServiceErrorCode.PATH_ALREADY_EXISTS, - AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, - AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, - AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, - AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, - AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); + LOG.debug("Rename({}, {}) operation failed.", + qualifiedSrcPath, qualifiedDstPath, ex); + if (!abfsStore.getAbfsConfiguration().getRenameRaisesExceptions()) { + // exceptions are downgraded to returning false. + checkException( + src, + ex, + AzureServiceErrorCode.PATH_ALREADY_EXISTS, + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, + AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, + AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, + AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); + } else { + // all exceptions are raised. + checkException(src, ex); + } return false; } @@ -1498,6 +1505,8 @@ public boolean hasPathCapability(final Path path, final String capability) switch (validatePathCapabilityArgs(p, capability)) { case CommonPathCapabilities.FS_PERMISSIONS: case CommonPathCapabilities.FS_APPEND: + case CommonPathCapabilities.ETAGS_AVAILABLE: + case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME: return true; case CommonPathCapabilities.FS_ACLS: return getIsNamespaceEnabled( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 4fd194b1e01cb..656f82186a09c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -65,6 +65,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.EtagSource; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -975,7 +976,7 @@ public FileStatus getFileStatus(final Path path, final long blockSize = abfsConfiguration.getAzureBlockSize(); final AbfsHttpOperation result = op.getResult(); - final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + String eTag = extractEtagHeader(result); final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS)); final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); @@ -1733,8 +1734,25 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) { return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName); } - private static class VersionedFileStatus extends FileStatus { - private final String version; + /** + * A File status with version info extracted from the etag value returned + * in a LIST or HEAD requiest. + * The etag is included in the java serialization. + */ + private static class VersionedFileStatus extends FileStatus + implements EtagSource { + + /** + * The superclass is declared serializable; this subclass can also + * be serialized. + */ + private static final long serialVersionUID = -2009013240419749458L; + + /** + * The etag of an object. + * Not-final so that serialization via reflection will preserve the value. + */ + private String version; VersionedFileStatus( final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl, @@ -1797,6 +1815,11 @@ public String getVersion() { return this.version; } + @Override + public String getEtag() { + return getVersion(); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( @@ -1902,4 +1925,30 @@ boolean areLeasesFreed() { } return true; } + + /** + * Get the etag header from a response, stripping any quotations. + * see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag + * @param result response to process. + * @return the quote-unwrapped etag. + */ + private static String extractEtagHeader(AbfsHttpOperation result) { + String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + if (etag != null) { + // strip out any wrapper "" quotes which come back, for consistency with + // list calls + if (etag.startsWith("W/\"")) { + // Weak etag + etag = etag.substring(3); + } else if (etag.startsWith("\"")) { + // strong etag + etag = etag.substring(1); + } + if (etag.endsWith("\"")) { + // trailing quote + etag = etag.substring(0, etag.length() - 1); + } + } + return etag; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 12beb5a9bbabe..0e7446c2436c4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -246,5 +246,15 @@ public static String accountProperty(String property, String account) { * @see FileSystem#openFile(org.apache.hadoop.fs.Path) */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; + + /** + * Should rename raise meaningful exceptions on failure? + * This is good for reporting and error handling -but it + * does diverge from what HDFS does. + */ + public static final String FS_AZURE_RENAME_RAISES_EXCEPTIONS = + "fs.azure.rename.raises.exceptions"; + public static final boolean DEFAULT_FS_AZURE_RENAME_RAISES_EXCEPTIONS = false; + private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 17d5c03a819ff..ef0b8f9a88ece 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -28,7 +28,6 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.time.Instant; import java.util.ArrayList; import java.util.Base64; import java.util.List; @@ -63,7 +62,6 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; -import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; @@ -507,75 +505,8 @@ public AbfsRestOperation renamePath(String source, final String destination, HTTP_METHOD_PUT, url, requestHeaders); - Instant renameRequestStartTime = Instant.now(); - try { - op.execute(tracingContext); - } catch (AzureBlobFileSystemException e) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw e; - } - final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp( - renameRequestStartTime, op, destination, tracingContext); - if (idempotencyOp.getResult().getStatusCode() - == op.getResult().getStatusCode()) { - // idempotency did not return different result - // throw back the exception - throw e; - } else { - return idempotencyOp; - } - } - - return op; - } - - /** - * Check if the rename request failure is post a retry and if earlier rename - * request might have succeeded at back-end. - * - * If there is a parallel rename activity happening from any other store - * interface, the logic here will detect the rename to have happened due to - * the one initiated from this ABFS filesytem instance as it was retried. This - * should be a corner case hence going ahead with LMT check. - * @param renameRequestStartTime startTime for the rename request - * @param op Rename request REST operation response with non-null HTTP response - * @param destination rename destination path - * @param tracingContext Tracks identifiers for request header - * @return REST operation response post idempotency check - * @throws AzureBlobFileSystemException if GetFileStatus hits any exception - */ - public AbfsRestOperation renameIdempotencyCheckOp( - final Instant renameRequestStartTime, - final AbfsRestOperation op, - final String destination, - TracingContext tracingContext) throws AzureBlobFileSystemException { - Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); - if ((op.isARetriedRequest()) - && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)) { - // Server has returned HTTP 404, which means rename source no longer - // exists. Check on destination status and if it has a recent LMT timestamp. - // If yes, return success, else fall back to original rename request failure response. - - try { - final AbfsRestOperation destStatusOp = getPathStatus(destination, - false, tracingContext); - if (destStatusOp.getResult().getStatusCode() - == HttpURLConnection.HTTP_OK) { - String lmt = destStatusOp.getResult().getResponseHeader( - HttpHeaderConfigurations.LAST_MODIFIED); - - if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) { - LOG.debug("Returning success response from rename idempotency logic"); - return destStatusOp; - } - } - } catch (AzureBlobFileSystemException e) { - // GetFileStatus on the destination failed, return original op - return op; - } - } - + // no attempt at recovery using timestamps as it was not reliable. + op.execute(tracingContext); return op; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index b12af5b0826ab..716c101493b3f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.azurebfs; -import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -26,36 +25,17 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.assertj.core.api.Assertions; -import org.junit.Test; import org.junit.Assert; +import org.junit.Test; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; -import org.apache.hadoop.fs.azurebfs.services.AbfsClient; -import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; -import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; -import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; -import static java.net.HttpURLConnection.HTTP_OK; -import static java.util.UUID.randomUUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome; -import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; - -import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test rename operation. @@ -63,9 +43,6 @@ public class ITestAzureBlobFileSystemRename extends AbstractAbfsIntegrationTest { - private static final int REDUCED_RETRY_COUNT = 1; - private static final int REDUCED_MAX_BACKOFF_INTERVALS_MS = 5000; - public ITestAzureBlobFileSystemRename() throws Exception { super(); } @@ -190,152 +167,4 @@ public void testPosixRenameDirectory() throws Exception { new Path(testDir2 + "/test1/test2/test3")); } - @Test - public void testRenameRetryFailureAsHTTP400() throws Exception { - // Rename failed as Bad Request - // RenameIdempotencyCheck should throw back the rename failure Op - testRenameTimeout(HTTP_BAD_REQUEST, HTTP_BAD_REQUEST, false, - "renameIdempotencyCheckOp should return rename BadRequest " - + "response itself."); - } - - @Test - public void testRenameRetryFailureAsHTTP404() throws Exception { - // Rename failed as FileNotFound and the destination LMT is - // within TimespanForIdentifyingRecentOperationThroughLMT - testRenameTimeout(HTTP_NOT_FOUND, HTTP_OK, false, - "Rename should return success response because the destination " - + "path is present and its LMT is within " - + "TimespanForIdentifyingRecentOperationThroughLMT."); - } - - @Test - public void testRenameRetryFailureWithDestOldLMT() throws Exception { - // Rename failed as FileNotFound and the destination LMT is - // older than TimespanForIdentifyingRecentOperationThroughLMT - testRenameTimeout(HTTP_NOT_FOUND, HTTP_NOT_FOUND, true, - "Rename should return original rename failure response " - + "because the destination path LMT is older than " - + "TimespanForIdentifyingRecentOperationThroughLMT."); - } - - @Test - public void testRenameIdempotencyTriggerHttpNotFound() throws Exception { - AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class); - when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND); - - AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class); - when(http200Op.getStatusCode()).thenReturn(HTTP_OK); - - // Check 1 where idempotency check fails to find dest path - // Rename should throw exception - testRenameIdempotencyTriggerChecks(http404Op); - - // Check 2 where idempotency check finds the dest path - // Renam will be successful - testRenameIdempotencyTriggerChecks(http200Op); - } - - private void testRenameIdempotencyTriggerChecks( - AbfsHttpOperation idempotencyRetHttpOp) throws Exception { - - final AzureBlobFileSystem fs = getFileSystem(); - AbfsClient client = TestAbfsClient.getMockAbfsClient( - fs.getAbfsStore().getClient(), - this.getConfiguration()); - - AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class); - when(idempotencyRetOp.getResult()).thenReturn(idempotencyRetHttpOp); - doReturn(idempotencyRetOp).when(client).renameIdempotencyCheckOp(any(), - any(), any(), any()); - when(client.renamePath(any(), any(), any(), any())).thenCallRealMethod(); - - // rename on non-existing source file will trigger idempotency check - if (idempotencyRetHttpOp.getStatusCode() == HTTP_OK) { - // idempotency check found that destination exists and is recently created - Assertions.assertThat(client.renamePath( - "/NonExistingsourcepath", - "/destpath", - null, - getTestTracingContext(fs, true)) - .getResult() - .getStatusCode()) - .describedAs("Idempotency check reports recent successful " - + "rename. 200OK should be returned") - .isEqualTo(idempotencyRetOp.getResult().getStatusCode()); - } else { - // rename dest not found. Original exception should be returned. - intercept(AbfsRestOperationException.class, - () -> client.renamePath( - "/NonExistingsourcepath", - "/destpath", - "", - getTestTracingContext(fs, true))); - } - } - - private void testRenameTimeout( - int renameRequestStatus, - int renameIdempotencyCheckStatus, - boolean isOldOp, - String assertMessage) throws Exception { - // Config to reduce the retry and maxBackoff time for test run - AbfsConfiguration abfsConfig - = TestAbfsConfigurationFieldsValidation.updateRetryConfigs( - getConfiguration(), - REDUCED_RETRY_COUNT, REDUCED_MAX_BACKOFF_INTERVALS_MS); - - final AzureBlobFileSystem fs = getFileSystem(); - AbfsClient abfsClient = fs.getAbfsStore().getClient(); - AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext( - abfsClient, - abfsConfig); - - // Mock instance of AbfsRestOperation - AbfsRestOperation op = mock(AbfsRestOperation.class); - // Set retryCount to non-zero - when(op.isARetriedRequest()).thenReturn(true); - - // Mock instance of Http Operation response. This will return HTTP:Bad Request - AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class); - when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST); - - // Mock instance of Http Operation response. This will return HTTP:Not Found - AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class); - when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND); - - Path destinationPath = fs.makeQualified( - new Path("destination" + randomUUID().toString())); - - Instant renameRequestStartTime = Instant.now(); - - if (renameRequestStatus == HTTP_BAD_REQUEST) { - when(op.getResult()).thenReturn(http400Op); - } else if (renameRequestStatus == HTTP_NOT_FOUND) { - // Create the file new. - fs.create(destinationPath).close(); - when(op.getResult()).thenReturn(http404Op); - - if (isOldOp) { - // instead of sleeping for DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS - // which will affect test run time - // will modify renameRequestStartTime to a future time so that - // lmt will qualify for old op - renameRequestStartTime = renameRequestStartTime.plusSeconds( - DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS); - } - - } - - Assertions.assertThat(testClient.renameIdempotencyCheckOp( - renameRequestStartTime, - op, - destinationPath.toUri().getPath(), - getTestTracingContext(fs, true)) - .getResult() - .getStatusCode()) - .describedAs(assertMessage) - .isEqualTo(renameIdempotencyCheckStatus); - } - } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemResilientCommit.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemResilientCommit.java new file mode 100644 index 0000000000000..83192b35b7e55 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemResilientCommit.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapreduce.lib.output.ResilientCommitByRenameHelper; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_RENAME_RAISES_EXCEPTIONS; +import static org.apache.hadoop.fs.contract.ContractTestUtils.toAsciiByteArray; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test the {@link ResilientCommitByRenameHelper}. + * Parameterized on whether or not the FS + * raises exceptions on rename failures. + * The outcome must be the same through the commit helper; + * exceptions and error messages will be different. + */ +@RunWith(Parameterized.class) +public class ITestAzureBlobFileSystemResilientCommit + extends AbstractAbfsIntegrationTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAzureBlobFileSystemResilientCommit.class); + private static final byte[] DATA = toAsciiByteArray("hello"); + private static final byte[] DATA2 = toAsciiByteArray("world"); + + /** + * is the test FS set up to raise exceptions on rename failures? + */ + private final boolean raiseExceptions; + + /** + * error keyword from azure storage when exceptions are being + * raised. + */ + public static final String E_NO_SOURCE = "SourcePathNotFound"; + + public ITestAzureBlobFileSystemResilientCommit( + final boolean raiseExceptions) throws Exception { + this.raiseExceptions = raiseExceptions; + } + + /** + * Does FS raise exceptions? + * @return test params + */ + @Parameterized.Parameters(name = "raising-{0}") + public static Collection getParameters() { + // -1 is covered in separate test case + return Arrays.asList(true, false); + } + + /** + * FS raising exceptions on rename. + */ + private AzureBlobFileSystem targetFS; + private Path outputPath; + private ResilientCommitByRenameHelper commitHelper; + private Path sourcePath; + private Path destPath; + + @Override + public void setup() throws Exception { + super.setup(); + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration conf = new Configuration(this.getRawConfiguration()); + conf.setBoolean(FS_AZURE_RENAME_RAISES_EXCEPTIONS, raiseExceptions); + + targetFS = (AzureBlobFileSystem) FileSystem.newInstance( + currentFs.getUri(), + conf); + Assertions.assertThat( + targetFS.getConf().getBoolean(FS_AZURE_RENAME_RAISES_EXCEPTIONS, false)) + .describedAs("FS raises exceptions on rename %s", targetFS) + .isEqualTo(raiseExceptions); + outputPath = path(getMethodName()); + sourcePath = new Path(outputPath, "source"); + destPath = new Path(outputPath, "dest"); + targetFS.mkdirs(outputPath); + + commitHelper = new ResilientCommitByRenameHelper( + targetFS, + outputPath, true); + + } + + @Override + public void teardown() throws Exception { + IOUtils.cleanupWithLogger(LOG, targetFS); + super.teardown(); + } + + /** + * Create a file; return the status. + * @param path file path + * @param data text of file + * @return the status + * @throws IOException creation failure + */ + FileStatus file(Path path, byte[] data) throws IOException { + ContractTestUtils.createFile(targetFS, path, true, + data); + return targetFS.getFileStatus(path); + } + + /** + * Make sure the filesystem resilience matches the text + * expectations. + */ + @Test + public void testVerifyResilient() { + Assertions.assertThat(commitHelper.isRenameRecoveryAvailable()) + .describedAs("recovery availability of %s", commitHelper) + .isTrue(); + } + + @Test + public void testSimpleRename() throws Throwable { + describe("simple rename succeeds and then fails"); + file(sourcePath, DATA); + targetFS.rename(sourcePath, destPath); + ContractTestUtils.verifyFileContents(targetFS, destPath, DATA); + ContractTestUtils.assertPathDoesNotExist(targetFS, + "source", sourcePath); + // attempt 2 fails differently depending + // on the FS settings + if (raiseExceptions) { + intercept(FileNotFoundException.class, + E_NO_SOURCE, + () -> targetFS.rename(sourcePath, destPath)); + } else { + Assertions.assertThat(targetFS.rename(sourcePath, destPath)) + .describedAs("return value of rename") + .isFalse(); + } + } + + @Test + public void testSimpleRenameNoSource() throws Throwable { + describe("simple rename fails when there is no source file"); + // attempt fails differently depending + // on the FS settings + if (raiseExceptions) { + intercept(FileNotFoundException.class, + E_NO_SOURCE, + () -> targetFS.rename(sourcePath, destPath)); + } else { + Assertions.assertThat(targetFS.rename(sourcePath, destPath)) + .describedAs("return value of rename") + .isFalse(); + } + } + + /** + * Commit a file twice. + * the second time the source file is missing but the dest file + * has the same etag. As a result, this is considered a success. + */ + @Test + public void testDoubleCommitTriggersRecovery() throws Throwable { + describe("commit a file twice; expect the second to be recovery"); + final FileStatus status = file(sourcePath, DATA); + commit(status, false); + ContractTestUtils.verifyFileContents(targetFS, destPath, DATA); + + // try again and as the status holds, expect recovery + commit(status, true); + Assertions.assertThat(commitHelper.getRecoveryCount()) + .describedAs("recovery count of %s", commitHelper) + .isEqualTo(1); + } + + /** + * commit a file twice with a status entry with no etag; + * the second attempt will fail. + */ + @Test + public void testDoubleCommitSourceHasNoEtag() throws Throwable { + describe("commit a file without an etag; expect the second to fail"); + final FileStatus status = file(sourcePath, DATA); + FileStatus st2 = new FileStatus(status); + commit(st2, false); + // try again and as the status has no tag, expect failure. + intercept(IOException.class, () -> + commit(st2, false)); + } + + /** + * overwrite a file before trying to commit it again. + */ + @Test + public void testDoubleCommitDifferentFiles() throws Throwable { + describe("commit two different files; no recovery allowed"); + final FileStatus status = file(sourcePath, DATA); + commit(status, false); + file(sourcePath, DATA2); + + // ioe raised; type will depend on whether or not FS + // is raising exceptions. + intercept(IOException.class, () -> + commit(status, false)); + } + + /** + * Commit a file, then try to commit again with a + * filestatus with a different source etag. + * Recovery MUST fail + */ + @Test + public void testDoubleCommitDifferentFiles2() throws Throwable { + describe("commit two different files; no recovery allowed"); + + // create a file to the source path with different data + // its status will not match that of the dest + final FileStatus status2 = file(sourcePath, DATA2); + + // overwrite with dataset 1; this will have a different + // etag + final FileStatus status = file(sourcePath, DATA); + + // commit the data1 dataset, which works + commit(status, false); + + // now attempt to commit with the file status of + // dataset 2. this is the file which was overwritten, + // so the etag at the dest path does not match it. + // expect a failure. + intercept(IOException.class, () -> + commit(status2, false)); + } + + /** + * Try to commit a file to a path where the destination + * directory does not exist -expect an exception to + * be raised. + */ + @Test + public void testCommitMissingDestDir() throws Throwable { + describe("commit a file under a nonexistent dir; expect an IOE"); + final FileStatus status = file(sourcePath, DATA); + final Path subpath = new Path(destPath, "subpath"); + intercept(IOException.class, () -> + commitHelper.commitFile(status, subpath)); + } + + /** + * If there is no source and no destination, recovery + * MUST fail. + */ + @Test + public void testCommitNoSource() throws Throwable { + describe("delete the source file, expect commit to fail"); + + final FileStatus status = file(sourcePath, DATA); + targetFS.delete(sourcePath, true); + intercept(FileNotFoundException.class, + raiseExceptions ? E_NO_SOURCE : "", + () -> commit(status, false)); + } + + /** + * Try to commit a file. + * @param status source status + * @param expectRecovery expect the operation to have required failure recovery. + * @return the outcome + * @throws IOException any failure to rename the file + */ + private ResilientCommitByRenameHelper.CommitOutcome commit( + final FileStatus status, + boolean expectRecovery) + throws IOException { + final ResilientCommitByRenameHelper.CommitOutcome outcome = commitHelper.commitFile( + status, destPath); + Assertions.assertThat(outcome.isRenameFailureResolvedThroughEtags()) + .describedAs("resolution of %s", outcome) + .isEqualTo(expectRecovery); + return outcome; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractEtag.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractEtag.java new file mode 100644 index 0000000000000..d498ae71a4b6f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractEtag.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractEtagTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Contract test for etag support. + */ +public class ITestAbfsFileSystemContractEtag extends AbstractContractEtagTest { + private final boolean isSecure; + private final ABFSContractTestBinding binding; + + public ITestAbfsFileSystemContractEtag() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + // Base rename contract test class re-uses the test folder + // This leads to failures when the test is re-run as same ABFS test + // containers are re-used for test run and creation of source and + // destination test paths fail, as they are already present. + binding.getFileSystem().delete(binding.getTestPath(), true); + } + + @Override + protected Configuration createConfiguration() { + return binding.getRawConfiguration(); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, isSecure); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemRenameRaisesExceptions.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemRenameRaisesExceptions.java new file mode 100644 index 0000000000000..413e57af91c0d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemRenameRaisesExceptions.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import org.assertj.core.api.Assertions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Contract test for rename operation with abfs set to raise exceptions. + * This requires patching both the config used by the FS + * and that reported by the contract. + */ +public class ITestAbfsFileSystemRenameRaisesExceptions extends AbstractContractRenameTest { + private final ABFSContractTestBinding binding; + + public ITestAbfsFileSystemRenameRaisesExceptions() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + // Base rename contract test class re-uses the test folder + // This leads to failures when the test is re-run as same ABFS test + // containers are re-used for test run and creation of source and + // destination test paths fail, as they are already present. + binding.getFileSystem().delete(binding.getTestPath(), true); + final FileSystem fs = getFileSystem(); + Assertions.assertThat(fs.getConf().getBoolean(ConfigurationKeys.FS_AZURE_RENAME_RAISES_EXCEPTIONS, false)) + .describedAs("FS raises exceptions on rename %s", fs) + .isTrue(); + } + + @Override + protected Configuration createConfiguration() { + final Configuration conf = binding.getRawConfiguration(); + conf.setBoolean(ConfigurationKeys.FS_AZURE_RENAME_RAISES_EXCEPTIONS, + true); + return conf; + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + final AbfsFileSystemContract contract = new AbfsFileSystemContract(conf, + binding.isSecureMode()); + // get the contract conf after abfs.xml is loaded, and patch it + final Configuration contractConf = contract.getConf(); + contractConf.setBoolean(FS_CONTRACT_KEY + RENAME_RETURNS_FALSE_IF_SOURCE_MISSING, + false); + contractConf.setBoolean(FS_CONTRACT_KEY + RENAME_RETURNS_FALSE_IF_DEST_EXISTS, + false); + // check it went through + Assertions.assertThat(contract.isSupported(RENAME_RETURNS_FALSE_IF_SOURCE_MISSING, true)) + .describedAs("isSupported(RENAME_RETURNS_FALSE_IF_SOURCE_MISSING)") + .isFalse(); + return contract; + } +}