Skip to content

Commit c2536a7

Browse files
Jonathancui123cloud-fan
authored andcommitted
[SPARK-39469][SQL] Infer date type for CSV schema inference
### What changes were proposed in this pull request? 1. Add a new `inferDate` option to CSV Options. The description is: > Whether or not to infer columns that satisfy the `dateFormat` option as `Date`. Requires `inferSchema` to be true. When `false`, columns with dates will be inferred as `String` (or as `Timestamp` if it fits the `timestampFormat`) Legacy date formats in `Timestamp` columns cannot be parsed with this option. An error will be thrown if `inferDate` is true when SQL Configuration LegacyTimeParserPolicy is `LEGACY`. This is to avoid incorrect schema inferences from legacy time parsers not doing strict parsing. The `inferDate` option should prevent performance degradation for users who don't opt-in. 2. Modify InferField in CSVInferSchema.scala to include Date type. If `typeSoFar` in `inferField` is Date, Timestamp or TimstampNTZ, we will first attempt to parse Date and then parse Timestamp/TimestampNTZ. The reason why we attempt to parse date for `typeSoFar`=Timestamp/TimestampNTZ is because of the case where a column contains a timestamp entry and then a date entry - we should detect both of the data types and infer the column as a timestamp type. Example: ``` Seq("2010|10|10", "2010_10_10") .toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") spark.read .option("inferSchema", "true") .option("header", "false") .option("dateFormat", "yyyy|MM|dd") .option("timestampFormat", "yyyy_MM_dd").csv("/tmp/foo").printSchema() ``` Result: ``` root |-- _c0: timestamp (nullable = true) ``` 3. Also modified `makeConverter` in `UnivocityParser` to handle Date type entries in a timestamp type column to properly parse the above example. ### Does this PR introduce _any_ user-facing change? The new behavior of schema inference when `inferDate = true`: 1. If a column contains only dates, it should be of “date” type in the inferred schema --> If the date format and the timestamp format are identical (e.g. both are yyyy/mm/dd), entries will default to being interpreted as Date 3. If a column contains dates and timestamps, it should be of “timestamp” type in the inferred schema ### How was this patch tested? Unit tests were added to `CSVInferSchemaSuite` and `UnivocityParserSuite`. An end to end test is added to `CSVSuite` ### Benchmarks: `inferDate` increases parsing/inference time in general. The impact scales with the number of rows (and not the number of columns). For columns of date type (which would be inferred as timestamp when `inferDate=false`), inference and parsing takes 30% longer. The performance impact is much greater on columns of timestamp type (taking 30x longer than `inferDate=false`) - due to trying each timestamp as a date (and throwing an error) during the inference step. #### Number of seconds taken to parse each CSV file with `inferDate true` and `inferDate false` | | inferDate=False | inferDate=True | master branch | |---------------------------------------------|-----------------|----------------|---------------| | Small file (<100 row/col). Mixed data types | 0.32 | 0.33 | | | 100K rows. 4 columns. Mixed data types. | 0.70 | 2.80 | 0.70 | | 20k columns. 4 rows. Mixed Data types. | 16.32 | 15.90 | 13.5 | | Large file. Only date type. | 2.15 | 3.70 | 2.10 | | Large file. Only timestamp type. | 2.60 | 77.00 | 2.30 | Results are the average of 3 trials with the same machine. Over multiple runs, master branch benchmark times have also shown results that are slower than `inferDate=false` (although the average is slightly faster). Given the +/- 20% variance in results between trials, master branch benchmark results are roughly similar to `inferDate=False` results. Closes #36871 from Jonathancui123/SPARK-39469-date-infer. Lead-authored-by: Jonathan Cui <[email protected]> Co-authored-by: Jonathan Cui <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 66b1f79 commit c2536a7

File tree

10 files changed

+219
-15
lines changed

10 files changed

+219
-15
lines changed

core/src/main/resources/error/error-classes.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323
],
2424
"sqlState" : "22005"
2525
},
26+
"CANNOT_INFER_DATE" : {
27+
"message" : [
28+
"Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". Legacy Date formatter does not support strict date format matching which is required to avoid inferring timestamps and other non-date entries to date."
29+
],
30+
"sqlState" : "22007"
31+
},
2632
"CANNOT_PARSE_DECIMAL" : {
2733
"message" : [
2834
"Cannot parse decimal"

docs/sql-data-sources-csv.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ Data source options of CSV can be set via:
108108
<td>Infers the input schema automatically from data. It requires one extra pass over the data. CSV built-in functions ignore this option.</td>
109109
<td>read</td>
110110
</tr>
111+
<tr>
112+
<td><code>inferDate</code></td>
113+
<td>false</td>
114+
<td>Whether or not to infer columns that satisfy the <code>dateFormat</code> option as <code>Date</code>. Requires <code>inferSchema</code> to be <code>true</code>. When <code>false</code>, columns with dates will be inferred as <code>String</code> (or as <code>Timestamp</code> if it fits the <code>timestampFormat</code>).</td>
115+
<td>read</td>
116+
</tr>
111117
<tr>
112118
<td><code>enforceSchema</code></td>
113119
<td>true</td>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import scala.util.control.Exception.allCatch
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2626
import org.apache.spark.sql.catalyst.expressions.ExprUtils
27+
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
2728
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
28-
import org.apache.spark.sql.catalyst.util.TimestampFormatter
2929
import org.apache.spark.sql.errors.QueryExecutionErrors
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.types._
@@ -46,6 +46,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
4646
isParsing = true,
4747
forTimestampNTZ = true)
4848

49+
private lazy val dateFormatter = DateFormatter(
50+
options.dateFormatInRead,
51+
options.locale,
52+
legacyFormat = FAST_DATE_FORMAT,
53+
isParsing = true)
54+
4955
private val decimalParser = if (options.locale == Locale.US) {
5056
// Special handling the default locale for backward compatibility
5157
s: String => new java.math.BigDecimal(s)
@@ -117,7 +123,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
117123
case LongType => tryParseLong(field)
118124
case _: DecimalType => tryParseDecimal(field)
119125
case DoubleType => tryParseDouble(field)
126+
case DateType => tryParseDateTime(field)
127+
case TimestampNTZType if options.inferDate => tryParseDateTime(field)
120128
case TimestampNTZType => tryParseTimestampNTZ(field)
129+
case TimestampType if options.inferDate => tryParseDateTime(field)
121130
case TimestampType => tryParseTimestamp(field)
122131
case BooleanType => tryParseBoolean(field)
123132
case StringType => StringType
@@ -169,6 +178,16 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
169178
private def tryParseDouble(field: String): DataType = {
170179
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
171180
DoubleType
181+
} else if (options.inferDate) {
182+
tryParseDateTime(field)
183+
} else {
184+
tryParseTimestampNTZ(field)
185+
}
186+
}
187+
188+
private def tryParseDateTime(field: String): DataType = {
189+
if ((allCatch opt dateFormatter.parse(field)).isDefined) {
190+
DateType
172191
} else {
173192
tryParseTimestampNTZ(field)
174193
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,28 @@ class CSVOptions(
148148
// A language tag in IETF BCP 47 format
149149
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
150150

151-
val dateFormatInRead: Option[String] = parameters.get("dateFormat")
151+
/**
152+
* Infer columns with all valid date entries as date type (otherwise inferred as timestamp type).
153+
* Disabled by default for backwards compatibility and performance. When enabled, date entries in
154+
* timestamp columns will be cast to timestamp upon parsing. Not compatible with
155+
* legacyTimeParserPolicy == LEGACY since legacy date parser will accept extra trailing characters
156+
*/
157+
val inferDate = {
158+
val inferDateFlag = getBool("inferDate")
159+
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) {
160+
throw QueryExecutionErrors.inferDateWithLegacyTimeParserError()
161+
}
162+
inferDateFlag
163+
}
164+
165+
// Provide a default value for dateFormatInRead when inferDate. This ensures that the
166+
// Iso8601DateFormatter (with strict date parsing) is used for date inference
167+
val dateFormatInRead: Option[String] =
168+
if (inferDate) {
169+
Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern))
170+
} else {
171+
parameters.get("dateFormat")
172+
}
152173
val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)
153174

154175
val timestampFormatInRead: Option[String] =
@@ -195,7 +216,6 @@ class CSVOptions(
195216
*/
196217
val enforceSchema = getBool("enforceSchema", default = true)
197218

198-
199219
/**
200220
* String representation of an empty value in read and in write.
201221
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
2929
import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal}
3030
import org.apache.spark.sql.catalyst.util._
31+
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC}
3132
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
3233
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
3334
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -197,34 +198,46 @@ class UnivocityParser(
197198
Decimal(decimalParser(datum), dt.precision, dt.scale)
198199
}
199200

200-
case _: TimestampType => (d: String) =>
201+
case _: DateType => (d: String) =>
201202
nullSafeDatum(d, name, nullable, options) { datum =>
202203
try {
203-
timestampFormatter.parse(datum)
204+
dateFormatter.parse(datum)
204205
} catch {
205206
case NonFatal(e) =>
206207
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
207208
// compatibility.
208209
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
209-
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
210+
DateTimeUtils.stringToDate(str).getOrElse(throw e)
210211
}
211212
}
212213

213-
case _: TimestampNTZType => (d: String) =>
214-
nullSafeDatum(d, name, nullable, options) { datum =>
215-
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
216-
}
217-
218-
case _: DateType => (d: String) =>
214+
case _: TimestampType => (d: String) =>
219215
nullSafeDatum(d, name, nullable, options) { datum =>
220216
try {
221-
dateFormatter.parse(datum)
217+
timestampFormatter.parse(datum)
222218
} catch {
223219
case NonFatal(e) =>
224220
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
225221
// compatibility.
226222
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
227-
DateTimeUtils.stringToDate(str).getOrElse(throw e)
223+
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse {
224+
// There may be date type entries in timestamp column due to schema inference
225+
if (options.inferDate) {
226+
daysToMicros(dateFormatter.parse(datum), options.zoneId)
227+
} else {
228+
throw(e)
229+
}
230+
}
231+
}
232+
}
233+
234+
case _: TimestampNTZType => (d: String) =>
235+
nullSafeDatum(d, name, nullable, options) { datum =>
236+
try {
237+
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
238+
} catch {
239+
case NonFatal(e) if (options.inferDate) =>
240+
daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId)
228241
}
229242
}
230243

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, Path}
3232
import org.apache.hadoop.fs.permission.FsPermission
3333
import org.codehaus.commons.compiler.{CompileException, InternalCompilerException}
3434

35-
import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkUnsupportedOperationException, SparkUpgradeException}
35+
import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkThrowable, SparkUnsupportedOperationException, SparkUpgradeException}
3636
import org.apache.spark.executor.CommitDeniedException
3737
import org.apache.spark.launcher.SparkLauncher
3838
import org.apache.spark.memory.SparkOutOfMemoryError
@@ -528,6 +528,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
528528
""".stripMargin)
529529
}
530530

531+
def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = {
532+
new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE",
533+
messageParameters = Array()
534+
)
535+
}
536+
531537
def streamedOperatorUnsupportedByDataSourceError(
532538
className: String, operator: String): Throwable = {
533539
new UnsupportedOperationException(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
109109
assert(
110110
inferSchema.mergeRowTypes(Array(DoubleType),
111111
Array(LongType)).sameElements(Array(DoubleType)))
112+
assert(
113+
inferSchema.mergeRowTypes(Array(DateType),
114+
Array(TimestampNTZType)).sameElements(Array(TimestampNTZType)))
115+
assert(
116+
inferSchema.mergeRowTypes(Array(DateType),
117+
Array(TimestampType)).sameElements(Array(TimestampType)))
112118
}
113119

114120
test("Null fields are handled properly when a nullValue is specified") {
@@ -192,4 +198,53 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
192198
Seq("en-US").foreach(checkDecimalInfer(_, StringType))
193199
Seq("ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 0)))
194200
}
201+
202+
test("SPARK-39469: inferring date type") {
203+
// "yyyy/MM/dd" format
204+
var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "inferDate" -> "true"),
205+
false, "UTC")
206+
var inferSchema = new CSVInferSchema(options)
207+
assert(inferSchema.inferField(NullType, "2018/12/02") == DateType)
208+
// "MMM yyyy" format
209+
options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "inferDate" -> "true"),
210+
false, "GMT")
211+
inferSchema = new CSVInferSchema(options)
212+
assert(inferSchema.inferField(NullType, "Dec 2018") == DateType)
213+
// Field should strictly match date format to infer as date
214+
options = new CSVOptions(
215+
Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
216+
"inferDate" -> "true"),
217+
columnPruning = false,
218+
defaultTimeZoneId = "GMT")
219+
inferSchema = new CSVInferSchema(options)
220+
assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == TimestampType)
221+
assert(inferSchema.inferField(NullType, "2018-12-03") == DateType)
222+
}
223+
224+
test("SPARK-39469: inferring date and timestamp types in a mixed column with inferDate=true") {
225+
var options = new CSVOptions(
226+
Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd",
227+
"timestampNTZFormat" -> "yyyy/MM/dd", "inferDate" -> "true"),
228+
columnPruning = false,
229+
defaultTimeZoneId = "UTC")
230+
var inferSchema = new CSVInferSchema(options)
231+
assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
232+
assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType)
233+
// SQL configuration must be set to default to TimestampNTZ
234+
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
235+
assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType)
236+
}
237+
238+
// inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp
239+
assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType)
240+
assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType)
241+
242+
// No errors when Date and Timestamp have the same format. Inference defaults to date
243+
options = new CSVOptions(
244+
Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy_MM_dd"),
245+
columnPruning = false,
246+
defaultTimeZoneId = "UTC")
247+
inferSchema = new CSVInferSchema(options)
248+
assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
249+
}
195250
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv
1919

2020
import java.math.BigDecimal
2121
import java.text.{DecimalFormat, DecimalFormatSymbols}
22+
import java.time.{ZoneOffset}
2223
import java.util.{Locale, TimeZone}
2324

2425
import org.apache.commons.lang3.time.FastDateFormat
@@ -358,4 +359,26 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
358359
Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC")
359360
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))
360361
}
362+
363+
test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") {
364+
def checkDate(dataType: DataType): Unit = {
365+
val timestampsOptions =
366+
new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm",
367+
"timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"),
368+
false, DateTimeUtils.getZoneId("-08:00").toString)
369+
// Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always
370+
// converted to their equivalent UTC timestamp
371+
val dateString = "08_09_2001"
372+
val expected = dataType match {
373+
case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00"))
374+
case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC)
375+
case DateType => days(2001, 9, 8)
376+
}
377+
val parser = new UnivocityParser(new StructType(), timestampsOptions)
378+
assert(parser.makeConverter("d", dataType).apply(dateString) == expected)
379+
}
380+
checkDate(TimestampType)
381+
checkDate(TimestampNTZType)
382+
checkDate(DateType)
383+
}
361384
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
date,timestamp-date,date-timestamp
2+
2001-09-08,2014-10-27T18:30:00,1765-03-28
3+
1941-01-02,2000-09-14T01:01:00,1423-11-12T23:41:00
4+
0293-11-07,1995-06-25,2016-01-28T20:00:00

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, Que
4141
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
4242
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
4343
import org.apache.spark.sql.internal.SQLConf
44+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
4445
import org.apache.spark.sql.test.SharedSparkSession
4546
import org.apache.spark.sql.types._
4647

@@ -74,6 +75,7 @@ abstract class CSVSuite
7475
private val simpleSparseFile = "test-data/simple_sparse.csv"
7576
private val numbersFile = "test-data/numbers.csv"
7677
private val datesFile = "test-data/dates.csv"
78+
private val dateInferSchemaFile = "test-data/date-infer-schema.csv"
7779
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
7880
private val valueMalformedFile = "test-data/value-malformed.csv"
7981
private val badAfterGoodFile = "test-data/bad_after_good.csv"
@@ -2788,6 +2790,56 @@ abstract class CSVSuite
27882790
}
27892791
}
27902792
}
2793+
2794+
test("SPARK-39469: Infer schema for date type") {
2795+
val options1 = Map(
2796+
"header" -> "true",
2797+
"inferSchema" -> "true",
2798+
"timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
2799+
"dateFormat" -> "yyyy-MM-dd",
2800+
"inferDate" -> "true")
2801+
val options2 = Map(
2802+
"header" -> "true",
2803+
"inferSchema" -> "true",
2804+
"inferDate" -> "true")
2805+
2806+
// Error should be thrown when attempting to inferDate with Legacy parser
2807+
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
2808+
val msg = intercept[IllegalArgumentException] {
2809+
spark.read
2810+
.format("csv")
2811+
.options(options1)
2812+
.load(testFile(dateInferSchemaFile))
2813+
}.getMessage
2814+
assert(msg.contains("CANNOT_INFER_DATE"))
2815+
} else {
2816+
// 1. Specify date format and timestamp format
2817+
// 2. Date inference should work with default date format when dateFormat is not provided
2818+
Seq(options1, options2).foreach {options =>
2819+
val results = spark.read
2820+
.format("csv")
2821+
.options(options)
2822+
.load(testFile(dateInferSchemaFile))
2823+
2824+
val expectedSchema = StructType(List(StructField("date", DateType),
2825+
StructField("timestamp-date", TimestampType),
2826+
StructField("date-timestamp", TimestampType)))
2827+
assert(results.schema == expectedSchema)
2828+
2829+
val expected =
2830+
Seq(
2831+
Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"),
2832+
Timestamp.valueOf("1765-03-28 00:00:0.0")),
2833+
Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"),
2834+
Timestamp.valueOf("1423-11-12 23:41:0.0")),
2835+
Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"),
2836+
Timestamp.valueOf("2016-01-28 20:00:00.0"))
2837+
)
2838+
assert(results.collect().toSeq.map(_.toSeq) == expected)
2839+
}
2840+
2841+
}
2842+
}
27912843
}
27922844

27932845
class CSVv1Suite extends CSVSuite {

0 commit comments

Comments
 (0)