Skip to content

Commit d2ed686

Browse files
author
sergei.rubtcov
committed
[SPARK-19228][SQL] Introduce tryParseDate method to process csv date, add a type-widening rule in findTightestCommonType between DateType and TimestampType, add java.time.format.DateTimeFormatter to more accurately infer the type of time, add an end-to-end test case and unit test
1 parent a6fc300 commit d2ed686

File tree

5 files changed

+80
-4
lines changed

5 files changed

+80
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ private[csv] object CSVInferSchema {
9090
// DecimalTypes have different precisions and scales, so we try to find the common type.
9191
findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType)
9292
case DoubleType => tryParseDouble(field, options)
93-
case TimestampType => tryParseTimestamp(field, options)
93+
case DateType => tryParseDate(field, options)
94+
case TimestampType =>
95+
findTightestCommonType(typeSoFar, tryParseTimestamp(field, options)).getOrElse(
96+
tryParseBoolean(field, options))
9497
case BooleanType => tryParseBoolean(field, options)
9598
case StringType => StringType
9699
case other: DataType =>
@@ -140,14 +143,23 @@ private[csv] object CSVInferSchema {
140143
private def tryParseDouble(field: String, options: CSVOptions): DataType = {
141144
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) {
142145
DoubleType
146+
} else {
147+
tryParseDate(field, options)
148+
}
149+
}
150+
151+
private def tryParseDate(field: String, options: CSVOptions): DataType = {
152+
// This case infers a custom `dateFormat` is set.
153+
if ((allCatch opt options.dateFormatter.parse(field)).isDefined) {
154+
DateType
143155
} else {
144156
tryParseTimestamp(field, options)
145157
}
146158
}
147159

148160
private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
149-
// This case infers a custom `dataFormat` is set.
150-
if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
161+
// This case infers a custom `timestampFormat` is set.
162+
if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) {
151163
TimestampType
152164
} else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
153165
// We keep this for backwards compatibility.
@@ -216,6 +228,8 @@ private[csv] object CSVInferSchema {
216228
} else {
217229
Some(DecimalType(range + scale, scale))
218230
}
231+
// By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes).
232+
case (t1: DateType, t2: TimestampType) => Some(TimestampType)
219233

220234
case _ => None
221235
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.csv
1919

2020
import java.nio.charset.StandardCharsets
21+
import java.time.format.{DateTimeFormatter, ResolverStyle}
2122
import java.util.{Locale, TimeZone}
2223

2324
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
@@ -150,6 +151,16 @@ class CSVOptions(
150151

151152
val isCommentSet = this.comment != '\u0000'
152153

154+
def dateFormatter: DateTimeFormatter = {
155+
DateTimeFormatter.ofPattern(dateFormat.getPattern)
156+
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
157+
}
158+
159+
def timestampFormatter: DateTimeFormatter = {
160+
DateTimeFormatter.ofPattern(timestampFormat.getPattern)
161+
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
162+
}
163+
153164
def asWriterSettings: CsvWriterSettings = {
154165
val writerSettings = new CsvWriterSettings()
155166
val format = writerSettings.getFormat
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
timestamp,date
2+
26/08/2015 22:31:46.913,27/09/2015
3+
27/10/2014 22:33:31.601,26/12/2016
4+
28/01/2016 22:33:52.888,28/01/2017

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
5959
assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne)
6060
}
6161

62-
test("Timestamp field types are inferred correctly via custom data format") {
62+
test("Timestamp field types are inferred correctly via custom date format") {
6363
var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
6464
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
6565
options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
6666
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
6767
}
6868

69+
test("Date field types are inferred correctly via custom date and timestamp format") {
70+
val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy",
71+
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT")
72+
assert(CSVInferSchema.inferField(TimestampType,
73+
"28/01/2017 22:31:46.913", options) == TimestampType)
74+
assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType)
75+
}
76+
6977
test("Timestamp field types are inferred correctly from other types") {
7078
val options = new CSVOptions(Map.empty[String, String], "GMT")
7179
assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
5454
private val simpleSparseFile = "test-data/simple_sparse.csv"
5555
private val numbersFile = "test-data/numbers.csv"
5656
private val datesFile = "test-data/dates.csv"
57+
private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv"
5758
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
5859
private val valueMalformedFile = "test-data/value-malformed.csv"
5960

@@ -566,6 +567,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
566567
assert(results.toSeq.map(_.toSeq) === expected)
567568
}
568569

570+
test("inferring timestamp types and date types via custom formats") {
571+
val options = Map(
572+
"header" -> "true",
573+
"inferSchema" -> "true",
574+
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS",
575+
"dateFormat" -> "dd/MM/yyyy")
576+
val results = spark.read
577+
.format("csv")
578+
.options(options)
579+
.load(testFile(datesAndTimestampsFile))
580+
assert(results.schema{0}.dataType===TimestampType)
581+
assert(results.schema{1}.dataType===DateType)
582+
val timestamps = spark.read
583+
.format("csv")
584+
.options(options)
585+
.load(testFile(datesAndTimestampsFile))
586+
.select("timestamp")
587+
.collect()
588+
val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US)
589+
val timestampExpected =
590+
Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)),
591+
Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)),
592+
Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime)))
593+
assert(timestamps.toSeq.map(_.toSeq) === timestampExpected)
594+
val dates = spark.read
595+
.format("csv")
596+
.options(options)
597+
.load(testFile(datesAndTimestampsFile))
598+
.select("date")
599+
.collect()
600+
val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US)
601+
val dateExpected =
602+
Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)),
603+
Seq(new Date(dateFormat.parse("26/12/2016").getTime)),
604+
Seq(new Date(dateFormat.parse("28/01/2017").getTime)))
605+
assert(dates.toSeq.map(_.toSeq) === dateExpected)
606+
}
607+
569608
test("load date types via custom date format") {
570609
val customSchema = new StructType(Array(StructField("date", DateType, true)))
571610
val options = Map(

0 commit comments

Comments
 (0)