From 36c64f0a8c47763dd99f0ed07cdf5a1813b0f2b7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 18:29:25 +0100 Subject: [PATCH 1/7] Test checks empty files are not loaded --- .../apache/spark/sql/sources/SaveLoadSuite.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index 12779b46bfe8..6c630a30927c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.sources import java.io.File +import java.nio.file.{Files, Paths} import org.scalatest.BeforeAndAfter - import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -142,4 +142,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in load") { + withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array[Byte]()) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes) + val readback = spark.read.text(path) + + assert(readback.rdd.getNumPartitions == 1) + } + } } From e428b83c82623f33e5b5e5073251d73d18a3c903 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 18:29:41 +0100 Subject: [PATCH 2/7] Fix json tests --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9ea9189cdf7f..8f0a95f07bb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1843,7 +1843,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType .toDF("value") - .repartition(1) .write .text(path) @@ -1898,7 +1897,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file + assert(jsonDF.count() === corruptRecordCount) assert(jsonDF.schema === new StructType() .add("_corrupt_record", StringType) .add("dummy", StringType)) @@ -1911,7 +1910,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { F.count($"dummy").as("valid"), F.count($"_corrupt_record").as("corrupt"), F.count("*").as("count")) - checkAnswer(counts, Row(1, 5, 7)) // null row for empty file + checkAnswer(counts, Row(1, 4, 6)) } } From 4212add0bcbcaaec5ee8e5483ea16d3e7929dcb6 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 18:34:24 +0100 Subject: [PATCH 3/7] Fix test --- .../test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index 6c630a30927c..041a6cd6946b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -148,7 +148,7 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA val path = dir.getCanonicalPath Files.write(Paths.get(path, "empty"), Array[Byte]()) Files.write(Paths.get(path, "notEmpty"), "a".getBytes) - val readback = spark.read.text(path) + val readback = spark.read.option("wholetext", true).text(path) assert(readback.rdd.getNumPartitions == 1) } From 2458882037349987396e8456799c451e80566442 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 18:36:06 +0100 Subject: [PATCH 4/7] Filtering out empty files --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 77e381ef6e6b..d29a24a199fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => - p.files.map { f => + p.files.filter(_.getLen > 0).map { f => val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) } @@ -438,7 +438,7 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => - partition.files.flatMap { file => + partition.files.filter(_.getLen > 0).flatMap { file => val blockLocations = getBlockLocations(file) if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { From b200a50de58641d297381bec45687317bd21dfb7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 18:53:45 +0100 Subject: [PATCH 5/7] Fix imports --- .../test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index 041a6cd6946b..c0f9f142d885 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -21,6 +21,7 @@ import java.io.File import java.nio.file.{Files, Paths} import org.scalatest.BeforeAndAfter + import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext From e7871f3037591523b7849569cb5f53e916206e8d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 16:46:35 +0100 Subject: [PATCH 6/7] Replacing Array[Byte]() by Array.empty[Byte] --- .../test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index c0f9f142d885..bf37485f5719 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -147,7 +147,7 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA test("skip empty files in load") { withTempDir { dir => val path = dir.getCanonicalPath - Files.write(Paths.get(path, "empty"), Array[Byte]()) + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) Files.write(Paths.get(path, "notEmpty"), "a".getBytes) val readback = spark.read.option("wholetext", true).text(path) From 7057f8b57223a092241f7b0ecf284b17ca883f6d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 27 Nov 2018 16:51:19 +0100 Subject: [PATCH 7/7] Addressing Sean's review comments --- .../scala/org/apache/spark/sql/sources/SaveLoadSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index bf37485f5719..048e4b80c72a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import java.io.File +import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import org.scalatest.BeforeAndAfter @@ -144,14 +145,14 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA } } - test("skip empty files in load") { + test("skip empty files in non bucketed read") { withTempDir { dir => val path = dir.getCanonicalPath Files.write(Paths.get(path, "empty"), Array.empty[Byte]) - Files.write(Paths.get(path, "notEmpty"), "a".getBytes) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) val readback = spark.read.option("wholetext", true).text(path) - assert(readback.rdd.getNumPartitions == 1) + assert(readback.rdd.getNumPartitions === 1) } } }