Skip to content

Commit 6e9ff72

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-31984][SQL] Make micros rebasing functions via local timestamps pure
### What changes were proposed in this pull request? 1. Set the given time zone as the first parameter of `RebaseDateTime`.`rebaseJulianToGregorianMicros()` and `rebaseGregorianToJulianMicros()` to Java 7 `GregorianCalendar`. ```scala val cal = new Calendar.Builder() // `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems .setCalendarType("gregory") ... .setTimeZone(tz) .build() ``` This makes the instance of the calendar independent from the default JVM time zone. 2. Change type of the first parameter from `ZoneId` to `TimeZone`. This allows to avoid unnecessary conversion from `TimeZone` to `ZoneId`, for example in ```scala def rebaseJulianToGregorianMicros(micros: Long): Long = { ... if (rebaseRecord == null || micros < rebaseRecord.switches(0)) { rebaseJulianToGregorianMicros(timeZone.toZoneId, micros) ``` and back to `TimeZone` inside of `rebaseJulianToGregorianMicros(zoneId: ZoneId, ...)` 3. Modify tests in `RebaseDateTimeSuite`, and set the default JVM time zone only for functions that depend on it. ### Why are the changes needed? 1. Ignoring passed parameter and using a global variable is bad practice. 2. Dependency from the global state doesn't allow to run the functions in parallel otherwise there is non-zero probability that the functions may return wrong result if the default JVM is changed during their execution. 3. This open opportunity for parallelisation of JSON files generation `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json`. Currently, the tests `generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'` generate the JSON files by iterating over all time zones sequentially w/ step of 1 week. Due to the large step, we can miss some spikes in diffs between 2 calendars (Java 8 Gregorian and Java 7 hybrid calendars) as the PR apache#28787 fixed and apache#28816 should fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running existing tests from `RebaseDateTimeSuite`. Closes apache#28824 from MaxGekk/pure-micros-rebasing. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d24d27f commit 6e9ff72

File tree

2 files changed

+94
-88
lines changed

2 files changed

+94
-88
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -320,13 +320,14 @@ object RebaseDateTime {
320320
* Julian calendar: 1582-01-01 00:00:00.123456 -> -12243196799876544
321321
* The code below converts -12244061221876544 to -12243196799876544.
322322
*
323-
* @param zoneId The time zone ID at which the rebasing should be performed.
323+
* @param tz The time zone at which the rebasing should be performed.
324324
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'
325325
* in Proleptic Gregorian calendar. It can be negative.
326326
* @return The rebased microseconds since the epoch in Julian calendar.
327327
*/
328-
private[sql] def rebaseGregorianToJulianMicros(zoneId: ZoneId, micros: Long): Long = {
328+
private[sql] def rebaseGregorianToJulianMicros(tz: TimeZone, micros: Long): Long = {
329329
val instant = microsToInstant(micros)
330+
val zoneId = tz.toZoneId
330331
val zonedDateTime = instant.atZone(zoneId)
331332
var ldt = zonedDateTime.toLocalDateTime
332333
if (ldt.isAfter(julianEndTs) && ldt.isBefore(gregorianStartTs)) {
@@ -337,6 +338,7 @@ object RebaseDateTime {
337338
.setCalendarType("gregory")
338339
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
339340
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
341+
.setTimeZone(tz)
340342
.build()
341343
// A local timestamp can have 2 instants in the cases of switching from:
342344
// 1. Summer to winter time.
@@ -379,7 +381,7 @@ object RebaseDateTime {
379381
val tzId = timeZone.getID
380382
val rebaseRecord = gregJulianRebaseMap.getOrNull(tzId)
381383
if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
382-
rebaseGregorianToJulianMicros(timeZone.toZoneId, micros)
384+
rebaseGregorianToJulianMicros(timeZone, micros)
383385
} else {
384386
rebaseMicros(rebaseRecord, micros)
385387
}
@@ -401,17 +403,17 @@ object RebaseDateTime {
401403
* Proleptic Gregorian calendar: 1582-01-01 00:00:00.123456 -> -12244061221876544
402404
* The code below converts -12243196799876544 to -12244061221876544.
403405
*
404-
* @param zoneId The time zone ID at which the rebasing should be performed.
406+
* @param tz The time zone at which the rebasing should be performed.
405407
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'
406408
* in the Julian calendar. It can be negative.
407409
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
408410
*/
409-
private[sql] def rebaseJulianToGregorianMicros(zoneId: ZoneId, micros: Long): Long = {
411+
private[sql] def rebaseJulianToGregorianMicros(tz: TimeZone, micros: Long): Long = {
410412
val cal = new Calendar.Builder()
411-
// `gregory` is a hybrid calendar that supports both
412-
// the Julian and Gregorian calendar systems
413+
// `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems
413414
.setCalendarType("gregory")
414415
.setInstant(microsToMillis(micros))
416+
.setTimeZone(tz)
415417
.build()
416418
val localDateTime = LocalDateTime.of(
417419
cal.get(YEAR),
@@ -427,6 +429,7 @@ object RebaseDateTime {
427429
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
428430
.`with`(ChronoField.ERA, cal.get(ERA))
429431
.plusDays(cal.get(DAY_OF_MONTH) - 1)
432+
val zoneId = tz.toZoneId
430433
val zonedDateTime = localDateTime.atZone(zoneId)
431434
// In the case of local timestamp overlapping, we need to choose the correct time instant
432435
// which is related to the original local timestamp. We look ahead of 1 day, and if the next
@@ -479,7 +482,7 @@ object RebaseDateTime {
479482
val tzId = timeZone.getID
480483
val rebaseRecord = julianGregRebaseMap.getOrNull(tzId)
481484
if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
482-
rebaseJulianToGregorianMicros(timeZone.toZoneId, micros)
485+
rebaseJulianToGregorianMicros(timeZone, micros)
483486
} else {
484487
rebaseMicros(rebaseRecord, micros)
485488
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala

Lines changed: 83 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -201,48 +201,48 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
201201
test("optimization of micros rebasing - Gregorian to Julian") {
202202
outstandingZoneIds.foreach { zid =>
203203
withClue(s"zone id = $zid") {
204-
withDefaultTimeZone(zid) {
205-
val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
206-
.atZone(zid)
207-
.toInstant)
208-
val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0)
209-
.atZone(zid)
210-
.toInstant)
211-
var micros = start
212-
do {
213-
val rebased = rebaseGregorianToJulianMicros(zid, micros)
214-
val rebasedAndOptimized = rebaseGregorianToJulianMicros(micros)
215-
assert(rebasedAndOptimized === rebased)
216-
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
217-
} while (micros <= end)
218-
}
204+
val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
205+
.atZone(zid)
206+
.toInstant)
207+
val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0)
208+
.atZone(zid)
209+
.toInstant)
210+
var micros = start
211+
do {
212+
val rebased = rebaseGregorianToJulianMicros(TimeZone.getTimeZone(zid), micros)
213+
val rebasedAndOptimized = withDefaultTimeZone(zid) {
214+
rebaseGregorianToJulianMicros(micros)
215+
}
216+
assert(rebasedAndOptimized === rebased)
217+
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
218+
} while (micros <= end)
219219
}
220220
}
221221
}
222222

223223
test("optimization of micros rebasing - Julian to Gregorian") {
224224
outstandingZoneIds.foreach { zid =>
225225
withClue(s"zone id = $zid") {
226-
withDefaultTimeZone(zid) {
227-
val start = rebaseGregorianToJulianMicros(
228-
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
229-
val end = rebaseGregorianToJulianMicros(
230-
instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant))
231-
var micros = start
232-
do {
233-
val rebased = rebaseJulianToGregorianMicros(zid, micros)
234-
val rebasedAndOptimized = rebaseJulianToGregorianMicros(micros)
235-
assert(rebasedAndOptimized === rebased)
236-
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
237-
} while (micros <= end)
238-
}
226+
val start = rebaseGregorianToJulianMicros(
227+
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
228+
val end = rebaseGregorianToJulianMicros(
229+
instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant))
230+
var micros = start
231+
do {
232+
val rebased = rebaseJulianToGregorianMicros(TimeZone.getTimeZone(zid), micros)
233+
val rebasedAndOptimized = withDefaultTimeZone(zid) {
234+
rebaseJulianToGregorianMicros(micros)
235+
}
236+
assert(rebasedAndOptimized === rebased)
237+
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
238+
} while (micros <= end)
239239
}
240240
}
241241
}
242242

243243
private def generateRebaseJson(
244-
adjustFunc: Long => Long,
245-
rebaseFunc: (ZoneId, Long) => Long,
244+
adjustFunc: (TimeZone, Long) => Long,
245+
rebaseFunc: (TimeZone, Long) => Long,
246246
dir: String,
247247
fileName: String): Unit = {
248248
import java.nio.file.{Files, Paths}
@@ -260,14 +260,15 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
260260
.sortBy(_.getId)
261261
.foreach { zid =>
262262
withDefaultTimeZone(zid) {
263-
val start = adjustFunc(instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
264-
.atZone(zid)
265-
.toInstant))
263+
val tz = TimeZone.getTimeZone(zid)
264+
val start = adjustFunc(
265+
tz,
266+
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
266267
// sun.util.calendar.ZoneInfo resolves DST after 2037 year incorrectly.
267268
// See https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8073446
268-
val end = adjustFunc(instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0)
269-
.atZone(zid)
270-
.toInstant))
269+
val end = adjustFunc(
270+
tz,
271+
instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0).atZone(zid).toInstant))
271272

272273
var micros = start
273274
var diff = Long.MaxValue
@@ -276,7 +277,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
276277
val switches = new ArrayBuffer[Long]()
277278
val diffs = new ArrayBuffer[Long]()
278279
while (micros < end) {
279-
val rebased = rebaseFunc(zid, micros)
280+
val rebased = rebaseFunc(tz, micros)
280281
val curDiff = rebased - micros
281282
if (curDiff != diff) {
282283
if (step > MICROS_PER_SECOND) {
@@ -308,7 +309,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
308309

309310
ignore("generate 'gregorian-julian-rebase-micros.json'") {
310311
generateRebaseJson(
311-
adjustFunc = identity[Long],
312+
adjustFunc = (_: TimeZone, micros: Long) => micros,
312313
rebaseFunc = rebaseGregorianToJulianMicros,
313314
dir = "/Users/maximgekk/tmp",
314315
fileName = "gregorian-julian-rebase-micros.json")
@@ -383,26 +384,27 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
383384

384385
test("rebase not-existed timestamps in the hybrid calendar") {
385386
outstandingZoneIds.foreach { zid =>
386-
withDefaultTimeZone(zid) {
387-
Seq(
388-
"1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999",
389-
"1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
390-
"1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001",
391-
"1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
392-
"1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
393-
"1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001",
394-
"1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455",
395-
"1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111",
396-
"1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415",
397-
"1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321",
398-
"1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
399-
"1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000"
400-
).foreach { case (gregTs, hybridTs) =>
401-
withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") {
402-
val hybridMicros = parseToJulianMicros(hybridTs)
403-
val gregorianMicros = parseToGregMicros(gregTs, zid)
404-
405-
assert(rebaseGregorianToJulianMicros(zid, gregorianMicros) === hybridMicros)
387+
Seq(
388+
"1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999",
389+
"1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
390+
"1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001",
391+
"1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
392+
"1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
393+
"1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001",
394+
"1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455",
395+
"1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111",
396+
"1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415",
397+
"1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321",
398+
"1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
399+
"1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000"
400+
).foreach { case (gregTs, hybridTs) =>
401+
withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") {
402+
val hybridMicros = withDefaultTimeZone(zid) { parseToJulianMicros(hybridTs) }
403+
val gregorianMicros = parseToGregMicros(gregTs, zid)
404+
405+
val tz = TimeZone.getTimeZone(zid)
406+
assert(rebaseGregorianToJulianMicros(tz, gregorianMicros) === hybridMicros)
407+
withDefaultTimeZone(zid) {
406408
assert(rebaseGregorianToJulianMicros(gregorianMicros) === hybridMicros)
407409
}
408410
}
@@ -416,38 +418,39 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
416418
// clocks were moved backward to become Sunday, 18 November, 1945 01:00:00 AM.
417419
// In this way, the overlap happened w/o Daylight Saving Time.
418420
val hkZid = getZoneId("Asia/Hong_Kong")
421+
var expected = "1945-11-18 01:30:00.0"
422+
var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0)
423+
var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
424+
var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
425+
var overlapInterval = MICROS_PER_HOUR
426+
if (earlierMicros + overlapInterval != laterMicros) {
427+
// Old JDK might have an outdated time zone database.
428+
// See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition
429+
// from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00"
430+
expected = "1945-09-14 23:30:00.0"
431+
ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0)
432+
earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
433+
laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
434+
// If time zone db doesn't have overlapping at all, set the overlap interval to zero.
435+
overlapInterval = laterMicros - earlierMicros
436+
}
437+
val hkTz = TimeZone.getTimeZone(hkZid)
438+
val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkTz, earlierMicros)
439+
val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkTz, laterMicros)
440+
assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros)
419441
withDefaultTimeZone(hkZid) {
420-
var expected = "1945-11-18 01:30:00.0"
421-
var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0)
422-
var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
423-
var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
424-
var overlapInterval = MICROS_PER_HOUR
425-
if (earlierMicros + overlapInterval != laterMicros) {
426-
// Old JDK might have an outdated time zone database.
427-
// See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition
428-
// from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00"
429-
expected = "1945-09-14 23:30:00.0"
430-
ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0)
431-
earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
432-
laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
433-
// If time zone db doesn't have overlapping at all, set the overlap interval to zero.
434-
overlapInterval = laterMicros - earlierMicros
435-
}
436-
val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkZid, earlierMicros)
437-
val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkZid, laterMicros)
438442
def toTsStr(micros: Long): String = toJavaTimestamp(micros).toString
439443
assert(toTsStr(rebasedEarlierMicros) === expected)
440444
assert(toTsStr(rebasedLaterMicros) === expected)
441-
assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros)
442445
// Check optimized rebasing
443446
assert(rebaseGregorianToJulianMicros(earlierMicros) === rebasedEarlierMicros)
444447
assert(rebaseGregorianToJulianMicros(laterMicros) === rebasedLaterMicros)
445448
// Check reverse rebasing
446449
assert(rebaseJulianToGregorianMicros(rebasedEarlierMicros) === earlierMicros)
447450
assert(rebaseJulianToGregorianMicros(rebasedLaterMicros) === laterMicros)
448-
// Check reverse not-optimized rebasing
449-
assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros)
450-
assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros)
451451
}
452+
// Check reverse not-optimized rebasing
453+
assert(rebaseJulianToGregorianMicros(hkTz, rebasedEarlierMicros) === earlierMicros)
454+
assert(rebaseJulianToGregorianMicros(hkTz, rebasedLaterMicros) === laterMicros)
452455
}
453456
}

0 commit comments

Comments
 (0)