Skip to content

Conversation

@beliefer
Copy link
Contributor

What changes were proposed in this pull request?

#33588 (comment) show Spark cannot read/write timestamp ntz and ltz correctly. Based on the discussion #34712 (comment), we just to fix read/write timestamp ntz to Orc uses UTC timestamp.

The root cause is Orc write/read timestamp with local timezone in default. The local timezone will be changed.
If the Orc writer write timestamp with local timezone(e.g. America/Los_Angeles), when the Orc reader reading the timestamp with other local timezone(e.g. Europe/Amsterdam), the value of timestamp will be different.

If we let the Orc writer write timestamp with UTC timezone, when the Orc reader reading the timestamp with UTC timezone too, the value of timestamp will be correct.

The related Orc source:
https://github.com/apache/orc/blob/3f1e57cf1cebe58027c1bd48c09eef4e9717a9e3/java/core/src/java/org/apache/orc/impl/WriterImpl.java#L525

https://github.com/apache/orc/blob/1f68ac0c7f2ae804b374500dcf1b4d7abe30ffeb/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java#L1184

Why are the changes needed?

Fix the bug about read/write timestamp ntz from/to Orc with different times zone.

Does this PR introduce any user-facing change?

No. Orc timestamp ntz is a new feature not release yet.

How was this patch tested?

New tests.

@github-actions github-actions bot added the SQL label Nov 29, 2021
@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50187/

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50187/

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Test build #145717 has finished for PR 34741 at commit 1c6da02.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50191/

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50191/

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Test build #145721 has finished for PR 34741 at commit f500cf3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

ping @cloud-fan

def toOrcNTZ(micros: Long): OrcTimestamp = {
val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
val utcMicros = DateTimeUtils.toUTCTime(micros, TimeZone.getDefault.getID)
Copy link
Contributor

@bersprockets bersprockets Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is at least one issue with dates before 1883-11-17. Railway time zones didn't exist then, and java.time classes (which fromUTCTime/toUTCTime use) care about that.

Also, I surmise there will be an additional issue with pre-Gregorian values, since ORC assumes Julian when "shifting" values on read.

Even my POC has issues with pre-1883-11-17 values when the writer is in the Pacific/Pago_Pago time zone and the reader is in some other time zone, because it also uses the fromUTCTime/toUTCTime utility functions.

When dealing with hybrid Julian times, ORC doesn't have issues shifting values between time zones that didn't exist yet, since the old time-related classes didn't worry about that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're looking at this, I will also try to see if there is a way to safely shift the values before 1883-11-17 between time zones (probably will need julian<->gregorian rebases).

If there isn't a reasonable way, we might need to consider some other ORC datatype for storing these values (maybe Long?). Don't know if that's doable or reasonable...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand this issue better. From the ORC source code, seems like

  1. ORC writer shifts the timestamp value w.r.t. the JVM local timezone, and record the timezone in file footer
  2. ORC reader shifts the timestamp value w.r.t. both the JVM local timezone and the record writer timezone.

seems like we only need to change the ORC reader to shift the timestamp value by writer timezone?

Copy link
Contributor

@bersprockets bersprockets Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand this issue better. From the ORC source code, seems like

Almost: For the first point, the ORC writer stores the timestamp value passed by the caller as-is, and records the timezone in file footer (that's why we need to shift the value before passing it to ORC).

The devil, however, is in the details. I know enough to know there are issues with the above, but not enough to know every issue or solution.

For example, an offset for a particular time zone is not fixed. It changes depending on the point on the timeline.

When shifting, ORC determines the offsets for the two time zones based on the stored timestamp value (millis) (see here). Because ORC uses the old time APIs to do this, we should ensure we pass a correct Hybrid Julian epoch time to ORC (via a Timestamp object), or ORC could get the wrong offsets.

Because of that, I don't think you can pass an arbitrary long value in the Timestamp object and hope to properly reconstruct it on read. You might not know how ORC shifted it.

In addition, the fromUTCTime/toUTCTime/convertTZ utilty methods work correctly, but are not appropriate for our needs here. fromUTCTime/toUTCTime/convertTZ uses the new Java time APIs, which don't work as we'd like for timestamps before the introduction of Railway time (circa 1883-11-17).

Emulating here what fromUTCTime does, you can see the issue here:

scala> val ldtUtc = ZonedDateTime.of(1883, 11, 16, 0, 0, 0, 0, ZoneId.of("UTC")).toLocalDateTime
ldtUtc: java.time.LocalDateTime = 1883-11-16T00:00

scala> val zdtShifted = ldtUtc.atZone(ZoneId.of("America/Los_Angeles"))
zdtShifted: java.time.ZonedDateTime = 1883-11-16T00:00-07:52:58[America/Los_Angeles]

scala>

Note how the time zone that the new APIs applied to the shifted value is -07:52:58, which is not some offset evenly divisible by 1 hour or 30 minutes.

As a result, you end up with a timestamp like this:

scala> val ts = new Timestamp(zdtShifted.toInstant.toEpochMilli)
ts: java.sql.Timestamp = 1883-11-15 23:52:58.0

scala> 

In fact, you can see a compounded version of this with this PR:

scala> TimeZone.setDefault(TimeZone.getTimeZone("UTC"))

scala> sql("select timestamp_ntz'1883-11-16 00:00:00.0' as ts_ntz").write.mode("overwrite").orc("test")

scala> TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))

scala> spark.read.orc("test").show(false)
+-------------------+
|ts_ntz             |
+-------------------+
|1883-11-16 00:07:02|
+-------------------+

scala> 

Edit: I assume the above is because of pre-railway shifting, I didn't verify that.

I updated my POC to attempt to accomodate for these issues by:

  • Stealing the timstamp shifting technique from ORC code (thus I avoid using fromUTCTime/toUTCTime)
  • Passing Hybrid Julian values to ORC on write, and assuming ORC retrieves a Hybrid Julian value on read.

That seems to have eliminated the pre-Railway and pre-Gregorian issues.

But there is more...

The ORC API accepts and returns Timestamp objects for writing and reading ORC timestamp fields. This alone introduces some oddities that will be noticable to end users.

For example, not all timestamp_ntz values can be represented with Timestamp in all time zones. The date/time '2021-03-14 02:15:00.0' doesn't exist in the America/Los_Angeles time zone.

scala> TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))

scala> val ts = java.sql.Timestamp.valueOf("2021-03-14 02:15:00.0")
ts: java.sql.Timestamp = 2021-03-14 03:15:00.0

scala>

So we will have oddities like this (using my POC code):

scala> import java.util.TimeZone
import java.util.TimeZone

scala> TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))

scala> sql("select timestamp_ntz'2021-03-14 02:15:00.0' as ts_ntz").write.mode("overwrite").orc("test")

scala> spark.read.orc("test").show(false)
+-------------------+
|ts_ntz             |
+-------------------+
|2021-03-14 01:15:00|
+-------------------+


scala> 

With this PR, you actually see it not with "spring forward", but with "fall back" (not sure why):

scala> import java.util.TimeZone
import java.util.TimeZone

scala> TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))

scala> sql("select timestamp_ntz'1996-10-27T09:10:25.088353' as ts_ntz").write.mode("overwrite").orc("test")

scala> spark.read.orc("test").show(false)
+--------------------------+
|ts_ntz                    |
+--------------------------+
|1996-10-27 08:10:25.088353|
+--------------------------+


scala>

So to summarize:

  • ORC needs the correct epoch values (pre-shifted, rebased to Hybrid Julian) to determine the correct offsets when shifting on read.
  • You can't use fromUTCTime/toUTCTime (or convertTZ) for shifting pre-Railway datetime values.
  • For the ORC timestamp type, the ORC API receives and returns Timestamp objects, but Timestamp objects alone introduce oddities.
  • It would certainly be nice if we could save and retrieve a long value (ala useUTCTime) without affecting timestamp_ltz.

Edit: caveat: I am no expert in the time APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bersprockets Thank you for the good investigation. I referenced the https://github.com/apache/orc/blob/334bf1f2c605f38c7e75ec81d1dab93c31fc8459/java/core/src/java/org/apache/orc/impl/SerializationUtils.java#L1444 and try to reverse conversion.

This PR runs the first and second example successfully, but the last '1996-10-27T09:10:25.088353' still output '1996-10-27 08:10:25.088353'.

@beliefer
Copy link
Contributor Author

ping @cloud-fan

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50205/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50205/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Test build #145735 has finished for PR 34741 at commit ee11bf4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class UDFBasicProfiler(BasicProfiler):
  • case class PrettyPythonUDF(

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50243/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50245/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50243/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50245/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Test build #145772 has finished for PR 34741 at commit 8e17559.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50265/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50265/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Test build #145792 has finished for PR 34741 at commit c289b97.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50284/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50284/

@cloud-fan
Copy link
Contributor

cloud-fan commented Dec 1, 2021

@bersprockets After reading more ORC code, I feel the timestamp implementation is quite messy in ORC. Not only the reader side, but also the writer side shifts the timestamp value according to the JVM local timezone: https://github.com/apache/orc/blob/1f68ac0c7f2ae804b374500dcf1b4d7abe30ffeb/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java#L112-L113

It seems like the ORC lib (the default behavior) is designed for people who want to deal with java.sql.Timestamp directly, not an engine like Spark that only treats ORC as a storage layer. Spark should set useUTCTimestamp as true but now it's too late as we need to support existing ORC files written by old Spark versions.

To fix the mistake in the storage layer, we need probably years to do a smooth migration. My proposal is:
Phase1:

  1. Write TIMESTAMP_NTZ as ORC int64, with a column property to indicate it's TIMESTAMP_NTZ (writing TIMESTAMP_LTZ should add the column property as well)
  2. Support reading ORC TIMESTAMP_INSTANT as Spark TIMESTAMP_LTZ.
  3. When reading ORC TIMESTAMP, check the column property to get the actual type (LTZ or NTZ).

Phase 2 (After Spark 3.3 becomes the least officially supported version):

  1. Write LTZ as ORC TIMESTAMP_INSTANT
  2. Write NTZ as ORC TIMESTAMP, with useUTCTimestamp set to true.
  3. Set useUTCTimestamp to true in the reader if the ORC file was written by the latest Spark version.

With this proposal, we can achieve:

  1. ORC files written by Spark 3.3 can be correctly read back by Spark 3.3
  2. Old spark versions and other systems will read TIMESTAMP_NTZ as long (not a big issue)
  3. Old ORC files can still be correctly read by Spark 3.3
  4. When phase 2 is completed, all the supported Spark versions and other systems can read the ORC files written by Spark correctly.
  5. When phase 2 is completed, all the supported Spark version can still read old ORC files correctly (we can look at the spark version in the file footer to decide if we should set useUTCTimestamp to true or not for the reader)

also cc @gengliangwang @MaxGekk

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50290/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Test build #145810 has finished for PR 34741 at commit 6aa5053.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50290/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Test build #145815 has finished for PR 34741 at commit 03a7640.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

@cloud-fan I have two concerns about your proposal:

  • Storing NTZ as ORC int64. will bring an extra burden on supporting this feature in the future releases.
  • Phase 2 introduces behavior changes and I don't think we can make it easily.

I would prefer storing as ORC NTZ and shifting the time zones on read/write. E.g. Currently LTZ actually works like NTZ from the comment #33588 (comment).
Or, we can try setting useUTCTimestamp and see how to keep the current LTZ behavior

@cloud-fan
Copy link
Contributor

@gengliangwang please read the previous discussions in previous PRs. We tried your proposals and none of them worked:

  1. ORC stored timezone per "column chunk" not per file, and we can't read this timezone info in the row-based ORC reader to shift the timestamp values.
  2. useUTCTimestamp is a global conf. If we set it, we break TIMESTAMP_LTZ.

I don't think there is a better option. Phase 2 is not a breaking change if no one is using Spark version < 3.3. It may take years but it's still possible. Before that, we are still in a good shape, the only problem is other systems reading ORC files written by Spark.

@gengliangwang
Copy link
Member

@cloud-fan I see. Let's go with your proposal.

cloud-fan pushed a commit that referenced this pull request Mar 24, 2022
### What changes were proposed in this pull request?
#33588 (comment) show Spark cannot read/write timestamp ntz and ltz correctly. Based on the discussion #34741 (comment), we just to fix read/write timestamp ntz to Orc uses int64.

### Why are the changes needed?
Fix the bug about read/write timestamp ntz from/to Orc with different times zone.

### Does this PR introduce _any_ user-facing change?
Yes. Orc timestamp ntz is a new feature.

### How was this patch tested?
New tests.

Closes #34984 from beliefer/SPARK-37463-int64.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Mar 24, 2022
### What changes were proposed in this pull request?
#33588 (comment) show Spark cannot read/write timestamp ntz and ltz correctly. Based on the discussion #34741 (comment), we just to fix read/write timestamp ntz to Orc uses int64.

### Why are the changes needed?
Fix the bug about read/write timestamp ntz from/to Orc with different times zone.

### Does this PR introduce _any_ user-facing change?
Yes. Orc timestamp ntz is a new feature.

### How was this patch tested?
New tests.

Closes #34984 from beliefer/SPARK-37463-int64.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e410d98)
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

This is replaced by #34984

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants