-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-29012][SQL] Support special timestamp values #25716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f3c2f4f
dfe541b
d4751af
ad23507
59e30e3
9f7ed14
fa0037d
14ce002
a268e62
e27a450
b17d642
ec1020f
a4fae09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,14 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.util | ||
|
|
||
| import java.nio.charset.StandardCharsets | ||
| import java.sql.{Date, Timestamp} | ||
| import java.time._ | ||
| import java.time.temporal.{ChronoField, ChronoUnit, IsoFields} | ||
| import java.util.{Locale, TimeZone} | ||
| import java.util.concurrent.TimeUnit._ | ||
|
|
||
| import scala.util.Try | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.types.Decimal | ||
|
|
@@ -218,6 +220,8 @@ object DateTimeUtils { | |
| var i = 0 | ||
| var currentSegmentValue = 0 | ||
| val bytes = s.trim.getBytes | ||
| val specialTimestamp = convertSpecialTimestamp(bytes, timeZoneId) | ||
| if (specialTimestamp.isDefined) return specialTimestamp | ||
| var j = 0 | ||
| var digitsMilli = 0 | ||
| var justTime = false | ||
|
|
@@ -848,4 +852,67 @@ object DateTimeUtils { | |
| val sinceEpoch = BigDecimal(timestamp) / MICROS_PER_SECOND + offset | ||
| new Decimal().set(sinceEpoch, 20, 6) | ||
| } | ||
|
|
||
| def currentTimestamp(): SQLTimestamp = instantToMicros(Instant.now()) | ||
|
|
||
| private def today(zoneId: ZoneId): ZonedDateTime = { | ||
| Instant.now().atZone(zoneId).`with`(LocalTime.MIDNIGHT) | ||
| } | ||
|
|
||
| private val specialValueRe = """(\p{Alpha}+)\p{Blank}*(.*)""".r | ||
|
|
||
| /** | ||
| * Extracts special values from an input string ignoring case. | ||
| * @param input - a trimmed string | ||
| * @param zoneId - zone identifier used to get the current date. | ||
| * @return some special value in lower case or None. | ||
| */ | ||
| private def extractSpecialValue(input: String, zoneId: ZoneId): Option[String] = { | ||
| def isValid(value: String, timeZoneId: String): Boolean = { | ||
| // Special value can be without any time zone | ||
| if (timeZoneId.isEmpty) return true | ||
| // "now" must not have the time zone field | ||
| if (value.compareToIgnoreCase("now") == 0) return false | ||
| // If the time zone field presents in the input, it must be resolvable | ||
| try { | ||
| getZoneId(timeZoneId) | ||
| true | ||
| } catch { | ||
| case NonFatal(_) => false | ||
| } | ||
| } | ||
|
|
||
| assert(input.trim.length == input.length) | ||
| if (input.length < 3 || !input(0).isLetter) return None | ||
| input match { | ||
| case specialValueRe(v, z) if isValid(v, z) => Some(v.toLowerCase(Locale.US)) | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Converts notational shorthands that are converted to ordinary timestamps. | ||
| * @param input - a trimmed string | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about checking if an input is trimmed by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add the assert: assert(input.trim.length == input.length) |
||
| * @param zoneId - zone identifier used to get the current date. | ||
| * @return some of microseconds since the epoch if the conversion completed | ||
| * successfully otherwise None. | ||
| */ | ||
| def convertSpecialTimestamp(input: String, zoneId: ZoneId): Option[SQLTimestamp] = { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's different from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have extracted common code there https://github.com/apache/spark/pull/25716/files#diff-da60f07e1826788aaeb07f295fae4b8aR864-R890 |
||
| extractSpecialValue(input, zoneId).flatMap { | ||
| case "epoch" => Some(0) | ||
| case "now" => Some(currentTimestamp()) | ||
| case "today" => Some(instantToMicros(today(zoneId).toInstant)) | ||
| case "tomorrow" => Some(instantToMicros(today(zoneId).plusDays(1).toInstant)) | ||
| case "yesterday" => Some(instantToMicros(today(zoneId).minusDays(1).toInstant)) | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
| private def convertSpecialTimestamp(bytes: Array[Byte], zoneId: ZoneId): Option[SQLTimestamp] = { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because I need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ur, I see. |
||
| if (bytes.length > 0 && Character.isAlphabetic(bytes(0))) { | ||
| convertSpecialTimestamp(new String(bytes, StandardCharsets.UTF_8), zoneId) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,16 +19,18 @@ package org.apache.spark.sql.catalyst.util | |
|
|
||
| import java.sql.{Date, Timestamp} | ||
| import java.text.SimpleDateFormat | ||
| import java.time.ZoneId | ||
| import java.time.{LocalDateTime, LocalTime, ZoneId} | ||
| import java.util.{Locale, TimeZone} | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import org.scalatest.Matchers | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils._ | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
| class DateTimeUtilsSuite extends SparkFunSuite { | ||
| class DateTimeUtilsSuite extends SparkFunSuite with Matchers { | ||
|
|
||
| val TimeZonePST = TimeZone.getTimeZone("PST") | ||
| private def defaultZoneId = ZoneId.systemDefault() | ||
|
|
@@ -142,10 +144,14 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| assert(stringToDate(UTF8String.fromString("1999 08")).isEmpty) | ||
| } | ||
|
|
||
| private def toTimestamp(str: String, zoneId: ZoneId): Option[SQLTimestamp] = { | ||
| stringToTimestamp(UTF8String.fromString(str), zoneId) | ||
| } | ||
|
|
||
| test("string to timestamp") { | ||
| for (tz <- ALL_TIMEZONES) { | ||
| def checkStringToTimestamp(str: String, expected: Option[Long]): Unit = { | ||
| assert(stringToTimestamp(UTF8String.fromString(str), tz.toZoneId) === expected) | ||
| assert(toTimestamp(str, tz.toZoneId) === expected) | ||
| } | ||
|
|
||
| checkStringToTimestamp("1969-12-31 16:00:00", Option(date(1969, 12, 31, 16, tz = tz))) | ||
|
|
@@ -271,8 +277,8 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| UTF8String.fromString("2015-02-29 00:00:00"), defaultZoneId).isEmpty) | ||
| assert(stringToTimestamp( | ||
| UTF8String.fromString("2015-04-31 00:00:00"), defaultZoneId).isEmpty) | ||
| assert(stringToTimestamp(UTF8String.fromString("2015-02-29"), defaultZoneId).isEmpty) | ||
| assert(stringToTimestamp(UTF8String.fromString("2015-04-31"), defaultZoneId).isEmpty) | ||
| assert(toTimestamp("2015-02-29", defaultZoneId).isEmpty) | ||
| assert(toTimestamp("2015-04-31", defaultZoneId).isEmpty) | ||
| } | ||
|
|
||
| test("hours") { | ||
|
|
@@ -456,8 +462,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| timezone: TimeZone = DateTimeUtils.defaultTimeZone()): Unit = { | ||
| val truncated = | ||
| DateTimeUtils.truncTimestamp(inputTS, level, timezone) | ||
| val expectedTS = | ||
| DateTimeUtils.stringToTimestamp(UTF8String.fromString(expected), defaultZoneId) | ||
| val expectedTS = toTimestamp(expected, defaultZoneId) | ||
| assert(truncated === expectedTS.get) | ||
| } | ||
|
|
||
|
|
@@ -564,4 +569,21 @@ class DateTimeUtilsSuite extends SparkFunSuite { | |
| assert(DateTimeUtils.toMillis(-9223372036844776001L) === -9223372036844777L) | ||
| assert(DateTimeUtils.toMillis(-157700927876544L) === -157700927877L) | ||
| } | ||
|
|
||
| test("special timestamp values") { | ||
| DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => | ||
| val tolerance = TimeUnit.SECONDS.toMicros(30) | ||
|
|
||
| assert(toTimestamp("Epoch", zoneId).get === 0) | ||
| val now = instantToMicros(LocalDateTime.now(zoneId).atZone(zoneId).toInstant) | ||
| toTimestamp("NOW", zoneId).get should be (now +- tolerance) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you check illegal cases, e.g., There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have already added the test here https://github.com/apache/spark/pull/25716/files#diff-c5655e947ce2dd3748e4cf95ebc32e8aR580 |
||
| assert(toTimestamp("now UTC", zoneId) === None) | ||
| val today = instantToMicros(LocalDateTime.now(zoneId) | ||
| .`with`(LocalTime.MIDNIGHT) | ||
| .atZone(zoneId).toInstant) | ||
| toTimestamp(" Yesterday", zoneId).get should be (today - MICROS_PER_DAY +- tolerance) | ||
| toTimestamp("Today ", zoneId).get should be (today +- tolerance) | ||
| toTimestamp(" tomorrow CET ", zoneId).get should be (today + MICROS_PER_DAY +- tolerance) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid to use
returnhere?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure about bytecode for this though, no overhead to use
return?