Skip to content

Commit 1a68e03

Browse files
committed
poc of time interval calculation
1 parent c506661 commit 1a68e03

File tree

3 files changed

+135
-1
lines changed

3 files changed

+135
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2727
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2828
import org.apache.spark.sql.types._
29-
import org.apache.spark.unsafe.types.UTF8String
29+
import org.apache.spark.unsafe.types.{Interval, UTF8String}
3030

3131
/**
3232
* Returns the current date at the start of query evaluation.
@@ -305,3 +305,59 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
305305
})
306306
}
307307
}
308+
309+
/**
310+
* Time Adds Interval.
311+
*/
312+
case class TimeAdd(start: Expression, interval: Expression)
313+
extends BinaryExpression with ExpectsInputTypes with CodegenFallback {
314+
315+
override def left: Expression = start
316+
override def right: Expression = interval
317+
318+
override def toString: String = s"$left + $right"
319+
override def inputTypes: Seq[AbstractDataType] =
320+
Seq(TypeCollection(DateType, TimestampType), IntervalType)
321+
322+
override def dataType: DataType = TimestampType
323+
324+
override def nullSafeEval(start: Any, inter: Any): Any = {
325+
val itvl = inter.asInstanceOf[Interval]
326+
dataType match {
327+
case DateType =>
328+
DateTimeUtils.dateAddFullInterval(
329+
start.asInstanceOf[Int], itvl.months, itvl.microseconds)
330+
case TimestampType =>
331+
DateTimeUtils.timestampAddFullInterval(
332+
start.asInstanceOf[Long], itvl.months, itvl.microseconds)
333+
}
334+
}
335+
}
336+
337+
/**
338+
* Time Subtracts Interval.
339+
*/
340+
case class TimeSub(start: Expression, interval: Expression)
341+
extends BinaryExpression with ExpectsInputTypes with CodegenFallback {
342+
343+
override def left: Expression = start
344+
override def right: Expression = interval
345+
346+
override def toString: String = s"$left - $right"
347+
override def inputTypes: Seq[AbstractDataType] =
348+
Seq(TypeCollection(DateType, TimestampType), IntervalType)
349+
350+
override def dataType: DataType = TimestampType
351+
352+
override def nullSafeEval(start: Any, inter: Any): Any = {
353+
val itvl = inter.asInstanceOf[Interval]
354+
dataType match {
355+
case DateType =>
356+
DateTimeUtils.dateAddFullInterval(
357+
start.asInstanceOf[Int], 0 - itvl.months, 0 - itvl.microseconds)
358+
case TimestampType =>
359+
DateTimeUtils.timestampAddFullInterval(
360+
start.asInstanceOf[Long], 0 - itvl.months, 0 - itvl.microseconds)
361+
}
362+
}
363+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,4 +573,61 @@ object DateTimeUtils {
573573
dayInYear - 334
574574
}
575575
}
576+
577+
/**
578+
* Returns the date value for the first day of the given month.
579+
* The month is expressed in months since 1.1.1970, starting from 0.
580+
*/
581+
def getDaysFromMonths(months: Int): Int = {
582+
val yearSinceEpoch = if (months < 0) months / 12 - 1 else months / 12
583+
val monthInYearFromZero = months - yearSinceEpoch * 12
584+
val daysFromYears = getDaysFromYears(yearSinceEpoch)
585+
val febDays = if (isLeapYear(1970 + yearSinceEpoch)) 29 else 28
586+
val daysForMonths = Seq(31, febDays, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
587+
daysForMonths.slice(0, monthInYearFromZero + 1).sum + daysFromYears
588+
}
589+
590+
/**
591+
* Returns the date value for January 1 of the given year.
592+
* The year is expressed in years since 1.1.1970, starting from 0.
593+
*/
594+
def getDaysFromYears(years: Int): Int = {
595+
val remainYear = (years % 400 + 400) % 400
596+
val cycles = (years - remainYear) / 400
597+
val numLeaps = if (remainYear < 130) {
598+
remainYear / 4
599+
} else if (remainYear < 230) {
600+
remainYear / 4 - 1
601+
} else if (remainYear < 330) {
602+
remainYear / 4 - 2
603+
} else {
604+
remainYear / 4 - 3
605+
}
606+
cycles * (365 * 400 + 397) + remainYear * 365 + numLeaps
607+
}
608+
609+
/**
610+
* Add date and year-month interval.
611+
* Returns a date value, expressed in days since 1.1.1970.
612+
*/
613+
def dateAddYearMonthInterval(days: Int, months: Int): Int = {
614+
getDaysFromMonths(getYear(days) * 12 + getMonth(days) + months) + getDayOfMonth(days)
615+
}
616+
617+
/**
618+
* Add date and full interval.
619+
* Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00.
620+
*/
621+
def dateAddFullInterval(days: Int, months: Int, microseconds: Long): Long = {
622+
daysToMillis(dateAddYearMonthInterval(days, months)) * 1000 + microseconds
623+
}
624+
625+
/**
626+
* Add timestamp and full interval.
627+
* Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00.
628+
*/
629+
def timestampAddFullInterval(micros: Long, months: Int, microseconds: Long): Long = {
630+
dateAddYearMonthInterval(
631+
millisToDays(micros / 1000), months) + micros % (MILLIS_PER_DAY * 1000) + microseconds
632+
}
576633
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.Calendar
2424
import org.apache.spark.SparkFunSuite
2525
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2626
import org.apache.spark.sql.types.{IntegerType, StringType, TimestampType, DateType}
27+
import org.apache.spark.unsafe.types.Interval
2728

2829
class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
2930

@@ -279,4 +280,24 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
279280
checkResult(
280281
DateSub(Literal("2015-01-01"), Literal.create(null, IntegerType)), null)
281282
}
283+
284+
test("time_add") {
285+
checkResult(
286+
TimeAdd(Literal(Date.valueOf("2016-02-28")), Literal(new Interval(1, 0))),
287+
DateTimeUtils.fromJavaDate(Date.valueOf("2016-02-29")))
288+
checkResult(
289+
TimeAdd(Literal(Date.valueOf("2016-03-01")), Literal(new Interval(1, 2000000.toLong))),
290+
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-03-02 00:00:02")))
291+
}
292+
293+
test("time_sub") {
294+
checkResult(
295+
TimeSub(Literal(Timestamp.valueOf("2016-02-28 10:00:00")), Literal(new Interval(1, 0))),
296+
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-02-27 10:00:00")))
297+
checkResult(
298+
TimeSub(
299+
Literal(Timestamp.valueOf("2016-03-01 00:00:02")),
300+
Literal(new Interval(1, 2000000.toLong))),
301+
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-02-28 23:59:59")))
302+
}
282303
}

0 commit comments

Comments
 (0)