Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Oct 12, 2016

What changes were proposed in this pull request?

This change is a followup for #15389 which calls _to_java_object_rdd() to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too.

Simple benchmark:

import time
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), 2)
start = time.time()
l = a.repartition(num_partitions).glom().map(len).collect()
end = time.time()
print(end - start)

Before: 419.447577953
_to_java_object_rdd(): 421.916361094
decreasing the batch size: 423.712255955

How was this patch tested?

Jenkins tests.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66789 has finished for PR 15445 at commit be6d153.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

Do you have benchmark on this change?

@davies
Copy link
Contributor

davies commented Oct 12, 2016

For the change related to performance, should be verified by benchmark, unless it's obvious.

@davies
Copy link
Contributor

davies commented Oct 12, 2016

Just saw the result from that PR (posting here would be great), we may don't need this PR if there is no noticable difference (even for complicated types).

@viirya
Copy link
Member Author

viirya commented Oct 13, 2016

@felixcheung I post the benchmark in #15389. Now post here too.

@viirya
Copy link
Member Author

viirya commented Oct 13, 2016

@davies @felixcheung I ran another benchmark as follows:

import time
import random

num_partitions = 20000
a = sc.parallelize(map(lambda x: [random.randint(0,1000) for r in xrange(20)], range(20000)))
start = time.time()
l = a.repartition(num_partitions).glom().map(len).collect()
end = time.time()
print(end - start)

_to_java_object_rdd(): 424.308749914
decreasing the batch size: 425.877130032

The time difference is not obvious.

However, when I ran another benchmark with numpy array. I found that the _to_java_object_rdd() approach has another problem on unpickling custom python object in java side.

When running the following code:

import time
import numpy as np

num_partitions = 20000
a = sc.parallelize(map(lambda x: np.random.rand(20), range(20000)), 2)
start = time.time()
l = a.repartition(num_partitions).glom().map(len).collect()
end = time.time()
print(end - start)

_to_java_object_rdd() will throw exception:

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:152)
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:151)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)

Consider the issue of pickling python object in converting to java rdd, I think this PR might be better solution.

@viirya
Copy link
Member Author

viirya commented Oct 15, 2016

ping @davies @felixcheung Could you take a look to see if we want to apply this? Thanks!

@viirya
Copy link
Member Author

viirya commented Oct 18, 2016

ping @davies @felixcheung May you review this again? Thanks.

@davies
Copy link
Contributor

davies commented Oct 18, 2016

@viirya That's good point, javaToPython can only used for known types that could deserialized in Java (for example, some types in sql and ml), this PR make better sense.

Merging this into master.

@asfgit asfgit closed this in 1e35e96 Oct 18, 2016
@viirya
Copy link
Member Author

viirya commented Oct 19, 2016

@davies @felixcheung Thanks!

robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
… in Highly Skewed Partition Sizes

## What changes were proposed in this pull request?

This change is a followup for apache#15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too.

Simple benchmark:

    import time
    num_partitions = 20000
    a = sc.parallelize(range(int(1e6)), 2)
    start = time.time()
    l = a.repartition(num_partitions).glom().map(len).collect()
    end = time.time()
    print(end - start)

Before: 419.447577953
_to_java_object_rdd(): 421.916361094
decreasing the batch size: 423.712255955

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#15445 from viirya/repartition-batch-size.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… in Highly Skewed Partition Sizes

## What changes were proposed in this pull request?

This change is a followup for apache#15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too.

Simple benchmark:

    import time
    num_partitions = 20000
    a = sc.parallelize(range(int(1e6)), 2)
    start = time.time()
    l = a.repartition(num_partitions).glom().map(len).collect()
    end = time.time()
    print(end - start)

Before: 419.447577953
_to_java_object_rdd(): 421.916361094
decreasing the batch size: 423.712255955

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#15445 from viirya/repartition-batch-size.
@viirya viirya deleted the repartition-batch-size branch December 27, 2023 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants