Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Nov 23, 2021

What changes were proposed in this pull request?

This PR proposes to replace Python's built-in CPickle to CPickle-based cloudpickle (requires Python 3.8+).
For Python 3.7 and below, it still uses the legacy built-in CPickle for the performance matter.

I did a bit of benchmark with basic cases, and I have seen no performance penalty (attached one of the benchmarks below).

Why are the changes needed?

To remove named tuple hack for the issues such as: SPARK-32079, SPARK-22674 and SPARK-27810.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Micro benchmark:

import time
import pickle
from pyspark import cloudpickle

def measure(f):
    start = time.time()
    f()
    end = time.time()
    print(end - start)

data = [123, "abc", (1, 2, 3), 2.2] * 100000000
measure(lambda: pickle.dumps(data))
measure(lambda: cloudpickle.dumps(data))
measure(lambda: pickle.loads(pickle.dumps(data)))
measure(lambda: cloudpickle.loads(cloudpickle.dumps(data)))
5.1765618324279785
5.2591071128845215
12.457043886184692
12.1910879611969
import time
import random
import pickle
from pyspark import cloudpickle

def measure(f):
    start = time.time()
    f()
    end = time.time()
    print(end - start)

rand_data = []

for _ in range(10000000):
    data = [
        random.randint(1, 100),
        str(random.randint(1, 100)),
        (random.randint(1, 100), random.randint(2, 200), random.randint(3, 300)),
        random.random()
    ]
    random.shuffle(data)
    rand_data.append(data)

measure(lambda: pickle.dumps(rand_data))
measure(lambda: cloudpickle.dumps(rand_data))
measure(lambda: pickle.loads(pickle.dumps(rand_data)))
measure(lambda: cloudpickle.loads(cloudpickle.dumps(rand_data)))
7.736639976501465
7.8458099365234375
20.306012868881226
17.787282943725586

E2E benchmark:

./bin/pyspark --conf spark.python.profile=true
import time
from collections import namedtuple
rdd = sc.parallelize([123] * 30000000)
rdd.count()  # init
start = time.time()
rdd.map(lambda x: x).count()
print(time.time() - start)
sc.show_profiles()

Before:

2.3216118812561035 (sec)

============================================================
Profile of RDD<id=2>
============================================================
         60264297 function calls (60264265 primitive calls) in 22.309 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000016   13.127    0.000   29.890    0.000 rdd.py:1291(<genexpr>)
       32    4.559    0.142   34.449    1.077 {built-in method builtins.sum}
 30000000    3.723    0.000    3.723    0.000 <stdin>:1(<lambda>)
    29297    0.699    0.000    0.699    0.000 {built-in method _pickle.loads}
    29313    0.059    0.000    0.874    0.000 serializers.py:151(_read_with_length)
    58610    0.045    0.000    0.045    0.000 {method 'read' of '_io.BufferedReader' objects}
    29313    0.035    0.000    0.057    0.000 serializers.py:567(read_int)
    29313    0.025    0.000    0.899    0.000 serializers.py:135(load_stream)
    29297    0.016    0.000    0.715    0.000 serializers.py:435(loads)
    29313    0.013    0.000    0.013    0.000 {built-in method _struct.unpack}
    29329    0.006    0.000    0.006    0.000 {built-in method builtins.len}
       16    0.000    0.000    0.000    0.000 rdd.py:409(func)
       16    0.000    0.000    0.001    0.000 serializers.py:256(dump_stream)
...

After:

2.279919147491455 (sec)

============================================================
Profile of RDD<id=2>
============================================================
         90264361 function calls (90264329 primitive calls) in 34.573 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000016   13.204    0.000   29.982    0.000 rdd.py:1291(<genexpr>)
 30000000   12.087    0.000   15.879    0.000 util.py:77(wrapper)
       32    4.588    0.143   34.571    1.080 {built-in method builtins.sum}
 30000000    3.792    0.000    3.792    0.000 <stdin>:1(<lambda>)
    29297    0.694    0.000    0.694    0.000 {built-in method _pickle.loads}
    29313    0.061    0.000    0.873    0.000 serializers.py:157(_read_with_length)
    58610    0.045    0.000    0.045    0.000 {method 'read' of '_io.BufferedReader' objects}
    29313    0.036    0.000    0.059    0.000 serializers.py:585(read_int)
    29313    0.026    0.000    0.900    0.000 serializers.py:141(load_stream)
    29297    0.018    0.000    0.712    0.000 serializers.py:463(loads)
    29313    0.013    0.000    0.013    0.000 {built-in method _struct.unpack}
    29329    0.007    0.000    0.007    0.000 {built-in method builtins.len}
       16    0.000    0.000   34.573    2.161 worker.py:665(process)
       16    0.000    0.000    0.000    0.000 rdd.py:409(func)
       16    0.000    0.000    0.001    0.000 serializers.py:262(dump_stream)
       16    0.000    0.000    0.001    0.000 cloudpickle_fast.py:59(dumps)
...

Existing test cases should cover all test cases.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-32079][PYTHON] Remove namedtuple hack by replace built-in pickle to cloudpickle [WIP][SPARK-32079][PYTHON] Remove namedtuple hack by replacing built-in pickle to cloudpickle Nov 23, 2021
@SparkQA

This comment has been minimized.

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-32079][PYTHON] Remove namedtuple hack by replacing built-in pickle to cloudpickle [SPARK-32079][PYTHON] Remove namedtuple hack by replacing built-in pickle to cloudpickle Nov 23, 2021
@HyukjinKwon HyukjinKwon marked this pull request as ready for review November 23, 2021 09:47
@HyukjinKwon
Copy link
Member Author

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Nov 24, 2021

I added a couple of micro benchmarks to the PR description.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we kept same test coverage for two serializers?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good if we've not missed current test coverage.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

Rebased.

@HyukjinKwon
Copy link
Member Author

I'll merge this one in few days if there aren't any comment.

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Test build #145607 has finished for PR 34688 at commit 1c3c50a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CloudPickleSerializer(FramedSerializer):

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50081/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50081/

@HyukjinKwon
Copy link
Member Author

Thanks, @viirya.

Merged to master.

@HyukjinKwon HyukjinKwon deleted the SPARK-32079 branch January 4, 2022 00:51
HyukjinKwon added a commit that referenced this pull request Nov 18, 2022
…ple hack

### What changes were proposed in this pull request?

This PR is a followup of #34688 that adds a switch to turn on and off the namedtuple hack.

### Why are the changes needed?

There are still behaviour differences between regular pickle and Cloudpickle e.g., bug fixes from the upstream. It's safer to have a switch to turn on and off for the time being.

### Does this PR introduce _any_ user-facing change?

This remains as an internal environment so ideally no. In fact the main change itself was the internal change too.

### How was this patch tested?

Manually tested.

Closes #38700 from HyukjinKwon/SPARK-41189.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…ple hack

### What changes were proposed in this pull request?

This PR is a followup of apache#34688 that adds a switch to turn on and off the namedtuple hack.

### Why are the changes needed?

There are still behaviour differences between regular pickle and Cloudpickle e.g., bug fixes from the upstream. It's safer to have a switch to turn on and off for the time being.

### Does this PR introduce _any_ user-facing change?

This remains as an internal environment so ideally no. In fact the main change itself was the internal change too.

### How was this patch tested?

Manually tested.

Closes apache#38700 from HyukjinKwon/SPARK-41189.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
…ple hack

### What changes were proposed in this pull request?

This PR is a followup of apache#34688 that adds a switch to turn on and off the namedtuple hack.

### Why are the changes needed?

There are still behaviour differences between regular pickle and Cloudpickle e.g., bug fixes from the upstream. It's safer to have a switch to turn on and off for the time being.

### Does this PR introduce _any_ user-facing change?

This remains as an internal environment so ideally no. In fact the main change itself was the internal change too.

### How was this patch tested?

Manually tested.

Closes apache#38700 from HyukjinKwon/SPARK-41189.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…ple hack

### What changes were proposed in this pull request?

This PR is a followup of apache#34688 that adds a switch to turn on and off the namedtuple hack.

### Why are the changes needed?

There are still behaviour differences between regular pickle and Cloudpickle e.g., bug fixes from the upstream. It's safer to have a switch to turn on and off for the time being.

### Does this PR introduce _any_ user-facing change?

This remains as an internal environment so ideally no. In fact the main change itself was the internal change too.

### How was this patch tested?

Manually tested.

Closes apache#38700 from HyukjinKwon/SPARK-41189.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants