Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sql-data-sources-csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ Data source options of CSV can be set via:
<td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>enableDateTimeParsingFallback</code></td>
<td>Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided</td>
<td>Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns.</td>
<td>read</td>
</tr>
<tr>
<td><code>maxColumns</code></td>
<td>20480</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-data-sources-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ Data source options of JSON can be set via:
<td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>enableDateTimeParsingFallback</code></td>
<td>Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided</td>
<td>Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns.</td>
<td>read</td>
</tr>
<tr>
<td><code>multiLine</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ class CSVOptions(
val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat",
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")

// SPARK-39731: Enables the backward compatible parsing behavior.
// Generally, this config should be set to false to avoid producing potentially incorrect results
// which is the current default (see UnivocityParser).
//
// If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`.
// If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used.
// Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and
// the value will be parsed as null.
val enableDateTimeParsingFallback: Option[Boolean] =
parameters.get("enableDateTimeParsingFallback").map(_.toBoolean)

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

val maxColumns = getInt("maxColumns", 20480)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ class UnivocityParser(
new NoopFilters
}

// Flags to signal if we need to fall back to the backward compatible behavior of parsing
// dates and timestamps.
// For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions.
private val enableParsingFallbackForTimestampType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.timestampFormatInRead.isEmpty
}
private val enableParsingFallbackForDateType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
}

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
val currentContent = tokenizer.getContext.currentParsedContent()
Expand Down Expand Up @@ -205,7 +219,10 @@ class UnivocityParser(
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToDate(str).getOrElse(throw e)
}
Expand All @@ -217,16 +234,17 @@ class UnivocityParser(
timestampFormatter.parse(datum)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse {
// There may be date type entries in timestamp column due to schema inference
if (options.inferDate) {
daysToMicros(dateFormatter.parse(datum), options.zoneId)
} else {
throw(e)
// There may be date type entries in timestamp column due to schema inference
if (options.inferDate) {
daysToMicros(dateFormatter.parse(datum), options.zoneId)
} else {
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForDateType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ private[sql] class JSONOptions(
val timestampNTZFormatInWrite: String =
parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")

// SPARK-39731: Enables the backward compatible parsing behavior.
// Generally, this config should be set to false to avoid producing potentially incorrect results
// which is the current default (see JacksonParser).
//
// If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`.
// If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used.
// Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and
// the value will be parsed as null.
val enableDateTimeParsingFallback: Option[Boolean] =
parameters.get("enableDateTimeParsingFallback").map(_.toBoolean)

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ class JacksonParser(
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)

// Flags to signal if we need to fall back to the backward compatible behavior of parsing
// dates and timestamps.
// For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions.
private val enableParsingFallbackForTimestampType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.timestampFormatInRead.isEmpty
}
private val enableParsingFallbackForDateType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
}

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -257,7 +271,10 @@ class JacksonParser(
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
}
Expand All @@ -280,7 +297,10 @@ class JacksonParser(
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
// compatibility if enabled.
if (!enableParsingFallbackForDateType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToDate(str).getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,22 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
val options = new CSVOptions(Map.empty[String, String], false, "UTC")
check(new UnivocityParser(StructType(Seq.empty), options))

val optionsWithPattern = new CSVOptions(
Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC")
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))
def optionsWithPattern(enableFallback: Boolean): CSVOptions = new CSVOptions(
Map(
"timestampFormat" -> "invalid",
"dateFormat" -> "invalid",
"enableDateTimeParsingFallback" -> s"$enableFallback"),
false,
"UTC")

// With fallback enabled, we are still able to parse dates and timestamps.
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(true)))

// With legacy parser disabled, parsing results in error.
val err = intercept[IllegalArgumentException] {
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(false)))
}
assert(err.getMessage.contains("Illegal pattern character: n"))
}

test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.logging.log4j.Level

import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
Expand Down Expand Up @@ -2837,7 +2837,80 @@ abstract class CSVSuite
)
assert(results.collect().toSeq.map(_.toSeq) == expected)
}
}
}

test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") {
withTempPath { path =>
Seq(
"1,2020011,2020011",
"2,20201203,20201203").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType()
.add("id", IntegerType)
.add("date", DateType)
.add("ts", TimestampType)
val output = spark.read
.schema(schema)
.option("dateFormat", "yyyyMMdd")
.option("timestampFormat", "yyyyMMdd")
.csv(path.getAbsolutePath)

def check(mode: String, res: Seq[Row]): Unit = {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) {
checkAnswer(output, res)
}
}

check(
"legacy",
Seq(
Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")),
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
)

check(
"corrected",
Seq(
Row(1, null, null),
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
Comment on lines +2866 to +2879
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, would you consider adding a check for LEGACY_TIME_PARSER_POLICY = EXCEPTION? Similar to the following?

val msg = intercept[SparkException] {
csv.collect()
}.getCause.getMessage
assert(msg.contains("Fail to parse"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

)

val err = intercept[SparkException] {
check("exception", Nil)
}.getCause
assert(err.isInstanceOf[SparkUpgradeException])
}
}

test("SPARK-39731: Handle date and timestamp parsing fallback") {
withTempPath { path =>
Seq("2020-01-01,2020-01-01").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType()
.add("date", DateType)
.add("ts", TimestampType)

def output(enableFallback: Boolean): DataFrame = spark.read
.schema(schema)
.option("dateFormat", "invalid")
.option("timestampFormat", "invalid")
.option("enableDateTimeParsingFallback", enableFallback)
.csv(path.getAbsolutePath)

checkAnswer(
output(enableFallback = true),
Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")))
)

checkAnswer(
output(enableFallback = false),
Seq(Row(null, null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I'm a bit confused. Why date parsing fails? 2020-01-01 is a valid date string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, because the format pattern is given but invalid.

)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
Expand Down Expand Up @@ -3249,6 +3249,79 @@ abstract class JsonSuite
}
}
}

test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") {
withTempPath { path =>
Seq(
"""{"date": "2020011", "ts": "2020011"}""",
"""{"date": "20201203", "ts": "20201203"}""").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType()
.add("date", DateType)
.add("ts", TimestampType)
val output = spark.read
.schema(schema)
.option("dateFormat", "yyyyMMdd")
.option("timestampFormat", "yyyyMMdd")
.json(path.getAbsolutePath)

def check(mode: String, res: Seq[Row]): Unit = {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) {
checkAnswer(output, res)
}
}

check(
"legacy",
Seq(
Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")),
Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
)

check(
"corrected",
Seq(
Row(null, null),
Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
)

val err = intercept[SparkException] {
check("exception", Nil)
}.getCause
assert(err.isInstanceOf[SparkUpgradeException])
}
}

test("SPARK-39731: Handle date and timestamp parsing fallback") {
withTempPath { path =>
Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType()
.add("date", DateType)
.add("ts", TimestampType)

def output(enableFallback: Boolean): DataFrame = spark.read
.schema(schema)
.option("dateFormat", "invalid")
.option("timestampFormat", "invalid")
.option("enableDateTimeParsingFallback", enableFallback)
.json(path.getAbsolutePath)

checkAnswer(
output(enableFallback = true),
Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")))
)

checkAnswer(
output(enableFallback = false),
Seq(Row(null, null))
)
}
}
}

class JsonV1Suite extends JsonSuite {
Expand Down