diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 3ab775c909238..5e9032b877855 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -280,7 +280,8 @@ private[csv] object CSVTypeCast { // compatibility. DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) } - case _: StringType => UTF8String.fromString(datum) + case _: StringType => + if (datum == options.emptyValue) UTF8String.EMPTY_UTF8 else UTF8String.fromString(datum) case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options) case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 014614eb997a5..1c123d2be653c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -90,6 +90,7 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str val permissive = ParseModes.isPermissiveMode(parseMode) val nullValue = parameters.getOrElse("nullValue", "") + val emptyValue = parameters.getOrElse("emptyValue", "") val nanValue = parameters.getOrElse("nanValue", "NaN") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 332f5c8e9fb74..6d22fe3dfe680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -43,6 +43,7 @@ private[csv] class CsvReader(params: CSVOptions) { settings.setInputBufferSize(params.inputBufferSize) settings.setMaxColumns(params.maxColumns) settings.setNullValue(params.nullValue) + settings.setEmptyValue(params.emptyValue) settings.setMaxCharsPerColumn(params.maxCharsPerColumn) settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) @@ -74,7 +75,7 @@ private[csv] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten format.setComment(params.comment) writerSettings.setNullValue(params.nullValue) - writerSettings.setEmptyValue(params.nullValue) + writerSettings.setEmptyValue(params.emptyValue) writerSettings.setSkipEmptyLines(true) writerSettings.setQuoteAllFields(params.quoteAll) writerSettings.setHeaders(headers: _*) diff --git a/sql/core/src/test/resources/test-data/emptystring.csv b/sql/core/src/test/resources/test-data/emptystring.csv new file mode 100644 index 0000000000000..929069bb783bd --- /dev/null +++ b/sql/core/src/test/resources/test-data/emptystring.csv @@ -0,0 +1,5 @@ +col1,col2 +1,"-" +2,"" +3, +4,"A" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f7c22c6c93f7a..61c719374c6e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -52,6 +52,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" + private val emptyStringValuesFile = "test-data/emptystring.csv" private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -889,4 +890,67 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } } + + test("SPARK-17916 load data with empty strings and null values") { + val rows = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("nullValue", "-") + .load(testFile(emptyStringValuesFile)) + + val expectedRows = Seq(Row(1, null), Row(2, ""), Row(3, null), Row(4, "A")) + checkAnswer(rows, expectedRows) + } + + test("save and load data with empty strings using all quotes option") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + val data = Seq((1, "", "abcd"), (2, null, "xyz")) + val df = spark.createDataFrame(data) + df.coalesce(1).write + .format("csv") + .option("quoteAll", "true") + .option("nullValue", "NUL") + .save(csvDir) + val results = spark.read.format("text").load(csvDir).collect() + val expected = Seq(Seq("\"1\",\"\",\"abcd\""), Seq("\"2\",\"NUL\",\"xyz\"")) + assert(results.toSeq.map(_.toSeq) === expected) + + val rows = spark.read + .format("csv") + .option("nullValue", "NUL") + .option("inferSchema", "true") + .load(csvDir) + + val expectedRows = Seq(Row(1, "", "abcd"), Row(2, null, "xyz")) + checkAnswer(rows, expectedRows) + } + } + + test("save and load data using the user specified emptyValue option") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + val data = Seq((1, "", "abcd"), (2, null, "xyz")) + val df = spark.createDataFrame(data) + df.coalesce(1).write + .format("csv") + .option("nullValue", "NUL") + .option("emptyValue", "") + .save(csvDir) + val results = spark.read.format("text").load(csvDir).collect() + val expected = Seq(Seq("1,,abcd"), Seq("2,NUL,xyz")) + assert(results.toSeq.map(_.toSeq) === expected) + + val rows = spark.read + .format("csv") + .option("nullValue", "NUL") + .option("emptyValue", "") + .option("inferSchema", "true") + .load(csvDir) + + val expectedRows = Seq(Row(1, "", "abcd"), Row(2, null, "xyz")) + checkAnswer(rows, expectedRows) + } + } }