From 1193ce78d3efcbe1395305b4b7deb0a195fa09d9 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 11 Jul 2022 13:11:30 +1200 Subject: [PATCH 01/16] fix issue --- .../sql/catalyst/csv/UnivocityParser.scala | 12 +++++- .../execution/datasources/csv/CSVSuite.scala | 41 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 56ebfcc26c63..e048dccd8a8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -204,7 +204,11 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility only if no custom pattern has been set. If there is a custom pattern, + // fail since it may be different from the default pattern. + if (options.timestampFormatInRead.isDefined) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) } @@ -222,7 +226,11 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility only if no custom pattern has been set. If there is a custom pattern, + // fail since it may be different from the default pattern. + if (options.dateFormatInRead.isDefined) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) DateTimeUtils.stringToDate(str).getOrElse(throw e) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index bf92ffcf4651..66640ad80691 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2788,6 +2788,47 @@ abstract class CSVSuite } } } + + test("SPARK-39731: Correctly parse dates with yyyyMMdd pattern") { + withTempPath { path => + Seq( + "1,2020011,2020011", + "2,20201203,20201203").toDF("data") + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("id", IntegerType) + .add("date", DateType) + .add("ts", TimestampType) + val output = spark.read + .schema(schema) + .option("dateFormat", "yyyyMMdd") + .option("timestampFormat", "yyyyMMdd") + .csv(path.getAbsolutePath) + + def check(mode: String, res: Seq[Row]): Unit = { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) { + checkAnswer(output, res) + } + } + + check( + "legacy", + Seq( + Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + check( + "corrected", + Seq( + Row(1, null, null), + Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + } + } } class CSVv1Suite extends CSVSuite { From b714b7fdf29ff77920e2b10acad806548477bca9 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Tue, 12 Jul 2022 18:54:25 +1200 Subject: [PATCH 02/16] update code --- .../sql/catalyst/csv/UnivocityParser.scala | 22 ++++++++++++++----- .../catalyst/csv/UnivocityParserSuite.scala | 12 +++++++++- .../execution/datasources/csv/CSVSuite.scala | 7 +++++- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index e048dccd8a8a..b073c6c3b9fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -119,6 +119,12 @@ class UnivocityParser( new NoopFilters } + // Flag is needed to distinguish parsing mode when inferring timestamp and date types. + // For more information, see the comments for TimestampType and DateType converter functions. + // Available for testing. + val isLegacyParserPolicy = + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY + // Retrieve the raw record string. private def getCurrentInput: UTF8String = { val currentContent = tokenizer.getContext.currentParsedContent() @@ -204,9 +210,11 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set. If there is a custom pattern, - // fail since it may be different from the default pattern. - if (options.timestampFormatInRead.isDefined) { + // compatibility only if no custom pattern has been set. + // + // If a custom pattern was provided and parser policy is not legacy, throw exception + // without applying legacy behavior to avoid producing incorrect results. + if (!isLegacyParserPolicy && options.timestampFormatInRead.isDefined) { throw e } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) @@ -226,9 +234,11 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set. If there is a custom pattern, - // fail since it may be different from the default pattern. - if (options.dateFormatInRead.isDefined) { + // compatibility only if no custom pattern has been set. + // + // If a custom pattern was provided and parser policy is not legacy, throw exception + // without applying legacy behavior to avoid producing incorrect results. + if (!isLegacyParserPolicy && options.dateFormatInRead.isDefined) { throw e } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 4166401d040f..2aa7f4c2fa0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -356,6 +356,16 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val optionsWithPattern = new CSVOptions( Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") - check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) + + // With legacy parser enabled, we are still able to parse dates and timestamps. + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern) { + override val isLegacyParserPolicy: Boolean = true + }) + + // With legacy parser disabled, parsing results in error. + val err = intercept[IllegalArgumentException] { + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) + } + assert(err.getMessage.contains("Illegal pattern character: n")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 66640ad80691..e5c99a185e4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.logging.log4j.Level -import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row} import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite @@ -2827,6 +2827,11 @@ abstract class CSVSuite Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) ) ) + + val err = intercept[SparkException] { + check("exception", Nil) + }.getCause + assert(err.isInstanceOf[SparkUpgradeException]) } } } From 8a10a6898de48da7bbb5288fe70e00e924842e31 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 14 Jul 2022 18:23:04 +1200 Subject: [PATCH 03/16] fix issue in json --- .../sql/catalyst/json/JacksonParser.scala | 18 +++++++ .../execution/datasources/csv/CSVSuite.scala | 4 +- .../datasources/json/JsonSuite.scala | 47 ++++++++++++++++++- 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 7004d2a8f162..76a9f1a6d78c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -78,6 +78,12 @@ class JacksonParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) + // Flag is needed to distinguish parsing mode when inferring timestamp and date types. + // For more information, see the comments for TimestampType and DateType converter functions. + // Available for testing. + val isLegacyParserPolicy = + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -258,6 +264,12 @@ class JacksonParser( case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. + // + // If a custom pattern was provided and parser policy is not legacy, throw exception + // without applying legacy behavior to avoid producing incorrect results. + if (!isLegacyParserPolicy && options.timestampFormatInRead.isDefined) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) } @@ -281,6 +293,12 @@ class JacksonParser( case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. + // + // If a custom pattern was provided and parser policy is not legacy, throw exception + // without applying legacy behavior to avoid producing incorrect results. + if (!isLegacyParserPolicy && options.dateFormatInRead.isDefined) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) DateTimeUtils.stringToDate(str).getOrElse { // In Spark 1.5.0, we store the data as number of days since epoch in string. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e5c99a185e4a..90918d9d34fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2789,11 +2789,11 @@ abstract class CSVSuite } } - test("SPARK-39731: Correctly parse dates with yyyyMMdd pattern") { + test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") { withTempPath { path => Seq( "1,2020011,2020011", - "2,20201203,20201203").toDF("data") + "2,20201203,20201203").toDF() .repartition(1) .write.text(path.getAbsolutePath) val schema = new StructType() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3fe9c58c957c..b2d0544dd72a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ @@ -3249,6 +3249,51 @@ abstract class JsonSuite } } } + + test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") { + withTempPath { path => + Seq( + """{"date": "2020011", "ts": "2020011"}""", + """{"date": "20201203", "ts": "20201203"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + val output = spark.read + .schema(schema) + .option("dateFormat", "yyyyMMdd") + .option("timestampFormat", "yyyyMMdd") + .json(path.getAbsolutePath) + + def check(mode: String, res: Seq[Row]): Unit = { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) { + checkAnswer(output, res) + } + } + + check( + "legacy", + Seq( + Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + check( + "corrected", + Seq( + Row(null, null), + Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + val err = intercept[SparkException] { + check("exception", Nil) + }.getCause + assert(err.isInstanceOf[SparkUpgradeException]) + } + } } class JsonV1Suite extends JsonSuite { From 9b65761b347dff878d8b70d3eef117d68294882c Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 14 Jul 2022 18:29:21 +1200 Subject: [PATCH 04/16] update code --- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 76a9f1a6d78c..638f0d20edf1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -263,7 +263,7 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility only if no custom pattern has been set. // // If a custom pattern was provided and parser policy is not legacy, throw exception // without applying legacy behavior to avoid producing incorrect results. @@ -292,7 +292,7 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility only if no custom pattern has been set. // // If a custom pattern was provided and parser policy is not legacy, throw exception // without applying legacy behavior to avoid producing incorrect results. From 45011a06afbd5af19bdfb6e328dcfc1f764a1194 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 18 Jul 2022 12:17:11 +1200 Subject: [PATCH 05/16] add a config option to control legacy behavior --- .../spark/sql/catalyst/csv/CSVOptions.scala | 10 ++++++ .../sql/catalyst/csv/UnivocityParser.scala | 32 ++++++++++--------- .../catalyst/csv/UnivocityParserSuite.scala | 17 ++++++---- .../execution/datasources/csv/CSVSuite.scala | 28 ++++++++++++++++ 4 files changed, 65 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 3e92c3d25eb4..9555c507c440 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -169,6 +169,16 @@ class CSVOptions( val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + // SPARK-39731: Enables the backward compatible parsing behavior. + // Generally, this config should be set to false to avoid producing potentially incorrect results + // which is the current default (see UnivocityParser). + // + // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. + // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown. + val enableDateTimeParsingFallback: Option[Boolean] = + parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) val maxColumns = getInt("maxColumns", 20480) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index b073c6c3b9fc..05db7ce7e506 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -119,11 +119,19 @@ class UnivocityParser( new NoopFilters } - // Flag is needed to distinguish parsing mode when inferring timestamp and date types. - // For more information, see the comments for TimestampType and DateType converter functions. - // Available for testing. - val isLegacyParserPolicy = - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY + // Flags to signal if we need to fall back to the backward compatible behavior of parsing + // dates and timestamps. + // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. + private val enableParsingFallbackForDateType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } + private val enableParsingFallbackForTimestampType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } // Retrieve the raw record string. private def getCurrentInput: UTF8String = { @@ -210,11 +218,8 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set. - // - // If a custom pattern was provided and parser policy is not legacy, throw exception - // without applying legacy behavior to avoid producing incorrect results. - if (!isLegacyParserPolicy && options.timestampFormatInRead.isDefined) { + // compatibility only if no custom pattern has been set or parser policy is legacy. + if (!enableParsingFallbackForTimestampType) { throw e } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) @@ -234,11 +239,8 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set. - // - // If a custom pattern was provided and parser policy is not legacy, throw exception - // without applying legacy behavior to avoid producing incorrect results. - if (!isLegacyParserPolicy && options.dateFormatInRead.isDefined) { + // compatibility only if no custom pattern has been set or parser policy is legacy. + if (!enableParsingFallbackForDateType) { throw e } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 2aa7f4c2fa0b..460ea82bc148 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -354,17 +354,20 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map.empty[String, String], false, "UTC") check(new UnivocityParser(StructType(Seq.empty), options)) - val optionsWithPattern = new CSVOptions( - Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") + def optionsWithPattern(enableFallback: Boolean) = new CSVOptions( + Map( + "timestampFormat" -> "invalid", + "dateFormat" -> "invalid", + "enableDateTimeParsingFallback" -> s"$enableFallback"), + false, + "UTC") - // With legacy parser enabled, we are still able to parse dates and timestamps. - check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern) { - override val isLegacyParserPolicy: Boolean = true - }) + // With fallback enabled, we are still able to parse dates and timestamps. + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(true))) // With legacy parser disabled, parsing results in error. val err = intercept[IllegalArgumentException] { - check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(false))) } assert(err.getMessage.contains("Illegal pattern character: n")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 90918d9d34fc..9a8275db0bb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2834,6 +2834,34 @@ abstract class CSVSuite assert(err.isInstanceOf[SparkUpgradeException]) } } + + test("SPARK-39731: Handle date and timestamp parsing fallback") { + withTempPath { path => + Seq("2020-01-01,2020-01-01").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + + def output(enableFallback: Boolean) = spark.read + .schema(schema) + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .option("enableDateTimeParsingFallback", enableFallback) + .csv(path.getAbsolutePath) + + checkAnswer( + output(enableFallback = true), + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + + checkAnswer( + output(enableFallback = false), + Seq(Row(null, null)) + ) + } + } } class CSVv1Suite extends CSVSuite { From 40d07bd046433317f9384ad042eb601593616fae Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 18 Jul 2022 12:27:19 +1200 Subject: [PATCH 06/16] add a config for json --- .../sql/catalyst/csv/UnivocityParser.scala | 8 ++--- .../spark/sql/catalyst/json/JSONOptions.scala | 10 ++++++ .../sql/catalyst/json/JacksonParser.scala | 32 ++++++++++--------- .../datasources/json/JsonSuite.scala | 28 ++++++++++++++++ 4 files changed, 59 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 05db7ce7e506..8c33b73cd1fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -122,15 +122,15 @@ class UnivocityParser( // Flags to signal if we need to fall back to the backward compatible behavior of parsing // dates and timestamps. // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. - private val enableParsingFallbackForDateType = + private val enableParsingFallbackForTimestampType = options.enableDateTimeParsingFallback.getOrElse { SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty + options.timestampFormatInRead.isEmpty } - private val enableParsingFallbackForTimestampType = + private val enableParsingFallbackForDateType = options.enableDateTimeParsingFallback.getOrElse { SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty + options.dateFormatInRead.isEmpty } // Retrieve the raw record string. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5f90dbc49c9d..7fe1a393bce5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -111,6 +111,16 @@ private[sql] class JSONOptions( val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + // SPARK-39731: Enables the backward compatible parsing behavior. + // Generally, this config should be set to false to avoid producing potentially incorrect results + // which is the current default (see UnivocityParser). + // + // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. + // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown. + val enableDateTimeParsingFallback: Option[Boolean] = + parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 638f0d20edf1..a6b6093216d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -78,11 +78,19 @@ class JacksonParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) - // Flag is needed to distinguish parsing mode when inferring timestamp and date types. - // For more information, see the comments for TimestampType and DateType converter functions. - // Available for testing. - val isLegacyParserPolicy = - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY + // Flags to signal if we need to fall back to the backward compatible behavior of parsing + // dates and timestamps. + // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. + private val enableParsingFallbackForTimestampType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } + private val enableParsingFallbackForDateType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } /** * Create a converter which converts the JSON documents held by the `JsonParser` @@ -263,11 +271,8 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set. - // - // If a custom pattern was provided and parser policy is not legacy, throw exception - // without applying legacy behavior to avoid producing incorrect results. - if (!isLegacyParserPolicy && options.timestampFormatInRead.isDefined) { + // compatibility only if no custom pattern has been set or parser policy is legacy. + if (!enableParsingFallbackForTimestampType) { throw e } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) @@ -292,11 +297,8 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set. - // - // If a custom pattern was provided and parser policy is not legacy, throw exception - // without applying legacy behavior to avoid producing incorrect results. - if (!isLegacyParserPolicy && options.dateFormatInRead.isDefined) { + // compatibility only if no custom pattern has been set or parser policy is legacy. + if (!enableParsingFallbackForDateType) { throw e } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b2d0544dd72a..19967abcc69a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3294,6 +3294,34 @@ abstract class JsonSuite assert(err.isInstanceOf[SparkUpgradeException]) } } + + test("SPARK-39731: Handle date and timestamp parsing fallback") { + withTempPath { path => + Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + + def output(enableFallback: Boolean) = spark.read + .schema(schema) + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .option("enableDateTimeParsingFallback", enableFallback) + .json(path.getAbsolutePath) + + checkAnswer( + output(enableFallback = true), + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + + checkAnswer( + output(enableFallback = false), + Seq(Row(null, null)) + ) + } + } } class JsonV1Suite extends JsonSuite { From 55c5579703afb9d92e5a0091b45bfcc0691fb203 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 18 Jul 2022 12:43:01 +1200 Subject: [PATCH 07/16] update docs and comments --- docs/sql-data-sources-csv.md | 6 ++++++ docs/sql-data-sources-json.md | 6 ++++++ .../org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 3 ++- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 3 ++- 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 1be1d7446e80..5d3f37750173 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -168,6 +168,12 @@ Data source options of CSV can be set via: Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type. read/write + + enableDateTimeParsingFallback + Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided + Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns. + read + maxColumns 20480 diff --git a/docs/sql-data-sources-json.md b/docs/sql-data-sources-json.md index 27d89875623c..500cd65b58b8 100644 --- a/docs/sql-data-sources-json.md +++ b/docs/sql-data-sources-json.md @@ -202,6 +202,12 @@ Data source options of JSON can be set via: Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type. read/write + + enableDateTimeParsingFallback + Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided + Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns. + read + multiLine false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 9555c507c440..c299397685a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -175,7 +175,8 @@ class CSVOptions( // // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. - // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and + // the value will be parsed as null. val enableDateTimeParsingFallback: Option[Boolean] = parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 7fe1a393bce5..306e7e2189dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -117,7 +117,8 @@ private[sql] class JSONOptions( // // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. - // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and + // the value will be parsed as null. val enableDateTimeParsingFallback: Option[Boolean] = parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) From ef91606aec7120bea2e7bd9e13031b49c00f1dae Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 18 Jul 2022 18:30:54 +1200 Subject: [PATCH 08/16] fix scalastyle --- .../apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala | 2 +- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- .../apache/spark/sql/execution/datasources/json/JsonSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 460ea82bc148..37605e14b926 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -354,7 +354,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map.empty[String, String], false, "UTC") check(new UnivocityParser(StructType(Seq.empty), options)) - def optionsWithPattern(enableFallback: Boolean) = new CSVOptions( + def optionsWithPattern(enableFallback: Boolean): CSVOptions = new CSVOptions( Map( "timestampFormat" -> "invalid", "dateFormat" -> "invalid", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 9a8275db0bb5..e934a47c278c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2844,7 +2844,7 @@ abstract class CSVSuite .add("date", DateType) .add("ts", TimestampType) - def output(enableFallback: Boolean) = spark.read + def output(enableFallback: Boolean): DataFrame = spark.read .schema(schema) .option("dateFormat", "invalid") .option("timestampFormat", "invalid") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 19967abcc69a..1ecaf748f5da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3304,7 +3304,7 @@ abstract class JsonSuite .add("date", DateType) .add("ts", TimestampType) - def output(enableFallback: Boolean) = spark.read + def output(enableFallback: Boolean): DataFrame = spark.read .schema(schema) .option("dateFormat", "invalid") .option("timestampFormat", "invalid") From 15c07f7a243860caa27d83001e9822492d68a641 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Tue, 19 Jul 2022 09:47:33 +1200 Subject: [PATCH 09/16] trigger ci From a83288b023050a2b1aef2740e8ca98e5ac76436c Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 21 Jul 2022 09:50:23 +1200 Subject: [PATCH 10/16] update comments --- .../scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala | 2 +- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 306e7e2189dd..66fd22894f93 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -113,7 +113,7 @@ private[sql] class JSONOptions( // SPARK-39731: Enables the backward compatible parsing behavior. // Generally, this config should be set to false to avoid producing potentially incorrect results - // which is the current default (see UnivocityParser). + // which is the current default (see JacksonParser). // // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index a6b6093216d0..639b8a74d80a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -80,7 +80,7 @@ class JacksonParser( // Flags to signal if we need to fall back to the backward compatible behavior of parsing // dates and timestamps. - // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. + // For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions. private val enableParsingFallbackForTimestampType = options.enableDateTimeParsingFallback.getOrElse { SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || From bf9351d6628cd614d54a7fce5fe5471cb1b773a3 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 21 Jul 2022 18:42:37 +1200 Subject: [PATCH 11/16] trigger ci From a447b0895cb961a69e9845e0e2fd63bfcebc98fd Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Fri, 22 Jul 2022 16:49:21 +1200 Subject: [PATCH 12/16] fix tests for SPARK-39469 --- .../apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala | 4 +++- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 381ec57fcd13..b1b319fca617 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -377,7 +377,9 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { def checkDate(dataType: DataType): Unit = { val timestampsOptions = new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm", - "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), + "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy", + // Required for date string to be parsed as timestamp since it misses the time component + "enableDateTimeParsingFallback" -> "true"), false, DateTimeUtils.getZoneId("-08:00").toString) // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always // converted to their equivalent UTC timestamp diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0e5718103902..2c5662a54ea9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2797,6 +2797,7 @@ abstract class CSVSuite "inferSchema" -> "true", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "dateFormat" -> "yyyy-MM-dd", + "enableDateTimeParsingFallback" -> "true", "inferDate" -> "true") val options2 = Map( "header" -> "true", From fbdf9d8e7eca3deda0e3fbbbe047980dc7fff2db Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 25 Jul 2022 10:57:12 +1200 Subject: [PATCH 13/16] update comments --- .../org/apache/spark/sql/catalyst/csv/UnivocityParser.scala | 4 ++-- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 9c31b97259fc..12446ab01bce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -219,7 +219,7 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set or parser policy is legacy. + // compatibility if enabled. if (!enableParsingFallbackForTimestampType) { throw e } @@ -235,7 +235,7 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set or parser policy is legacy. + // compatibility if enabled. if (!enableParsingFallbackForDateType) { throw e } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 639b8a74d80a..06133d44c13a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -271,7 +271,7 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set or parser policy is legacy. + // compatibility if enabled. if (!enableParsingFallbackForTimestampType) { throw e } @@ -297,7 +297,7 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility only if no custom pattern has been set or parser policy is legacy. + // compatibility if enabled. if (!enableParsingFallbackForDateType) { throw e } From 2962cd9e1338323756113b04dfce5c8ed964a277 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 25 Jul 2022 10:58:46 +1200 Subject: [PATCH 14/16] Revert "fix tests for SPARK-39469" This reverts commit a447b0895cb961a69e9845e0e2fd63bfcebc98fd. --- .../apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala | 4 +--- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index b1b319fca617..381ec57fcd13 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -377,9 +377,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { def checkDate(dataType: DataType): Unit = { val timestampsOptions = new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm", - "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy", - // Required for date string to be parsed as timestamp since it misses the time component - "enableDateTimeParsingFallback" -> "true"), + "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), false, DateTimeUtils.getZoneId("-08:00").toString) // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always // converted to their equivalent UTC timestamp diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2c5662a54ea9..0e5718103902 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2797,7 +2797,6 @@ abstract class CSVSuite "inferSchema" -> "true", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "dateFormat" -> "yyyy-MM-dd", - "enableDateTimeParsingFallback" -> "true", "inferDate" -> "true") val options2 = Map( "header" -> "true", From 10ca4a4a09623ef9ea217b0fdf33bbdf2e22c227 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 25 Jul 2022 11:01:51 +1200 Subject: [PATCH 15/16] update the priority order for SPARK-39469 --- .../sql/catalyst/csv/UnivocityParser.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 12446ab01bce..a6b4d7ea6679 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -234,19 +234,17 @@ class UnivocityParser( timestampFormatter.parse(datum) } catch { case NonFatal(e) => - // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility if enabled. - if (!enableParsingFallbackForDateType) { - throw e - } - val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { - // There may be date type entries in timestamp column due to schema inference - if (options.inferDate) { - daysToMicros(dateFormatter.parse(datum), options.zoneId) - } else { - throw(e) + // There may be date type entries in timestamp column due to schema inference + if (options.inferDate) { + daysToMicros(dateFormatter.parse(datum), options.zoneId) + } else { + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility if enabled. + if (!enableParsingFallbackForDateType) { + throw e } + val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e)) } } } From b2a3db2033d28afd5e05182bf8b66ffde2de6525 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 25 Jul 2022 19:05:32 +1200 Subject: [PATCH 16/16] trigger ci