Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import java.time.{Instant, ZoneId}
import java.util.Locale

import scala.util.Try

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.internal.SQLConf

sealed trait DateFormatter {
def parse(s: String): Int // returns days since epoch
def format(days: Int): String
}

class Iso8601DateFormatter(
pattern: String,
locale: Locale) extends DateFormatter with DateTimeFormatterHelper {

private val formatter = buildFormatter(pattern, locale)
private val UTC = ZoneId.of("UTC")

private def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
toInstantWithZoneId(temporalAccessor, UTC)
}

override def parse(s: String): Int = {
val seconds = toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
days.toInt
}

override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
formatter.withZone(UTC).format(instant)
}
}

class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
private val format = FastDateFormat.getInstance(pattern, locale)

override def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
DateTimeUtils.millisToDays(milliseconds)
}

override def format(days: Int): String = {
val date = DateTimeUtils.toJavaDate(days)
format.format(date)
}
}

class LegacyFallbackDateFormatter(
pattern: String,
locale: Locale) extends LegacyDateFormatter(pattern, locale) {
override def parse(s: String): Int = {
Try(super.parse(s)).orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime))
}.getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
s.toInt
}
}
}

object DateFormatter {
def apply(format: String, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateFormatter(format, locale)
} else {
new Iso8601DateFormatter(format, locale)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import java.time.{Instant, LocalDateTime, ZonedDateTime, ZoneId}
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.time.temporal.{ChronoField, TemporalAccessor}
import java.util.Locale

trait DateTimeFormatterHelper {

protected def buildFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
}

protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor, zoneId: ZoneId): Instant = {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
Instant.from(zonedDateTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.sql.catalyst.util

import java.time._
import java.time.format.DateTimeFormatterBuilder
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.time.temporal.TemporalQueries
import java.util.{Locale, TimeZone}

import scala.util.Try
Expand All @@ -33,39 +32,16 @@ sealed trait TimestampFormatter {
def format(us: Long): String
}

trait FormatterUtils {
protected def zoneId: ZoneId
protected def buildFormatter(
pattern: String,
locale: Locale): java.time.format.DateTimeFormatter = {
new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
}
protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
Instant.from(zonedDateTime)
}
}

class Iso8601TimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends TimestampFormatter with FormatterUtils {
val zoneId = timeZone.toZoneId
val formatter = buildFormatter(pattern, locale)
locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
private val formatter = buildFormatter(pattern, locale)

def toInstant(s: String): Instant = {
private def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
toInstantWithZoneId(temporalAccessor)
toInstantWithZoneId(temporalAccessor, timeZone.toZoneId)
} else {
Instant.from(temporalAccessor)
}
Expand All @@ -77,9 +53,9 @@ class Iso8601TimestampFormatter(
result
}

def parse(s: String): Long = instantToMicros(toInstant(s))
override def parse(s: String): Long = instantToMicros(toInstant(s))

def format(us: Long): String = {
override def format(us: Long): String = {
val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND)
val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND)
val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS)
Expand All @@ -92,13 +68,13 @@ class LegacyTimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends TimestampFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
private val format = FastDateFormat.getInstance(pattern, timeZone, locale)

protected def toMillis(s: String): Long = format.parse(s).getTime

def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS
override def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS

def format(us: Long): String = {
override def format(us: Long): String = {
format.format(DateTimeUtils.toJavaTimestamp(us))
}
}
Expand All @@ -121,74 +97,3 @@ object TimestampFormatter {
}
}
}

sealed trait DateFormatter {
def parse(s: String): Int // returns days since epoch
def format(days: Int): String
}

class Iso8601DateFormatter(
pattern: String,
locale: Locale) extends DateFormatter with FormatterUtils {

val zoneId = ZoneId.of("UTC")

val formatter = buildFormatter(pattern, locale)

def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
toInstantWithZoneId(temporalAccessor)
}

override def parse(s: String): Int = {
val seconds = toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)

days.toInt
}

override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
formatter.withZone(zoneId).format(instant)
}
}

class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, locale)

def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
DateTimeUtils.millisToDays(milliseconds)
}

def format(days: Int): String = {
val date = DateTimeUtils.toJavaDate(days)
format.format(date)
}
}

class LegacyFallbackDateFormatter(
pattern: String,
locale: Locale) extends LegacyDateFormatter(pattern, locale) {
override def parse(s: String): Int = {
Try(super.parse(s)).orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime))
}.getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
s.toInt
}
}
}

object DateFormatter {
def apply(format: String, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateFormatter(format, locale)
} else {
new Iso8601DateFormatter(format, locale)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import java.util.Locale

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf

class DateFormatterSuite extends SparkFunSuite with SQLHelper {
test("parsing dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val daysSinceEpoch = formatter.parse("2018-12-02")
assert(daysSinceEpoch === 17867)
}
}
}

test("format dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val date = formatter.format(17867)
assert(date === "2018-12-02")
}
}
}

test("roundtrip date -> days -> date") {
Seq(
"0050-01-01",
"0953-02-02",
"1423-03-08",
"1969-12-31",
"1972-08-25",
"1975-09-26",
"2018-12-12",
"2038-01-01",
"5010-11-17").foreach { date =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val days = formatter.parse(date)
val formatted = formatter.format(days)
assert(date === formatted)
}
}
}
}

test("roundtrip days -> date -> days") {
Seq(
-701265,
-371419,
-199722,
-1,
0,
967,
2094,
17877,
24837,
1110657).foreach { days =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val date = formatter.format(days)
val parsed = formatter.parse(date)
assert(days === parsed)
}
}
}
}
}
Loading