Skip to content

Conversation

@beliefer
Copy link
Contributor

@beliefer beliefer commented Nov 25, 2021

What changes were proposed in this pull request?

This PR used to fix the issue
#33588 (comment)

The root cause is Orc write/read timestamp with local timezone in default. The local timezone will be changed.
If the Orc writer write timestamp with local timezone(e.g. America/Los_Angeles), when the Orc reader reading the timestamp with local timezone(e.g. Europe/Amsterdam), the value of timestamp will be different.

If we let the Orc writer write timestamp with UTC timezone, when the Orc reader reading the timestamp with UTC timezone too, the value of timestamp will be correct.

This PR let Orc write/read Timestamp with UTC timezone by call useUTCTimestamp(true) for readers or writers.

The related Orc source:
https://github.com/apache/orc/blob/3f1e57cf1cebe58027c1bd48c09eef4e9717a9e3/java/core/src/java/org/apache/orc/impl/WriterImpl.java#L525

https://github.com/apache/orc/blob/1f68ac0c7f2ae804b374500dcf1b4d7abe30ffeb/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java#L1184

Another problem is Spark 3.3 or newer read the Orc file written by Spark 3.2 or prior. Because the older Spark write timestamp with local timezone, no need to read them with UTC timezone. Otherwise, an incorrect value of timestamp occurs.

Why are the changes needed?

Fix the bug for Orc timestamp.

Does this PR introduce any user-facing change?

Orc timestamp ntz is a new feature not release yet.

How was this patch tested?

New tests.

@github-actions github-actions bot added the SQL label Nov 25, 2021
@beliefer beliefer changed the title [SPARK-37463][SQL] read/write Timestamp ntz or ltz to Orc uses UTC timestamp [SPARK-37463][SQL] Read/Write Timestamp ntz or ltz to Orc uses UTC timestamp Nov 25, 2021
@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Test build #145630 has finished for PR 34712 at commit 8c01499.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

: Option[TypeDescription] = {
val fs = file.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs).useUTCTimestamp(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? it just read the footer to get the schema


val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs).useUTCTimestamp(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems not needed as it's only used to do column pruning

for (tz <- tzs) {
TimeZone.setDefault(TimeZone.getTimeZone(tz))
sql(s"set spark.sql.session.timeZone = $tz")
sql(queryOrc).collect().equals(sql(queryParquet).collect())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's test with withAllOrcReaders

}

test("SPARK-37463: read/write Timestamp ntz or ltz to Orc uses UTC timestamp") {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's global. please set it back at the end of the test


test("SPARK-37463: read/write Timestamp ntz or ltz to Orc uses UTC timestamp") {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
sql("set spark.sql.session.timeZone = America/Los_Angeles")
Copy link
Contributor

@cloud-fan cloud-fan Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this orc bug is only about the jvm timezone, not the spark session timezone, we don't need to set it.

sql("select timestamp_ntz '2021-06-01 00:00:00' ts_ntz, timestamp '2021-06-01 00:00:00' ts")

df.write.mode("overwrite").orc("ts_ntz_orc")
df.write.mode("overwrite").parquet("ts_ntz_parquet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check with the actual result, instead of comparing with parquet?

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50101/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Test build #145631 has finished for PR 34712 at commit 7c343a7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50101/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50102/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50102/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50105/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50106/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50106/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50105/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Test build #145634 has finished for PR 34712 at commit 92d7b8f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Test build #145635 has finished for PR 34712 at commit 998e93d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val split = inputSplit.asInstanceOf[FileSplit]
val conf = taskAttemptContext.getConfiguration()
val readOptions = OrcFile.readerOptions(conf)
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)).useUTCTimestamp(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While useUTCTimestamp(true) makes TIMESTAMP_NTZ consistent between ORC and Parquet/Avro, it also makes TIMESTAMP consistent between ORC and Parquet/Avro. While that seems like a good thing, it's a behavior change between 3.2 (which is consistent with earlier versions, including 2.x) and the upcoming 3.3

(Up to and including 3.2, TIMESTAMP with ORC has behaved more like TIMESTAMP_NTZ).

This also seems to break reading ORC files written by earlier Spark versions.

For example, In Spark 3.2.0, in the America/Los_Angeles timezone, or some other non-UTC timezone, write a timestamp to ORC:

sql("select timestamp '2021-06-01 00:00:00' ts").write.mode("overwrite").format("orc").save("/tmp/testdata/ts_orc_spark32")

In Spark with this PR, running in the same timezone as above, read the timestamp:

scala> sql("select * from `orc`.`/tmp/testdata/ts_orc_spark32`").show(false)
+-------------------+
|ts                 |
+-------------------+
|2021-05-31 17:00:00| 
+-------------------+

scala> 

(Note that the above will appear to work correctly if your local timezone is set to UTC).

I think the fix to TIMESTAMP might need to be done with a feature flag, and the fix for TIMESTAMP_NTZ might need to be more targeted to just TIMESTAMP_NTZ.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bersprockets Thank you for your investigation.

…ckle to cloudpickle

### What changes were proposed in this pull request?

This PR proposes to replace Python's built-in CPickle to CPickle-based cloudpickle (requires Python 3.8+).
For Python 3.7 and below, it still uses the legacy built-in CPickle for the performance matter.

I did a bit of benchmark with basic cases, and I have seen no performance penalty (attached one of the benchmarks below).

### Why are the changes needed?

To remove named tuple hack for the issues such as: SPARK-32079,  SPARK-22674 and SPARK-27810.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

#### Micro benchmark:

```python
import time
import pickle
from pyspark import cloudpickle

def measure(f):
    start = time.time()
    f()
    end = time.time()
    print(end - start)

data = [123, "abc", (1, 2, 3), 2.2] * 100000000
measure(lambda: pickle.dumps(data))
measure(lambda: cloudpickle.dumps(data))
measure(lambda: pickle.loads(pickle.dumps(data)))
measure(lambda: cloudpickle.loads(cloudpickle.dumps(data)))
```

```
5.1765618324279785
5.2591071128845215
12.457043886184692
12.1910879611969
```

```python
import time
import random
import pickle
from pyspark import cloudpickle

def measure(f):
    start = time.time()
    f()
    end = time.time()
    print(end - start)

rand_data = []

for _ in range(10000000):
    data = [
        random.randint(1, 100),
        str(random.randint(1, 100)),
        (random.randint(1, 100), random.randint(2, 200), random.randint(3, 300)),
        random.random()
    ]
    random.shuffle(data)
    rand_data.append(data)

measure(lambda: pickle.dumps(rand_data))
measure(lambda: cloudpickle.dumps(rand_data))
measure(lambda: pickle.loads(pickle.dumps(rand_data)))
measure(lambda: cloudpickle.loads(cloudpickle.dumps(rand_data)))
```

```
7.736639976501465
7.8458099365234375
20.306012868881226
17.787282943725586
```

#### E2E benchmark:

```bash
./bin/pyspark --conf spark.python.profile=true
```

```python
import time
from collections import namedtuple
rdd = sc.parallelize([123] * 30000000)
rdd.count()  # init
start = time.time()
rdd.map(lambda x: x).count()
print(time.time() - start)
sc.show_profiles()
```

Before:

2.3216118812561035 (sec)

```
============================================================
Profile of RDD<id=2>
============================================================
         60264297 function calls (60264265 primitive calls) in 22.309 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000016   13.127    0.000   29.890    0.000 rdd.py:1291(<genexpr>)
       32    4.559    0.142   34.449    1.077 {built-in method builtins.sum}
 30000000    3.723    0.000    3.723    0.000 <stdin>:1(<lambda>)
    29297    0.699    0.000    0.699    0.000 {built-in method _pickle.loads}
    29313    0.059    0.000    0.874    0.000 serializers.py:151(_read_with_length)
    58610    0.045    0.000    0.045    0.000 {method 'read' of '_io.BufferedReader' objects}
    29313    0.035    0.000    0.057    0.000 serializers.py:567(read_int)
    29313    0.025    0.000    0.899    0.000 serializers.py:135(load_stream)
    29297    0.016    0.000    0.715    0.000 serializers.py:435(loads)
    29313    0.013    0.000    0.013    0.000 {built-in method _struct.unpack}
    29329    0.006    0.000    0.006    0.000 {built-in method builtins.len}
       16    0.000    0.000    0.000    0.000 rdd.py:409(func)
       16    0.000    0.000    0.001    0.000 serializers.py:256(dump_stream)
...
```

After:

2.279919147491455 (sec)

```
============================================================
Profile of RDD<id=2>
============================================================
         90264361 function calls (90264329 primitive calls) in 34.573 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000016   13.204    0.000   29.982    0.000 rdd.py:1291(<genexpr>)
 30000000   12.087    0.000   15.879    0.000 util.py:77(wrapper)
       32    4.588    0.143   34.571    1.080 {built-in method builtins.sum}
 30000000    3.792    0.000    3.792    0.000 <stdin>:1(<lambda>)
    29297    0.694    0.000    0.694    0.000 {built-in method _pickle.loads}
    29313    0.061    0.000    0.873    0.000 serializers.py:157(_read_with_length)
    58610    0.045    0.000    0.045    0.000 {method 'read' of '_io.BufferedReader' objects}
    29313    0.036    0.000    0.059    0.000 serializers.py:585(read_int)
    29313    0.026    0.000    0.900    0.000 serializers.py:141(load_stream)
    29297    0.018    0.000    0.712    0.000 serializers.py:463(loads)
    29313    0.013    0.000    0.013    0.000 {built-in method _struct.unpack}
    29329    0.007    0.000    0.007    0.000 {built-in method builtins.len}
       16    0.000    0.000   34.573    2.161 worker.py:665(process)
       16    0.000    0.000    0.000    0.000 rdd.py:409(func)
       16    0.000    0.000    0.001    0.000 serializers.py:262(dump_stream)
       16    0.000    0.000    0.001    0.000 cloudpickle_fast.py:59(dumps)
...
```

Existing test cases should cover all test cases.

Closes apache#34688 from HyukjinKwon/SPARK-32079.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50287/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50288/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50287/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50288/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Test build #145813 has finished for PR 34712 at commit aec499f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.