-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24721][SQL] Extract Python UDFs at the end of optimizer #22104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #94746 has finished for PR 22104 at commit
|
512f4b6 to
86d3b0c
Compare
|
Test build #94747 has finished for PR 22104 at commit
|
|
retest please |
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add another test case for arrow-based pandas udf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Will add.
|
@icexelloss Do we face the same issue for DataSourceStrategy? |
|
@gatorsmile Possibly, let me see if I can create a test case |
|
@gatorsmile Can you advise how to create a df with data source? All my attempts end up triggering FileSourceStrategy not DataSourceStrategy |
|
I think another way to fix this is to move the logic to |
|
@icexelloss we can implement a dummy data source v1/v2 at scala side and scan them in PySpark test with |
There's an example #21007 that implement something in Scala and use it in Python side test, and there are some fake source examples in |
|
Thanks @HyukjinKwon and @cloud-fan ! I will take a look |
|
Test build #94820 has finished for PR 22104 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably another bug I found in testing this - If the input node to EvalPythonExec doesn't produce UnsafeRow, and cast here will fail.
I found this in testing when I pass in an test data source scan node, which produces GeneralInternalRow, will throw exception here.
I am happy to submit this as a separate patch if people think it's necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.. This seems to break existing tests. Need to look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get rid of the logic previously in FileSourceStrategy to exclude PythonUDF in the filter in favor of this fix - I think this fix is cleaner.
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Added tests for file source, data source and data source v2. I might need to move the pandas_udf tests into another tests because of arrow_requirement :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created separate tests for pandas_udf under ScalarPandasUDFTests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires some discussion.
This is probably another bug I found in testing this - If the input node to EvalPythonExec doesn't produce UnsafeRow, and cast here will fail. I don't know if we require data sources to produce unsafe rows, if not, then this is a problem.
I also don't know if this will introduce additional copy if input is already UnsafeRow - it seems like UnsafeProject should be smart to skip the copy but I am not sure if it's actually the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Friendly ping @cloud-fan. Do you think forcing a unsafeProject here to deal with non-unsafe rows from data sources is correct? Is there a way to know whether the children nodes output unsafe rows so to avoid unnecessary unsafe projection here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally all the operators will produce UnsafeRow. If the data source does not produce UnsafeRow, Spark will make sure there will be a project above it to produce UnsafeRow, so we don't need to worry it here and safely assume the input is always UnsafeRow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I will remove this then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sorry, I don't think I am being very clear...
If the data source does not produce UnsafeRow, Spark will make sure there will be a project
above it to produce UnsafeRow
I don't think this is happening for datasource V2 right now:
(Code running in pyspark test)
datasource_v2_df = self.spark.read \
.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
.load()
result = datasource_v2_df.withColumn('x', udf(lambda x: x, 'int')(datasource_v2_df['i']))
result.show()
The code above fails with:
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:127)
at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
I think this is an issue with DataSourceV2 that probably should be addressed in another PR (DataSourceV1 works fine). @cloud-fan WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #94826 has finished for PR 22104 at commit
|
|
Test build #94822 has finished for PR 22104 at commit
|
|
Test build #94848 has finished for PR 22104 at commit
|
|
Tests pass now. This comment https://github.com/apache/spark/pull/22104/files#r210414941 requires some attention. @cloud-fan Do you think this is the right way to handle GenericInternalRow inputs here? |
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wouldn't work if test classes are not compiled. I think we should better make another test suite that skips the test if the test classes are not existent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I actually am not sure how does pyspark find these classes and how to check the existence, do you have an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can probably check try to check the existence of sql/core/target/scala-2.11/test-classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I think this is a bit fragile because things like "scala-2.11" (scala version can change).
Seems a bit over complicated to do this properly, when do we expect pyspark test to run without compiling scala test classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added checks to skip the tests if scala tests are not compiled
dcf07fb to
6b7445c
Compare
|
Test build #95169 has finished for PR 22104 at commit
|
|
@HyukjinKwon I addressed the comments. Do you mind taking a another look? |
|
Test build #95171 has finished for PR 22104 at commit
|
|
retest this please |
|
Let me take another look today or tomorrow. |
|
Test build #95246 has finished for PR 22104 at commit
|
3f0a97a to
2325a4f
Compare
|
Test build #95317 has finished for PR 22104 at commit
|
|
thanks, merging to master! |
|
LGTM |
|
Test build #95312 has finished for PR 22104 at commit
|
|
Thanks all for the review! |
## What changes were proposed in this pull request? The PR excludes Python UDFs filters in FileSourceStrategy so that they don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass Python UDF filters in FileSourceStrategy anyway because they cannot be used as push down filters. ## How was this patch tested? Add a new regression test Closes apache#22104 from icexelloss/SPARK-24721-udf-filter. Authored-by: Li Jin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
Just realized that the PR title and description is not updated. @icexelloss can you update them? thanks! |
|
@cloud-fan Sure! Updated |
| Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ | ||
| Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ | ||
| Batch("Extract Python UDFs", Once, | ||
| Seq(ExtractPythonUDFFromAggregate, ExtractPythonUDFs): _*) :+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks weird to add this rule in our optimizer batch. We need at least some comments to explain the reason in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we already have ExtractPythonUDFFromAggregate here...
## What changes were proposed in this pull request? This is a regression introduced by apache#22104 at Spark 2.4.0. When we have Python UDF in subquery, we will hit an exception ``` Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF at scala.collection.immutable.Stream.map(Stream.scala:414) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815) ... ``` apache#22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once. For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again. Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans. This PR proposes 2 changes to be double safe: 1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent 2. `ExtractPythonUDFs` should skip subquery ## How was this patch tested? a new test. Closes apache#23248 from cloud-fan/python. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
This is a regression introduced by apache#22104 at Spark 2.4.0. When we have Python UDF in subquery, we will hit an exception ``` Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF at scala.collection.immutable.Stream.map(Stream.scala:414) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815) ... ``` apache#22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once. For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again. Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans. This PR proposes 2 changes to be double safe: 1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent 2. `ExtractPythonUDFs` should skip subquery a new test. Closes apache#23248 from cloud-fan/python. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…uery ## What changes were proposed in this pull request? This PR backports #23248 which seems mistakenly not backported. This is a regression introduced by #22104 at Spark 2.4.0. When we have Python UDF in subquery, we will hit an exception ``` Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF at scala.collection.immutable.Stream.map(Stream.scala:414) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815) ... ``` #22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once. For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again. Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans. This PR proposes 2 changes to be double safe: 1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent 2. `ExtractPythonUDFs` should skip subquery ## How was this patch tested? a new test. Closes #27960 from HyukjinKwon/backport-SPARK-26293. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
In #12127 , we moved the ExtractPythonUDFs rule to the physical phase, while there was another option: do ExtractPythonUDFs at the end of optimizer.
Currently we hit 2 issues when exacting python UDFs at physical phase:
This PR proposes to move ExtractPythonUDFs to the end of optimizer.
How was this patch tested?
Add a new regression test