diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 8441c2c481ec..2309aa42b80c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical -import java.util.concurrent.TimeUnit - import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.DateTimeUtils.MILLIS_PER_MONTH import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval @@ -28,9 +27,7 @@ object EventTimeWatermark { val delayKey = "spark.watermarkDelayMs" def getDelayMs(delay: CalendarInterval): Long = { - // We define month as `31 days` to simplify calculation. - val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 - delay.milliseconds + delay.months * millisPerMonth + delay.milliseconds + delay.months * MILLIS_PER_MONTH } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 34e8012106bb..79fc45ec8947 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -59,6 +59,15 @@ object DateTimeUtils { final val MILLIS_PER_MINUTE: Long = 60 * MILLIS_PER_SECOND final val MILLIS_PER_HOUR: Long = 60 * MILLIS_PER_MINUTE final val MILLIS_PER_DAY: Long = SECONDS_PER_DAY * MILLIS_PER_SECOND + // The average year of the Gregorian calendar 365.2425 days long, see + // https://en.wikipedia.org/wiki/Gregorian_calendar + // Leap year occurs every 4 years, except for years that are divisible by 100 + // and not divisible by 400. So, the mean length of of the Gregorian calendar year is: + // 1 mean year = (365 + 1/4 - 1/100 + 1/400) days = 365.2425 days + // The mean year length in seconds is: + // 60 * 60 * 24 * 365.2425 = 31556952.0 = 12 * 2629746 + final val SECONDS_PER_MONTH: Int = 2629746 + final val MILLIS_PER_MONTH: Long = SECONDS_PER_MONTH * MILLIS_PER_SECOND // number of days between 1.1.1970 and 1.1.2001 final val to2001 = -11323 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index dda9d41f630e..f459a2c1f8e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution.streaming import java.sql.Date -import java.util.concurrent.TimeUnit import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.MILLIS_PER_MONTH import org.apache.spark.sql.execution.streaming.GroupStateImpl._ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} import org.apache.spark.unsafe.types.CalendarInterval @@ -164,8 +164,7 @@ private[sql] class GroupStateImpl[S] private( throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } - val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 - cal.milliseconds + cal.months * millisPerMonth + cal.milliseconds + cal.months * MILLIS_PER_MONTH } private def checkTimeoutTimestampAllowed(): Unit = {