Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
74a76c2
New and legacy time parser
MaxGekk Nov 24, 2018
63cf611
Add config spark.sql.legacy.timeParser.enabled
MaxGekk Nov 24, 2018
2a2ab83
Fallback legacy parser
MaxGekk Nov 24, 2018
667bf9f
something
MaxGekk Nov 24, 2018
227a7bd
Using instances
MaxGekk Nov 25, 2018
73ee560
Added generator
MaxGekk Nov 25, 2018
f35f6e1
Refactoring of TimeFormatter
MaxGekk Nov 25, 2018
1c09b58
Renaming to DateTimeFormatter
MaxGekk Nov 25, 2018
7b213d5
Added DateFormatter
MaxGekk Nov 25, 2018
242ba47
Default values in parsing
MaxGekk Nov 25, 2018
db48ee6
Parse as date type because format for timestamp is not not matched to…
MaxGekk Nov 25, 2018
e18841b
Fix tests
MaxGekk Nov 25, 2018
8db0238
CSVSuite passed
MaxGekk Nov 25, 2018
0b9ed92
Fix imports
MaxGekk Nov 26, 2018
799ebb3
Revert test back
MaxGekk Nov 26, 2018
5a22391
Set timeZone
MaxGekk Nov 26, 2018
f287b77
Removing default for micros because it causes conflicts in parsing
MaxGekk Nov 26, 2018
52074f7
Set timezone otherwise default is using
MaxGekk Nov 27, 2018
647b09c
Removing CSVOptions param from CsvInferSchema methods
MaxGekk Nov 29, 2018
4d6c86b
Use constants
MaxGekk Nov 29, 2018
6552dcf
Merge remote-tracking branch 'origin/master' into time-parser
MaxGekk Nov 30, 2018
f3f46c7
Merging followup
MaxGekk Nov 30, 2018
3f3ca70
Updating the migration guide
MaxGekk Dec 1, 2018
1dd9ed1
Inlining method's arguments
MaxGekk Dec 1, 2018
83bf58b
Additional fallback
MaxGekk Dec 1, 2018
00509d3
Removing unrelated changes
MaxGekk Dec 1, 2018
9b0570e
Merge remote-tracking branch 'origin/master' into time-parser
MaxGekk Dec 2, 2018
e9d6bb0
Using floorDiv to take days from seconds
MaxGekk Dec 2, 2018
1ad1184
A test for roundtrip timestamp parsing
MaxGekk Dec 2, 2018
f8097b4
Tests for DateTimeFormatter
MaxGekk Dec 2, 2018
3848795
Fix typo
MaxGekk Dec 3, 2018
60c0974
Merge remote-tracking branch 'fork/time-parser' into time-parser
MaxGekk Dec 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ displayTitle: Spark SQL Upgrading Guide

- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.

- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, can you check if this legacy configuration works or not?

I checked it as below:

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 2b8d22dde92..08795972fb7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -26,6 +26,7 @@ import scala.util.Try

 import org.apache.commons.lang3.time.FastDateFormat

+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf

 sealed trait TimestampFormatter {
@@ -112,11 +113,13 @@ class LegacyFallbackTimestampFormatter(
   }
 }

-object TimestampFormatter {
+object TimestampFormatter extends Logging {
   def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
     if (SQLConf.get.legacyTimeParserEnabled) {
+      logError("LegacyFallbackTimestampFormatter is being used")
       new LegacyFallbackTimestampFormatter(format, timeZone, locale)
     } else {
+      logError("Iso8601TimestampFormatter is being used")
       new Iso8601TimestampFormatter(format, timeZone, locale)
     }
   }
$ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
scala> spark.conf.get("spark.sql.legacy.timeParser.enabled")
res0: String = true

scala> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")

scala> spark.read.option("inferSchema", "true").option("header", "false").option("timestampFormat", "yyyy|MM|dd").csv("/tmp/foo").printSchema()
18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is being used
root
 |-- _c0: timestamp (nullable = true)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised it doesn't work, as this pattern of using SQLConf appears in many places.

Can you create a ticket for it? Is this only a problem when setting conf via spark shell?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely the flag switches behavior since I used in a test recently: https://github.com/apache/spark/pull/23196/files#diff-fde14032b0e6ef8086461edf79a27c5dR1454

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I saw the test but weirdly it doesn't work in shall. Do you mind if I check it as I did? Something is weird. Want to be very sure if it's an issue or something I did wrongly by myself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the same but it is interesting that:

scala> spark.range(1).map(_ => org.apache.spark.sql.internal.SQLConf.get.legacyTimeParserEnabled).show
+-----+
|value|
+-----+
| true|
+-----+

It seems when an instance of CSVInferSchema is created SQL configs haven't been set on executor side yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good that I wasn't doing something stupid alone. Let's file a JIRA.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed this conf in the follow-up PRs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#23495 is the PR that removed the conf.


## Upgrading From Spark SQL 2.3 to 2.4

- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeFormatter
import org.apache.spark.sql.types._

class CSVInferSchema(options: CSVOptions) extends Serializable {
class CSVInferSchema(val options: CSVOptions) extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we get the CSVOptions in the constructor, shall we remove it as a parameter of the several methods? it is pretty confusing which one is used right now...


@transient
private lazy val timeParser = DateTimeFormatter(
options.timestampFormat,
options.timeZone,
options.locale)

private val decimalParser = {
ExprUtils.getDecimalParser(options.locale)
Expand Down Expand Up @@ -154,10 +160,7 @@ class CSVInferSchema(options: CSVOptions) extends Serializable {

private def tryParseTimestamp(field: String): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
TimestampType
} else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
// We keep this for backwards compatibility.
if ((allCatch opt timeParser.parse(field)).isDefined) {
TimestampType
} else {
tryParseBoolean(field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
import java.util.{Locale, TimeZone}

import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -146,13 +145,10 @@ class CSVOptions(
// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")

val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
val timestampFormat: String =
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.Writer
import com.univocity.parsers.csv.CsvWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
import org.apache.spark.sql.types._

class UnivocityGenerator(
Expand All @@ -41,14 +41,18 @@ class UnivocityGenerator(
private val valueConverters: Array[ValueConverter] =
schema.map(_.dataType).map(makeConverter).toArray

private val timeFormatter = DateTimeFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)

private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
(row: InternalRow, ordinal: Int) =>
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
(row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))

case TimestampType =>
(row: InternalRow, ordinal: Int) =>
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
(row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))

case udt: UserDefinedType[_] => makeConverter(udt.sqlType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.catalyst.csv

import java.io.InputStream

import scala.util.Try
import scala.util.control.NonFatal

import com.univocity.parsers.csv.CsvParser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -75,6 +74,12 @@ class UnivocityParser(

private val row = new GenericInternalRow(requiredSchema.length)

private val timeFormatter = DateTimeFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
Expand All @@ -100,7 +105,7 @@ class UnivocityParser(
//
// output row - ["A", 2]
private val valueConverters: Array[ValueConverter] = {
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable)).toArray
}

private val decimalParser = ExprUtils.getDecimalParser(options.locale)
Expand All @@ -115,8 +120,7 @@ class UnivocityParser(
def makeConverter(
name: String,
dataType: DataType,
nullable: Boolean = true,
options: CSVOptions): ValueConverter = dataType match {
nullable: Boolean = true): ValueConverter = dataType match {
case _: ByteType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(_.toByte)

Expand Down Expand Up @@ -154,34 +158,16 @@ class UnivocityParser(
}

case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Try(options.timestampFormat.parse(datum).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(datum).getTime * 1000L
}
}
nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)

case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
}
}
nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)

case _: StringType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(UTF8String.fromString)

case udt: UserDefinedType[_] => (datum: String) =>
makeConverter(name, udt.sqlType, nullable, options)
makeConverter(name, udt.sqlType, nullable)

// We don't actually hit this exception though, we keep it for understandability
case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import java.time._
import java.time.format.DateTimeFormatterBuilder
import java.time.temporal.{ChronoField, TemporalQueries}
import java.util.{Locale, TimeZone}

import scala.util.Try

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.internal.SQLConf

sealed trait DateTimeFormatter {
def parse(s: String): Long // returns microseconds since epoch
def format(us: Long): String
}

class Iso8601DateTimeFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
val formatter = new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)

def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
Instant.from(zonedDateTime)
} else {
Instant.from(temporalAccessor)
}
}

private def instantToMicros(instant: Instant): Long = {
val sec = Math.multiplyExact(instant.getEpochSecond, DateTimeUtils.MICROS_PER_SECOND)
val result = Math.addExact(sec, instant.getNano / DateTimeUtils.NANOS_PER_MICROS)
result
}

def parse(s: String): Long = instantToMicros(toInstant(s))

def format(us: Long): String = {
val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND)
val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND)
val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS)

formatter.withZone(timeZone.toZoneId).format(instant)
}
}

class LegacyDateTimeFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)

protected def toMillis(s: String): Long = format.parse(s).getTime

def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS

def format(us: Long): String = {
format.format(DateTimeUtils.toJavaTimestamp(us))
}
}

class LegacyFallbackDateTimeFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
override def toMillis(s: String): Long = {
Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
}
}

object DateTimeFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
} else {
new Iso8601DateTimeFormatter(format, timeZone, locale)
}
}
}

sealed trait DateFormatter {
def parse(s: String): Int // returns days since epoch
def format(days: Int): String
}

class Iso8601DateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {

val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)

override def parse(s: String): Int = {
val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)

days.toInt
}

override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
}
}

class LegacyDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)

def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
DateTimeUtils.millisToDays(milliseconds)
}

def format(days: Int): String = {
val date = DateTimeUtils.toJavaDate(days)
format.format(date)
}
}

class LegacyFallbackDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
override def parse(s: String): Int = {
Try(super.parse(s)).orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime))
}.getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
s.toInt
}
}
}

object DateFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateFormatter(format, timeZone, locale)
} else {
new Iso8601DateFormatter(format, timeZone, locale)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object DateTimeUtils {
final val MILLIS_PER_SECOND = 1000L
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY

final val NANOS_PER_MICROS = 1000L
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L

// number of days in 400 years
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,13 @@ object SQLConf {
"a SparkConf entry.")
.booleanConf
.createWithDefault(true)

val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled")
.doc("When set to true, java.text.SimpleDateFormat is used for formatting and parsing " +
" dates/timestamps in a locale-sensitive manner. When set to false, classes from " +
"java.time.* packages are used for the same purpose.")
.booleanConf
.createWithDefault(false)
}

/**
Expand Down Expand Up @@ -2040,6 +2047,8 @@ class SQLConf extends Serializable with Logging {

def setCommandRejectsSparkConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CONFS)

def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Loading