Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand All @@ -53,6 +54,8 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

@transient private lazy val timestampParser = new TimestampParser(options.timestampFormat)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -213,15 +216,12 @@ class JacksonParser(
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
Try(timestampParser.parse(stringValue)).getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
}

case VALUE_NUMBER_INT =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.sql.catalyst.util

import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.text.{DateFormat, ParsePosition, SimpleDateFormat}
import java.time.Instant
import java.util.{Calendar, Locale, TimeZone}
import java.util.{Calendar, GregorianCalendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

import scala.annotation.tailrec

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -1164,4 +1167,43 @@ object DateTimeUtils {
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}

/**
* The custom sub-class of `GregorianCalendar` is needed to get access to
* protected `fields` immediately after parsing. We cannot use
* the `get()` method because it performs normalization of the fraction
* part. Accordingly, the `MILLISECOND` field doesn't contain original value.
*/
private class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) {
// Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision.
// For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and
// if the `MILLISECOND` field was parsed to `1234`.
def getMicros(digitsInFraction: Int): SQLTimestamp = {
// Append 6 zeros to the field: 1234 -> 1234000000
val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND
// Take the first 6 digits from `d`: 1234000000 -> 123400
// The rest contains exactly `digitsInFraction`: `0000` = 10 ^ digitsInFraction
// So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction)
d / Decimal.POW_10(digitsInFraction)
}
}

/**
* An instance of the class is aimed to re-use many times. It contains helper objects
* `cal` and `digitsInFraction` that are reused between `parse()` invokes.
*/
class TimestampParser(format: FastDateFormat) {
private val digitsInFraction = format.getPattern.count(_ == 'S')
private val cal = new MicrosCalendar(format.getTimeZone)

def parse(s: String): SQLTimestamp = {
cal.clear() // Clear the calendar because it can be re-used many times
if (!format.parse(s, new ParsePosition(0), cal)) {
throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
}
val micros = cal.getMicros(digitsInFraction)
cal.set(Calendar.MILLISECOND, 0)
cal.getTimeInMillis * MICROS_PER_MILLIS + micros
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ object Decimal {
/** Maximum number of decimal digits a Long can represent */
val MAX_LONG_DIGITS = 18

private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)
val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)

private val BIG_DEC_ZERO = BigDecimal(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -692,4 +694,41 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
}
}

test("parsing timestamp strings up to microsecond precision") {
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
def check(pattern: String, input: String, reference: String): Unit = {
val parser = new TimestampParser(FastDateFormat.getInstance(pattern, timeZone, Locale.US))
val expected = DateTimeUtils.stringToTimestamp(
UTF8String.fromString(reference), timeZone).get
val actual = parser.parse(input)
assert(actual === expected)
}

check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX",
"2019-10-14T09:39:07.3220000Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
"2019-10-14T09:39:07.322000", "2019-10-14T09:39:07.322")
check("yyyy-MM-dd'T'HH:mm:ss.SSSX",
"2019-10-14T09:39:07.322Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.123456Z", "2019-10-14T09:39:07.123456Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.000010Z", "2019-10-14T09:39:07.00001Z")
check("yyyy-MM-dd'T'HH:mm:ss.S",
"2019-10-14T09:39:07.1", "2019-10-14T09:39:07.1")
check("yyyy-MM-dd'T'HH:mm:ss.SS",
"2019-10-14T09:39:07.10", "2019-10-14T09:39:07.1")

try {
new TimestampParser(
FastDateFormat.getInstance("yyyy/MM/dd HH_mm_ss.SSSSSS", timeZone, Locale.US))
.parse("2019/11/14 20#25#30.123456")
fail("Expected to throw an exception for the invalid input")
} catch {
case e: IllegalArgumentException =>
assert(e.getMessage.contains("is an invalid timestamp"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser
import org.apache.spark.sql.execution.datasources.FailureSafeParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -77,6 +78,8 @@ class UnivocityParser(

private val row = new GenericInternalRow(requiredSchema.length)

@transient private lazy val timestampParser = new TimestampParser(options.timestampFormat)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
Expand Down Expand Up @@ -156,10 +159,7 @@ class UnivocityParser(

case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Try(options.timestampFormat.parse(datum).getTime * 1000L)
.getOrElse {
Try(timestampParser.parse(datum)).getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(datum).getTime * 1000L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("from_json - timestamp in micros") {
val df = Seq("""{"time": "1970-01-01T00:00:00.123456"}""").toDS()
val schema = new StructType().add("time", TimestampType)
val options = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")

checkAnswer(
df.select(from_json($"value", schema, options)),
Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456"))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1875,4 +1875,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
}
}

test("parse timestamp in microsecond precision") {
withTempPath { path =>
val t = "2019-11-14 20:35:30.123456"
Seq(t).toDF("t").write.text(path.getAbsolutePath)
val readback = spark.read
.schema("t timestamp")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.csv(path.getAbsolutePath)
checkAnswer(readback, Row(Timestamp.valueOf(t)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

class UnivocityParserSuite extends SparkFunSuite {
private val parser = new UnivocityParser(
StructType(Seq.empty),
new CSVOptions(Map.empty[String, String], false, "GMT"))
private def getParser(options: CSVOptions) = {
new UnivocityParser(StructType(Seq.empty), options)
}

private def assertNull(v: Any) = assert(v == null)

Expand All @@ -40,8 +40,10 @@ class UnivocityParserSuite extends SparkFunSuite {
stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
val decimalValue = new BigDecimal(decimalVal.toString)
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) ===
Decimal(decimalValue, decimalType.precision, decimalType.scale))
assert(
getParser(options)
.makeConverter("_1", decimalType, options = options)
.apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale))
}
}

Expand All @@ -53,22 +55,23 @@ class UnivocityParserSuite extends SparkFunSuite {
types.foreach { t =>
// Tests that a custom nullValue.
val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), false, "GMT")
val converter =
parser.makeConverter("_1", t, nullable = true, options = nullValueOptions)
val converter = getParser(nullValueOptions)
.makeConverter("_1", t, nullable = true, options = nullValueOptions)
assertNull(converter.apply("-"))
assertNull(converter.apply(null))

// Tests that the default nullValue is empty string.
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val parser = getParser(options)
assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply(""))
}

// Not nullable field with nullValue option.
types.foreach { t =>
// Casts a null to not nullable field should throw an exception.
val options = new CSVOptions(Map("nullValue" -> "-"), false, "GMT")
val converter =
parser.makeConverter("_1", t, nullable = false, options = options)
val converter = getParser(options)
.makeConverter("_1", t, nullable = false, options = options)
var message = intercept[RuntimeException] {
converter.apply("-")
}.getMessage
Expand All @@ -83,22 +86,25 @@ class UnivocityParserSuite extends SparkFunSuite {
// null.
Seq(true, false).foreach { b =>
val options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT")
val converter =
parser.makeConverter("_1", StringType, nullable = b, options = options)
val converter = getParser(options)
.makeConverter("_1", StringType, nullable = b, options = options)
assert(converter.apply("") == UTF8String.fromString(""))
}
}

test("Throws exception for empty string with non null type") {
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val exception = intercept[RuntimeException]{
parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("")
getParser(options)
.makeConverter("_1", IntegerType, nullable = false, options = options)
.apply("")
}
assert(exception.getMessage.contains("null value found but field _1 is not nullable."))
}

test("Types are cast correctly") {
val options = new CSVOptions(Map.empty[String, String], false, "GMT")
val parser = getParser(options)
assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10)
assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10)
assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10)
Expand All @@ -111,17 +117,17 @@ class UnivocityParserSuite extends SparkFunSuite {
new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, "GMT")
val customTimestamp = "31/01/2015 00:00"
val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
val castedTimestamp =
parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
.apply(customTimestamp)
val castedTimestamp = getParser(timestampsOptions)
.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
.apply(customTimestamp)
assert(castedTimestamp == expectedTime * 1000L)

val customDate = "31/01/2015"
val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT")
val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
val castedDate =
parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
.apply(customTimestamp)
val castedDate = getParser(dateOptions)
.makeConverter("_1", DateType, nullable = true, options = dateOptions)
.apply(customTimestamp)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate))

val timestamp = "2015-01-01 00:00:00"
Expand All @@ -138,7 +144,7 @@ class UnivocityParserSuite extends SparkFunSuite {
types.foreach { dt =>
input.foreach { v =>
val message = intercept[NumberFormatException] {
parser.makeConverter("_1", dt, options = options).apply(v)
getParser(options).makeConverter("_1", dt, options = options).apply(v)
}.getMessage
assert(message.contains(v))
}
Expand All @@ -147,7 +153,7 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Float NaN values are parsed correctly") {
val options = new CSVOptions(Map("nanValue" -> "nn"), false, "GMT")
val floatVal: Float = parser.makeConverter(
val floatVal: Float = getParser(options).makeConverter(
"_1", FloatType, nullable = true, options = options
).apply("nn").asInstanceOf[Float]

Expand All @@ -158,7 +164,7 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Double NaN values are parsed correctly") {
val options = new CSVOptions(Map("nanValue" -> "-"), false, "GMT")
val doubleVal: Double = parser.makeConverter(
val doubleVal: Double = getParser(options).makeConverter(
"_1", DoubleType, nullable = true, options = options
).apply("-").asInstanceOf[Double]

Expand All @@ -167,14 +173,14 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Float infinite values can be parsed") {
val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT")
val floatVal1 = parser.makeConverter(
val floatVal1 = getParser(negativeInfOptions).makeConverter(
"_1", FloatType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Float]

assert(floatVal1 == Float.NegativeInfinity)

val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT")
val floatVal2 = parser.makeConverter(
val floatVal2 = getParser(positiveInfOptions).makeConverter(
"_1", FloatType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Float]

Expand All @@ -183,18 +189,17 @@ class UnivocityParserSuite extends SparkFunSuite {

test("Double infinite values can be parsed") {
val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT")
val doubleVal1 = parser.makeConverter(
val doubleVal1 = getParser(negativeInfOptions).makeConverter(
"_1", DoubleType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Double]

assert(doubleVal1 == Double.NegativeInfinity)

val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT")
val doubleVal2 = parser.makeConverter(
val doubleVal2 = getParser(positiveInfOptions).makeConverter(
"_1", DoubleType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Double]

assert(doubleVal2 == Double.PositiveInfinity)
}

}