Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
the default value, empty string. Since 2.0.1, this ``nullValue`` param
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can omit the "since x.y.z" in this PR. The new text will be in the docs for the version it applies to and not earlier ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This patch introduces a behavior change, i.e. how we deal with nullValue for the string type. So let's keep the "since x.y.z" thing for people to find a clue?

applies to all supported types including the string type.
:param nanValue: sets the string representation of a non-number value. If None is set, it
uses the default value, ``NaN``.
:param positiveInf: sets the string representation of a positive infinity value. If None
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
the default value, empty string. Since 2.0.1, this ``nullValue`` param
applies to all supported types including the string type.
:param nanValue: sets the string representation of a non-number value. If None is set, it
uses the default value, ``NaN``.
:param positiveInf: sets the string representation of a positive infinity value. If None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* from values being read should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
Copy link
Member

@HyukjinKwon HyukjinKwon Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there would be the same documentation in readwriter.py. I guess we should fix them too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And streaming.py as well if I remember correctly.

Copy link
Contributor Author

@lw-lin lw-lin Aug 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh thanks! Indeed there are two occurrences (one in readwriter.py / one in streaming.py) needs fixing. I'll fix them.

* 2.0.1, this applies to all supported types including the string type.</li>
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
* value.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,66 +232,58 @@ private[csv] object CSVTypeCast {
nullable: Boolean = true,
options: CSVOptions = CSVOptions()): Any = {

castType match {
case _: ByteType => if (datum == options.nullValue && nullable) null else datum.toByte
case _: ShortType => if (datum == options.nullValue && nullable) null else datum.toShort
case _: IntegerType => if (datum == options.nullValue && nullable) null else datum.toInt
case _: LongType => if (datum == options.nullValue && nullable) null else datum.toLong
case _: FloatType =>
if (datum == options.nullValue && nullable) {
null
} else if (datum == options.nanValue) {
Float.NaN
} else if (datum == options.negativeInf) {
Float.NegativeInfinity
} else if (datum == options.positiveInf) {
Float.PositiveInfinity
} else {
Try(datum.toFloat)
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
}
case _: DoubleType =>
if (datum == options.nullValue && nullable) {
null
} else if (datum == options.nanValue) {
Double.NaN
} else if (datum == options.negativeInf) {
Double.NegativeInfinity
} else if (datum == options.positiveInf) {
Double.PositiveInfinity
} else {
Try(datum.toDouble)
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
}
case _: BooleanType => datum.toBoolean
case dt: DecimalType =>
if (datum == options.nullValue && nullable) {
null
} else {
val value = new BigDecimal(datum.replaceAll(",", ""))
Decimal(value, dt.precision, dt.scale)
}
case _: TimestampType =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Try(options.timestampFormat.parse(datum).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(datum).getTime * 1000L
if (nullable && datum == options.nullValue) {
null
} else {
castType match {
case _: ByteType => datum.toByte
case _: ShortType => datum.toShort
case _: IntegerType => datum.toInt
case _: LongType => datum.toLong
case _: FloatType =>
datum match {
case options.nanValue => Float.NaN
case options.negativeInf => Float.NegativeInfinity
case options.positiveInf => Float.PositiveInfinity
case _ =>
Try(datum.toFloat)
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
}
case _: DateType =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
case _: DoubleType =>
datum match {
case options.nanValue => Double.NaN
case options.negativeInf => Double.NegativeInfinity
case options.positiveInf => Double.PositiveInfinity
case _ =>
Try(datum.toDouble)
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
}
case _: StringType => UTF8String.fromString(datum)
case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
case _: BooleanType => datum.toBoolean
case dt: DecimalType =>
val value = new BigDecimal(datum.replaceAll(",", ""))
Decimal(value, dt.precision, dt.scale)
case _: TimestampType =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Try(options.timestampFormat.parse(datum).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(datum).getTime * 1000L
}
case _: DateType =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
}
case _: StringType => UTF8String.fromString(datum)
case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* from values being read should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
* 2.0.1, this applies to all supported types including the string type.</li>
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
* value.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

verifyCars(cars, withHeader = true, checkValues = false)
val results = cars.collect()
assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null"))
assert(results(0).toSeq === Array(2012, "Tesla", "S", null, null))
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,46 @@ class CSVTypeCastSuite extends SparkFunSuite {
}

test("Nullable types are handled") {
assert(CSVTypeCast.castTo("", IntegerType, nullable = true, CSVOptions()) == null)
assertNull(
CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", BooleanType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", TimestampType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", DateType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", StringType, nullable = true, CSVOptions("nullValue", "-")))
}

test("String type should always return the same as the input") {
test("String type should also respect `nullValue`") {
assertNull(
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()))
assert(
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) ==
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) ==
UTF8String.fromString(""))

assert(
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) ==
CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions("nullValue", "null")) ==
UTF8String.fromString(""))
assert(
CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions("nullValue", "null")) ==
UTF8String.fromString(""))

assertNull(
CSVTypeCast.castTo(null, StringType, nullable = true, CSVOptions("nullValue", "null")))
}

test("Throws exception for empty string with non null type") {
Expand Down Expand Up @@ -170,20 +200,4 @@ class CSVTypeCastSuite extends SparkFunSuite {
assert(doubleVal2 == Double.PositiveInfinity)
}

test("Type-specific null values are used for casting") {
assertNull(
CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-")))
assertNull(
CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-")))
}
}