-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-39469][SQL] Infer date type for CSV schema inference #36871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5048253
5058d92
f16e5e1
c2b5fdc
9514c2c
be7aabd
db3b442
7d98686
966bdb6
638064b
50a91a6
d71558d
601dfc8
5aa4ab6
2282c59
2484b77
762e0d8
2c93af5
41fa8eb
e1170d0
1e8f938
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,8 +24,8 @@ import scala.util.control.Exception.allCatch | |
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.analysis.TypeCoercion | ||
| import org.apache.spark.sql.catalyst.expressions.ExprUtils | ||
| import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} | ||
| import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT | ||
| import org.apache.spark.sql.catalyst.util.TimestampFormatter | ||
| import org.apache.spark.sql.errors.QueryExecutionErrors | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
|
|
@@ -46,6 +46,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { | |
| isParsing = true, | ||
| forTimestampNTZ = true) | ||
|
|
||
| private lazy val dateFormatter = DateFormatter( | ||
| options.dateFormatInRead, | ||
| options.locale, | ||
| legacyFormat = FAST_DATE_FORMAT, | ||
| isParsing = true) | ||
|
|
||
| private val decimalParser = if (options.locale == Locale.US) { | ||
| // Special handling the default locale for backward compatibility | ||
| s: String => new java.math.BigDecimal(s) | ||
|
|
@@ -117,7 +123,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { | |
| case LongType => tryParseLong(field) | ||
| case _: DecimalType => tryParseDecimal(field) | ||
| case DoubleType => tryParseDouble(field) | ||
| case DateType => tryParseDateTime(field) | ||
| case TimestampNTZType if options.inferDate => tryParseDateTime(field) | ||
|
||
| case TimestampNTZType => tryParseTimestampNTZ(field) | ||
| case TimestampType if options.inferDate => tryParseDateTime(field) | ||
| case TimestampType => tryParseTimestamp(field) | ||
| case BooleanType => tryParseBoolean(field) | ||
| case StringType => StringType | ||
|
|
@@ -169,6 +178,16 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { | |
| private def tryParseDouble(field: String): DataType = { | ||
| if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { | ||
| DoubleType | ||
| } else if (options.inferDate) { | ||
| tryParseDateTime(field) | ||
| } else { | ||
| tryParseTimestampNTZ(field) | ||
| } | ||
| } | ||
|
|
||
| private def tryParseDateTime(field: String): DataType = { | ||
| if ((allCatch opt dateFormatter.parse(field)).isDefined) { | ||
HyukjinKwon marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| DateType | ||
| } else { | ||
| tryParseTimestampNTZ(field) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} | ||
| import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} | ||
| import org.apache.spark.sql.catalyst.util._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC} | ||
| import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT | ||
| import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ | ||
| import org.apache.spark.sql.errors.QueryExecutionErrors | ||
|
|
@@ -197,34 +198,46 @@ class UnivocityParser( | |
| Decimal(decimalParser(datum), dt.precision, dt.scale) | ||
| } | ||
|
|
||
| case _: TimestampType => (d: String) => | ||
| case _: DateType => (d: String) => | ||
| nullSafeDatum(d, name, nullable, options) { datum => | ||
| try { | ||
| timestampFormatter.parse(datum) | ||
| dateFormatter.parse(datum) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // If fails to parse, then tries the way used in 2.0 and 1.x for backwards | ||
| // compatibility. | ||
| val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) | ||
| DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) | ||
| DateTimeUtils.stringToDate(str).getOrElse(throw e) | ||
| } | ||
| } | ||
|
|
||
| case _: TimestampNTZType => (d: String) => | ||
| nullSafeDatum(d, name, nullable, options) { datum => | ||
| timestampNTZFormatter.parseWithoutTimeZone(datum, false) | ||
| } | ||
|
|
||
| case _: DateType => (d: String) => | ||
| case _: TimestampType => (d: String) => | ||
| nullSafeDatum(d, name, nullable, options) { datum => | ||
| try { | ||
| dateFormatter.parse(datum) | ||
| timestampFormatter.parse(datum) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // If fails to parse, then tries the way used in 2.0 and 1.x for backwards | ||
| // compatibility. | ||
| val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) | ||
| DateTimeUtils.stringToDate(str).getOrElse(throw e) | ||
| DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the issue here is, if the timestamp parsing fails, maybe it's because this is a date, or maybe it's a legacy timestamp format. We need to define the priority here. Since cc @sadikovi
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wonder, all issues mentioned by @HyukjinKwon in my PR #23202 (comment) have been addressed by this PR.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. We should address the order. Otherwise, it is unclear how to handle fallback. Fixed here: 10ca4a4. |
||
| // There may be date type entries in timestamp column due to schema inference | ||
| if (options.inferDate) { | ||
| daysToMicros(dateFormatter.parse(datum), options.zoneId) | ||
|
||
| } else { | ||
| throw(e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| case _: TimestampNTZType => (d: String) => | ||
| nullSafeDatum(d, name, nullable, options) { datum => | ||
| try { | ||
| timestampNTZFormatter.parseWithoutTimeZone(datum, false) | ||
| } catch { | ||
| case NonFatal(e) if (options.inferDate) => | ||
| daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv | |
|
|
||
| import java.math.BigDecimal | ||
| import java.text.{DecimalFormat, DecimalFormatSymbols} | ||
| import java.time.{ZoneOffset} | ||
| import java.util.{Locale, TimeZone} | ||
|
|
||
| import org.apache.commons.lang3.time.FastDateFormat | ||
|
|
@@ -358,4 +359,26 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { | |
| Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") | ||
| check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) | ||
| } | ||
|
|
||
| test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") { | ||
| def checkDate(dataType: DataType): Unit = { | ||
| val timestampsOptions = | ||
| new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm", | ||
| "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), | ||
|
||
| false, DateTimeUtils.getZoneId("-08:00").toString) | ||
| // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always | ||
| // converted to their equivalent UTC timestamp | ||
| val dateString = "08_09_2001" | ||
| val expected = dataType match { | ||
| case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00")) | ||
| case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) | ||
|
||
| case DateType => days(2001, 9, 8) | ||
| } | ||
| val parser = new UnivocityParser(new StructType(), timestampsOptions) | ||
| assert(parser.makeConverter("d", dataType).apply(dateString) == expected) | ||
| } | ||
| checkDate(TimestampType) | ||
| checkDate(TimestampNTZType) | ||
Jonathancui123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| checkDate(DateType) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| date,timestamp-date,date-timestamp | ||
| 2001-09-08,2014-10-27T18:30:00,1765-03-28 | ||
| 1941-01-02,2000-09-14T01:01:00,1423-11-12T23:41:00 | ||
| 0293-11-07,1995-06-25,2016-01-28T20:00:00 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems changing the method
tryParseDoubleshould be enoughUh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is necessary:
Consider the column of a TimestampType followed by a DateType entry. We would expect this column to be inferred as a TimestampType column.
typeSoFarwill beTimestampwheninferFieldis called on the second entry which isDateType. We need logic ininferFieldto try and parseDateTypeeven whentypeSoFarisTimestamp