Skip to content

Commit 7dbd90b

Browse files
cloud-fandongjoon-hyun
authored andcommitted
[SPARK-31797][SQL][FOLLOWUP] TIMESTAMP_SECONDS supports fractional input
### What changes were proposed in this pull request? This is a followup of #28534 , to make `TIMESTAMP_SECONDS` function support fractional input as well. ### Why are the changes needed? Previously the cast function can cast fractional values to timestamp. Now we suggest users to ues these new functions, and we need to cover all the cast use cases. ### Does this PR introduce _any_ user-facing change? Yes, now `TIMESTAMP_SECONDS` function accepts fractional input. ### How was this patch tested? new tests Closes #28956 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f7d9e3d commit 7dbd90b

File tree

9 files changed

+267
-39
lines changed

9 files changed

+267
-39
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
2020
import java.text.ParseException
2121
import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId}
2222
import java.time.format.DateTimeParseException
23-
import java.time.temporal.IsoFields
2423
import java.util.Locale
2524

2625
import org.apache.commons.text.StringEscapeUtils
@@ -386,7 +385,7 @@ case class DayOfYear(child: Expression) extends GetDateField {
386385
override val funcName = "getDayInYear"
387386
}
388387

389-
abstract class NumberToTimestampBase extends UnaryExpression
388+
abstract class IntegralToTimestampBase extends UnaryExpression
390389
with ExpectsInputTypes with NullIntolerant {
391390

392391
protected def upScaleFactor: Long
@@ -408,19 +407,66 @@ abstract class NumberToTimestampBase extends UnaryExpression
408407
}
409408
}
410409

410+
// scalastyle:off line.size.limit
411411
@ExpressionDescription(
412-
usage = "_FUNC_(seconds) - Creates timestamp from the number of seconds since UTC epoch.",
412+
usage = "_FUNC_(seconds) - Creates timestamp from the number of seconds (can be fractional) since UTC epoch.",
413413
examples = """
414414
Examples:
415415
> SELECT _FUNC_(1230219000);
416416
2008-12-25 07:30:00
417+
> SELECT _FUNC_(1230219000.123);
418+
2008-12-25 07:30:00.123
417419
""",
418420
group = "datetime_funcs",
419421
since = "3.1.0")
420-
case class SecondsToTimestamp(child: Expression)
421-
extends NumberToTimestampBase {
422+
// scalastyle:on line.size.limit
423+
case class SecondsToTimestamp(child: Expression) extends UnaryExpression
424+
with ExpectsInputTypes with NullIntolerant {
425+
426+
override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
427+
428+
override def dataType: DataType = TimestampType
422429

423-
override def upScaleFactor: Long = MICROS_PER_SECOND
430+
override def nullable: Boolean = child.dataType match {
431+
case _: FloatType | _: DoubleType => true
432+
case _ => child.nullable
433+
}
434+
435+
@transient
436+
private lazy val evalFunc: Any => Any = child.dataType match {
437+
case _: IntegralType => input =>
438+
Math.multiplyExact(input.asInstanceOf[Number].longValue(), MICROS_PER_SECOND)
439+
case _: DecimalType => input =>
440+
val operand = new java.math.BigDecimal(MICROS_PER_SECOND)
441+
input.asInstanceOf[Decimal].toJavaBigDecimal.multiply(operand).longValueExact()
442+
case _: FloatType => input =>
443+
val f = input.asInstanceOf[Float]
444+
if (f.isNaN || f.isInfinite) null else (f * MICROS_PER_SECOND).toLong
445+
case _: DoubleType => input =>
446+
val d = input.asInstanceOf[Double]
447+
if (d.isNaN || d.isInfinite) null else (d * MICROS_PER_SECOND).toLong
448+
}
449+
450+
override def nullSafeEval(input: Any): Any = evalFunc(input)
451+
452+
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = child.dataType match {
453+
case _: IntegralType =>
454+
defineCodeGen(ctx, ev, c => s"java.lang.Math.multiplyExact($c, ${MICROS_PER_SECOND}L)")
455+
case _: DecimalType =>
456+
val operand = s"new java.math.BigDecimal($MICROS_PER_SECOND)"
457+
defineCodeGen(ctx, ev, c => s"$c.toJavaBigDecimal().multiply($operand).longValueExact()")
458+
case other =>
459+
nullSafeCodeGen(ctx, ev, c => {
460+
val typeStr = CodeGenerator.boxedType(other)
461+
s"""
462+
|if ($typeStr.isNaN($c) || $typeStr.isInfinite($c)) {
463+
| ${ev.isNull} = true;
464+
|} else {
465+
| ${ev.value} = (long)($c * $MICROS_PER_SECOND);
466+
|}
467+
|""".stripMargin
468+
})
469+
}
424470

425471
override def prettyName: String = "timestamp_seconds"
426472
}
@@ -437,7 +483,7 @@ case class SecondsToTimestamp(child: Expression)
437483
since = "3.1.0")
438484
// scalastyle:on line.size.limit
439485
case class MillisToTimestamp(child: Expression)
440-
extends NumberToTimestampBase {
486+
extends IntegralToTimestampBase {
441487

442488
override def upScaleFactor: Long = MICROS_PER_MILLIS
443489

@@ -456,7 +502,7 @@ case class MillisToTimestamp(child: Expression)
456502
since = "3.1.0")
457503
// scalastyle:on line.size.limit
458504
case class MicrosToTimestamp(child: Expression)
459-
extends NumberToTimestampBase {
505+
extends IntegralToTimestampBase {
460506

461507
override def upScaleFactor: Long = 1L
462508

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala

Lines changed: 114 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,28 +1142,6 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
11421142
}
11431143
}
11441144

1145-
test("SPARK-31710:Adds TIMESTAMP_SECONDS, " +
1146-
"TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions") {
1147-
checkEvaluation(SecondsToTimestamp(Literal(1230219000)), 1230219000L * MICROS_PER_SECOND)
1148-
checkEvaluation(SecondsToTimestamp(Literal(-1230219000)), -1230219000L * MICROS_PER_SECOND)
1149-
checkEvaluation(SecondsToTimestamp(Literal(null, IntegerType)), null)
1150-
checkEvaluation(MillisToTimestamp(Literal(1230219000123L)), 1230219000123L * MICROS_PER_MILLIS)
1151-
checkEvaluation(MillisToTimestamp(
1152-
Literal(-1230219000123L)), -1230219000123L * MICROS_PER_MILLIS)
1153-
checkEvaluation(MillisToTimestamp(Literal(null, IntegerType)), null)
1154-
checkEvaluation(MicrosToTimestamp(Literal(1230219000123123L)), 1230219000123123L)
1155-
checkEvaluation(MicrosToTimestamp(Literal(-1230219000123123L)), -1230219000123123L)
1156-
checkEvaluation(MicrosToTimestamp(Literal(null, IntegerType)), null)
1157-
checkExceptionInExpression[ArithmeticException](
1158-
SecondsToTimestamp(Literal(1230219000123123L)), "long overflow")
1159-
checkExceptionInExpression[ArithmeticException](
1160-
SecondsToTimestamp(Literal(-1230219000123123L)), "long overflow")
1161-
checkExceptionInExpression[ArithmeticException](
1162-
MillisToTimestamp(Literal(92233720368547758L)), "long overflow")
1163-
checkExceptionInExpression[ArithmeticException](
1164-
MillisToTimestamp(Literal(-92233720368547758L)), "long overflow")
1165-
}
1166-
11671145
test("Consistent error handling for datetime formatting and parsing functions") {
11681146

11691147
def checkException[T <: Exception : ClassTag](c: String): Unit = {
@@ -1194,4 +1172,118 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
11941172
new ParseToTimestamp(Literal("11:11 PM"), Literal("mm:ss a")).child,
11951173
Timestamp.valueOf("1970-01-01 12:11:11.0"))
11961174
}
1175+
1176+
def testIntegralInput(testFunc: Number => Unit): Unit = {
1177+
def checkResult(input: Long): Unit = {
1178+
if (input.toByte == input) {
1179+
testFunc(input.toByte)
1180+
} else if (input.toShort == input) {
1181+
testFunc(input.toShort)
1182+
} else if (input.toInt == input) {
1183+
testFunc(input.toInt)
1184+
} else {
1185+
testFunc(input)
1186+
}
1187+
}
1188+
checkResult(0)
1189+
checkResult(Byte.MaxValue)
1190+
checkResult(Byte.MinValue)
1191+
checkResult(Short.MaxValue)
1192+
checkResult(Short.MinValue)
1193+
checkResult(Int.MaxValue)
1194+
checkResult(Int.MinValue)
1195+
checkResult(Int.MaxValue.toLong + 100)
1196+
checkResult(Int.MinValue.toLong - 100)
1197+
}
1198+
1199+
test("TIMESTAMP_SECONDS") {
1200+
def testIntegralFunc(value: Number): Unit = {
1201+
checkEvaluation(
1202+
SecondsToTimestamp(Literal(value)),
1203+
Instant.ofEpochSecond(value.longValue()))
1204+
}
1205+
1206+
// test null input
1207+
checkEvaluation(
1208+
SecondsToTimestamp(Literal(null, IntegerType)),
1209+
null)
1210+
1211+
// test integral input
1212+
testIntegralInput(testIntegralFunc)
1213+
// test overflow
1214+
checkExceptionInExpression[ArithmeticException](
1215+
SecondsToTimestamp(Literal(Long.MaxValue, LongType)), EmptyRow, "long overflow")
1216+
1217+
def testFractionalInput(input: String): Unit = {
1218+
Seq(input.toFloat, input.toDouble, Decimal(input)).foreach { value =>
1219+
checkEvaluation(
1220+
SecondsToTimestamp(Literal(value)),
1221+
(input.toDouble * MICROS_PER_SECOND).toLong)
1222+
}
1223+
}
1224+
1225+
testFractionalInput("1.0")
1226+
testFractionalInput("-1.0")
1227+
testFractionalInput("1.234567")
1228+
testFractionalInput("-1.234567")
1229+
1230+
// test overflow for decimal input
1231+
checkExceptionInExpression[ArithmeticException](
1232+
SecondsToTimestamp(Literal(Decimal("9" * 38))), "Overflow"
1233+
)
1234+
// test truncation error for decimal input
1235+
checkExceptionInExpression[ArithmeticException](
1236+
SecondsToTimestamp(Literal(Decimal("0.1234567"))), "Rounding necessary"
1237+
)
1238+
1239+
// test NaN
1240+
checkEvaluation(
1241+
SecondsToTimestamp(Literal(Double.NaN)),
1242+
null)
1243+
checkEvaluation(
1244+
SecondsToTimestamp(Literal(Float.NaN)),
1245+
null)
1246+
// double input can truncate
1247+
checkEvaluation(
1248+
SecondsToTimestamp(Literal(123.456789123)),
1249+
Instant.ofEpochSecond(123, 456789000))
1250+
}
1251+
1252+
test("TIMESTAMP_MILLIS") {
1253+
def testIntegralFunc(value: Number): Unit = {
1254+
checkEvaluation(
1255+
MillisToTimestamp(Literal(value)),
1256+
Instant.ofEpochMilli(value.longValue()))
1257+
}
1258+
1259+
// test null input
1260+
checkEvaluation(
1261+
MillisToTimestamp(Literal(null, IntegerType)),
1262+
null)
1263+
1264+
// test integral input
1265+
testIntegralInput(testIntegralFunc)
1266+
// test overflow
1267+
checkExceptionInExpression[ArithmeticException](
1268+
MillisToTimestamp(Literal(Long.MaxValue, LongType)), EmptyRow, "long overflow")
1269+
}
1270+
1271+
test("TIMESTAMP_MICROS") {
1272+
def testIntegralFunc(value: Number): Unit = {
1273+
checkEvaluation(
1274+
MicrosToTimestamp(Literal(value)),
1275+
value.longValue())
1276+
}
1277+
1278+
// test null input
1279+
checkEvaluation(
1280+
MicrosToTimestamp(Literal(null, IntegerType)),
1281+
null)
1282+
1283+
// test integral input
1284+
testIntegralInput(testIntegralFunc)
1285+
// test max/min input
1286+
testIntegralFunc(Long.MaxValue)
1287+
testIntegralFunc(Long.MinValue)
1288+
}
11971289
}

sql/core/src/test/resources/sql-tests/inputs/datetime.sql

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,18 @@
22

33
-- [SPARK-31710] TIMESTAMP_SECONDS, TIMESTAMP_MILLISECONDS and TIMESTAMP_MICROSECONDS to timestamp transfer
44
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null);
5+
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d), TIMESTAMP_SECONDS(FLOAT(1.23));
56
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null);
67
select TIMESTAMP_MICROS(1230219000123123),TIMESTAMP_MICROS(-1230219000123123),TIMESTAMP_MICROS(null);
7-
-- overflow exception:
8+
-- overflow exception
89
select TIMESTAMP_SECONDS(1230219000123123);
910
select TIMESTAMP_SECONDS(-1230219000123123);
1011
select TIMESTAMP_MILLIS(92233720368547758);
1112
select TIMESTAMP_MILLIS(-92233720368547758);
13+
-- truncate exception
14+
select TIMESTAMP_SECONDS(0.1234567);
15+
-- truncation is OK for float/double
16+
select TIMESTAMP_SECONDS(0.1234567d), TIMESTAMP_SECONDS(FLOAT(0.1234567));
1217

1318
-- [SPARK-16836] current_date and current_timestamp literals
1419
select current_date = current_date(), current_timestamp = current_timestamp();

sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 103
2+
-- Number of queries: 106
33

44

55
-- !query
66
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null)
77
-- !query schema
8-
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS INT)):timestamp>
8+
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS DOUBLE)):timestamp>
99
-- !query output
1010
2008-12-25 07:30:00 1931-01-07 00:30:00 NULL
1111

1212

13+
-- !query
14+
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d), TIMESTAMP_SECONDS(FLOAT(1.23))
15+
-- !query schema
16+
struct<timestamp_seconds(1.23):timestamp,timestamp_seconds(1.23):timestamp,timestamp_seconds(CAST(1.23 AS FLOAT)):timestamp>
17+
-- !query output
18+
1969-12-31 16:00:01.23 1969-12-31 16:00:01.23 1969-12-31 16:00:01.23
19+
20+
1321
-- !query
1422
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null)
1523
-- !query schema
@@ -62,6 +70,23 @@ java.lang.ArithmeticException
6270
long overflow
6371

6472

73+
-- !query
74+
select TIMESTAMP_SECONDS(0.1234567)
75+
-- !query schema
76+
struct<>
77+
-- !query output
78+
java.lang.ArithmeticException
79+
Rounding necessary
80+
81+
82+
-- !query
83+
select TIMESTAMP_SECONDS(0.1234567d), TIMESTAMP_SECONDS(FLOAT(0.1234567))
84+
-- !query schema
85+
struct<timestamp_seconds(0.1234567):timestamp,timestamp_seconds(CAST(0.1234567 AS FLOAT)):timestamp>
86+
-- !query output
87+
1969-12-31 16:00:00.123456 1969-12-31 16:00:00.123456
88+
89+
6590
-- !query
6691
select current_date = current_date(), current_timestamp = current_timestamp()
6792
-- !query schema

sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 103
2+
-- Number of queries: 106
33

44

55
-- !query
66
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null)
77
-- !query schema
8-
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS INT)):timestamp>
8+
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS DOUBLE)):timestamp>
99
-- !query output
1010
2008-12-25 07:30:00 1931-01-07 00:30:00 NULL
1111

1212

13+
-- !query
14+
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d), TIMESTAMP_SECONDS(FLOAT(1.23))
15+
-- !query schema
16+
struct<timestamp_seconds(1.23):timestamp,timestamp_seconds(1.23):timestamp,timestamp_seconds(CAST(1.23 AS FLOAT)):timestamp>
17+
-- !query output
18+
1969-12-31 16:00:01.23 1969-12-31 16:00:01.23 1969-12-31 16:00:01.23
19+
20+
1321
-- !query
1422
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null)
1523
-- !query schema
@@ -62,6 +70,23 @@ java.lang.ArithmeticException
6270
long overflow
6371

6472

73+
-- !query
74+
select TIMESTAMP_SECONDS(0.1234567)
75+
-- !query schema
76+
struct<>
77+
-- !query output
78+
java.lang.ArithmeticException
79+
Rounding necessary
80+
81+
82+
-- !query
83+
select TIMESTAMP_SECONDS(0.1234567d), TIMESTAMP_SECONDS(FLOAT(0.1234567))
84+
-- !query schema
85+
struct<timestamp_seconds(0.1234567):timestamp,timestamp_seconds(CAST(0.1234567 AS FLOAT)):timestamp>
86+
-- !query output
87+
1969-12-31 16:00:00.123456 1969-12-31 16:00:00.123456
88+
89+
6590
-- !query
6691
select current_date = current_date(), current_timestamp = current_timestamp()
6792
-- !query schema

0 commit comments

Comments
 (0)