diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 787f4bcbbea82..fee0e6df7177c 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -33,6 +33,8 @@ displayTitle: Spark SQL Upgrading Guide - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. + - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 94cb4b114e6b6..345dc4d41993e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -22,10 +22,16 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeFormatter import org.apache.spark.sql.types._ -class CSVInferSchema(options: CSVOptions) extends Serializable { +class CSVInferSchema(val options: CSVOptions) extends Serializable { + + @transient + private lazy val timeParser = DateTimeFormatter( + options.timestampFormat, + options.timeZone, + options.locale) private val decimalParser = { ExprUtils.getDecimalParser(options.locale) @@ -154,10 +160,7 @@ class CSVInferSchema(options: CSVOptions) extends Serializable { private def tryParseTimestamp(field: String): DataType = { // This case infers a custom `dataFormat` is set. - if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { - TimestampType - } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { - // We keep this for backwards compatibility. + if ((allCatch opt timeParser.parse(field)).isDefined) { TimestampType } else { tryParseBoolean(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 94bdb72d675d4..90c96d1f55c91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} -import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ @@ -146,13 +145,10 @@ class CSVOptions( // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) - // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. - val dateFormat: FastDateFormat = - FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale) + val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd") - val timestampFormat: FastDateFormat = - FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale) + val timestampFormat: String = + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 2ab376c0ac208..af09cd6c8449b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -22,7 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter} import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -41,14 +41,18 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray + private val timeFormatter = DateTimeFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => - (row: InternalRow, ordinal: Int) => - options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + (row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal)) case TimestampType => - (row: InternalRow, ordinal: Int) => - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 85e129224c913..dfdcb2325e71f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.csv import java.io.InputStream -import scala.util.Try import scala.util.control.NonFatal import com.univocity.parsers.csv.CsvParser @@ -27,7 +26,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -75,6 +74,12 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) + private val timeFormatter = DateTimeFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + // Retrieve the raw record string. private def getCurrentInput: UTF8String = { UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) @@ -100,7 +105,7 @@ class UnivocityParser( // // output row - ["A", 2] private val valueConverters: Array[ValueConverter] = { - requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable)).toArray } private val decimalParser = ExprUtils.getDecimalParser(options.locale) @@ -115,8 +120,7 @@ class UnivocityParser( def makeConverter( name: String, dataType: DataType, - nullable: Boolean = true, - options: CSVOptions): ValueConverter = dataType match { + nullable: Boolean = true): ValueConverter = dataType match { case _: ByteType => (d: String) => nullSafeDatum(d, name, nullable, options)(_.toByte) @@ -154,34 +158,16 @@ class UnivocityParser( } case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(options.timestampFormat.parse(datum).getTime * 1000L) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(datum).getTime * 1000L - } - } + nullSafeDatum(d, name, nullable, options)(timeFormatter.parse) case _: DateType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681.x - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) - } - } + nullSafeDatum(d, name, nullable, options)(dateFormatter.parse) case _: StringType => (d: String) => nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) case udt: UserDefinedType[_] => (datum: String) => - makeConverter(name, udt.sqlType, nullable, options) + makeConverter(name, udt.sqlType, nullable) // We don't actually hit this exception though, we keep it for understandability case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala new file mode 100644 index 0000000000000..ad1f4131de2f6 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time._ +import java.time.format.DateTimeFormatterBuilder +import java.time.temporal.{ChronoField, TemporalQueries} +import java.util.{Locale, TimeZone} + +import scala.util.Try + +import org.apache.commons.lang3.time.FastDateFormat + +import org.apache.spark.sql.internal.SQLConf + +sealed trait DateTimeFormatter { + def parse(s: String): Long // returns microseconds since epoch + def format(us: Long): String +} + +class Iso8601DateTimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateTimeFormatter { + val formatter = new DateTimeFormatterBuilder() + .appendPattern(pattern) + .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) + .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) + .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .toFormatter(locale) + + def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + if (temporalAccessor.query(TemporalQueries.offset()) == null) { + val localDateTime = LocalDateTime.from(temporalAccessor) + val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId) + Instant.from(zonedDateTime) + } else { + Instant.from(temporalAccessor) + } + } + + private def instantToMicros(instant: Instant): Long = { + val sec = Math.multiplyExact(instant.getEpochSecond, DateTimeUtils.MICROS_PER_SECOND) + val result = Math.addExact(sec, instant.getNano / DateTimeUtils.NANOS_PER_MICROS) + result + } + + def parse(s: String): Long = instantToMicros(toInstant(s)) + + def format(us: Long): String = { + val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND) + val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND) + val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS) + + formatter.withZone(timeZone.toZoneId).format(instant) + } +} + +class LegacyDateTimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateTimeFormatter { + val format = FastDateFormat.getInstance(pattern, timeZone, locale) + + protected def toMillis(s: String): Long = format.parse(s).getTime + + def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS + + def format(us: Long): String = { + format.format(DateTimeUtils.toJavaTimestamp(us)) + } +} + +class LegacyFallbackDateTimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) { + override def toMillis(s: String): Long = { + Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) + } +} + +object DateTimeFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = { + if (SQLConf.get.legacyTimeParserEnabled) { + new LegacyFallbackDateTimeFormatter(format, timeZone, locale) + } else { + new Iso8601DateTimeFormatter(format, timeZone, locale) + } + } +} + +sealed trait DateFormatter { + def parse(s: String): Int // returns days since epoch + def format(days: Int): String +} + +class Iso8601DateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateFormatter { + + val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale) + + override def parse(s: String): Int = { + val seconds = dateTimeFormatter.toInstant(s).getEpochSecond + val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY) + + days.toInt + } + + override def format(days: Int): String = { + val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) + dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant) + } +} + +class LegacyDateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateFormatter { + val format = FastDateFormat.getInstance(pattern, timeZone, locale) + + def parse(s: String): Int = { + val milliseconds = format.parse(s).getTime + DateTimeUtils.millisToDays(milliseconds) + } + + def format(days: Int): String = { + val date = DateTimeUtils.toJavaDate(days) + format.format(date) + } +} + +class LegacyFallbackDateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { + override def parse(s: String): Int = { + Try(super.parse(s)).orElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime)) + }.getOrElse { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + s.toInt + } + } +} + +object DateFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = { + if (SQLConf.get.legacyTimeParserEnabled) { + new LegacyFallbackDateFormatter(format, timeZone, locale) + } else { + new Iso8601DateFormatter(format, timeZone, locale) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5ae75dc939303..c6dfdbf2505ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -50,7 +50,7 @@ object DateTimeUtils { final val MILLIS_PER_SECOND = 1000L final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY - + final val NANOS_PER_MICROS = 1000L final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L // number of days in 400 years diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c4f00d723c252..451b051f8407e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1618,6 +1618,13 @@ object SQLConf { "a SparkConf entry.") .booleanConf .createWithDefault(true) + + val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled") + .doc("When set to true, java.text.SimpleDateFormat is used for formatting and parsing " + + " dates/timestamps in a locale-sensitive manner. When set to false, classes from " + + "java.time.* packages are used for the same purpose.") + .booleanConf + .createWithDefault(false) } /** @@ -2040,6 +2047,8 @@ class SQLConf extends Serializable with Logging { def setCommandRejectsSparkConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CONFS) + def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 1a020e67a75b0..c2b525ad1a9f8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -22,13 +22,12 @@ import java.util.Locale import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { +class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { test("String fields types are inferred correctly from null types") { - val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"), false, "GMT") val inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "") == NullType) @@ -48,7 +47,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("String fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"), false, "GMT") val inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(LongType, "1.0") == DoubleType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 7212402ef5cff..2d0b0d3033a9c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -19,20 +19,17 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal import java.text.{DecimalFormat, DecimalFormatSymbols} -import java.util.Locale +import java.util.{Locale, TimeZone} + +import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class UnivocityParserSuite extends SparkFunSuite with SQLHelper { - private val parser = new UnivocityParser( - StructType(Seq.empty), - new CSVOptions(Map.empty[String, String], false, "GMT")) - private def assertNull(v: Any) = assert(v == null) test("Can parse decimal type values") { @@ -43,7 +40,8 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) => val decimalValue = new BigDecimal(decimalVal.toString) val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) === + val parser = new UnivocityParser(StructType(Seq.empty), options) + assert(parser.makeConverter("_1", decimalType).apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale)) } } @@ -56,22 +54,23 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { types.foreach { t => // Tests that a custom nullValue. val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), false, "GMT") - val converter = - parser.makeConverter("_1", t, nullable = true, options = nullValueOptions) + var parser = new UnivocityParser(StructType(Seq.empty), nullValueOptions) + val converter = parser.makeConverter("_1", t, nullable = true) assertNull(converter.apply("-")) assertNull(converter.apply(null)) // Tests that the default nullValue is empty string. val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply("")) + parser = new UnivocityParser(StructType(Seq.empty), options) + assertNull(parser.makeConverter("_1", t, nullable = true).apply("")) } // Not nullable field with nullValue option. types.foreach { t => // Casts a null to not nullable field should throw an exception. val options = new CSVOptions(Map("nullValue" -> "-"), false, "GMT") - val converter = - parser.makeConverter("_1", t, nullable = false, options = options) + val parser = new UnivocityParser(StructType(Seq.empty), options) + val converter = parser.makeConverter("_1", t, nullable = false) var message = intercept[RuntimeException] { converter.apply("-") }.getMessage @@ -86,62 +85,74 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { // null. Seq(true, false).foreach { b => val options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT") - val converter = - parser.makeConverter("_1", StringType, nullable = b, options = options) + val parser = new UnivocityParser(StructType(Seq.empty), options) + val converter = parser.makeConverter("_1", StringType, nullable = b) assert(converter.apply("") == UTF8String.fromString("")) } } test("Throws exception for empty string with non null type") { - val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val exception = intercept[RuntimeException]{ - parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("") + parser.makeConverter("_1", IntegerType, nullable = false).apply("") } assert(exception.getMessage.contains("null value found but field _1 is not nullable.")) } test("Types are cast correctly") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true) - - val timestampsOptions = + var parser = new UnivocityParser(StructType(Seq.empty), options) + assert(parser.makeConverter("_1", ByteType).apply("10") == 10) + assert(parser.makeConverter("_1", ShortType).apply("10") == 10) + assert(parser.makeConverter("_1", IntegerType).apply("10") == 10) + assert(parser.makeConverter("_1", LongType).apply("10") == 10) + assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", BooleanType).apply("true") == true) + + var timestampsOptions = new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, "GMT") + parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions) val customTimestamp = "31/01/2015 00:00" - val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime - val castedTimestamp = - parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) + var format = FastDateFormat.getInstance( + timestampsOptions.timestampFormat, timestampsOptions.timeZone, timestampsOptions.locale) + val expectedTime = format.parse(customTimestamp).getTime + val castedTimestamp = parser.makeConverter("_1", TimestampType, nullable = true) .apply(customTimestamp) assert(castedTimestamp == expectedTime * 1000L) val customDate = "31/01/2015" val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT") - val expectedDate = dateOptions.dateFormat.parse(customDate).getTime - val castedDate = - parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) - .apply(customTimestamp) - assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) + parser = new UnivocityParser(StructType(Seq.empty), dateOptions) + format = FastDateFormat.getInstance( + dateOptions.dateFormat, dateOptions.timeZone, dateOptions.locale) + val expectedDate = format.parse(customDate).getTime + val castedDate = parser.makeConverter("_1", DateType, nullable = true) + .apply(customDate) + assert(castedDate == DateTimeUtils.millisToDays(expectedDate, TimeZone.getTimeZone("GMT"))) val timestamp = "2015-01-01 00:00:00" - assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) == - DateTimeUtils.stringToTime(timestamp).getTime * 1000L) - assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") == - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime)) + timestampsOptions = new CSVOptions(Map( + "timestampFormat" -> "yyyy-MM-dd HH:mm:ss", + "dateFormat" -> "yyyy-MM-dd"), false, "UTC") + parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions) + val expected = 1420070400 * DateTimeUtils.MICROS_PER_SECOND + assert(parser.makeConverter("_1", TimestampType).apply(timestamp) == + expected) + assert(parser.makeConverter("_1", DateType).apply("2015-01-01") == + expected / DateTimeUtils.MICROS_PER_DAY) } test("Throws exception for casting an invalid string to Float and Double Types") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val types = Seq(DoubleType, FloatType) val input = Seq("10u000", "abc", "1 2/3") types.foreach { dt => input.foreach { v => val message = intercept[NumberFormatException] { - parser.makeConverter("_1", dt, options = options).apply(v) + parser.makeConverter("_1", dt).apply(v) }.getMessage assert(message.contains(v)) } @@ -150,9 +161,9 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { test("Float NaN values are parsed correctly") { val options = new CSVOptions(Map("nanValue" -> "nn"), false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val floatVal: Float = parser.makeConverter( - "_1", FloatType, nullable = true, options = options - ).apply("nn").asInstanceOf[Float] + "_1", FloatType, nullable = true).apply("nn").asInstanceOf[Float] // Java implements the IEEE-754 floating point standard which guarantees that any comparison // against NaN will return false (except != which returns true) @@ -161,41 +172,41 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { test("Double NaN values are parsed correctly") { val options = new CSVOptions(Map("nanValue" -> "-"), false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val doubleVal: Double = parser.makeConverter( - "_1", DoubleType, nullable = true, options = options - ).apply("-").asInstanceOf[Double] + "_1", DoubleType, nullable = true).apply("-").asInstanceOf[Double] assert(doubleVal.isNaN) } test("Float infinite values can be parsed") { val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT") + var parser = new UnivocityParser(StructType(Seq.empty), negativeInfOptions) val floatVal1 = parser.makeConverter( - "_1", FloatType, nullable = true, options = negativeInfOptions - ).apply("max").asInstanceOf[Float] + "_1", FloatType, nullable = true).apply("max").asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT") + parser = new UnivocityParser(StructType(Seq.empty), positiveInfOptions) val floatVal2 = parser.makeConverter( - "_1", FloatType, nullable = true, options = positiveInfOptions - ).apply("max").asInstanceOf[Float] + "_1", FloatType, nullable = true).apply("max").asInstanceOf[Float] assert(floatVal2 == Float.PositiveInfinity) } test("Double infinite values can be parsed") { val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT") + var parser = new UnivocityParser(StructType(Seq.empty), negativeInfOptions) val doubleVal1 = parser.makeConverter( - "_1", DoubleType, nullable = true, options = negativeInfOptions - ).apply("max").asInstanceOf[Double] + "_1", DoubleType, nullable = true).apply("max").asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT") + parser = new UnivocityParser(StructType(Seq.empty), positiveInfOptions) val doubleVal2 = parser.makeConverter( - "_1", DoubleType, nullable = true, options = positiveInfOptions - ).apply("max").asInstanceOf[Double] + "_1", DoubleType, nullable = true).apply("max").asInstanceOf[Double] assert(doubleVal2 == Double.PositiveInfinity) } @@ -211,7 +222,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map("locale" -> langTag), false, "GMT") val parser = new UnivocityParser(new StructType().add("d", decimalType), options) - assert(parser.makeConverter("_1", decimalType, options = options).apply(input) === expected) + assert(parser.makeConverter("_1", decimalType).apply(input) === expected) } Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalParsing) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala index dfa0fe93a2f9c..66d8d28988f89 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala @@ -26,7 +26,7 @@ object DateTimeTestUtils { val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone) - val outstandingTimezones: Seq[TimeZone] = Seq( + val outstandingTimezonesIds: Seq[String] = Seq( "UTC", "PST", "CET", @@ -34,7 +34,8 @@ object DateTimeTestUtils { "America/Los_Angeles", "Antarctica/Vostok", "Asia/Hong_Kong", - "Europe/Amsterdam").map(TimeZone.getTimeZone) + "Europe/Amsterdam") + val outstandingTimezones: Seq[TimeZone] = outstandingTimezonesIds.map(TimeZone.getTimeZone) def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = { val originalDefaultTimeZone = TimeZone.getDefault diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala new file mode 100644 index 0000000000000..02d4ee0490604 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import java.util.{Locale, TimeZone} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} + +class DateTimeFormatterSuite extends SparkFunSuite { + test("parsing dates using time zones") { + val localDate = "2018-12-02" + val expectedDays = Map( + "UTC" -> 17867, + "PST" -> 17867, + "CET" -> 17866, + "Africa/Dakar" -> 17867, + "America/Los_Angeles" -> 17867, + "Antarctica/Vostok" -> 17866, + "Asia/Hong_Kong" -> 17866, + "Europe/Amsterdam" -> 17866) + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) + val daysSinceEpoch = formatter.parse(localDate) + assert(daysSinceEpoch === expectedDays(timeZone)) + } + } + + test("parsing timestamps using time zones") { + val localDate = "2018-12-02T10:11:12.001234" + val expectedMicros = Map( + "UTC" -> 1543745472001234L, + "PST" -> 1543774272001234L, + "CET" -> 1543741872001234L, + "Africa/Dakar" -> 1543745472001234L, + "America/Los_Angeles" -> 1543774272001234L, + "Antarctica/Vostok" -> 1543723872001234L, + "Asia/Hong_Kong" -> 1543716672001234L, + "Europe/Amsterdam" -> 1543741872001234L) + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateTimeFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone), + Locale.US) + val microsSinceEpoch = formatter.parse(localDate) + assert(microsSinceEpoch === expectedMicros(timeZone)) + } + } + + test("format dates using time zones") { + val daysSinceEpoch = 17867 + val expectedDate = Map( + "UTC" -> "2018-12-02", + "PST" -> "2018-12-01", + "CET" -> "2018-12-02", + "Africa/Dakar" -> "2018-12-02", + "America/Los_Angeles" -> "2018-12-01", + "Antarctica/Vostok" -> "2018-12-02", + "Asia/Hong_Kong" -> "2018-12-02", + "Europe/Amsterdam" -> "2018-12-02") + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) + val date = formatter.format(daysSinceEpoch) + assert(date === expectedDate(timeZone)) + } + } + + test("format timestamps using time zones") { + val microsSinceEpoch = 1543745472001234L + val expectedTimestamp = Map( + "UTC" -> "2018-12-02T10:11:12.001234", + "PST" -> "2018-12-02T02:11:12.001234", + "CET" -> "2018-12-02T11:11:12.001234", + "Africa/Dakar" -> "2018-12-02T10:11:12.001234", + "America/Los_Angeles" -> "2018-12-02T02:11:12.001234", + "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234", + "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234", + "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234") + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateTimeFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone), + Locale.US) + val timestamp = formatter.format(microsSinceEpoch) + assert(timestamp === expectedTimestamp(timeZone)) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 1c359ce1d2014..63a3c1d883f63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -53,7 +53,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { test("checking the columnNameOfCorruptRecord option") { val columnNameOfCorruptRecord = "_unparsed" val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS() - val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val schema = new StructType().add("a", IntegerType).add("b", DateType) val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) val df2 = df .select(from_csv($"value", schemaWithCorrField1, Map( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index bc950f2418d33..0ac959744e413 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -586,6 +586,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val results = spark.read .format("csv") .options(Map("comment" -> "~", "header" -> "false", "inferSchema" -> "true")) + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") .load(testFile(commentsFile)) .collect() @@ -622,10 +623,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val options = Map( "header" -> "true", "inferSchema" -> "false", - "dateFormat" -> "dd/MM/yyyy hh:mm") + "dateFormat" -> "dd/MM/yyyy HH:mm") val results = spark.read .format("csv") .options(options) + .option("timeZone", "UTC") .schema(customSchema) .load(testFile(datesFile)) .select("date") @@ -893,36 +895,38 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } test("Write dates correctly in ISO8601 format by default") { - withTempDir { dir => - val customSchema = new StructType(Array(StructField("date", DateType, true))) - val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv" - val dates = spark.read - .format("csv") - .schema(customSchema) - .option("header", "true") - .option("inferSchema", "false") - .option("dateFormat", "dd/MM/yyyy HH:mm") - .load(testFile(datesFile)) - dates.write - .format("csv") - .option("header", "true") - .save(iso8601datesPath) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + withTempDir { dir => + val customSchema = new StructType(Array(StructField("date", DateType, true))) + val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv" + val dates = spark.read + .format("csv") + .schema(customSchema) + .option("header", "true") + .option("inferSchema", "false") + .option("dateFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + dates.write + .format("csv") + .option("header", "true") + .save(iso8601datesPath) - // This will load back the dates as string. - val stringSchema = StructType(StructField("date", StringType, true) :: Nil) - val iso8601dates = spark.read - .format("csv") - .schema(stringSchema) - .option("header", "true") - .load(iso8601datesPath) + // This will load back the dates as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val iso8601dates = spark.read + .format("csv") + .schema(stringSchema) + .option("header", "true") + .load(iso8601datesPath) + + val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US) + val expectedDates = dates.collect().map { r => + // This should be ISO8601 formatted string. + Row(iso8501.format(r.toSeq.head)) + } - val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US) - val expectedDates = dates.collect().map { r => - // This should be ISO8601 formatted string. - Row(iso8501.format(r.toSeq.head)) + checkAnswer(iso8601dates, expectedDates) } - - checkAnswer(iso8601dates, expectedDates) } } @@ -1107,7 +1111,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { Seq(false, true).foreach { multiLine => - val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val schema = new StructType().add("a", IntegerType).add("b", DateType) // We use `PERMISSIVE` mode by default if invalid string is given. val df1 = spark .read @@ -1139,7 +1143,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val schemaWithCorrField2 = new StructType() .add("a", IntegerType) .add(columnNameOfCorruptRecord, StringType) - .add("b", TimestampType) + .add("b", DateType) val df3 = spark .read .option("mode", "permissive") @@ -1325,7 +1329,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val columnNameOfCorruptRecord = "_corrupt_record" val schema = new StructType() .add("a", IntegerType) - .add("b", TimestampType) + .add("b", DateType) .add(columnNameOfCorruptRecord, StringType) // negative cases val msg = intercept[AnalysisException] {