Skip to content

Conversation

@redsanket
Copy link

@redsanket redsanket commented May 21, 2020

What changes were proposed in this pull request?

UnionRDD of PairRDDs causing a bug. The fix is to check for instance type before proceeding

Why are the changes needed?

Changes are needed to avoid users running into issues with union rdd operation with any other type other than JavaRDD.

Does this PR introduce any user-facing change?

Yes

Before:
SparkSession available as 'spark'.

rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([6,7,8,9,10])
pairRDD1 = rdd1.zip(rdd2)
unionRDD1 = sc.union([pairRDD1, pairRDD1])
Traceback (most recent call last): File "", line 1, in File "/home/gs/spark/latest/python/pyspark/context.py", line 870,
in union jrdds[i] = rdds[i]._jrdd
File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 238, in setitem File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 221,
in __set_item File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.None. Trace: py4j.Py4JException: Cannot convert org.apache.spark.api.java.JavaPairRDD to org.apache.spark.api.java.JavaRDD at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166) at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144) at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

After:

rdd2 = sc.parallelize([6,7,8,9,10])
pairRDD1 = rdd1.zip(rdd2)
unionRDD1 = sc.union([pairRDD1, pairRDD1])
unionRDD1.collect()
[(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)]

How was this patch tested?

Tested with the reproduced piece of code above manually

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 22, 2020

Shell we also add a unit test? Also, please describe before/after this fix in "Does this PR introduce any user-facing change?". Technically I think this IS a user-facing behaviour changes from error to working case.

@redsanket
Copy link
Author

ok

@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon HyukjinKwon changed the title [SPARK-31788][CORE] Fix UnionRDD of PairRDDs [SPARK-31788][CORE][PYTHON] Fix UnionRDD of PairRDDs May 23, 2020
@SparkQA
Copy link

SparkQA commented May 23, 2020

Test build #123023 has finished for PR 28603 at commit c65be0d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.0.

HyukjinKwon pushed a commit that referenced this pull request May 25, 2020
### What changes were proposed in this pull request?
UnionRDD of PairRDDs causing a bug. The fix is to check for instance type before proceeding

### Why are the changes needed?
Changes are needed to avoid users running into issues with union rdd operation with any other type other than JavaRDD.

### Does this PR introduce _any_ user-facing change?
Yes

Before:
SparkSession available as 'spark'.
>>> rdd1 = sc.parallelize([1,2,3,4,5])
>>> rdd2 = sc.parallelize([6,7,8,9,10])
>>> pairRDD1 = rdd1.zip(rdd2)
>>> unionRDD1 = sc.union([pairRDD1, pairRDD1])
Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/gs/spark/latest/python/pyspark/context.py", line 870,
in union jrdds[i] = rdds[i]._jrdd
File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 238, in setitem File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 221,
in __set_item File "/home/gs/spark/latest/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error occurred while calling None.None. Trace: py4j.Py4JException: Cannot convert org.apache.spark.api.java.JavaPairRDD to org.apache.spark.api.java.JavaRDD at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166) at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144) at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

After:
>>> rdd2 = sc.parallelize([6,7,8,9,10])
>>> pairRDD1 = rdd1.zip(rdd2)
>>> unionRDD1 = sc.union([pairRDD1, pairRDD1])
>>> unionRDD1.collect()
[(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)]

### How was this patch tested?
Tested with the reproduced piece of code above manually

Closes #28603 from redsanket/SPARK-31788.

Authored-by: schintap <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit a61911c)
Signed-off-by: HyukjinKwon <[email protected]>
@leewyang
Copy link
Contributor

@redsanket @HyukjinKwon I pulled the latest branch-3.0 today (which includes this patch), but I'm now seeing the following weird behavior:

>>> rdd1 = sc.parallelize([1,2,3,4,5])
>>> rdd2 = sc.parallelize([6,7,8,9,10])
>>> pairRDD1 = rdd1.zip(rdd2)
>>> unionRDD1 = sc.union([pairRDD1, pairRDD1])
>>> unionRDD1.collect()
[((1, 6), (2, 7)), ((3, 8), (4, 9)), ((5, 10), (1, 6)), ((2, 7), (3, 8)), ((4, 9), (5, 10))] 
>>> unionRDD1.count()
0 

... where Spark 2.4.5 produces:

>>> unionRDD1.collect()
[(1, 6), (2, 7), (3, 8), (4, 9), (5, 10), (1, 6), (2, 7), (3, 8), (4, 9), (5, 10)]
>>> unionRDD1.count()
10

So, the output is incorrect/unexpected and the count is zero.

@redsanket
Copy link
Author

redsanket commented May 26, 2020

@leewyang you are right the review changes suggested by @HyukjinKwon caused the change in behavior 1d8d308. It was working as expected prior to that. @HyukjinKwon are we sure the mapping of JavaPairRDD to JavaRDD is the right approach here?

Original pr code snippet reference

jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD	       
is_jrdd = is_instance_of(gw, rdds[0]._jrdd, cls)
pair_jrdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD	
double_jrdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD	
if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls):	
    cls = jrdd_cls	
elif is_instance_of(gw, rdds[0]._jrdd, pair_jrdd_cls):	
    cls = pair_jrdd_cls	
elif is_instance_of(gw, rdds[0]._jrdd, double_jrdd_cls):	
    cls = double_jrdd_cls	
else:	
    raise TypeError("Unsupported java rdd class %s", rdds[0]._jrdd)

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 27, 2020

Okay, I just noticed f83fedc caused this problem, and this is a regression. I am going to revert this PR.

@HyukjinKwon
Copy link
Member

Yes, I think I rushed to read it. Let me make another PR to use your fix. I think we should fix streaming side together fixed in f83fedc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants