From 664fe9b21e57840257f2a0375407442d8e37fc69 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 27 Mar 2019 15:32:56 +0800 Subject: [PATCH 1/2] fix --- .../sql/execution/datasources/v2/FileScan.scala | 2 +- .../apache/spark/sql/sources/SaveLoadSuite.scala | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index e971fd762efe..f0abd7509766 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -41,7 +41,7 @@ abstract class FileScan( val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val splitFiles = selectedPartitions.flatMap { partition => - partition.files.flatMap { file => + partition.files.filter(_.getLen > 0).flatMap { file => val filePath = file.getPath PartitionedFileUtil.splitFiles( sparkSession = sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index 048e4b80c72a..7680f61b8b6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -146,13 +146,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA } test("skip empty files in non bucketed read") { - withTempDir { dir => - val path = dir.getCanonicalPath - Files.write(Paths.get(path, "empty"), Array.empty[Byte]) - Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) - val readback = spark.read.option("wholetext", true).text(path) - - assert(readback.rdd.getNumPartitions === 1) + Seq("csv", "text").foreach { format => + withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readBack = spark.read.option("wholetext", true).format(format).load(path) + + assert(readBack.rdd.getNumPartitions === 1) + } } } } From 4bf5cec6cd7388e1536f7439db356c076ce1f6e1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 27 Mar 2019 22:33:48 +0800 Subject: [PATCH 2/2] filter empty files on constructing selectedPartitions --- .../apache/spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../execution/datasources/PartitioningAwareFileIndex.scala | 7 +++++-- .../spark/sql/execution/datasources/v2/FileScan.scala | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 92f7d664b66b..33adfce34dd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -382,7 +382,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => - p.files.filter(_.getLen > 0).map { f => + p.files.map { f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) } }.groupBy { f => @@ -426,7 +426,7 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => - partition.files.filter(_.getLen > 0).flatMap { file => + partition.files.flatMap { file => // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath val isSplitable = relation.fileFormat.isSplitable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index f5ae095a6dad..29b304a1e487 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -58,15 +58,18 @@ abstract class PartitioningAwareFileIndex( override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + def isNonEmptyFile(f: FileStatus): Boolean = { + isDataPath(f.getPath) && f.getLen > 0 + } val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil } else { prunePartitions(partitionFilters, partitionSpec()).map { case PartitionPath(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filter(f => isDataPath(f.getPath)) + existingDir.filter(isNonEmptyFile) case None => // Directory does not exist, or has no children files diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index f0abd7509766..e971fd762efe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -41,7 +41,7 @@ abstract class FileScan( val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val splitFiles = selectedPartitions.flatMap { partition => - partition.files.filter(_.getLen > 0).flatMap { file => + partition.files.flatMap { file => val filePath = file.getPath PartitionedFileUtil.splitFiles( sparkSession = sparkSession,