diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 80f15053005f..9eaf6a2862a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} +import java.time.LocalDateTime +import java.time.temporal.ChronoField import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.ConcurrentHashMap import java.util.function.{Function => JFunction} @@ -143,6 +145,12 @@ object DateTimeUtils { millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone) } + def dateTimeToMicroseconds(localDateTime: LocalDateTime, timeZone: TimeZone): Long = { + val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND) + val epochSecond = localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond + epochSecond * 1000000L + microOfSecond + } + def dateToString(days: SQLDate): String = getThreadLocalDateFormat.format(toJavaDate(days)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index cbf6106697f3..cd1b7395b97d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter import java.util.{Calendar, Locale, TimeZone} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.unsafe.types.UTF8String +import org.junit.Assert.assertEquals class DateTimeUtilsSuite extends SparkFunSuite { @@ -645,6 +648,18 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } + test("Java 8 LocalDateTime to microseconds") { + val nanos = "2015-05-09 00:10:23.999750987" + var formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS") + val localDateTimeInNanos = LocalDateTime.parse(nanos, formatter) + val timeInMicros = dateTimeToMicroseconds(localDateTimeInNanos, TimeZonePST) + assertEquals(1431155423999750L, timeInMicros) + val micros = "2015-05-09 00:10:23.999750" + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS") + val localDateTimeInMicros = LocalDateTime.parse(micros, formatter) + assertEquals(timeInMicros, dateTimeToMicroseconds(localDateTimeInMicros, TimeZonePST)) + } + test("daysToMillis and millisToDays") { val c = Calendar.getInstance(TimeZonePST) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index a585cbed2551..6239f5666cd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -90,6 +90,7 @@ private[csv] object CSVInferSchema { // DecimalTypes have different precisions and scales, so we try to find the common type. findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType) case DoubleType => tryParseDouble(field, options) + case DateType => tryParseDate(field, options) case TimestampType => tryParseTimestamp(field, options) case BooleanType => tryParseBoolean(field, options) case StringType => StringType @@ -140,14 +141,23 @@ private[csv] object CSVInferSchema { private def tryParseDouble(field: String, options: CSVOptions): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) { DoubleType + } else { + tryParseDate(field, options) + } + } + + private def tryParseDate(field: String, options: CSVOptions): DataType = { + // This case infers a custom `dateFormat` is set. + if ((allCatch opt options.dateFormatter.parse(field)).isDefined) { + DateType } else { tryParseTimestamp(field, options) } } private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { - // This case infers a custom `dataFormat` is set. - if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { + // This case infers a custom `timestampFormat` is set. + if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) { TimestampType } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { // We keep this for backwards compatibility. @@ -216,6 +226,9 @@ private[csv] object CSVInferSchema { } else { Some(DecimalType(range + scale, scale)) } + // By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes). + case (t1: DateType, t2: TimestampType) => Some(TimestampType) + case (t1: TimestampType, t2: DateType) => Some(TimestampType) case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 1066d156acd7..fae56744feb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets +import java.time.format.{DateTimeFormatter, ResolverStyle} import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} -import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ @@ -119,7 +119,6 @@ class CSVOptions( val positiveInf = parameters.getOrElse("positiveInf", "Inf") val negativeInf = parameters.getOrElse("negativeInf", "-Inf") - val compressionCodec: Option[String] = { val name = parameters.get("compression").orElse(parameters.get("codec")) name.map(CompressionCodecs.getCodecClassName) @@ -128,13 +127,20 @@ class CSVOptions( val timeZone: TimeZone = DateTimeUtils.getTimeZone( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) - // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. - val dateFormat: FastDateFormat = - FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) + val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd") + + val timestampFormat: String = parameters + .getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") - val timestampFormat: FastDateFormat = - FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US) + @transient lazy val dateFormatter: DateTimeFormatter = { + DateTimeFormatter.ofPattern(dateFormat) + .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART) + } + + @transient lazy val timestampFormatter: DateTimeFormatter = { + DateTimeFormatter.ofPattern(timestampFormat) + .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART) + } val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 99557a1ceb0c..2ddd0d6dad08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.InputStream import java.math.BigDecimal +import java.time.{LocalDate, LocalDateTime} +import java.time.temporal.ChronoField import scala.util.Try import scala.util.control.NonFatal @@ -131,9 +133,8 @@ class UnivocityParser( case _: TimestampType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(options.timestampFormat.parse(datum).getTime * 1000L) + Try(DateTimeUtils.dateTimeToMicroseconds(LocalDateTime + .parse(datum, options.timestampFormatter), options.timeZone)) .getOrElse { // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. @@ -143,9 +144,8 @@ class UnivocityParser( case _: DateType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681.x - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)) + Try(Math.toIntExact(LocalDate.parse(datum, options.dateFormatter) + .getLong(ChronoField.EPOCH_DAY))) .getOrElse { // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. diff --git a/sql/core/src/test/resources/test-data/dates-and-timestamps.csv b/sql/core/src/test/resources/test-data/dates-and-timestamps.csv new file mode 100644 index 000000000000..0a9a4c2f8566 --- /dev/null +++ b/sql/core/src/test/resources/test-data/dates-and-timestamps.csv @@ -0,0 +1,4 @@ +timestamp,date +26/08/2015 22:31:46.913,27/09/2015 +27/10/2014 22:33:31.601,26/12/2016 +28/01/2016 22:33:52.888,28/01/2017 \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 661742087112..c8cfc07a4a1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite { assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne) } - test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT") + test("Timestamp field types are inferred correctly via custom date format") { + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) } + test("Date field types are inferred correctly via custom date and timestamp format") { + val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy", + "timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT") + assert(CSVInferSchema.inferField(TimestampType, + "28/01/2017 22:31:46.913", options) == TimestampType) + assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType) + } + test("Timestamp field types are inferred correctly from other types") { val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) @@ -111,7 +119,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT") + val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-MM"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 07e6c74b14d0..b571b9430d95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te private val simpleSparseFile = "test-data/simple_sparse.csv" private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" + private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" @@ -565,6 +566,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(results.toSeq.map(_.toSeq) === expected) } + test("inferring timestamp types and date types via custom formats") { + val options = Map( + "header" -> "true", + "inferSchema" -> "true", + "timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS", + "dateFormat" -> "dd/MM/yyyy") + val results = spark.read + .format("csv") + .options(options) + .load(testFile(datesAndTimestampsFile)) + assert(results.schema{0}.dataType===TimestampType) + assert(results.schema{1}.dataType===DateType) + val timestamps = spark.read + .format("csv") + .options(options) + .load(testFile(datesAndTimestampsFile)) + .select("timestamp") + .collect() + val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US) + val timestampExpected = + Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)), + Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)), + Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime))) + assert(timestamps.toSeq.map(_.toSeq) === timestampExpected) + val dates = spark.read + .format("csv") + .options(options) + .load(testFile(datesAndTimestampsFile)) + .select("date") + .collect() + val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US) + val dateExpected = + Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)), + Seq(new Date(dateFormat.parse("26/12/2016").getTime)), + Seq(new Date(dateFormat.parse("28/01/2017").getTime))) + assert(dates.toSeq.map(_.toSeq) === dateExpected) + } + test("load date types via custom date format") { val customSchema = new StructType(Array(StructField("date", DateType, true))) val options = Map( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index efbf73534bd1..257a683ee9d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.math.BigDecimal -import java.util.Locale +import java.time.{LocalDate, LocalDateTime} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -107,20 +107,26 @@ class UnivocityParserSuite extends SparkFunSuite { assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true) val timestampsOptions = - new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT") + new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy HH:mm"), "GMT") val customTimestamp = "31/01/2015 00:00" - val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime + + val expectedTime = LocalDateTime.parse(customTimestamp, timestampsOptions.timestampFormatter) + .atZone(options.timeZone.toZoneId) + .toInstant.toEpochMilli val castedTimestamp = - parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) + parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions) .apply(customTimestamp) assert(castedTimestamp == expectedTime * 1000L) - val customDate = "31/01/2015" val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT") - val expectedDate = dateOptions.dateFormat.parse(customDate).getTime + val customDate = "31/01/2015" + + val expectedDate = LocalDate.parse(customDate, dateOptions.dateFormatter) + .atStartOfDay(options.timeZone.toZoneId) + .toInstant.toEpochMilli val castedDate = - parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) - .apply(customTimestamp) + parser.makeConverter("_1", DateType, nullable = true, dateOptions) + .apply(customDate) assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) val timestamp = "2015-01-01 00:00:00"