-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18189] [SQL] Fix serialization issue in KeyValueGroupedDataset #15706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Ok to test |
|
This guy @transient must get a lot of emails ... |
|
Can you change the title to "[SQL]" rather than "[core]"? |
|
LGTM pending Jenkins. |
|
Test build #3383 has finished for PR 15706 at commit
|
|
Actually is there a test case you can create for this? Make sure the test case would fail when the patch is not applied. |
|
Test build #3385 has finished for PR 15706 at commit
|
|
Hi @rxin , I added a unittest in DatasetSuite but I wasn't able to repro it. Bug only reproduce in spark-shell. That's why I moved the unittest to ReplSuite. I checked the history and seems like we are adding unittests to only scala-2.11 now. If that's not the case, I can create same one in ReplSuite scala-210 Please let me know. Thanks |
|
Thanks! LGTM pending Jenkins. |
|
Test build #3388 has finished for PR 15706 at commit
|
|
Merging in master/branch-2.0. |
## What changes were proposed in this pull request? Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient. As mentioned in the Jira ticket, without transient we saw serialization issues like ``` Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == ``` ## How was this patch tested? Run the query which is specified in the Jira ticket before and after: ``` val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)] val grouped = a.groupByKey( {x:(Int,Int)=>x._1} ) val mappedGroups = grouped.mapGroups((k,x)=> {(k,1)} ) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=> { val simpley = yyy.value 1 } ) ``` Author: Ergin Seyfe <[email protected]> Closes #15706 from seyfe/keyvaluegrouped_serialization. (cherry picked from commit 8a538c9) Signed-off-by: Reynold Xin <[email protected]>
|
I think this might be causing the master/2.0 builds (but only Maven, weirdly) to fail. This includes the 2.0.2 RC2, which I built with Maven. See for example: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.2/2200/ |
|
@srowen that's weird - do you have time to dig into this a bit? |
|
Yeah will dig in, may not have an answer until tomorrow. If you're looking to push ahead 2.0.2 I'd just revert in branch-2.0 for now. |
|
@seyfe so my guess is that, not surprisingly, this is some pretty arcane interaction between several of a) codegen, b) Netty's class hierarchy, c) SBT vs Maven classloading and d) the spark-shell's classloading peculiarities. I strongly suspect it's an issue with the test of course, not the underlying change. I wonder if we can just write a different test that doesn't trip this. Is it necessary to test this in the REPL? can we just write the code it tests as a test directly? |
|
Hi @srowen. I initially put the test in DatasetSuite but the test was passing with and without my change. Since, I was able to repro only in spark-shell I moved the test to REPL suite. I didn't run the test in branch-2.0 only run in master. Let me check if we can fix it so, it would work on branch-2.0 as well. |
|
I run the test in branch-2.0 locally and it works fine for me. |
|
I think it may only involve the same exception but I am not sure it is the same. I don't like removing tests of course but I think I conclude that the issue you intend to fix is fixed. It is the test that has a problem. We do want to make the change but need a test that will pass. If there is no obvious workaround for what looks to me like a fairly obscure class loader issue, then we could make this test not use the REPL. It would still test something useful and catch some potential regressions. It would not evidently catch this one though I suppose it is unlikely this exact issue would manifest again. I could live with a simpler test to make this work and get the change into 2.0.2 because otherwise it has to be reverted for now. |
|
@srowen, that is fair. Let me move this test from Repl into DatasetSuite for branch-2.0 and send a PR. |
## What changes were proposed in this pull request? Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient. As mentioned in the Jira ticket, without transient we saw serialization issues like ``` Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == ``` ## How was this patch tested? Run the query which is specified in the Jira ticket before and after: ``` val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)] val grouped = a.groupByKey( {x:(Int,Int)=>x._1} ) val mappedGroups = grouped.mapGroups((k,x)=> {(k,1)} ) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=> { val simpley = yyy.value 1 } ) ``` Author: Ergin Seyfe <[email protected]> Closes apache#15706 from seyfe/keyvaluegrouped_serialization.
What changes were proposed in this pull request?
Likewise DataSet.scala KeyValueGroupedDataset should mark the queryExecution as transient.
As mentioned in the Jira ticket, without @transient we saw serialization issues like
How was this patch tested?
Run the query which is specified in the Jira ticket before and after: