Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util

import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.time.LocalDateTime
import java.time.temporal.ChronoField
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
Expand Down Expand Up @@ -143,6 +145,12 @@ object DateTimeUtils {
millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
}

def dateTimeToMicroseconds(localDateTime: LocalDateTime, timeZone: TimeZone): Long = {
val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND)
val epochSecond = localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond
epochSecond * 1000000L + microOfSecond
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1000000L -> MICROS_PER_SECOND?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

}

def dateToString(days: SQLDate): String =
getThreadLocalDateFormat.format(toJavaDate(days))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.util

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.{Calendar, Locale, TimeZone}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
import org.junit.Assert.assertEquals

class DateTimeUtilsSuite extends SparkFunSuite {

Expand Down Expand Up @@ -645,6 +648,18 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
}

test("Java 8 LocalDateTime to microseconds") {
val nanos = "2015-05-09 00:10:23.999750987"
var formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
val localDateTimeInNanos = LocalDateTime.parse(nanos, formatter)
val timeInMicros = dateTimeToMicroseconds(localDateTimeInNanos, TimeZonePST)
assertEquals(1431155423999750L, timeInMicros)
val micros = "2015-05-09 00:10:23.999750"
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
val localDateTimeInMicros = LocalDateTime.parse(micros, formatter)
assertEquals(timeInMicros, dateTimeToMicroseconds(localDateTimeInMicros, TimeZonePST))
}

test("daysToMillis and millisToDays") {
val c = Calendar.getInstance(TimeZonePST)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private[csv] object CSVInferSchema {
// DecimalTypes have different precisions and scales, so we try to find the common type.
findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType)
case DoubleType => tryParseDouble(field, options)
case DateType => tryParseDate(field, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also is a behavior change. Shall we document it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do it, but where exactly it should be documented?

case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
Expand Down Expand Up @@ -140,14 +141,23 @@ private[csv] object CSVInferSchema {
private def tryParseDouble(field: String, options: CSVOptions): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) {
DoubleType
} else {
tryParseDate(field, options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a behavior change? Previously timestamp type, now date type/timestamp type?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, by mistake we have identical "timestampFormat" and "dateFormat" options.
Let it be "yyyy-MM-dd"
'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes)
So if they can overlap, we need to try parse it as date firstly, because both of these types are suitable, but you need to try to use a more compact by default and it will be correct inferring of type

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, DateType here is ignored at all, I'm not sure that it was conceived when the type was created

}
}

private def tryParseDate(field: String, options: CSVOptions): DataType = {
// This case infers a custom `dateFormat` is set.
if ((allCatch opt options.dateFormatter.parse(field)).isDefined) {
DateType
} else {
tryParseTimestamp(field, options)
}
}

private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
// This case infers a custom `timestampFormat` is set.
if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) {
TimestampType
} else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
// We keep this for backwards compatibility.
Expand Down Expand Up @@ -216,6 +226,9 @@ private[csv] object CSVInferSchema {
} else {
Some(DecimalType(range + scale, scale))
}
// By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes).
case (t1: DateType, t2: TimestampType) => Some(TimestampType)
case (t1: TimestampType, t2: DateType) => Some(TimestampType)

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark.sql.execution.datasources.csv

import java.nio.charset.StandardCharsets
import java.time.format.{DateTimeFormatter, ResolverStyle}
import java.util.{Locale, TimeZone}

import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -119,7 +119,6 @@ class CSVOptions(
val positiveInf = parameters.getOrElse("positiveInf", "Inf")
val negativeInf = parameters.getOrElse("negativeInf", "-Inf")


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds unrelated change.

val compressionCodec: Option[String] = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CompressionCodecs.getCodecClassName)
Expand All @@ -128,13 +127,20 @@ class CSVOptions(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")

val timestampFormat: String = parameters
.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
@transient lazy val dateFormatter: DateTimeFormatter = {
DateTimeFormatter.ofPattern(dateFormat)
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
}

@transient lazy val timestampFormatter: DateTimeFormatter = {
DateTimeFormatter.ofPattern(timestampFormat)
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
}

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv

import java.io.InputStream
import java.math.BigDecimal
import java.time.{LocalDate, LocalDateTime}
import java.time.temporal.ChronoField

import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -131,9 +133,8 @@ class UnivocityParser(

case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Try(options.timestampFormat.parse(datum).getTime * 1000L)
Try(DateTimeUtils.dateTimeToMicroseconds(LocalDateTime
.parse(datum, options.timestampFormatter), options.timeZone))
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand All @@ -143,9 +144,8 @@ class UnivocityParser(

case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
Try(Math.toIntExact(LocalDate.parse(datum, options.dateFormatter)
.getLong(ChronoField.EPOCH_DAY)))
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
timestamp,date
26/08/2015 22:31:46.913,27/09/2015
27/10/2014 22:33:31.601,26/12/2016
28/01/2016 22:33:52.888,28/01/2017
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne)
}

test("Timestamp field types are inferred correctly via custom data format") {
var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
test("Timestamp field types are inferred correctly via custom date format") {
var options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM"), "GMT")
Copy link
Member

@viirya viirya May 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change this? The format string change is also a behavior change for users. We might need a config for that.

Copy link
Author

@sergey-rubtsov sergey-rubtsov May 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"yyyy-mm" means years and minutes, this is a date format, not a time format
"yyyy-MM" means years and months, but I do not insist on this change

assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}

test("Date field types are inferred correctly via custom date and timestamp format") {
val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy",
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT")
assert(CSVInferSchema.inferField(TimestampType,
"28/01/2017 22:31:46.913", options) == TimestampType)
assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType)
}

test("Timestamp field types are inferred correctly from other types") {
val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
Expand Down Expand Up @@ -111,7 +119,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}

test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-MM"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
private val simpleSparseFile = "test-data/simple_sparse.csv"
private val numbersFile = "test-data/numbers.csv"
private val datesFile = "test-data/dates.csv"
private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"

Expand Down Expand Up @@ -565,6 +566,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(results.toSeq.map(_.toSeq) === expected)
}

test("inferring timestamp types and date types via custom formats") {
val options = Map(
"header" -> "true",
"inferSchema" -> "true",
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS",
"dateFormat" -> "dd/MM/yyyy")
val results = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
assert(results.schema{0}.dataType===TimestampType)
assert(results.schema{1}.dataType===DateType)
val timestamps = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
.select("timestamp")
.collect()
val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US)
val timestampExpected =
Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)),
Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)),
Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime)))
assert(timestamps.toSeq.map(_.toSeq) === timestampExpected)
val dates = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
.select("date")
.collect()
val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US)
val dateExpected =
Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)),
Seq(new Date(dateFormat.parse("26/12/2016").getTime)),
Seq(new Date(dateFormat.parse("28/01/2017").getTime)))
assert(dates.toSeq.map(_.toSeq) === dateExpected)
}

test("load date types via custom date format") {
val customSchema = new StructType(Array(StructField("date", DateType, true)))
val options = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv

import java.math.BigDecimal
import java.util.Locale
import java.time.{LocalDate, LocalDateTime}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -107,20 +107,26 @@ class UnivocityParserSuite extends SparkFunSuite {
assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true)

val timestampsOptions =
new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy HH:mm"), "GMT")
val customTimestamp = "31/01/2015 00:00"
val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime

val expectedTime = LocalDateTime.parse(customTimestamp, timestampsOptions.timestampFormatter)
.atZone(options.timeZone.toZoneId)
.toInstant.toEpochMilli
val castedTimestamp =
parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions)
.apply(customTimestamp)
assert(castedTimestamp == expectedTime * 1000L)

val customDate = "31/01/2015"
val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
val customDate = "31/01/2015"

val expectedDate = LocalDate.parse(customDate, dateOptions.dateFormatter)
.atStartOfDay(options.timeZone.toZoneId)
.toInstant.toEpochMilli
val castedDate =
parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this line as was.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

.apply(customTimestamp)
parser.makeConverter("_1", DateType, nullable = true, dateOptions)
.apply(customDate)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate))

val timestamp = "2015-01-01 00:00:00"
Expand Down