diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 4faa27c2c1e2..b29d5c76c5f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -368,7 +368,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => - p.files.map { f => + p.files.filter(_.getLen > 0).map { f => val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) } @@ -418,7 +418,7 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => - partition.files.flatMap { file => + partition.files.filter(_.getLen > 0).flatMap { file => val blockLocations = getBlockLocations(file) if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ee5176e23e34..9d23161c1f24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1842,7 +1842,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType .toDF("value") - .repartition(1) .write .text(path) @@ -1910,7 +1909,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { F.count($"dummy").as("valid"), F.count($"_corrupt_record").as("corrupt"), F.count("*").as("count")) - checkAnswer(counts, Row(1, 4, 6)) // null row for empty file + checkAnswer(counts, Row(1, 4, 6)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index 12779b46bfe8..048e4b80c72a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.sources import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} import org.scalatest.BeforeAndAfter @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { + withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) + } + } }