File tree Expand file tree Collapse file tree 3 files changed +8
-13
lines changed
catalyst/src/main/scala/org/apache/spark/sql/catalyst
core/src/main/scala/org/apache/spark/sql/execution/streaming Expand file tree Collapse file tree 3 files changed +8
-13
lines changed Original file line number Diff line number Diff line change 1717
1818package org .apache .spark .sql .catalyst .plans .logical
1919
20+ import java .util .concurrent .TimeUnit
21+
2022import org .apache .spark .sql .catalyst .expressions .Attribute
21- import org .apache .spark .sql .catalyst .util .DateTimeUtils .MILLIS_PER_MONTH
2223import org .apache .spark .sql .types .MetadataBuilder
2324import org .apache .spark .unsafe .types .CalendarInterval
2425
@@ -27,7 +28,9 @@ object EventTimeWatermark {
2728 val delayKey = " spark.watermarkDelayMs"
2829
2930 def getDelayMs (delay : CalendarInterval ): Long = {
30- delay.milliseconds + delay.months * MILLIS_PER_MONTH
31+ // We define month as `31 days` to simplify calculation.
32+ val millisPerMonth = TimeUnit .MICROSECONDS .toMillis(CalendarInterval .MICROS_PER_DAY ) * 31
33+ delay.milliseconds + delay.months * millisPerMonth
3134 }
3235}
3336
Original file line number Diff line number Diff line change @@ -59,15 +59,6 @@ object DateTimeUtils {
5959 final val MILLIS_PER_MINUTE : Long = 60 * MILLIS_PER_SECOND
6060 final val MILLIS_PER_HOUR : Long = 60 * MILLIS_PER_MINUTE
6161 final val MILLIS_PER_DAY : Long = SECONDS_PER_DAY * MILLIS_PER_SECOND
62- // The average year of the Gregorian calendar 365.2425 days long, see
63- // https://en.wikipedia.org/wiki/Gregorian_calendar
64- // Leap year occurs every 4 years, except for years that are divisible by 100
65- // and not divisible by 400. So, the mean length of of the Gregorian calendar year is:
66- // 1 mean year = (365 + 1/4 - 1/100 + 1/400) days = 365.2425 days
67- // The mean year length in seconds is:
68- // 60 * 60 * 24 * 365.2425 = 31556952.0 = 12 * 2629746
69- final val SECONDS_PER_MONTH : Int = 2629746
70- final val MILLIS_PER_MONTH : Long = SECONDS_PER_MONTH * MILLIS_PER_SECOND
7162
7263 // number of days between 1.1.1970 and 1.1.2001
7364 final val to2001 = - 11323
Original file line number Diff line number Diff line change 1818package org .apache .spark .sql .execution .streaming
1919
2020import java .sql .Date
21+ import java .util .concurrent .TimeUnit
2122
2223import org .apache .spark .sql .catalyst .plans .logical .{EventTimeTimeout , ProcessingTimeTimeout }
23- import org .apache .spark .sql .catalyst .util .DateTimeUtils .MILLIS_PER_MONTH
2424import org .apache .spark .sql .execution .streaming .GroupStateImpl ._
2525import org .apache .spark .sql .streaming .{GroupState , GroupStateTimeout }
2626import org .apache .spark .unsafe .types .CalendarInterval
@@ -164,7 +164,8 @@ private[sql] class GroupStateImpl[S] private(
164164 throw new IllegalArgumentException (s " Provided duration ( $duration) is not positive " )
165165 }
166166
167- cal.milliseconds + cal.months * MILLIS_PER_MONTH
167+ val millisPerMonth = TimeUnit .MICROSECONDS .toMillis(CalendarInterval .MICROS_PER_DAY ) * 31
168+ cal.milliseconds + cal.months * millisPerMonth
168169 }
169170
170171 private def checkTimeoutTimestampAllowed (): Unit = {
You can’t perform that action at this time.
0 commit comments