diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java index 450fbac6d3639..d75b9a4fef1ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; @@ -151,12 +152,13 @@ public Iterable getFileStatuses() throws InterruptedException, } } finally { lock.unlock(); + // either the scan completed or an error was raised. + // in the case of an error shutting down the executor will interrupt all + // active threads, which can add noise to the logs. + LOG.debug("Scan complete: shutting down"); + this.exec.shutdownNow(); } - // either the scan completed or an error was raised. - // in the case of an error shutting down the executor will interrupt all - // active threads, which can add noise to the logs. - LOG.debug("Scan complete: shutting down"); - this.exec.shutdownNow(); + if (this.unknownError != null) { LOG.debug("Scan failed", this.unknownError); if (this.unknownError instanceof Error) { @@ -402,4 +404,10 @@ public void onFailure(Throwable t) { registerError(t); } } + + @VisibleForTesting + ListeningExecutorService getListeningExecutorService() { + return exec; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLocatedFileStatusFetcher.java new file mode 100644 index 0000000000000..c818a25f2f574 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLocatedFileStatusFetcher.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.test.GenericTestUtils; + +/** + * Test that the executor service has been shut down + * when the LocatedFileStatusFetcher is interrupted. + */ +public class TestLocatedFileStatusFetcher extends AbstractHadoopTestBase { + + private Configuration conf; + private FileSystem fileSys; + private boolean mkdirs; + private File dir = GenericTestUtils.getTestDir("test-lfs-fetcher"); + private static final CountDownLatch LATCH = new CountDownLatch(1); + + @Before + public void setup() throws Exception { + conf = new Configuration(false); + conf.set("fs.file.impl", MockFileSystem.class.getName()); + fileSys = FileSystem.getLocal(conf); + } + + @After + public void after() { + if (mkdirs) { + FileUtil.fullyDelete(dir); + } + } + + @Test + public void testExecutorsShutDown() throws Exception { + Path scanPath = new Path(dir.getAbsolutePath()); + mkdirs = fileSys.mkdirs(scanPath); + Path[] dirs = new Path[] {scanPath}; + final LocatedFileStatusFetcher fetcher = new LocatedFileStatusFetcher(conf, + dirs, true, new PathFilter() { + @Override + public boolean accept(Path path) { + return true; + } + }, true); + + Thread t = new Thread() { + @Override + public void run() { + try { + fetcher.getFileStatuses(); + } catch (Exception e) { + // This should interrupt condition.await() + Assert.assertTrue(e instanceof InterruptedException); + } + } + }; + + t.start(); + LATCH.await(); + + t.interrupt(); + t.join(); + // Check the status for executor service + Assert.assertTrue("The executor service should have been shut down", + fetcher.getListeningExecutorService().isShutdown()); + } + + static class MockFileSystem extends LocalFileSystem { + @Override + public FileStatus[] globStatus(Path pathPattern, PathFilter filter) + throws IOException { + // The executor service now is running tasks + LATCH.countDown(); + try { + // Try to sleep some time to + // let LocatedFileStatusFetcher#getFileStatuses be interrupted before + // the getting file info task finishes. + Thread.sleep(5000); + } catch (InterruptedException e) { + // Ignore this exception + } + return super.globStatus(pathPattern, filter); + } + + } + +}