From a8d27d64d45ca4ea207f0d403dba472b2f606968 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 22:57:02 +0100 Subject: [PATCH 1/7] Test for inferring the date type --- .../spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 1a020e67a75b..b38ecc775141 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -188,4 +188,14 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 0))) } + + test("inferring date type") { + var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd"), false, "GMT") + var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "2018/12/02") == DateType) + + options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"), false, "GMT") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "Dec 2018") == DateType) + } } From fa915fd3fcab37be2c53bc23a77b55e3024b3994 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 22:57:21 +0100 Subject: [PATCH 2/7] Inferring date type --- .../sql/catalyst/csv/CSVInferSchema.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 94cb4b114e6b..b2de308b6957 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.csv +import java.text.ParsePosition + import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD @@ -98,6 +100,7 @@ class CSVInferSchema(options: CSVOptions) extends Serializable { compatibleType(typeSoFar, tryParseDecimal(field)).getOrElse(StringType) case DoubleType => tryParseDouble(field) case TimestampType => tryParseTimestamp(field) + case DateType => tryParseDate(field) case BooleanType => tryParseBoolean(field) case StringType => StringType case other: DataType => @@ -159,6 +162,21 @@ class CSVInferSchema(options: CSVOptions) extends Serializable { } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { // We keep this for backwards compatibility. TimestampType + } else { + tryParseDate(field) + } + } + + private def tryParseDate(field: String): DataType = { + val dateTry = allCatch opt { + val pos = new ParsePosition(0) + options.dateFormat.parse(field, pos) + if (pos.getErrorIndex != -1 || pos.getIndex != field.length) { + throw new IllegalArgumentException(s"${field} cannot be parsed as ${DateType.simpleString}") + } + } + if (dateTry.isDefined) { + DateType } else { tryParseBoolean(field) } From 45958bd7bfff98767b6f91f0a4c284412ebd62bb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 3 Dec 2018 11:12:01 +0100 Subject: [PATCH 3/7] Additional tests --- .../spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index b38ecc775141..ecf50787502f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -197,5 +197,13 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"), false, "GMT") inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "Dec 2018") == DateType) + + options = new CSVOptions( + Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + columnPruning = false, + defaultTimeZoneId = "GMT") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == TimestampType) + assert(inferSchema.inferField(NullType, "2018-12-03") == DateType) } } From a6723f3732712ca9a3c9dfcdd167ee05d78c956f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 3 Dec 2018 11:12:24 +0100 Subject: [PATCH 4/7] Infer date type before timestamp type --- .../sql/catalyst/csv/CSVInferSchema.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index b2de308b6957..425ecd0ac44d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -150,18 +150,6 @@ class CSVInferSchema(options: CSVOptions) extends Serializable { private def tryParseDouble(field: String): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType - } else { - tryParseTimestamp(field) - } - } - - private def tryParseTimestamp(field: String): DataType = { - // This case infers a custom `dataFormat` is set. - if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { - TimestampType - } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { - // We keep this for backwards compatibility. - TimestampType } else { tryParseDate(field) } @@ -177,6 +165,18 @@ class CSVInferSchema(options: CSVOptions) extends Serializable { } if (dateTry.isDefined) { DateType + } else { + tryParseTimestamp(field) + } + } + + private def tryParseTimestamp(field: String): DataType = { + // This case infers a custom `dataFormat` is set. + if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { + TimestampType + } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { + // We keep this for backwards compatibility. + TimestampType } else { tryParseBoolean(field) } From e99a61956b63914d0724452c9211fbad2a3e5d06 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 15 Dec 2018 17:42:26 +0100 Subject: [PATCH 5/7] Moving on new date parser --- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 40c59c965bf6..3a1d3cc51b95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -24,7 +24,7 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils -import org.apache.spark.sql.catalyst.util.DateTimeFormatter +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter} import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @@ -34,6 +34,11 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { options.timestampFormat, options.timeZone, options.locale) + @transient + private lazy val dateFormatter = DateFormatter( + options.dateFormat, + options.timeZone, + options.locale) private val decimalParser = { ExprUtils.getDecimalParser(options.locale) @@ -162,14 +167,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } private def tryParseDate(field: String): DataType = { - val dateTry = allCatch opt { - val pos = new ParsePosition(0) - options.dateFormat.parse(field, pos) - if (pos.getErrorIndex != -1 || pos.getIndex != field.length) { - throw new IllegalArgumentException(s"${field} cannot be parsed as ${DateType.simpleString}") - } - } - if (dateTry.isDefined) { + if ((allCatch opt dateFormatter.parse(field)).isDefined) { DateType } else { tryParseTimestamp(field) From 0ec5c767103b8a20db60a2f57386b8fd22c712d3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 15 Dec 2018 17:54:05 +0100 Subject: [PATCH 6/7] Infer TimestampType before DateType --- .../spark/sql/catalyst/csv/CSVInferSchema.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 3a1d3cc51b95..bf94f585512c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -161,23 +161,22 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseDouble(field: String): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType - } else { - tryParseDate(field) - } - } - - private def tryParseDate(field: String): DataType = { - if ((allCatch opt dateFormatter.parse(field)).isDefined) { - DateType } else { tryParseTimestamp(field) } } private def tryParseTimestamp(field: String): DataType = { - // This case infers a custom `dataFormat` is set. if ((allCatch opt timeParser.parse(field)).isDefined) { TimestampType + } else { + tryParseDate(field) + } + } + + private def tryParseDate(field: String): DataType = { + if ((allCatch opt dateFormatter.parse(field)).isDefined) { + DateType } else { tryParseBoolean(field) } From beb69120f7b2b16121a6d87e9cf34955f1e1eb8f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 16 Dec 2018 10:50:51 +0100 Subject: [PATCH 7/7] Fix merge --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index f3a73edf0445..11f3740d99a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.csv -import java.text.ParsePosition - import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD @@ -30,14 +28,13 @@ import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @transient - private lazy val timestampParser = TimestampFormatter( + private lazy val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) @transient private lazy val dateFormatter = DateFormatter( options.dateFormat, - options.timeZone, options.locale) private val decimalParser = { @@ -167,7 +164,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } private def tryParseTimestamp(field: String): DataType = { - if ((allCatch opt timestampParser.parse(field)).isDefined) { + if ((allCatch opt timestampFormatter.parse(field)).isDefined) { TimestampType } else { tryParseDate(field)