From 5048253ca6374c21074dd611ca335427d9a63f31 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Tue, 14 Jun 2022 17:49:36 -0700 Subject: [PATCH 01/21] infer date type in csv --- .../sql/catalyst/csv/CSVInferSchema.scala | 23 ++++++++-- .../catalyst/csv/CSVInferSchemaSuite.scala | 43 +++++++++++++++++++ 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 8b0c6c49b855..432c865e6b1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -24,8 +24,8 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT -import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -46,6 +46,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { isParsing = true, forTimestampNTZ = true) + private lazy val dateFormatter = DateFormatter( + options.dateFormatInRead, + options.locale, + legacyFormat = FAST_DATE_FORMAT, + // TODO: Should isParsing be true or false? + isParsing = true) + private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility s: String => new java.math.BigDecimal(s) @@ -117,8 +124,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case LongType => tryParseLong(field) case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) - case TimestampNTZType => tryParseTimestampNTZ(field) - case TimestampType => tryParseTimestamp(field) + // Temporary NOTE: DateTimeType is private to [sql] package + case DateType | TimestampNTZType | TimestampType => tryParseDate(field) case BooleanType => tryParseBoolean(field) case StringType => StringType case other: DataType => @@ -169,6 +176,14 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseDouble(field: String): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType + } else { + tryParseDate(field) + } + } + + private def tryParseDate(field: String): DataType = { + if ((allCatch opt dateFormatter.parse(field)).isDefined) { + DateType } else { tryParseTimestampNTZ(field) } @@ -178,7 +193,9 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. + print("BEEP PARSING TIMESTAMPNTZ\n") if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { + print(s"$field is NTZ") SQLConf.get.timestampType } else { tryParseTimestamp(field) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index d268f8c2e721..38827705580f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -109,6 +109,12 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { assert( inferSchema.mergeRowTypes(Array(DoubleType), Array(LongType)).sameElements(Array(DoubleType))) + assert( + inferSchema.mergeRowTypes(Array(DateType), + Array(TimestampNTZType)).sameElements(Array(TimestampNTZType))) + assert( + inferSchema.mergeRowTypes(Array(DateType), + Array(TimestampType)).sameElements(Array(TimestampType))) } test("Null fields are handled properly when a nullValue is specified") { @@ -192,4 +198,41 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { Seq("en-US").foreach(checkDecimalInfer(_, StringType)) Seq("ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 0))) } + + test("inferring date type") { + var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd"), false, "UTC") + var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "2018/12/02") == DateType) + + options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"), false, "GMT") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "Dec 2018") == DateType) + // TODO: How does the DateFormatter recognize Dec as valid for MMM? + + options = new CSVOptions( + Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + columnPruning = false, + defaultTimeZoneId = "GMT") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == TimestampType) + assert(inferSchema.inferField(NullType, "2018-12-03") == DateType) + } + + test("inferring date and timestamp types in a mixed column") { + var options = new CSVOptions( + Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd", + "timestampNTZFormat" -> "yyyy/MM/dd"), + columnPruning = false, + defaultTimeZoneId = "UTC") + var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) + assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType) + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { + assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType) + } + + // inferField will upgrade the date field to timestamp if the typeSoFar is a timestamp + assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType) + assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType) + } } From 5058d92a8da55b0037595572242c12c8a5efdb05 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Tue, 14 Jun 2022 17:51:23 -0700 Subject: [PATCH 02/21] fix --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 432c865e6b1c..2b737584f6ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -193,9 +193,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. - print("BEEP PARSING TIMESTAMPNTZ\n") if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - print(s"$field is NTZ") SQLConf.get.timestampType } else { tryParseTimestamp(field) From f16e5e169b99fdeffdc0925a4cbed20b672628ce Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Wed, 15 Jun 2022 14:31:18 -0700 Subject: [PATCH 03/21] parse dates from timestamp column --- .../sql/catalyst/csv/CSVInferSchema.scala | 6 ++-- .../sql/catalyst/csv/UnivocityParser.scala | 29 ++++++++++++------- .../catalyst/csv/UnivocityParserSuite.scala | 15 ++++++++++ 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 2b737584f6ae..26349419df0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -125,7 +125,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) // Temporary NOTE: DateTimeType is private to [sql] package - case DateType | TimestampNTZType | TimestampType => tryParseDate(field) + case DateType | TimestampNTZType | TimestampType => tryParseDateTime(field) case BooleanType => tryParseBoolean(field) case StringType => StringType case other: DataType => @@ -177,11 +177,11 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType } else { - tryParseDate(field) + tryParseDateTime(field) } } - private def tryParseDate(field: String): DataType = { + private def tryParseDateTime(field: String): DataType = { if ((allCatch opt dateFormatter.parse(field)).isDefined) { DateType } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 56ebfcc26c63..598778f6d040 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -150,6 +150,18 @@ class UnivocityParser( private val decimalParser = ExprUtils.getDecimalParser(options.locale) + private def convertToDate(datum: String): Int = { + try { + dateFormatter.parse(datum) + } catch { + case NonFatal(e) => + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) + DateTimeUtils.stringToDate(str).getOrElse(throw e) + } + } + /** * Create a converter which converts the string value to a value according to a desired type. * Currently, we do not support complex types (`ArrayType`, `MapType`, `StructType`). @@ -206,28 +218,23 @@ class UnivocityParser( // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(convertToDate(datum)) } } case _: TimestampNTZType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - timestampNTZFormatter.parseWithoutTimeZone(datum, false) - } - - case _: DateType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => try { - dateFormatter.parse(datum) + timestampNTZFormatter.parseWithoutTimeZone(datum, false) } catch { case NonFatal(e) => - // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToDate(str).getOrElse(throw e) + convertToDate(datum) } } + case _: DateType => (d: String) => + nullSafeDatum(d, name, nullable, options)(convertToDate) + case _: StringType => (d: String) => nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 4166401d040f..0e7962a0f9eb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -358,4 +358,19 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) } + + test("dates should be parsed correctly in a timestamp column") { + def checkDate(dataType: DataType): Unit = { + val timestampsOptions = + new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy HH:mm", + "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), + false, "UTC") + val dateString = "08_09_2001" + val date = days(2001, 9, 8) + val parser = new UnivocityParser(new StructType(), timestampsOptions) + assert(parser.makeConverter("d", dataType).apply(dateString) == date) + } + checkDate(TimestampType) + checkDate(TimestampNTZType) + } } From c2b5fdc968c44379dc59755428df46a4f95a4517 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Wed, 15 Jun 2022 14:51:13 -0700 Subject: [PATCH 04/21] cleanup --- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 2 -- .../spark/sql/catalyst/csv/UnivocityParser.scala | 8 ++++++-- .../spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 11 ++++++----- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 26349419df0d..c7f566560c7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -50,7 +50,6 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, - // TODO: Should isParsing be true or false? isParsing = true) private val decimalParser = if (options.locale == Locale.US) { @@ -124,7 +123,6 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case LongType => tryParseLong(field) case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) - // Temporary NOTE: DateTimeType is private to [sql] package case DateType | TimestampNTZType | TimestampType => tryParseDateTime(field) case BooleanType => tryParseBoolean(field) case StringType => StringType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 598778f6d040..eb21a22a8925 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -218,7 +218,10 @@ class UnivocityParser( // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(convertToDate(datum)) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { + // There may be date type entries in timestamp column due to schema inference + convertToDate(datum) + } } } @@ -228,7 +231,8 @@ class UnivocityParser( timestampNTZFormatter.parseWithoutTimeZone(datum, false) } catch { case NonFatal(e) => - convertToDate(datum) + // There may be date type entries in timestampNTZ column due to schema inference + convertToDate(datum) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 38827705580f..7b26f6d3a951 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -200,15 +200,15 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("inferring date type") { + // "yyyy/MM/dd" format var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd"), false, "UTC") var inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "2018/12/02") == DateType) - + // "MMM yyyy" format options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"), false, "GMT") inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "Dec 2018") == DateType) - // TODO: How does the DateFormatter recognize Dec as valid for MMM? - + // Field should strictly match date format to infer as date options = new CSVOptions( Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), columnPruning = false, @@ -224,14 +224,15 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { "timestampNTZFormat" -> "yyyy/MM/dd"), columnPruning = false, defaultTimeZoneId = "UTC") - var inferSchema = new CSVInferSchema(options) + val inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType) + // SQL configuration must be set to default to TimestampNTZ withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType) } - // inferField will upgrade the date field to timestamp if the typeSoFar is a timestamp + // inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType) assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType) } From 9514c2c34223cccde59f28ea9688f05ccffc483e Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Wed, 15 Jun 2022 15:58:51 -0700 Subject: [PATCH 05/21] test date and timestamp same format --- .../spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 7b26f6d3a951..7b2ba7322176 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -224,7 +224,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { "timestampNTZFormat" -> "yyyy/MM/dd"), columnPruning = false, defaultTimeZoneId = "UTC") - val inferSchema = new CSVInferSchema(options) + var inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType) // SQL configuration must be set to default to TimestampNTZ @@ -235,5 +235,13 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { // inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType) assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType) + + // No errors when Date and Timestamp have the same format. Inference defaults to date + options = new CSVOptions( + Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy_MM_dd"), + columnPruning = false, + defaultTimeZoneId = "UTC") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) } } From be7aabd2a684bb69f16afda002934beff9a1f29b Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Thu, 16 Jun 2022 13:28:48 -0700 Subject: [PATCH 06/21] e2e test WIP --- .../execution/datasources/csv/CSVSuite.scala | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index bf92ffcf4651..00a8c0d4aa84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -74,6 +74,7 @@ abstract class CSVSuite private val simpleSparseFile = "test-data/simple_sparse.csv" private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" + private val dateInferSchemaFile = "test-data/date-infer-schema.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" private val badAfterGoodFile = "test-data/bad_after_good.csv" @@ -2788,6 +2789,37 @@ abstract class CSVSuite } } } + + test("SPARK-39469: Infer schema for Date type") { + val options = Map( + "header" -> "true", + "inferSchema" -> "true", + "timestampFormat" -> "dd/MM/yyyy HH:mm", + "dateFormat" -> "MM_dd_yyyy") + val results = spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) + + val expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", TimestampType), StructField("date-timestamp", TimestampType))) + assert(results.schema == expectedSchema) + + val expected = + Seq( + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0")), + Seq(Date.valueOf("2020-1-12")), + Seq(Date.valueOf("2020-1-12")) + ) + + // val col1 = result + // .select("date") + // .collect() + // val expectedCol1 = List() + // Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12")) + + assert(results.collect().toSeq.map(_.toSeq) === expected) + } } class CSVv1Suite extends CSVSuite { From db3b442a8cd56dce34ed227829e89a0a5fa0eea5 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Thu, 16 Jun 2022 13:31:14 -0700 Subject: [PATCH 07/21] e2e test data --- sql/core/src/test/resources/test-data/date-infer-schema.csv | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 sql/core/src/test/resources/test-data/date-infer-schema.csv diff --git a/sql/core/src/test/resources/test-data/date-infer-schema.csv b/sql/core/src/test/resources/test-data/date-infer-schema.csv new file mode 100644 index 000000000000..f38553ddeeb8 --- /dev/null +++ b/sql/core/src/test/resources/test-data/date-infer-schema.csv @@ -0,0 +1,4 @@ +date,timestamp-date,date-timestamp +09_08_2001,27/10/2014 18:30,03_28_1765 +01_02_1941,14/09/2000 01:01,12/11/1423 23:41 +11_07_0293,06_25_1995,28/01/2016 20:00 From 7d9868683e3c7f11c42afc38b548d2ed9a7db362 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Fri, 17 Jun 2022 11:37:39 -0700 Subject: [PATCH 08/21] fix e2e date test --- .../sql/catalyst/csv/UnivocityParser.scala | 6 ++++-- .../execution/datasources/csv/CSVSuite.scala | 20 ++++++++----------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index eb21a22a8925..bbe95cbf1d26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.csv import java.io.InputStream +import java.time.ZoneOffset import scala.util.control.NonFatal @@ -28,6 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -220,7 +222,7 @@ class UnivocityParser( val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { // There may be date type entries in timestamp column due to schema inference - convertToDate(datum) + daysToMicros(convertToDate(datum), options.zoneId) } } } @@ -232,7 +234,7 @@ class UnivocityParser( } catch { case NonFatal(e) => // There may be date type entries in timestampNTZ column due to schema inference - convertToDate(datum) + daysToMicros(convertToDate(datum), ZoneOffset.UTC) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 00a8c0d4aa84..018ae894e4dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2790,7 +2790,7 @@ abstract class CSVSuite } } - test("SPARK-39469: Infer schema for Date type") { + test("SPARK-39469: Infer schema for date type") { val options = Map( "header" -> "true", "inferSchema" -> "true", @@ -2807,18 +2807,14 @@ abstract class CSVSuite val expected = Seq( - Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0")), - Seq(Date.valueOf("2020-1-12")), - Seq(Date.valueOf("2020-1-12")) + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), + Timestamp.valueOf("1765-03-28 00:00:0.0")), + Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), + Timestamp.valueOf("1423-11-12 23:41:0.0")), + Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), + Timestamp.valueOf("2016-01-28 20:00:00.0")) ) - - // val col1 = result - // .select("date") - // .collect() - // val expectedCol1 = List() - // Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12")) - - assert(results.collect().toSeq.map(_.toSeq) === expected) + assert(results.collect().toSeq.map(_.toSeq) == expected) } } From 966bdb68cc27abdd853a90636c54b88398280587 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Tue, 21 Jun 2022 16:28:33 -0700 Subject: [PATCH 09/21] added inferDate flag --- .../sql/catalyst/csv/CSVInferSchema.scala | 25 ++++++++++- .../spark/sql/catalyst/csv/CSVOptions.scala | 6 +++ .../sql/catalyst/csv/UnivocityParser.scala | 41 +++++++++++-------- .../catalyst/csv/CSVInferSchemaSuite.scala | 15 ++++--- .../catalyst/csv/UnivocityParserSuite.scala | 11 ++--- .../resources/test-data/date-infer-schema.csv | 6 +-- .../execution/datasources/csv/CSVSuite.scala | 6 ++- 7 files changed, 74 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index c7f566560c7d..5e9bbe936b9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -116,6 +116,23 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { def inferField(typeSoFar: DataType, field: String): DataType = { if (field == null || field.isEmpty || field == options.nullValue) { typeSoFar + } else + if (options.inferDate) { + val typeElemInfer = typeSoFar match { + case NullType => tryParseInteger(field) + case IntegerType => tryParseInteger(field) + case LongType => tryParseLong(field) + case _: DecimalType => tryParseDecimal(field) + case DoubleType => tryParseDouble(field) + case DateType => tryParseDateTime(field) + case TimestampNTZType => tryParseDateTime(field) + case TimestampType => tryParseDateTime(field) + case BooleanType => tryParseBoolean(field) + case StringType => StringType + case other: DataType => + throw QueryExecutionErrors.dataTypeUnexpectedError(other) + } + compatibleType(typeSoFar, typeElemInfer).getOrElse(StringType) } else { val typeElemInfer = typeSoFar match { case NullType => tryParseInteger(field) @@ -123,7 +140,9 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case LongType => tryParseLong(field) case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) - case DateType | TimestampNTZType | TimestampType => tryParseDateTime(field) + case DateType => tryParseDateTime(field) + case TimestampNTZType => tryParseTimestampNTZ(field) + case TimestampType => tryParseTimestamp(field) case BooleanType => tryParseBoolean(field) case StringType => StringType case other: DataType => @@ -174,8 +193,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseDouble(field: String): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType - } else { + } else if (options.inferDate) { tryParseDateTime(field) + } else { + tryParseTimestampNTZ(field) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 3e92c3d25eb4..702af0d52446 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -195,6 +195,12 @@ class CSVOptions( */ val enforceSchema = getBool("enforceSchema", default = true) + /** + * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type). + * Disabled by default for backwards compatibility and performance. When enabled, date entries in + * timestamp columns will be cast to timestamp upon parsing. + */ + val inferDate = getBool("inferDate") /** * String representation of an empty value in read and in write. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index bbe95cbf1d26..2c219d90148f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.csv import java.io.InputStream -import java.time.ZoneOffset import scala.util.control.NonFatal @@ -152,17 +151,6 @@ class UnivocityParser( private val decimalParser = ExprUtils.getDecimalParser(options.locale) - private def convertToDate(datum: String): Int = { - try { - dateFormatter.parse(datum) - } catch { - case NonFatal(e) => - // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToDate(str).getOrElse(throw e) - } - } /** * Create a converter which converts the string value to a value according to a desired type. @@ -211,6 +199,19 @@ class UnivocityParser( Decimal(decimalParser(datum), dt.precision, dt.scale) } + case _: DateType => (d: String) => + nullSafeDatum(d, name, nullable, options) { datum => + try { + dateFormatter.parse(datum) + } catch { + case NonFatal(e) => + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) + DateTimeUtils.stringToDate(str).getOrElse(throw e) + } + } + case _: TimestampType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => try { @@ -222,7 +223,11 @@ class UnivocityParser( val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { // There may be date type entries in timestamp column due to schema inference - daysToMicros(convertToDate(datum), options.zoneId) + if (options.inferDate) { + daysToMicros(dateFormatter.parse(datum), options.zoneId) + } else { + throw(e) + } } } } @@ -233,14 +238,14 @@ class UnivocityParser( timestampNTZFormatter.parseWithoutTimeZone(datum, false) } catch { case NonFatal(e) => - // There may be date type entries in timestampNTZ column due to schema inference - daysToMicros(convertToDate(datum), ZoneOffset.UTC) + if (options.inferDate) { + daysToMicros(dateFormatter.parse(datum), options.zoneId) + } else { + throw(e) + } } } - case _: DateType => (d: String) => - nullSafeDatum(d, name, nullable, options)(convertToDate) - case _: StringType => (d: String) => nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 7b2ba7322176..8790223a680f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -199,18 +199,21 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { Seq("ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 0))) } - test("inferring date type") { + test("SPARK-39469: inferring date type") { // "yyyy/MM/dd" format - var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd"), false, "UTC") + var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "inferDate" -> "true"), + false, "UTC") var inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "2018/12/02") == DateType) // "MMM yyyy" format - options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"), false, "GMT") + options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "inferDate" -> "true"), + false, "GMT") inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "Dec 2018") == DateType) // Field should strictly match date format to infer as date options = new CSVOptions( - Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + "inferDate" -> "true"), columnPruning = false, defaultTimeZoneId = "GMT") inferSchema = new CSVInferSchema(options) @@ -218,10 +221,10 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { assert(inferSchema.inferField(NullType, "2018-12-03") == DateType) } - test("inferring date and timestamp types in a mixed column") { + test("SPARK-39469: inferring date and timestamp types in a mixed column with inferDate=true") { var options = new CSVOptions( Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd", - "timestampNTZFormat" -> "yyyy/MM/dd"), + "timestampNTZFormat" -> "yyyy/MM/dd", "inferDate" -> "true"), columnPruning = false, defaultTimeZoneId = "UTC") var inferSchema = new CSVInferSchema(options) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 0e7962a0f9eb..423084c8e564 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal import java.text.{DecimalFormat, DecimalFormatSymbols} +import java.time.{ZoneId, ZoneOffset} import java.util.{Locale, TimeZone} import org.apache.commons.lang3.time.FastDateFormat - import org.apache.spark.SparkFunSuite + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.DateTimeConstants._ @@ -359,16 +360,16 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) } - test("dates should be parsed correctly in a timestamp column") { + test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") { def checkDate(dataType: DataType): Unit = { val timestampsOptions = - new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy HH:mm", + new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm", "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), false, "UTC") val dateString = "08_09_2001" - val date = days(2001, 9, 8) + val expected = date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) val parser = new UnivocityParser(new StructType(), timestampsOptions) - assert(parser.makeConverter("d", dataType).apply(dateString) == date) + assert(parser.makeConverter("d", dataType).apply(dateString) == expected) } checkDate(TimestampType) checkDate(TimestampNTZType) diff --git a/sql/core/src/test/resources/test-data/date-infer-schema.csv b/sql/core/src/test/resources/test-data/date-infer-schema.csv index f38553ddeeb8..ed8317481279 100644 --- a/sql/core/src/test/resources/test-data/date-infer-schema.csv +++ b/sql/core/src/test/resources/test-data/date-infer-schema.csv @@ -1,4 +1,4 @@ date,timestamp-date,date-timestamp -09_08_2001,27/10/2014 18:30,03_28_1765 -01_02_1941,14/09/2000 01:01,12/11/1423 23:41 -11_07_0293,06_25_1995,28/01/2016 20:00 +2001-09-08,2014-10-27T18:30,1765-03-28 +1941-01-02,2000-09-14T01:01,1423-11-12T23:41 +0293-11-07,1995-06-25,2016-01-28T20:00 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 018ae894e4dd..b79e06eff410 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2794,8 +2794,10 @@ abstract class CSVSuite val options = Map( "header" -> "true", "inferSchema" -> "true", - "timestampFormat" -> "dd/MM/yyyy HH:mm", - "dateFormat" -> "MM_dd_yyyy") + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm", + "dateFormat" -> "yyyy-MM-dd", + "inferDate" -> "true") + val results = spark.read .format("csv") .options(options) From 638064bb847396c749b0cf81df33c2ce03d3af46 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Tue, 21 Jun 2022 17:28:42 -0700 Subject: [PATCH 10/21] Updated documentation and fixed tests --- docs/sql-data-sources-csv.md | 6 ++++++ .../spark/sql/catalyst/csv/CSVInferSchema.scala | 14 ++++++++++++-- .../sql/catalyst/csv/UnivocityParserSuite.scala | 10 +++++++--- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 1be1d7446e80..004762e88803 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -108,6 +108,12 @@ Data source options of CSV can be set via: Infers the input schema automatically from data. It requires one extra pass over the data. CSV built-in functions ignore this option. read + + inferDate + false + Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. Legacy date formats in Timestamp columns cannot be parsed with this option. + read + enforceSchema true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 5e9bbe936b9a..09cdd060267e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -141,8 +141,18 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) case DateType => tryParseDateTime(field) - case TimestampNTZType => tryParseTimestampNTZ(field) - case TimestampType => tryParseTimestamp(field) + case TimestampNTZType => + if (options.inferDate) { + tryParseDateTime(field) + } else { + tryParseTimestampNTZ(field) + } + case TimestampType => + if (options.inferDate) { + tryParseDateTime(field) + } else { + tryParseTimestamp(field) + } case BooleanType => tryParseBoolean(field) case StringType => StringType case other: DataType => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 423084c8e564..d084677bfff5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal import java.text.{DecimalFormat, DecimalFormatSymbols} -import java.time.{ZoneId, ZoneOffset} +import java.time.{ZoneOffset} import java.util.{Locale, TimeZone} import org.apache.commons.lang3.time.FastDateFormat -import org.apache.spark.SparkFunSuite +import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.DateTimeConstants._ @@ -367,11 +367,15 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), false, "UTC") val dateString = "08_09_2001" - val expected = date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) + val expected = dataType match { + case TimestampType | TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) + case DateType => days(2001, 9, 8) + } val parser = new UnivocityParser(new StructType(), timestampsOptions) assert(parser.makeConverter("d", dataType).apply(dateString) == expected) } checkDate(TimestampType) checkDate(TimestampNTZType) + checkDate(DateType) } } From 50a91a6206873e30c1a24c93ee0267f8ac933623 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Wed, 22 Jun 2022 16:45:19 -0700 Subject: [PATCH 11/21] added new error for inferDate with legacy parser and fixed feedback --- .../sql/catalyst/csv/CSVInferSchema.scala | 23 ++------- .../spark/sql/catalyst/csv/CSVOptions.scala | 12 ++++- .../sql/catalyst/csv/UnivocityParser.scala | 9 +--- .../sql/errors/QueryExecutionErrors.scala | 9 ++++ .../execution/datasources/csv/CSVSuite.scala | 47 ++++++++++++------- 5 files changed, 54 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 09cdd060267e..1bdb2747bf45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} -import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{FAST_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -46,10 +46,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { isParsing = true, forTimestampNTZ = true) +// val FAST_DATE_FORMAT, SIMPLE_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT = Value + private lazy val dateFormatter = DateFormatter( options.dateFormatInRead, options.locale, - legacyFormat = FAST_DATE_FORMAT, + legacyFormat = LENIENT_SIMPLE_DATE_FORMAT, isParsing = true) private val decimalParser = if (options.locale == Locale.US) { @@ -116,23 +118,6 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { def inferField(typeSoFar: DataType, field: String): DataType = { if (field == null || field.isEmpty || field == options.nullValue) { typeSoFar - } else - if (options.inferDate) { - val typeElemInfer = typeSoFar match { - case NullType => tryParseInteger(field) - case IntegerType => tryParseInteger(field) - case LongType => tryParseLong(field) - case _: DecimalType => tryParseDecimal(field) - case DoubleType => tryParseDouble(field) - case DateType => tryParseDateTime(field) - case TimestampNTZType => tryParseDateTime(field) - case TimestampType => tryParseDateTime(field) - case BooleanType => tryParseBoolean(field) - case StringType => StringType - case other: DataType => - throw QueryExecutionErrors.dataTypeUnexpectedError(other) - } - compatibleType(typeSoFar, typeElemInfer).getOrElse(StringType) } else { val typeElemInfer = typeSoFar match { case NullType => tryParseInteger(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 702af0d52446..feac160af423 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -198,9 +198,17 @@ class CSVOptions( /** * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type). * Disabled by default for backwards compatibility and performance. When enabled, date entries in - * timestamp columns will be cast to timestamp upon parsing. + * timestamp columns will be cast to timestamp upon parsing. Not compatible with + * legacyTimeParserPolicy == LEGACY */ - val inferDate = getBool("inferDate") + val inferDate = { + val inferDateFlag = getBool("inferDate") + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) { + throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() + } + inferDateFlag + } + /** * String representation of an empty value in read and in write. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2c219d90148f..2d6eef243a7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -151,7 +151,6 @@ class UnivocityParser( private val decimalParser = ExprUtils.getDecimalParser(options.locale) - /** * Create a converter which converts the string value to a value according to a desired type. * Currently, we do not support complex types (`ArrayType`, `MapType`, `StructType`). @@ -237,12 +236,8 @@ class UnivocityParser( try { timestampNTZFormatter.parseWithoutTimeZone(datum, false) } catch { - case NonFatal(e) => - if (options.inferDate) { - daysToMicros(dateFormatter.parse(datum), options.zoneId) - } else { - throw(e) - } + case NonFatal(e) if (options.inferDate) => + daysToMicros(dateFormatter.parse(datum), options.zoneId) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index a3129f249c1d..87af4b3060f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -529,6 +529,15 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { """.stripMargin) } + def inferDateWithLegacyTimeParserError(): Throwable = { + new IllegalArgumentException( + """ + | Cannot infer date in schema inference when LegacyTimeParserPolicy is 'legacy'. Legacy + | Date formatter does not support strict date format matching which is required to avoid + | inferring timestamps and other non-date entries to date. + """.stripMargin) + } + def streamedOperatorUnsupportedByDataSourceError( className: String, operator: String): Throwable = { new UnsupportedOperationException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b79e06eff410..6bd4a244ee13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, Que import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -2797,26 +2798,36 @@ abstract class CSVSuite "timestampFormat" -> "yyyy-MM-dd'T'HH:mm", "dateFormat" -> "yyyy-MM-dd", "inferDate" -> "true") + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + val msg = intercept[IllegalArgumentException] { + spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) + }.getMessage + assert(msg.contains("Cannot infer date in schema inference when " + + "LegacyTimeParserPolicy is 'legacy'")) + } else { + val results = spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) - val results = spark.read - .format("csv") - .options(options) - .load(testFile(dateInferSchemaFile)) - - val expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", TimestampType), StructField("date-timestamp", TimestampType))) - assert(results.schema == expectedSchema) + val expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", TimestampType), StructField("date-timestamp", TimestampType))) + assert(results.schema == expectedSchema) - val expected = - Seq( - Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), - Timestamp.valueOf("1765-03-28 00:00:0.0")), - Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), - Timestamp.valueOf("1423-11-12 23:41:0.0")), - Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), - Timestamp.valueOf("2016-01-28 20:00:00.0")) - ) - assert(results.collect().toSeq.map(_.toSeq) == expected) + val expected = + Seq( + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), + Timestamp.valueOf("1765-03-28 00:00:0.0")), + Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), + Timestamp.valueOf("1423-11-12 23:41:0.0")), + Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), + Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + assert(results.collect().toSeq.map(_.toSeq) == expected) + } } } From d71558d0e794b504dabbcc930a42e83c40329e2f Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Wed, 22 Jun 2022 17:01:48 -0700 Subject: [PATCH 12/21] Update CSVInferSchema.scala --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 1bdb2747bf45..c92a50652160 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -46,8 +46,6 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { isParsing = true, forTimestampNTZ = true) -// val FAST_DATE_FORMAT, SIMPLE_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT = Value - private lazy val dateFormatter = DateFormatter( options.dateFormatInRead, options.locale, From 601dfc8655260c29bbe3e47fe3c5b2ab2bbf8885 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Wed, 22 Jun 2022 17:02:41 -0700 Subject: [PATCH 13/21] Update CSVOptions.scala --- .../scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index feac160af423..d3036ecd714c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -209,7 +209,6 @@ class CSVOptions( inferDateFlag } - /** * String representation of an empty value in read and in write. */ From 5aa4ab68e3c5f183bcc2f94ca4aae7414248992e Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Thu, 23 Jun 2022 11:52:38 -0700 Subject: [PATCH 14/21] change inferDate error type and improve docs --- core/src/main/resources/error/error-classes.json | 7 +++++++ docs/sql-data-sources-csv.md | 2 +- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 16 ++++------------ .../spark/sql/errors/QueryExecutionErrors.scala | 13 +++++-------- .../sql/execution/datasources/csv/CSVSuite.scala | 3 +-- 5 files changed, 18 insertions(+), 23 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index a159202d99c4..7d6ef2a319c0 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -17,6 +17,13 @@ ], "sqlState" : "22005" }, + "CANNOT_INFER_DATE" : { + "message" : [ + "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". ", + "Legacy Date formatter does not support strict date format matching which is required ", + "to avoid inferring timestamps and other non-date entries to date." + ] + }, "CANNOT_CHANGE_DECIMAL_PRECISION" : { "message" : [ " cannot be represented as Decimal(, ). If necessary set to \"false\" to bypass this error." diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 004762e88803..04c4e81dee40 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -111,7 +111,7 @@ Data source options of CSV can be set via: inferDate false - Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. Legacy date formats in Timestamp columns cannot be parsed with this option. + Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat) Legacy date formats in Timestamp columns cannot be parsed with this option. read diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index c92a50652160..b1634723f950 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -124,18 +124,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) case DateType => tryParseDateTime(field) - case TimestampNTZType => - if (options.inferDate) { - tryParseDateTime(field) - } else { - tryParseTimestampNTZ(field) - } - case TimestampType => - if (options.inferDate) { - tryParseDateTime(field) - } else { - tryParseTimestamp(field) - } + case TimestampNTZType if options.inferDate => tryParseDateTime(field) + case TimestampNTZType => tryParseTimestampNTZ(field) + case TimestampType if options.inferDate => tryParseDateTime(field) + case TimestampType => tryParseTimestamp(field) case BooleanType => tryParseBoolean(field) case StringType => StringType case other: DataType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 87af4b3060f7..62e3f95940b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.InternalCompilerException -import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkUnsupportedOperationException, SparkUpgradeException} +import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkThrowable, SparkUnsupportedOperationException, SparkUpgradeException} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.memory.SparkOutOfMemoryError @@ -529,13 +529,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { """.stripMargin) } - def inferDateWithLegacyTimeParserError(): Throwable = { - new IllegalArgumentException( - """ - | Cannot infer date in schema inference when LegacyTimeParserPolicy is 'legacy'. Legacy - | Date formatter does not support strict date format matching which is required to avoid - | inferring timestamps and other non-date entries to date. - """.stripMargin) + def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = { + new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE", + messageParameters = Array() + ) } def streamedOperatorUnsupportedByDataSourceError( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 6bd4a244ee13..b71aef8bbef8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2805,8 +2805,7 @@ abstract class CSVSuite .options(options) .load(testFile(dateInferSchemaFile)) }.getMessage - assert(msg.contains("Cannot infer date in schema inference when " + - "LegacyTimeParserPolicy is 'legacy'")) + assert(msg.contains("CANNOT_INFER_DATE")) } else { val results = spark.read .format("csv") From 2282c5988a21a93230fbed058f85d621ebfe34a4 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Thu, 23 Jun 2022 16:59:58 -0700 Subject: [PATCH 15/21] fix error class --- core/src/main/resources/error/error-classes.json | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 7d6ef2a319c0..a15b2c4d060d 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -17,19 +17,18 @@ ], "sqlState" : "22005" }, - "CANNOT_INFER_DATE" : { - "message" : [ - "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". ", - "Legacy Date formatter does not support strict date format matching which is required ", - "to avoid inferring timestamps and other non-date entries to date." - ] - }, "CANNOT_CHANGE_DECIMAL_PRECISION" : { "message" : [ " cannot be represented as Decimal(, ). If necessary set to \"false\" to bypass this error." ], "sqlState" : "22005" }, + "CANNOT_INFER_DATE" : { + "message" : [ + "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". Legacy Date formatter does not support strict date format matching which is required to avoid inferring timestamps and other non-date entries to date." + ], + "sqlState" : "22005" + }, "CANNOT_PARSE_DECIMAL" : { "message" : [ "Cannot parse decimal" From 2484b772efcaa63297d37c95b88c9528f23f55f2 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Fri, 24 Jun 2022 10:59:54 -0700 Subject: [PATCH 16/21] fix sqlState code and typo --- core/src/main/resources/error/error-classes.json | 2 +- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index a15b2c4d060d..dddf2bdfec1a 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -27,7 +27,7 @@ "message" : [ "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". Legacy Date formatter does not support strict date format matching which is required to avoid inferring timestamps and other non-date entries to date." ], - "sqlState" : "22005" + "sqlState" : "22007" }, "CANNOT_PARSE_DECIMAL" : { "message" : [ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index b1634723f950..7b368286981c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} -import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{FAST_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT} +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{FAST_DATE_FORMAT} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -49,7 +49,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private lazy val dateFormatter = DateFormatter( options.dateFormatInRead, options.locale, - legacyFormat = LENIENT_SIMPLE_DATE_FORMAT, + legacyFormat = FAST_DATE_FORMAT, isParsing = true) private val decimalParser = if (options.locale == Locale.US) { From 762e0d879ab6a6b1cfe99df4650e25e693a19bd2 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Fri, 24 Jun 2022 11:04:44 -0700 Subject: [PATCH 17/21] Update CSVInferSchema.scala --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 7b368286981c..3132fea8700b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} -import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{FAST_DATE_FORMAT} +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ From 2c93af5ebb7e20c636e212455721074760297199 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Tue, 28 Jun 2022 16:42:54 -0700 Subject: [PATCH 18/21] updated docs and test parsing dates in TimestampNTZ column --- docs/sql-data-sources-csv.md | 2 +- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala | 4 ++-- .../spark/sql/catalyst/csv/UnivocityParserSuite.scala | 7 +++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 04c4e81dee40..8384f8332a6a 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -111,7 +111,7 @@ Data source options of CSV can be set via: inferDate false - Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat) Legacy date formats in Timestamp columns cannot be parsed with this option. + Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat). read diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2d6eef243a7f..0237b6c454d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -237,7 +237,7 @@ class UnivocityParser( timestampNTZFormatter.parseWithoutTimeZone(datum, false) } catch { case NonFatal(e) if (options.inferDate) => - daysToMicros(dateFormatter.parse(datum), options.zoneId) + daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index d084677bfff5..2589376bc3dc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -365,10 +365,13 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val timestampsOptions = new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm", "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), - false, "UTC") + false, DateTimeUtils.getZoneId("-08:00").toString) + // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always + // converted to their equivalent UTC timestamp val dateString = "08_09_2001" val expected = dataType match { - case TimestampType | TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) + case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00")) + case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) case DateType => days(2001, 9, 8) } val parser = new UnivocityParser(new StructType(), timestampsOptions) From 41fa8eb2ff25cce402a38be40b5f0a0d3b48c6a8 Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Thu, 30 Jun 2022 11:32:30 -0700 Subject: [PATCH 19/21] Fix case where dateFormat is not specified --- .../spark/sql/catalyst/csv/CSVOptions.scala | 37 +++++++------ .../resources/test-data/date-infer-schema.csv | 6 +-- .../execution/datasources/csv/CSVSuite.scala | 52 ++++++++++++------- 3 files changed, 57 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index d3036ecd714c..a033e3a3a8d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -148,7 +148,28 @@ class CSVOptions( // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) - val dateFormatInRead: Option[String] = parameters.get("dateFormat") + /** + * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type). + * Disabled by default for backwards compatibility and performance. When enabled, date entries in + * timestamp columns will be cast to timestamp upon parsing. Not compatible with + * legacyTimeParserPolicy == LEGACY since legacy date parser will accept extra trailing characters + */ + val inferDate = { + val inferDateFlag = getBool("inferDate") + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) { + throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() + } + inferDateFlag + } + + // Provide a default value for dateFormatInRead when inferDate. This ensures that the + // Iso8601DateFormatter (with strict date parsing) is used for date inference + val dateFormatInRead: Option[String] = + if (inferDate) { + Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)) + } else { + parameters.get("dateFormat") + } val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) val timestampFormatInRead: Option[String] = @@ -195,20 +216,6 @@ class CSVOptions( */ val enforceSchema = getBool("enforceSchema", default = true) - /** - * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type). - * Disabled by default for backwards compatibility and performance. When enabled, date entries in - * timestamp columns will be cast to timestamp upon parsing. Not compatible with - * legacyTimeParserPolicy == LEGACY - */ - val inferDate = { - val inferDateFlag = getBool("inferDate") - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) { - throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() - } - inferDateFlag - } - /** * String representation of an empty value in read and in write. */ diff --git a/sql/core/src/test/resources/test-data/date-infer-schema.csv b/sql/core/src/test/resources/test-data/date-infer-schema.csv index ed8317481279..ca16ec81e6dc 100644 --- a/sql/core/src/test/resources/test-data/date-infer-schema.csv +++ b/sql/core/src/test/resources/test-data/date-infer-schema.csv @@ -1,4 +1,4 @@ date,timestamp-date,date-timestamp -2001-09-08,2014-10-27T18:30,1765-03-28 -1941-01-02,2000-09-14T01:01,1423-11-12T23:41 -0293-11-07,1995-06-25,2016-01-28T20:00 +2001-09-08,2014-10-27T18:30:00,1765-03-28 +1941-01-02,2000-09-14T01:01:00,1423-11-12T23:41:00 +0293-11-07,1995-06-25,2016-01-28T20:00:00 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b71aef8bbef8..758f54306088 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2792,40 +2792,52 @@ abstract class CSVSuite } test("SPARK-39469: Infer schema for date type") { - val options = Map( + val options1 = Map( "header" -> "true", "inferSchema" -> "true", - "timestampFormat" -> "yyyy-MM-dd'T'HH:mm", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "dateFormat" -> "yyyy-MM-dd", "inferDate" -> "true") + val options2 = Map( + "header" -> "true", + "inferSchema" -> "true", + "inferDate" -> "true") + + // Error should be thrown when attempting to inferDate with Legacy parser if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { val msg = intercept[IllegalArgumentException] { spark.read .format("csv") - .options(options) + .options(options1) .load(testFile(dateInferSchemaFile)) }.getMessage assert(msg.contains("CANNOT_INFER_DATE")) } else { - val results = spark.read - .format("csv") - .options(options) - .load(testFile(dateInferSchemaFile)) + // 1. Specify date format and timestamp format + // 2. Date inference should work with default date format when dateFormat is not provided + Seq(options1, options2).foreach {options => + val results = spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) - val expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", TimestampType), StructField("date-timestamp", TimestampType))) - assert(results.schema == expectedSchema) + val expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", TimestampType), + StructField("date-timestamp", TimestampType))) + assert(results.schema == expectedSchema) + + val expected = + Seq( + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), + Timestamp.valueOf("1765-03-28 00:00:0.0")), + Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), + Timestamp.valueOf("1423-11-12 23:41:0.0")), + Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), + Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + assert(results.collect().toSeq.map(_.toSeq) == expected) + } - val expected = - Seq( - Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), - Timestamp.valueOf("1765-03-28 00:00:0.0")), - Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), - Timestamp.valueOf("1423-11-12 23:41:0.0")), - Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), - Timestamp.valueOf("2016-01-28 20:00:00.0")) - ) - assert(results.collect().toSeq.map(_.toSeq) == expected) } } } From e1170d0ee2027d810d2b23243602de147748838b Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Tue, 12 Jul 2022 11:11:20 -0700 Subject: [PATCH 20/21] allow legacy parser with inferDate --- .../main/resources/error/error-classes.json | 6 -- docs/sql-data-sources-csv.md | 2 +- .../sql/catalyst/csv/CSVInferSchema.scala | 3 +- .../spark/sql/catalyst/csv/CSVOptions.scala | 14 ++--- .../sql/catalyst/csv/UnivocityParser.scala | 3 +- .../sql/catalyst/json/JacksonParser.scala | 3 +- .../sql/catalyst/util/DateFormatter.scala | 10 ++-- .../sql/errors/QueryExecutionErrors.scala | 8 +-- .../execution/datasources/csv/CSVSuite.scala | 58 +++++++++---------- 9 files changed, 44 insertions(+), 63 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index dddf2bdfec1a..a159202d99c4 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -23,12 +23,6 @@ ], "sqlState" : "22005" }, - "CANNOT_INFER_DATE" : { - "message" : [ - "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". Legacy Date formatter does not support strict date format matching which is required to avoid inferring timestamps and other non-date entries to date." - ], - "sqlState" : "22007" - }, "CANNOT_PARSE_DECIMAL" : { "message" : [ "Cannot parse decimal" diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 8384f8332a6a..b1eebb918ca9 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -111,7 +111,7 @@ Data source options of CSV can be set via: inferDate false - Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat). + Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true and cannot infer from legacy formats. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat). read diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 3132fea8700b..fcde5213cc07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -50,7 +50,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, - isParsing = true) + isParsing = true, + isInferring = true) private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index a033e3a3a8d7..a402ae72fdd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -150,17 +150,11 @@ class CSVOptions( /** * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type). - * Disabled by default for backwards compatibility and performance. When enabled, date entries in - * timestamp columns will be cast to timestamp upon parsing. Not compatible with - * legacyTimeParserPolicy == LEGACY since legacy date parser will accept extra trailing characters + * Disabled by default for performance. When enabled, date entries in timestamp columns + * will be cast to timestamp upon parsing. Cannot infer legacy date formats since the legacy date + * parser will accept extra trailing characters */ - val inferDate = { - val inferDateFlag = getBool("inferDate") - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) { - throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() - } - inferDateFlag - } + val inferDate = getBool("inferDate") // Provide a default value for dateFormatInRead when inferDate. This ensures that the // Iso8601DateFormatter (with strict date parsing) is used for date inference diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 0237b6c454d0..4e0584939891 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -112,7 +112,8 @@ class UnivocityParser( options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, - isParsing = true) + isParsing = true, + isInferring = false) private val csvFilters = if (SQLConf.get.csvFilterPushDown) { new OrderedFilters(filters, requiredSchema) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 7004d2a8f162..ab5a2781d8ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -76,7 +76,8 @@ class JacksonParser( options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, - isParsing = true) + isParsing = true, + isInferring = false) /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 0aa64bcf5b7f..deacfaa063f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -175,8 +175,9 @@ object DateFormatter { format: Option[String], locale: Locale = defaultLocale, legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT, - isParsing: Boolean): DateFormatter = { - if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { + isParsing: Boolean, + isInference: Boolean = false): DateFormatter = { + if (SQLConf.get.legacyTimeParserPolicy == LEGACY && !isInference) { getLegacyFormatter(format.getOrElse(defaultPattern), locale, legacyFormat) } else { val df = format @@ -203,8 +204,9 @@ object DateFormatter { format: Option[String], locale: Locale, legacyFormat: LegacyDateFormat, - isParsing: Boolean): DateFormatter = { - getFormatter(format, locale, legacyFormat, isParsing) + isParsing: Boolean, + isInferring: Boolean): DateFormatter = { + getFormatter(format, locale, legacyFormat, isParsing, isInferring) } def apply( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 62e3f95940b8..a3129f249c1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.InternalCompilerException -import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkThrowable, SparkUnsupportedOperationException, SparkUpgradeException} +import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkUnsupportedOperationException, SparkUpgradeException} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.memory.SparkOutOfMemoryError @@ -529,12 +529,6 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { """.stripMargin) } - def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = { - new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE", - messageParameters = Array() - ) - } - def streamedOperatorUnsupportedByDataSourceError( className: String, operator: String): Throwable = { new UnsupportedOperationException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 758f54306088..8c5b59590776 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2803,41 +2803,35 @@ abstract class CSVSuite "inferSchema" -> "true", "inferDate" -> "true") - // Error should be thrown when attempting to inferDate with Legacy parser + // We should still be able to parse legacy formats when the schema is provided if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - val msg = intercept[IllegalArgumentException] { - spark.read - .format("csv") - .options(options1) - .load(testFile(dateInferSchemaFile)) - }.getMessage - assert(msg.contains("CANNOT_INFER_DATE")) - } else { - // 1. Specify date format and timestamp format - // 2. Date inference should work with default date format when dateFormat is not provided - Seq(options1, options2).foreach {options => - val results = spark.read - .format("csv") - .options(options) - .load(testFile(dateInferSchemaFile)) - - val expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", TimestampType), - StructField("date-timestamp", TimestampType))) - assert(results.schema == expectedSchema) + val ds = Seq("1500-02-29").toDS() + val csv = spark.read.option("header", false).schema("d date").csv(ds) + checkAnswer(csv, Row(Date.valueOf("1500-03-01"))) + } + // 1. Specify date format and timestamp format + // 2. Date inference should work with default date format when dateFormat is not provided + Seq(options1, options2).foreach {options => + val results = spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) - val expected = - Seq( - Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), - Timestamp.valueOf("1765-03-28 00:00:0.0")), - Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), - Timestamp.valueOf("1423-11-12 23:41:0.0")), - Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), - Timestamp.valueOf("2016-01-28 20:00:00.0")) - ) - assert(results.collect().toSeq.map(_.toSeq) == expected) - } + val expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", TimestampType), + StructField("date-timestamp", TimestampType))) + assert(results.schema == expectedSchema) + val expected = + Seq( + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), + Timestamp.valueOf("1765-03-28 00:00:0.0")), + Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), + Timestamp.valueOf("1423-11-12 23:41:0.0")), + Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), + Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + assert(results.collect().toSeq.map(_.toSeq) == expected) } } } From 1e8f9384163e7f90fed69cc890a2ff6ef6323dab Mon Sep 17 00:00:00 2001 From: Jonathan Cui Date: Tue, 19 Jul 2022 09:34:43 -0700 Subject: [PATCH 21/21] Revert "allow legacy parser with inferDate" This reverts commit e1170d0ee2027d810d2b23243602de147748838b. --- .../main/resources/error/error-classes.json | 6 ++ docs/sql-data-sources-csv.md | 2 +- .../sql/catalyst/csv/CSVInferSchema.scala | 3 +- .../spark/sql/catalyst/csv/CSVOptions.scala | 14 +++-- .../sql/catalyst/csv/UnivocityParser.scala | 3 +- .../sql/catalyst/json/JacksonParser.scala | 3 +- .../sql/catalyst/util/DateFormatter.scala | 10 ++-- .../sql/errors/QueryExecutionErrors.scala | 8 ++- .../execution/datasources/csv/CSVSuite.scala | 58 ++++++++++--------- 9 files changed, 63 insertions(+), 44 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index a159202d99c4..dddf2bdfec1a 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -23,6 +23,12 @@ ], "sqlState" : "22005" }, + "CANNOT_INFER_DATE" : { + "message" : [ + "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". Legacy Date formatter does not support strict date format matching which is required to avoid inferring timestamps and other non-date entries to date." + ], + "sqlState" : "22007" + }, "CANNOT_PARSE_DECIMAL" : { "message" : [ "Cannot parse decimal" diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index b1eebb918ca9..8384f8332a6a 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -111,7 +111,7 @@ Data source options of CSV can be set via: inferDate false - Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true and cannot infer from legacy formats. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat). + Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat). read diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index fcde5213cc07..3132fea8700b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -50,8 +50,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, - isParsing = true, - isInferring = true) + isParsing = true) private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index a402ae72fdd5..a033e3a3a8d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -150,11 +150,17 @@ class CSVOptions( /** * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type). - * Disabled by default for performance. When enabled, date entries in timestamp columns - * will be cast to timestamp upon parsing. Cannot infer legacy date formats since the legacy date - * parser will accept extra trailing characters + * Disabled by default for backwards compatibility and performance. When enabled, date entries in + * timestamp columns will be cast to timestamp upon parsing. Not compatible with + * legacyTimeParserPolicy == LEGACY since legacy date parser will accept extra trailing characters */ - val inferDate = getBool("inferDate") + val inferDate = { + val inferDateFlag = getBool("inferDate") + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) { + throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() + } + inferDateFlag + } // Provide a default value for dateFormatInRead when inferDate. This ensures that the // Iso8601DateFormatter (with strict date parsing) is used for date inference diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 4e0584939891..0237b6c454d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -112,8 +112,7 @@ class UnivocityParser( options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, - isParsing = true, - isInferring = false) + isParsing = true) private val csvFilters = if (SQLConf.get.csvFilterPushDown) { new OrderedFilters(filters, requiredSchema) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index ab5a2781d8ea..7004d2a8f162 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -76,8 +76,7 @@ class JacksonParser( options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, - isParsing = true, - isInferring = false) + isParsing = true) /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index deacfaa063f1..0aa64bcf5b7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -175,9 +175,8 @@ object DateFormatter { format: Option[String], locale: Locale = defaultLocale, legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT, - isParsing: Boolean, - isInference: Boolean = false): DateFormatter = { - if (SQLConf.get.legacyTimeParserPolicy == LEGACY && !isInference) { + isParsing: Boolean): DateFormatter = { + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { getLegacyFormatter(format.getOrElse(defaultPattern), locale, legacyFormat) } else { val df = format @@ -204,9 +203,8 @@ object DateFormatter { format: Option[String], locale: Locale, legacyFormat: LegacyDateFormat, - isParsing: Boolean, - isInferring: Boolean): DateFormatter = { - getFormatter(format, locale, legacyFormat, isParsing, isInferring) + isParsing: Boolean): DateFormatter = { + getFormatter(format, locale, legacyFormat, isParsing) } def apply( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index a3129f249c1d..62e3f95940b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.InternalCompilerException -import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkUnsupportedOperationException, SparkUpgradeException} +import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkThrowable, SparkUnsupportedOperationException, SparkUpgradeException} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.memory.SparkOutOfMemoryError @@ -529,6 +529,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { """.stripMargin) } + def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = { + new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE", + messageParameters = Array() + ) + } + def streamedOperatorUnsupportedByDataSourceError( className: String, operator: String): Throwable = { new UnsupportedOperationException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 8c5b59590776..758f54306088 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2803,35 +2803,41 @@ abstract class CSVSuite "inferSchema" -> "true", "inferDate" -> "true") - // We should still be able to parse legacy formats when the schema is provided + // Error should be thrown when attempting to inferDate with Legacy parser if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - val ds = Seq("1500-02-29").toDS() - val csv = spark.read.option("header", false).schema("d date").csv(ds) - checkAnswer(csv, Row(Date.valueOf("1500-03-01"))) - } - // 1. Specify date format and timestamp format - // 2. Date inference should work with default date format when dateFormat is not provided - Seq(options1, options2).foreach {options => - val results = spark.read - .format("csv") - .options(options) - .load(testFile(dateInferSchemaFile)) + val msg = intercept[IllegalArgumentException] { + spark.read + .format("csv") + .options(options1) + .load(testFile(dateInferSchemaFile)) + }.getMessage + assert(msg.contains("CANNOT_INFER_DATE")) + } else { + // 1. Specify date format and timestamp format + // 2. Date inference should work with default date format when dateFormat is not provided + Seq(options1, options2).foreach {options => + val results = spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) - val expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", TimestampType), - StructField("date-timestamp", TimestampType))) - assert(results.schema == expectedSchema) + val expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", TimestampType), + StructField("date-timestamp", TimestampType))) + assert(results.schema == expectedSchema) + + val expected = + Seq( + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), + Timestamp.valueOf("1765-03-28 00:00:0.0")), + Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), + Timestamp.valueOf("1423-11-12 23:41:0.0")), + Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), + Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + assert(results.collect().toSeq.map(_.toSeq) == expected) + } - val expected = - Seq( - Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), - Timestamp.valueOf("1765-03-28 00:00:0.0")), - Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), - Timestamp.valueOf("1423-11-12 23:41:0.0")), - Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), - Timestamp.valueOf("2016-01-28 20:00:00.0")) - ) - assert(results.collect().toSeq.map(_.toSeq) == expected) } } }