From e782616498bcfc50398c2b560c3adf1512099d4f Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 9 Jul 2016 21:39:50 +0800 Subject: [PATCH 1/5] cast null correctly --- .../datasources/csv/CSVInferSchema.scala | 98 +++++++++---------- .../datasources/csv/CSVTypeCastSuite.scala | 42 ++++---- 2 files changed, 72 insertions(+), 68 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index de3d889621b7d..01de8ed1d6192 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -238,59 +238,55 @@ private[csv] object CSVTypeCast { nullable: Boolean = true, options: CSVOptions = CSVOptions()): Any = { - castType match { - case _: ByteType => if (datum == options.nullValue && nullable) null else datum.toByte - case _: ShortType => if (datum == options.nullValue && nullable) null else datum.toShort - case _: IntegerType => if (datum == options.nullValue && nullable) null else datum.toInt - case _: LongType => if (datum == options.nullValue && nullable) null else datum.toLong - case _: FloatType => - if (datum == options.nullValue && nullable) { - null - } else if (datum == options.nanValue) { - Float.NaN - } else if (datum == options.negativeInf) { - Float.NegativeInfinity - } else if (datum == options.positiveInf) { - Float.PositiveInfinity - } else { - Try(datum.toFloat) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) - } - case _: DoubleType => - if (datum == options.nullValue && nullable) { - null - } else if (datum == options.nanValue) { - Double.NaN - } else if (datum == options.negativeInf) { - Double.NegativeInfinity - } else if (datum == options.positiveInf) { - Double.PositiveInfinity - } else { - Try(datum.toDouble) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) - } - case _: BooleanType => datum.toBoolean - case dt: DecimalType => - if (datum == options.nullValue && nullable) { - null - } else { + if (datum == options.nullValue && nullable && (!castType.isInstanceOf[StringType])) { + null + } else { + castType match { + case _: ByteType => datum.toByte + case _: ShortType => datum.toShort + case _: IntegerType => datum.toInt + case _: LongType => datum.toLong + case _: FloatType => + if (datum == options.nanValue) { + Float.NaN + } else if (datum == options.negativeInf) { + Float.NegativeInfinity + } else if (datum == options.positiveInf) { + Float.PositiveInfinity + } else { + Try(datum.toFloat) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) + } + case _: DoubleType => + if (datum == options.nanValue) { + Double.NaN + } else if (datum == options.negativeInf) { + Double.NegativeInfinity + } else if (datum == options.positiveInf) { + Double.PositiveInfinity + } else { + Try(datum.toDouble) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) + } + case _: BooleanType => datum.toBoolean + case dt: DecimalType => val value = new BigDecimal(datum.replaceAll(",", "")) Decimal(value, dt.precision, dt.scale) - } - case _: TimestampType if options.dateFormat != null => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - options.dateFormat.parse(datum).getTime * 1000L - case _: TimestampType => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(datum).getTime * 1000L - case _: DateType if options.dateFormat != null => - DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime) - case _: DateType => - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) - case _: StringType => UTF8String.fromString(datum) - case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") + case _: TimestampType if options.dateFormat != null => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + options.dateFormat.parse(datum).getTime * 1000L + case _: TimestampType => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + DateTimeUtils.stringToTime(datum).getTime * 1000L + case _: DateType if options.dateFormat != null => + DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime) + case _: DateType => + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) + case _: StringType => UTF8String.fromString(datum) + case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index 26b33b24efc3d..42bb09c45a66d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -68,7 +68,31 @@ class CSVTypeCastSuite extends SparkFunSuite { } test("Nullable types are handled") { - assert(CSVTypeCast.castTo("", IntegerType, nullable = true, CSVOptions()) == null) + assertNull( + CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", BooleanType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", TimestampType, nullable = true, CSVOptions("nullValue", "-"))) + assertNull( + CSVTypeCast.castTo("-", DateType, nullable = true, CSVOptions("nullValue", "-"))) + + // special treatment for StringType + assert( + CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-")) === + UTF8String.fromString("-")) } test("String type should always return the same as the input") { @@ -165,20 +189,4 @@ class CSVTypeCastSuite extends SparkFunSuite { assert(doubleVal2 == Double.PositiveInfinity) } - test("Type-specific null values are used for casting") { - assertNull( - CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-"))) - assertNull( - CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-"))) - assertNull( - CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-"))) - assertNull( - CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-"))) - assertNull( - CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-"))) - assertNull( - CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-"))) - assertNull( - CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-"))) - } } From bf01cea8273f00386ceef6459f8b8fe2c169e12a Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Mon, 1 Aug 2016 11:15:59 +0800 Subject: [PATCH 2/5] StringType should also respect `nullValue` --- .../apache/spark/sql/DataFrameReader.scala | 3 ++- .../datasources/csv/CSVInferSchema.scala | 2 +- .../sql/streaming/DataStreamReader.scala | 3 ++- .../execution/datasources/csv/CSVSuite.scala | 2 +- .../datasources/csv/CSVTypeCastSuite.scala | 22 +++++++++++++------ 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e8c2885d7737c..6d0cf53419121 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -370,7 +370,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * from values being read should be skipped. *
  • `ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing * whitespaces from values being read should be skipped.
  • - *
  • `nullValue` (default empty string): sets the string representation of a null value.
  • + *
  • `nullValue` (default empty string): sets the string representation of a null value. Since + * 2.0.1, this applies to all supported types including the string type.
  • *
  • `nanValue` (default `NaN`): sets the string representation of a non-number" value.
  • *
  • `positiveInf` (default `Inf`): sets the string representation of a positive infinity * value.
  • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 01de8ed1d6192..d657b99e6a389 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -238,7 +238,7 @@ private[csv] object CSVTypeCast { nullable: Boolean = true, options: CSVOptions = CSVOptions()): Any = { - if (datum == options.nullValue && nullable && (!castType.isInstanceOf[StringType])) { + if (datum == options.nullValue && nullable) { null } else { castType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 248247a257d94..608ac32077143 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -218,7 +218,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * from values being read should be skipped. *
  • `ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing * whitespaces from values being read should be skipped.
  • - *
  • `nullValue` (default empty string): sets the string representation of a null value.
  • + *
  • `nullValue` (default empty string): sets the string representation of a null value. Since + * 2.0.1, this applies to all supported types including the string type.
  • *
  • `nanValue` (default `NaN`): sets the string representation of a non-number" value.
  • *
  • `positiveInf` (default `Inf`): sets the string representation of a positive infinity * value.
  • diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f170065132acd..d60f4e4a900d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -527,7 +527,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(cars, withHeader = true, checkValues = false) val results = cars.collect() - assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null")) + assert(results(0).toSeq === Array(2012, "Tesla", "S", null, null)) assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index 42bb09c45a66d..fa9ae9cf52d8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -88,20 +88,28 @@ class CSVTypeCastSuite extends SparkFunSuite { CSVTypeCast.castTo("-", TimestampType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( CSVTypeCast.castTo("-", DateType, nullable = true, CSVOptions("nullValue", "-"))) - - // special treatment for StringType - assert( - CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-")) === - UTF8String.fromString("-")) + assertNull( + CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-"))) } - test("String type should always return the same as the input") { + test("String type should also respect `nullValue`") { assert( CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) == - UTF8String.fromString("")) + null) assert( CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) == UTF8String.fromString("")) + + assert( + CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions("nullValue", "null")) == + UTF8String.fromString("")) + assert( + CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions("nullValue", "null")) == + UTF8String.fromString("")) + + assert( + CSVTypeCast.castTo(null, StringType, nullable = true, CSVOptions("nullValue", "null")) == + null) } test("Throws exception for empty string with non null type") { From f58e33d2c0815c9fd4bae2552609d8b2f3098e13 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Wed, 10 Aug 2016 11:07:13 +0800 Subject: [PATCH 3/5] Address comments --- .../execution/datasources/csv/CSVTypeCastSuite.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index fa9ae9cf52d8e..5ffa4bf88f024 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -93,9 +93,8 @@ class CSVTypeCastSuite extends SparkFunSuite { } test("String type should also respect `nullValue`") { - assert( - CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) == - null) + assertNull( + CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions())) assert( CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) == UTF8String.fromString("")) @@ -107,9 +106,8 @@ class CSVTypeCastSuite extends SparkFunSuite { CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions("nullValue", "null")) == UTF8String.fromString("")) - assert( - CSVTypeCast.castTo(null, StringType, nullable = true, CSVOptions("nullValue", "null")) == - null) + assertNull( + CSVTypeCast.castTo(null, StringType, nullable = true, CSVOptions("nullValue", "null"))) } test("Throws exception for empty string with non null type") { From 74b4dd8ff2f79faaf9df50c5a54e6298234137e7 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Fri, 19 Aug 2016 12:20:41 +0800 Subject: [PATCH 4/5] Fix python docs --- python/pyspark/sql/readwriter.py | 3 ++- python/pyspark/sql/streaming.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 78d992e415489..6a6983c288219 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -318,7 +318,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non being read should be skipped. If None is set, it uses the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses - the default value, empty string. + the default value, empty string. Since 2.0.1, this ``nullValue`` param + applies to all supported types including the string type. :param nanValue: sets the string representation of a non-number value. If None is set, it uses the default value, ``NaN``. :param positiveInf: sets the string representation of a positive infinity value. If None diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 8bac347e13084..ea4b760d39a6d 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -485,7 +485,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non being read should be skipped. If None is set, it uses the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses - the default value, empty string. + the default value, empty string. Since 2.0.1, this ``nullValue`` param + applies to all supported types including the string type. :param nanValue: sets the string representation of a non-number value. If None is set, it uses the default value, ``NaN``. :param positiveInf: sets the string representation of a positive infinity value. If None From 365cbfb02b58bc1992a635118ffba6b4e371cb06 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Fri, 16 Sep 2016 08:25:29 +0800 Subject: [PATCH 5/5] Address comments --- .../datasources/csv/CSVInferSchema.scala | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 0ef35edb0c97b..3ab775c909238 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -232,7 +232,7 @@ private[csv] object CSVTypeCast { nullable: Boolean = true, options: CSVOptions = CSVOptions()): Any = { - if (datum == options.nullValue && nullable) { + if (nullable && datum == options.nullValue) { null } else { castType match { @@ -241,26 +241,22 @@ private[csv] object CSVTypeCast { case _: IntegerType => datum.toInt case _: LongType => datum.toLong case _: FloatType => - if (datum == options.nanValue) { - Float.NaN - } else if (datum == options.negativeInf) { - Float.NegativeInfinity - } else if (datum == options.positiveInf) { - Float.PositiveInfinity - } else { - Try(datum.toFloat) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) + datum match { + case options.nanValue => Float.NaN + case options.negativeInf => Float.NegativeInfinity + case options.positiveInf => Float.PositiveInfinity + case _ => + Try(datum.toFloat) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) } case _: DoubleType => - if (datum == options.nanValue) { - Double.NaN - } else if (datum == options.negativeInf) { - Double.NegativeInfinity - } else if (datum == options.positiveInf) { - Double.PositiveInfinity - } else { - Try(datum.toDouble) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) + datum match { + case options.nanValue => Double.NaN + case options.negativeInf => Double.NegativeInfinity + case options.positiveInf => Double.PositiveInfinity + case _ => + Try(datum.toDouble) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) } case _: BooleanType => datum.toBoolean case dt: DecimalType =>