From b6430f04ee70f39ab1a6e65790f4eeb9c2c46e91 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Jun 2015 13:41:37 -0700 Subject: [PATCH 1/4] Add failing regression test for SPARK-8062 --- .../metrics/InputOutputMetricsSuite.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index ca226fd4e694..efc85af94827 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -106,4 +106,28 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou } } } + + test("exceptions while getting IO thread statistics should not fail tasks / jobs (SPARK-8062)") { + // For some reason, the following code needs to be called in order for this regression test to + // fail and reproduce the bug. The fact that this is necessary suggests that there may be other + // bugs in our InputOutputMetrics code; SPARK-8086 tracks progress towards investigating this + // issue, since fixing it is out of scope for SPARK-8062. + val fs = FileSystem.getLocal(new Configuration()) + val outPath = new Path(fs.getWorkingDirectory, "outdir") + SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf) + + // Intentionally call this method with a null scheme, which will store an entry for a FileSystem + // with a null scheme into Hadoop's global `FileSystem.statisticsTable`. + FileSystem.getStatistics(null, classOf[FileSystem]) + + // Prior to fixing SPARK-8062, this would fail with a NullPointerException in + // SparkHadoopUtil.getFileSystemThreadStatistics + val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + try { + rdd.saveAsTextFile(outPath.toString) + sc.textFile(outPath.toString).count() + } finally { + fs.delete(outPath, true) + } + } } From 1d8d1250daf12e7ac5826f575b10e25cde55237c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Jun 2015 13:42:12 -0700 Subject: [PATCH 2/4] Fix SPARK-8062 with additional null checks --- .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 57f9faf5ddd1..25488e09cbe6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -175,8 +175,13 @@ class SparkHadoopUtil extends Logging { private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = { val qualifiedPath = path.getFileSystem(conf).makeQualified(path) val scheme = qualifiedPath.toUri().getScheme() - val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme)) - stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + if (scheme == null) { + Seq.empty + } else { + FileSystem.getAllStatistics + .filter { stats => scheme.equals(stats.getScheme) } + .map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + } } private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { From 66fc60001e3ce1faeddf8086bcf140e06ab9fff6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Jun 2015 14:10:02 -0700 Subject: [PATCH 3/4] Fix and minimize regression test (verified that it still fails) --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../spark/metrics/InputOutputMetricsSuite.scala | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 25488e09cbe6..8e126b763fa3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -179,7 +179,7 @@ class SparkHadoopUtil extends Logging { Seq.empty } else { FileSystem.getAllStatistics - .filter { stats => scheme.equals(stats.getScheme) } + .filter { stats => stats.getScheme.equals(scheme) } .map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) } } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index efc85af94827..c12ad715c79b 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -22,6 +22,7 @@ import java.io.{FileWriter, PrintWriter, File} import org.apache.spark.SharedSparkContext import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} +import org.apache.spark.util.Utils import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers @@ -108,13 +109,8 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou } test("exceptions while getting IO thread statistics should not fail tasks / jobs (SPARK-8062)") { - // For some reason, the following code needs to be called in order for this regression test to - // fail and reproduce the bug. The fact that this is necessary suggests that there may be other - // bugs in our InputOutputMetrics code; SPARK-8086 tracks progress towards investigating this - // issue, since fixing it is out of scope for SPARK-8062. - val fs = FileSystem.getLocal(new Configuration()) - val outPath = new Path(fs.getWorkingDirectory, "outdir") - SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf) + val tempDir = Utils.createTempDir() + val outPath = new File(tempDir, "outfile") // Intentionally call this method with a null scheme, which will store an entry for a FileSystem // with a null scheme into Hadoop's global `FileSystem.statisticsTable`. @@ -122,12 +118,12 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou // Prior to fixing SPARK-8062, this would fail with a NullPointerException in // SparkHadoopUtil.getFileSystemThreadStatistics - val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) try { + val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) rdd.saveAsTextFile(outPath.toString) sc.textFile(outPath.toString).count() } finally { - fs.delete(outPath, true) + Utils.deleteRecursively(tempDir) } } } From 652fa3c54ecccd3cd8e9ff4d3fed876ff5813309 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Jun 2015 14:15:32 -0700 Subject: [PATCH 4/4] Re-name test and reapply fix --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../org/apache/spark/metrics/InputOutputMetricsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8e126b763fa3..ee725be39f93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -179,7 +179,7 @@ class SparkHadoopUtil extends Logging { Seq.empty } else { FileSystem.getAllStatistics - .filter { stats => stats.getScheme.equals(scheme) } + .filter { stats => scheme.equals(stats.getScheme()) } .map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) } } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index c12ad715c79b..1b17c3ad359d 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -108,7 +108,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou } } - test("exceptions while getting IO thread statistics should not fail tasks / jobs (SPARK-8062)") { + test("getFileSystemThreadStatistics should guard against null schemes (SPARK-8062)") { val tempDir = Utils.createTempDir() val outPath = new File(tempDir, "outfile")