-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-7289] handle project -> limit -> sort efficiently #6780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #34755 has finished for PR 6780 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LimitPushDown should run after ColumnPruning. For something like Limit(Project(Sort(...))), we should try to push down Project through Sort first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rules within a single batch shouldn't be sensitive to execution order. Especially for FixedPoint batches.
|
Test build #34759 has finished for PR 6780 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suite
|
What's the relationship between this PR and SPARK-7289? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for SPARK-7289. As we discussed in #5821, we need an optimize rule like this.
cc @adrian-wang
|
Test build #34890 has finished for PR 6780 at commit
|
|
Test build #34892 has finished for PR 6780 at commit
|
|
Test build #34901 has finished for PR 6780 at commit
|
|
Test build #34904 has finished for PR 6780 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused here that we have to add @transient for it to avoid NotSerializableException . It looks to me that TakeOrderedAndProject should run on driver side and it doesn't pass its reference to RDD functions. Sorry if I missed something here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you see errors here? I removed @transient and sbt sql/test still works for me.
|
How about the |
|
Hmm...this PR doesn't change the handling of |
|
Test build #35230 has finished for PR 6780 at commit
|
|
cc @marmbrus , |
|
Test build #35395 has finished for PR 6780 at commit
|
|
Test build #35400 has finished for PR 6780 at commit
|
|
retest this please. |
|
Test build #35406 has finished for PR 6780 at commit
|
|
Test build #35408 has finished for PR 6780 at commit
|
|
Test build #35410 has finished for PR 6780 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made these changes to clean the closure and avoid referencing the $out so that we don't need to add a lot of @ transient. However, it fails a test CliSuite.Commands using SerDe provided in --jars with timeout. Does anybody know the reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears the change is breaking something in the serialization debugger.
15/06/22 22:25:41.020 main WARN SerializationDebugger: Exception in serialization debugger
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:248)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:107)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:166)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:107)
at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:66)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:319)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:312)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:139)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:114)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:186)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:125)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:263)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:89)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:89)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:88)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:986)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:986)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:143)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:127)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:50)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:786)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:61)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:283)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:423)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:218)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:621)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
That said, it would be good if you could limit the changes in a given PR to the task at hand, and do refactorings in a separate PR. It's okay when they are very minor (i.e. fixing spelling), but grouping them together makes the PR harder to review, harder to backport, and causes it to get blocked on unrelated problems (such as this serialization issue).
|
Test build #35558 has finished for PR 6780 at commit
|
|
Test build #35634 has finished for PR 6780 at commit
|
|
retest this please. |
|
Test build #35652 has finished for PR 6780 at commit
|
|
Thanks! Merged to master. |
… branch 1.4 The bug fixed by SPARK-7289 is a pretty serious one (Spark SQL generates wrong results). We should backport the fix to branch 1.4 (#6780). Also, we need to backport the fix of `TakeOrderedAndProject` as well (#8179). Author: Wenchen Fan <[email protected]> Author: Yin Huai <[email protected]> Closes #8252 from yhuai/backport7289And9949.
make the
TakeOrderedstrategy and operator more general, such that it can optionally handle a projection when necessary