Skip to content

Commit a930445

Browse files
sadikovicloud-fan
authored andcommitted
[SPARK-39731][SQL] Fix issue in CSV and JSON data sources when parsing dates in "yyyyMMdd" format with CORRECTED time parser policy
### What changes were proposed in this pull request? This PR fixes a correctness issue when reading a CSV or a JSON file with dates in "yyyyMMdd" format: ``` name,mydate 1,2020011 2,20201203 ``` or ``` {"date": "2020011"} {"date": "20201203"} ``` Prior to #32959, reading this CSV file would return: ``` +----+--------------+ |name|mydate | +----+--------------+ |1 |null | |2 |2020-12-03 | +----+--------------+ ``` However, after the patch, the invalid date is parsed because of the much more lenient parsing in `DateTimeUtils.stringToDate`, the method treats `2020011` as a full year: ``` +----+--------------+ |name|mydate | +----+--------------+ |1 |+2020011-01-01| |2 |2020-12-03 | +----+--------------+ ``` Similar result would be observed in JSON. This PR attempts to address correctness issue by introducing a new configuration option `enableDateTimeParsingFallback` which allows to enable/disable the backward compatible parsing. Currently, by default we will fall back to the backward compatible behavior only if parser policy is legacy and no custom pattern was set (this is defined in `UnivocityParser` and `JacksonParser` for csv and json respectively). ### Why are the changes needed? Fixes a correctness issue in Spark 3.4. ### Does this PR introduce _any_ user-facing change? In order to avoid correctness issues when reading CSV or JSON files with a custom pattern, a new configuration option `enableDateTimeParsingFallback` has been added to control whether or not the code would fall back to the backward compatible behavior of parsing dates and timestamps in CSV and JSON data sources. - If the config is enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. - If the config is enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. - Otherwise, depending on the parser policy and a custom pattern, the value will be parsed as null. ### How was this patch tested? I added unit tests for CSV and JSON to verify the fix and the config option. Closes #37147 from sadikovi/fix-csv-date-inference. Authored-by: Ivan Sadikov <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0e33195 commit a930445

File tree

9 files changed

+248
-17
lines changed

9 files changed

+248
-17
lines changed

docs/sql-data-sources-csv.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ Data source options of CSV can be set via:
174174
<td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td>
175175
<td>read/write</td>
176176
</tr>
177+
<tr>
178+
<td><code>enableDateTimeParsingFallback</code></td>
179+
<td>Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided</td>
180+
<td>Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns.</td>
181+
<td>read</td>
182+
</tr>
177183
<tr>
178184
<td><code>maxColumns</code></td>
179185
<td>20480</td>

docs/sql-data-sources-json.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@ Data source options of JSON can be set via:
202202
<td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td>
203203
<td>read/write</td>
204204
</tr>
205+
<tr>
206+
<td><code>enableDateTimeParsingFallback</code></td>
207+
<td>Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided</td>
208+
<td>Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns.</td>
209+
<td>read</td>
210+
</tr>
205211
<tr>
206212
<td><code>multiLine</code></td>
207213
<td><code>false</code></td>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,17 @@ class CSVOptions(
190190
val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat",
191191
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")
192192

193+
// SPARK-39731: Enables the backward compatible parsing behavior.
194+
// Generally, this config should be set to false to avoid producing potentially incorrect results
195+
// which is the current default (see UnivocityParser).
196+
//
197+
// If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`.
198+
// If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used.
199+
// Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and
200+
// the value will be parsed as null.
201+
val enableDateTimeParsingFallback: Option[Boolean] =
202+
parameters.get("enableDateTimeParsingFallback").map(_.toBoolean)
203+
193204
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
194205

195206
val maxColumns = getInt("maxColumns", 20480)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,20 @@ class UnivocityParser(
120120
new NoopFilters
121121
}
122122

123+
// Flags to signal if we need to fall back to the backward compatible behavior of parsing
124+
// dates and timestamps.
125+
// For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions.
126+
private val enableParsingFallbackForTimestampType =
127+
options.enableDateTimeParsingFallback.getOrElse {
128+
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
129+
options.timestampFormatInRead.isEmpty
130+
}
131+
private val enableParsingFallbackForDateType =
132+
options.enableDateTimeParsingFallback.getOrElse {
133+
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
134+
options.dateFormatInRead.isEmpty
135+
}
136+
123137
// Retrieve the raw record string.
124138
private def getCurrentInput: UTF8String = {
125139
val currentContent = tokenizer.getContext.currentParsedContent()
@@ -205,7 +219,10 @@ class UnivocityParser(
205219
} catch {
206220
case NonFatal(e) =>
207221
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
208-
// compatibility.
222+
// compatibility if enabled.
223+
if (!enableParsingFallbackForTimestampType) {
224+
throw e
225+
}
209226
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
210227
DateTimeUtils.stringToDate(str).getOrElse(throw e)
211228
}
@@ -217,16 +234,17 @@ class UnivocityParser(
217234
timestampFormatter.parse(datum)
218235
} catch {
219236
case NonFatal(e) =>
220-
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
221-
// compatibility.
222-
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
223-
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse {
224-
// There may be date type entries in timestamp column due to schema inference
225-
if (options.inferDate) {
226-
daysToMicros(dateFormatter.parse(datum), options.zoneId)
227-
} else {
228-
throw(e)
237+
// There may be date type entries in timestamp column due to schema inference
238+
if (options.inferDate) {
239+
daysToMicros(dateFormatter.parse(datum), options.zoneId)
240+
} else {
241+
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
242+
// compatibility if enabled.
243+
if (!enableParsingFallbackForDateType) {
244+
throw e
229245
}
246+
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
247+
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e))
230248
}
231249
}
232250
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,17 @@ private[sql] class JSONOptions(
111111
val timestampNTZFormatInWrite: String =
112112
parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")
113113

114+
// SPARK-39731: Enables the backward compatible parsing behavior.
115+
// Generally, this config should be set to false to avoid producing potentially incorrect results
116+
// which is the current default (see JacksonParser).
117+
//
118+
// If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`.
119+
// If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used.
120+
// Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and
121+
// the value will be parsed as null.
122+
val enableDateTimeParsingFallback: Option[Boolean] =
123+
parameters.get("enableDateTimeParsingFallback").map(_.toBoolean)
124+
114125
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
115126

116127
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,20 @@ class JacksonParser(
7878
legacyFormat = FAST_DATE_FORMAT,
7979
isParsing = true)
8080

81+
// Flags to signal if we need to fall back to the backward compatible behavior of parsing
82+
// dates and timestamps.
83+
// For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions.
84+
private val enableParsingFallbackForTimestampType =
85+
options.enableDateTimeParsingFallback.getOrElse {
86+
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
87+
options.timestampFormatInRead.isEmpty
88+
}
89+
private val enableParsingFallbackForDateType =
90+
options.enableDateTimeParsingFallback.getOrElse {
91+
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
92+
options.dateFormatInRead.isEmpty
93+
}
94+
8195
/**
8296
* Create a converter which converts the JSON documents held by the `JsonParser`
8397
* to a value according to a desired schema. This is a wrapper for the method
@@ -257,7 +271,10 @@ class JacksonParser(
257271
} catch {
258272
case NonFatal(e) =>
259273
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
260-
// compatibility.
274+
// compatibility if enabled.
275+
if (!enableParsingFallbackForTimestampType) {
276+
throw e
277+
}
261278
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
262279
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
263280
}
@@ -280,7 +297,10 @@ class JacksonParser(
280297
} catch {
281298
case NonFatal(e) =>
282299
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
283-
// compatibility.
300+
// compatibility if enabled.
301+
if (!enableParsingFallbackForDateType) {
302+
throw e
303+
}
284304
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
285305
DateTimeUtils.stringToDate(str).getOrElse {
286306
// In Spark 1.5.0, we store the data as number of days since epoch in string.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,22 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
355355
val options = new CSVOptions(Map.empty[String, String], false, "UTC")
356356
check(new UnivocityParser(StructType(Seq.empty), options))
357357

358-
val optionsWithPattern = new CSVOptions(
359-
Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC")
360-
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))
358+
def optionsWithPattern(enableFallback: Boolean): CSVOptions = new CSVOptions(
359+
Map(
360+
"timestampFormat" -> "invalid",
361+
"dateFormat" -> "invalid",
362+
"enableDateTimeParsingFallback" -> s"$enableFallback"),
363+
false,
364+
"UTC")
365+
366+
// With fallback enabled, we are still able to parse dates and timestamps.
367+
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(true)))
368+
369+
// With legacy parser disabled, parsing results in error.
370+
val err = intercept[IllegalArgumentException] {
371+
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(false)))
372+
}
373+
assert(err.getMessage.contains("Illegal pattern character: n"))
361374
}
362375

363376
test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
3636
import org.apache.hadoop.io.compress.GzipCodec
3737
import org.apache.logging.log4j.Level
3838

39-
import org.apache.spark.{SparkConf, SparkException, TestUtils}
39+
import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils}
4040
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row}
4141
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
4242
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
@@ -2837,7 +2837,80 @@ abstract class CSVSuite
28372837
)
28382838
assert(results.collect().toSeq.map(_.toSeq) == expected)
28392839
}
2840+
}
2841+
}
2842+
2843+
test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") {
2844+
withTempPath { path =>
2845+
Seq(
2846+
"1,2020011,2020011",
2847+
"2,20201203,20201203").toDF()
2848+
.repartition(1)
2849+
.write.text(path.getAbsolutePath)
2850+
val schema = new StructType()
2851+
.add("id", IntegerType)
2852+
.add("date", DateType)
2853+
.add("ts", TimestampType)
2854+
val output = spark.read
2855+
.schema(schema)
2856+
.option("dateFormat", "yyyyMMdd")
2857+
.option("timestampFormat", "yyyyMMdd")
2858+
.csv(path.getAbsolutePath)
28402859

2860+
def check(mode: String, res: Seq[Row]): Unit = {
2861+
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) {
2862+
checkAnswer(output, res)
2863+
}
2864+
}
2865+
2866+
check(
2867+
"legacy",
2868+
Seq(
2869+
Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")),
2870+
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
2871+
)
2872+
)
2873+
2874+
check(
2875+
"corrected",
2876+
Seq(
2877+
Row(1, null, null),
2878+
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
2879+
)
2880+
)
2881+
2882+
val err = intercept[SparkException] {
2883+
check("exception", Nil)
2884+
}.getCause
2885+
assert(err.isInstanceOf[SparkUpgradeException])
2886+
}
2887+
}
2888+
2889+
test("SPARK-39731: Handle date and timestamp parsing fallback") {
2890+
withTempPath { path =>
2891+
Seq("2020-01-01,2020-01-01").toDF()
2892+
.repartition(1)
2893+
.write.text(path.getAbsolutePath)
2894+
val schema = new StructType()
2895+
.add("date", DateType)
2896+
.add("ts", TimestampType)
2897+
2898+
def output(enableFallback: Boolean): DataFrame = spark.read
2899+
.schema(schema)
2900+
.option("dateFormat", "invalid")
2901+
.option("timestampFormat", "invalid")
2902+
.option("enableDateTimeParsingFallback", enableFallback)
2903+
.csv(path.getAbsolutePath)
2904+
2905+
checkAnswer(
2906+
output(enableFallback = true),
2907+
Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")))
2908+
)
2909+
2910+
checkAnswer(
2911+
output(enableFallback = false),
2912+
Seq(Row(null, null))
2913+
)
28412914
}
28422915
}
28432916
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
2929
import org.apache.hadoop.io.SequenceFile.CompressionType
3030
import org.apache.hadoop.io.compress.GzipCodec
3131

32-
import org.apache.spark.{SparkConf, SparkException, TestUtils}
32+
import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils}
3333
import org.apache.spark.rdd.RDD
3434
import org.apache.spark.sql.{functions => F, _}
3535
import org.apache.spark.sql.catalyst.json._
@@ -3249,6 +3249,79 @@ abstract class JsonSuite
32493249
}
32503250
}
32513251
}
3252+
3253+
test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") {
3254+
withTempPath { path =>
3255+
Seq(
3256+
"""{"date": "2020011", "ts": "2020011"}""",
3257+
"""{"date": "20201203", "ts": "20201203"}""").toDF()
3258+
.repartition(1)
3259+
.write.text(path.getAbsolutePath)
3260+
val schema = new StructType()
3261+
.add("date", DateType)
3262+
.add("ts", TimestampType)
3263+
val output = spark.read
3264+
.schema(schema)
3265+
.option("dateFormat", "yyyyMMdd")
3266+
.option("timestampFormat", "yyyyMMdd")
3267+
.json(path.getAbsolutePath)
3268+
3269+
def check(mode: String, res: Seq[Row]): Unit = {
3270+
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) {
3271+
checkAnswer(output, res)
3272+
}
3273+
}
3274+
3275+
check(
3276+
"legacy",
3277+
Seq(
3278+
Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")),
3279+
Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
3280+
)
3281+
)
3282+
3283+
check(
3284+
"corrected",
3285+
Seq(
3286+
Row(null, null),
3287+
Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
3288+
)
3289+
)
3290+
3291+
val err = intercept[SparkException] {
3292+
check("exception", Nil)
3293+
}.getCause
3294+
assert(err.isInstanceOf[SparkUpgradeException])
3295+
}
3296+
}
3297+
3298+
test("SPARK-39731: Handle date and timestamp parsing fallback") {
3299+
withTempPath { path =>
3300+
Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF()
3301+
.repartition(1)
3302+
.write.text(path.getAbsolutePath)
3303+
val schema = new StructType()
3304+
.add("date", DateType)
3305+
.add("ts", TimestampType)
3306+
3307+
def output(enableFallback: Boolean): DataFrame = spark.read
3308+
.schema(schema)
3309+
.option("dateFormat", "invalid")
3310+
.option("timestampFormat", "invalid")
3311+
.option("enableDateTimeParsingFallback", enableFallback)
3312+
.json(path.getAbsolutePath)
3313+
3314+
checkAnswer(
3315+
output(enableFallback = true),
3316+
Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")))
3317+
)
3318+
3319+
checkAnswer(
3320+
output(enableFallback = false),
3321+
Seq(Row(null, null))
3322+
)
3323+
}
3324+
}
32523325
}
32533326

32543327
class JsonV1Suite extends JsonSuite {

0 commit comments

Comments
 (0)