diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 92f7d664b66b..33adfce34dd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -382,7 +382,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => - p.files.filter(_.getLen > 0).map { f => + p.files.map { f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) } }.groupBy { f => @@ -426,7 +426,7 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => - partition.files.filter(_.getLen > 0).flatMap { file => + partition.files.flatMap { file => // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath val isSplitable = relation.fileFormat.isSplitable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index f5ae095a6dad..29b304a1e487 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -58,15 +58,18 @@ abstract class PartitioningAwareFileIndex( override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + def isNonEmptyFile(f: FileStatus): Boolean = { + isDataPath(f.getPath) && f.getLen > 0 + } val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil } else { prunePartitions(partitionFilters, partitionSpec()).map { case PartitionPath(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filter(f => isDataPath(f.getPath)) + existingDir.filter(isNonEmptyFile) case None => // Directory does not exist, or has no children files diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index 048e4b80c72a..7680f61b8b6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -146,13 +146,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA } test("skip empty files in non bucketed read") { - withTempDir { dir => - val path = dir.getCanonicalPath - Files.write(Paths.get(path, "empty"), Array.empty[Byte]) - Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) - val readback = spark.read.option("wholetext", true).text(path) - - assert(readback.rdd.getNumPartitions === 1) + Seq("csv", "text").foreach { format => + withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readBack = spark.read.option("wholetext", true).format(format).load(path) + + assert(readBack.rdd.getNumPartitions === 1) + } } } }