From 17b3c580518663577b015d30d6700e32e3aa2052 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Tue, 10 May 2016 18:44:28 -0700 Subject: [PATCH 1/5] fix SPARK-15264, add test cases --- .../execution/datasources/csv/DefaultSource.scala | 4 +++- .../src/test/resources/cars-blank-column-name.csv | 3 +++ .../sql/execution/datasources/csv/CSVSuite.scala | 12 ++++++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/cars-blank-column-name.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index 948fac0d58ad7..ba65f6df4f1a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -61,7 +61,9 @@ class DefaultSource extends FileFormat with DataSourceRegister { val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine) val header = if (csvOptions.headerFlag) { - firstRow + firstRow.zipWithIndex.map { case (value, index) => + if (value == "" || value == null) s"C$index" else value + } } else { firstRow.zipWithIndex.map { case (value, index) => s"C$index" } } diff --git a/sql/core/src/test/resources/cars-blank-column-name.csv b/sql/core/src/test/resources/cars-blank-column-name.csv new file mode 100644 index 0000000000000..0b804b1614d60 --- /dev/null +++ b/sql/core/src/test/resources/cars-blank-column-name.csv @@ -0,0 +1,3 @@ +"",,make,customer,comment +2012,"Tesla","S","bill","blank" +2013,"Tesla","S","c","something" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b6cdc8cfabe3c..72b9ecbb6ae91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -38,6 +38,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val carsAltFile = "cars-alternative.csv" private val carsUnbalancedQuotesFile = "cars-unbalanced-quotes.csv" private val carsNullFile = "cars-null.csv" + private val carsBlankColName = "cars-blank-column-name.csv" private val emptyFile = "empty.csv" private val commentsFile = "comments.csv" private val disableCommentsFile = "disable_comments.csv" @@ -224,6 +225,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(cars.select("year").collect().size === 2) } + test("test for blank column names on read and select normal column") { + val cars = spark.read + .format("csv") + .options(Map("header" -> "true", "inferSchema" -> "true")) + .load(testFile(carsBlankColName)) + + assert(cars.select("customer").collect().size == 2) + assert(cars.select("C0").collect().size == 2) + assert(cars.select("C1").collect().size == 2) + } + test("test for FAILFAST parsing mode") { val exception = intercept[SparkException]{ spark.read From 85d08434b620241d698edeaa6557e8a715816185 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Tue, 10 May 2016 19:48:22 -0700 Subject: [PATCH 2/5] Rename test case --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 72b9ecbb6ae91..57e0778a6930f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -225,7 +225,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(cars.select("year").collect().size === 2) } - test("test for blank column names on read and select normal column") { + test("test for blank column names on read and select columns") { val cars = spark.read .format("csv") .options(Map("header" -> "true", "inferSchema" -> "true")) From 0ed0b4c93b3be6e77ee8731f9c511b619c810625 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Wed, 11 May 2016 13:35:36 -0700 Subject: [PATCH 3/5] Fix according to @andrewor14 comments, rename C0 to _c0, fix null checks --- .../spark/sql/execution/datasources/csv/DefaultSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index ba65f6df4f1a2..f47ed76cba765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -62,10 +62,10 @@ class DefaultSource extends FileFormat with DataSourceRegister { val header = if (csvOptions.headerFlag) { firstRow.zipWithIndex.map { case (value, index) => - if (value == "" || value == null) s"C$index" else value + if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value } } else { - firstRow.zipWithIndex.map { case (value, index) => s"C$index" } + firstRow.zipWithIndex.map { case (value, index) => s"_c$index" } } val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths) From d665bea023b5f4aa9bfe2b2ccbc46586e07bc396 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Wed, 11 May 2016 13:41:45 -0700 Subject: [PATCH 4/5] fix tests --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 57e0778a6930f..ae91e0f606ecc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -72,14 +72,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { if (withHeader) { assert(df.schema.fieldNames === Array("year", "make", "model", "comment", "blank")) } else { - assert(df.schema.fieldNames === Array("C0", "C1", "C2", "C3", "C4")) + assert(df.schema.fieldNames === Array("_c0", "_c1", "_c2", "_c3", "_c4")) } } if (checkValues) { val yearValues = List("2012", "1997", "2015") val actualYears = if (!withHeader) "year" :: yearValues else yearValues - val years = if (withHeader) df.select("year").collect() else df.select("C0").collect() + val years = if (withHeader) df.select("year").collect() else df.select("_c0").collect() years.zipWithIndex.foreach { case (year, index) => if (checkTypes) { @@ -232,8 +232,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .load(testFile(carsBlankColName)) assert(cars.select("customer").collect().size == 2) - assert(cars.select("C0").collect().size == 2) - assert(cars.select("C1").collect().size == 2) + assert(cars.select("_c0").collect().size == 2) + assert(cars.select("_c1").collect().size == 2) } test("test for FAILFAST parsing mode") { From d98371859d78706cb9673f253b4dccbffb9fc385 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Wed, 11 May 2016 15:09:49 -0700 Subject: [PATCH 5/5] fix python test failure --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index e2ee9db049489..fe666d623f335 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -358,7 +358,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non >>> df = sqlContext.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes - [('C0', 'string'), ('C1', 'string')] + [('_c0', 'string'), ('_c1', 'string')] """ if schema is not None: self.schema(schema)