diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 57f9faf5ddd1..ee725be39f93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -175,8 +175,13 @@ class SparkHadoopUtil extends Logging { private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = { val qualifiedPath = path.getFileSystem(conf).makeQualified(path) val scheme = qualifiedPath.toUri().getScheme() - val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme)) - stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + if (scheme == null) { + Seq.empty + } else { + FileSystem.getAllStatistics + .filter { stats => scheme.equals(stats.getScheme()) } + .map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + } } private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index ca226fd4e694..1b17c3ad359d 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -22,6 +22,7 @@ import java.io.{FileWriter, PrintWriter, File} import org.apache.spark.SharedSparkContext import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} +import org.apache.spark.util.Utils import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers @@ -106,4 +107,23 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou } } } + + test("getFileSystemThreadStatistics should guard against null schemes (SPARK-8062)") { + val tempDir = Utils.createTempDir() + val outPath = new File(tempDir, "outfile") + + // Intentionally call this method with a null scheme, which will store an entry for a FileSystem + // with a null scheme into Hadoop's global `FileSystem.statisticsTable`. + FileSystem.getStatistics(null, classOf[FileSystem]) + + // Prior to fixing SPARK-8062, this would fail with a NullPointerException in + // SparkHadoopUtil.getFileSystemThreadStatistics + try { + val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + rdd.saveAsTextFile(outPath.toString) + sc.textFile(outPath.toString).count() + } finally { + Utils.deleteRecursively(tempDir) + } + } }