From 47325eb1e85b11fb5a58fc1e55f6286a5276d4e0 Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Wed, 4 May 2016 11:35:59 -0700 Subject: [PATCH 1/2] This patch adds boolean option emptyAsNull to CSV datasource for user to specify to interpret empty quoted strings as null or an empty string. --- .../datasources/csv/CSVOptions.scala | 1 + .../execution/datasources/csv/CSVParser.scala | 3 +- .../src/test/resources/emptystring-values.csv | 4 + .../execution/datasources/csv/CSVSuite.scala | 75 +++++++++++++++++++ 4 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/emptystring-values.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 014614eb997a5..7c375bf6551d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -90,6 +90,7 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str val permissive = ParseModes.isPermissiveMode(parseMode) val nullValue = parameters.getOrElse("nullValue", "") + val emptyAsNull = getBool("emptyAsNull", true) val nanValue = parameters.getOrElse("nanValue", "NaN") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 332f5c8e9fb74..984a74f6386bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -43,6 +43,7 @@ private[csv] class CsvReader(params: CSVOptions) { settings.setInputBufferSize(params.inputBufferSize) settings.setMaxColumns(params.maxColumns) settings.setNullValue(params.nullValue) + settings.setEmptyValue(if (params.emptyAsNull) null else "") settings.setMaxCharsPerColumn(params.maxCharsPerColumn) settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) @@ -74,7 +75,7 @@ private[csv] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten format.setComment(params.comment) writerSettings.setNullValue(params.nullValue) - writerSettings.setEmptyValue(params.nullValue) + writerSettings.setEmptyValue(if (params.emptyAsNull) params.nullValue else "") writerSettings.setSkipEmptyLines(true) writerSettings.setQuoteAllFields(params.quoteAll) writerSettings.setHeaders(headers: _*) diff --git a/sql/core/src/test/resources/emptystring-values.csv b/sql/core/src/test/resources/emptystring-values.csv new file mode 100644 index 0000000000000..cc768e8b6834d --- /dev/null +++ b/sql/core/src/test/resources/emptystring-values.csv @@ -0,0 +1,4 @@ +year,make,model,comment,price +2017,Tesla,Mode 3,looks nice.,35000.99 +2016,Chevy,Bolt,"",29000.00 +2015,Porsche,"",, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f7c22c6c93f7a..a54de2d89376f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -52,6 +52,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" + private val emptyStringValuesFile = "test-data/emptystring-values.csv" private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -889,4 +890,78 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } } + + test("load data with empty quoted string fields.") { + val results = sqlContext + .read + .format("csv") + .options(Map( + "header" -> "true", + "nullValue" -> null, + "inferSchema" -> "true")) + .load(testFile(emptyStringValuesFile)) + .collect() + + assert(results(0).toSeq === Array(2017, "Tesla", "Mode 3", "looks nice.", 35000.99d)) + assert(results(1).toSeq === Array(2016, "Chevy", "Bolt", null, 29000.00d)) + assert(results(2).toSeq === Array(2015, "Porsche", null, null, null)) + } + + test("load data with empty quoted string fields using user specified empty value option.") { + val results = sqlContext + .read + .format("csv") + .options(Map( + "header" -> "true", + "inferSchema" -> "true", + "nullValue" -> null, + "emptyAsNull" -> "false")) + .load(testFile(emptyStringValuesFile)) + .collect() + + assert(results(0).toSeq === Array(2017, "Tesla", "Mode 3", "looks nice.", 35000.99d)) + assert(results(1).toSeq === Array(2016, "Chevy", "Bolt", "", 29000.00d)) + assert(results(2).toSeq === Array(2015, "Porsche", "", null, null)) + } + + test("save empty string fields.") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + val data = Seq((2015, "Tesla", "Model X", "", 129000.00d)) + val df = data.toDF("year", "make", "model", "comment", "price") + df.coalesce(1).write.format("csv").save(csvDir) + val results = sqlContext + .read + .format("csv") + .schema(df.schema) + .load(csvDir) + .collect() + + assert(results(0).toSeq === Array(2015, "Tesla", "Model X", "", 129000.00d)) + } + } + + test("save empty string fields with user specified emptyValue option.") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + val data = Seq((2015, "Tesla", "Model X", "", 129000.00d)) + val df = data.toDF("year", "make", "model", "comment", "price") + df.coalesce(1) + .write.format("csv") + .option("nullValue", "\\N") + .option("emptyAsNull", "false") + .save(csvDir) + val results = sqlContext + .read + .format("csv") + .schema(df.schema) + .option("nullValue", "\\N") + .option("emptyAsNull", "false") + .load(csvDir) + .collect() + // CSV writer does not seem to distinguish between empty string value vs null value, + // null value (\\N) gets written in both the cases. + assert(results(0).toSeq === Array(2015, "Tesla", "Model X", "\\N", 129000.00d)) + } + } } From b128fbb92792ecceb67f5fec3c1feb1ae7fcff69 Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Thu, 20 Oct 2016 01:21:54 -0700 Subject: [PATCH 2/2] Adding emptyValue option to will allow user to speify the mapping for empty string on read/write --- .../datasources/csv/CSVInferSchema.scala | 3 +- .../datasources/csv/CSVOptions.scala | 2 +- .../execution/datasources/csv/CSVParser.scala | 4 +- .../src/test/resources/emptystring-values.csv | 4 - .../test/resources/test-data/emptystring.csv | 5 + .../execution/datasources/csv/CSVSuite.scala | 97 ++++++++----------- 6 files changed, 53 insertions(+), 62 deletions(-) delete mode 100644 sql/core/src/test/resources/emptystring-values.csv create mode 100644 sql/core/src/test/resources/test-data/emptystring.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 3ab775c909238..5e9032b877855 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -280,7 +280,8 @@ private[csv] object CSVTypeCast { // compatibility. DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) } - case _: StringType => UTF8String.fromString(datum) + case _: StringType => + if (datum == options.emptyValue) UTF8String.EMPTY_UTF8 else UTF8String.fromString(datum) case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options) case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 7c375bf6551d8..1c123d2be653c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -90,7 +90,7 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str val permissive = ParseModes.isPermissiveMode(parseMode) val nullValue = parameters.getOrElse("nullValue", "") - val emptyAsNull = getBool("emptyAsNull", true) + val emptyValue = parameters.getOrElse("emptyValue", "") val nanValue = parameters.getOrElse("nanValue", "NaN") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 984a74f6386bb..6d22fe3dfe680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -43,7 +43,7 @@ private[csv] class CsvReader(params: CSVOptions) { settings.setInputBufferSize(params.inputBufferSize) settings.setMaxColumns(params.maxColumns) settings.setNullValue(params.nullValue) - settings.setEmptyValue(if (params.emptyAsNull) null else "") + settings.setEmptyValue(params.emptyValue) settings.setMaxCharsPerColumn(params.maxCharsPerColumn) settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) @@ -75,7 +75,7 @@ private[csv] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten format.setComment(params.comment) writerSettings.setNullValue(params.nullValue) - writerSettings.setEmptyValue(if (params.emptyAsNull) params.nullValue else "") + writerSettings.setEmptyValue(params.emptyValue) writerSettings.setSkipEmptyLines(true) writerSettings.setQuoteAllFields(params.quoteAll) writerSettings.setHeaders(headers: _*) diff --git a/sql/core/src/test/resources/emptystring-values.csv b/sql/core/src/test/resources/emptystring-values.csv deleted file mode 100644 index cc768e8b6834d..0000000000000 --- a/sql/core/src/test/resources/emptystring-values.csv +++ /dev/null @@ -1,4 +0,0 @@ -year,make,model,comment,price -2017,Tesla,Mode 3,looks nice.,35000.99 -2016,Chevy,Bolt,"",29000.00 -2015,Porsche,"",, diff --git a/sql/core/src/test/resources/test-data/emptystring.csv b/sql/core/src/test/resources/test-data/emptystring.csv new file mode 100644 index 0000000000000..929069bb783bd --- /dev/null +++ b/sql/core/src/test/resources/test-data/emptystring.csv @@ -0,0 +1,5 @@ +col1,col2 +1,"-" +2,"" +3, +4,"A" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a54de2d89376f..61c719374c6e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -52,7 +52,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" - private val emptyStringValuesFile = "test-data/emptystring-values.csv" + private val emptyStringValuesFile = "test-data/emptystring.csv" private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -891,77 +891,66 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("load data with empty quoted string fields.") { - val results = sqlContext - .read - .format("csv") - .options(Map( - "header" -> "true", - "nullValue" -> null, - "inferSchema" -> "true")) - .load(testFile(emptyStringValuesFile)) - .collect() - - assert(results(0).toSeq === Array(2017, "Tesla", "Mode 3", "looks nice.", 35000.99d)) - assert(results(1).toSeq === Array(2016, "Chevy", "Bolt", null, 29000.00d)) - assert(results(2).toSeq === Array(2015, "Porsche", null, null, null)) - } - - test("load data with empty quoted string fields using user specified empty value option.") { - val results = sqlContext - .read + test("SPARK-17916 load data with empty strings and null values") { + val rows = spark.read .format("csv") - .options(Map( - "header" -> "true", - "inferSchema" -> "true", - "nullValue" -> null, - "emptyAsNull" -> "false")) + .option("header", "true") + .option("inferSchema", "true") + .option("nullValue", "-") .load(testFile(emptyStringValuesFile)) - .collect() - assert(results(0).toSeq === Array(2017, "Tesla", "Mode 3", "looks nice.", 35000.99d)) - assert(results(1).toSeq === Array(2016, "Chevy", "Bolt", "", 29000.00d)) - assert(results(2).toSeq === Array(2015, "Porsche", "", null, null)) + val expectedRows = Seq(Row(1, null), Row(2, ""), Row(3, null), Row(4, "A")) + checkAnswer(rows, expectedRows) } - test("save empty string fields.") { + test("save and load data with empty strings using all quotes option") { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath - val data = Seq((2015, "Tesla", "Model X", "", 129000.00d)) - val df = data.toDF("year", "make", "model", "comment", "price") - df.coalesce(1).write.format("csv").save(csvDir) - val results = sqlContext - .read + val data = Seq((1, "", "abcd"), (2, null, "xyz")) + val df = spark.createDataFrame(data) + df.coalesce(1).write .format("csv") - .schema(df.schema) + .option("quoteAll", "true") + .option("nullValue", "NUL") + .save(csvDir) + val results = spark.read.format("text").load(csvDir).collect() + val expected = Seq(Seq("\"1\",\"\",\"abcd\""), Seq("\"2\",\"NUL\",\"xyz\"")) + assert(results.toSeq.map(_.toSeq) === expected) + + val rows = spark.read + .format("csv") + .option("nullValue", "NUL") + .option("inferSchema", "true") .load(csvDir) - .collect() - assert(results(0).toSeq === Array(2015, "Tesla", "Model X", "", 129000.00d)) + val expectedRows = Seq(Row(1, "", "abcd"), Row(2, null, "xyz")) + checkAnswer(rows, expectedRows) } } - test("save empty string fields with user specified emptyValue option.") { + test("save and load data using the user specified emptyValue option") { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath - val data = Seq((2015, "Tesla", "Model X", "", 129000.00d)) - val df = data.toDF("year", "make", "model", "comment", "price") - df.coalesce(1) - .write.format("csv") - .option("nullValue", "\\N") - .option("emptyAsNull", "false") + val data = Seq((1, "", "abcd"), (2, null, "xyz")) + val df = spark.createDataFrame(data) + df.coalesce(1).write + .format("csv") + .option("nullValue", "NUL") + .option("emptyValue", "") .save(csvDir) - val results = sqlContext - .read + val results = spark.read.format("text").load(csvDir).collect() + val expected = Seq(Seq("1,,abcd"), Seq("2,NUL,xyz")) + assert(results.toSeq.map(_.toSeq) === expected) + + val rows = spark.read .format("csv") - .schema(df.schema) - .option("nullValue", "\\N") - .option("emptyAsNull", "false") + .option("nullValue", "NUL") + .option("emptyValue", "") + .option("inferSchema", "true") .load(csvDir) - .collect() - // CSV writer does not seem to distinguish between empty string value vs null value, - // null value (\\N) gets written in both the cases. - assert(results(0).toSeq === Array(2015, "Tesla", "Model X", "\\N", 129000.00d)) + + val expectedRows = Seq(Row(1, "", "abcd"), Row(2, null, "xyz")) + checkAnswer(rows, expectedRows) } } }