-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18634][PySpark][SQL] Corruption and Correctness issues with exploding Python UDFs #16120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #69572 has finished for PR 16120 at commit
|
|
Can you add WIP until this is ready? |
|
OK. WIP added. |
a31432a to
44aaf39
Compare
|
Test build #69608 has finished for PR 16120 at commit
|
44aaf39 to
a5594f7
Compare
|
Test build #69610 has started for PR 16120 at commit |
|
retest this please. |
|
Test build #69615 has finished for PR 16120 at commit
|
| // prepend the new qualifier to the existed one | ||
| generatorOutput.map(a => a.withQualifier(Some(q))) | ||
| ).getOrElse(generatorOutput) | ||
| val qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we qualify all the output attributes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm.... this is actually better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, actually this is what Generate did to prepare its output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I did a scan in codes and I can't find anyplace to assign the qualifier parameter for Generate. I only see using None for it.
@hvanhovell Do you know any place we have specified qualifier for a Generate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the same thing when I was looking at this. There is one place: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala#L522
I think the approach you took in this PR is actually the correct one, and that the current one had a latent bug which would be triggered by both the child and the generated producing an attribute with the same name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we make it a method or lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan making it a method seems better. But it is merged. Should I submit a tiny follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I think the current one should be ok too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea it's fine to leave it.
|
LGTM - merging to master/2.1/2.0 (if possible). Thanks! |
…ploding Python UDFs
## What changes were proposed in this pull request?
As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.
The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>>
>>> df = spark.range(10)
>>>
>>> def return_range(value):
... return [(i, str(i)) for i in range(value - 1, value + 1)]
...
>>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()),
... StructField("string_val", StringType())])))
>>>
>>> df.select("id", explode(range_udf(df.id))).show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)
The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.
Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes.
It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`.
However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen.
To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct.
## How was this patch tested?
Added test cases to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <[email protected]>
Closes #16120 from viirya/fix-py-udf-with-generator.
(cherry picked from commit 3ba69b6)
Signed-off-by: Herman van Hovell <[email protected]>
…ploding Python UDFs
## What changes were proposed in this pull request?
As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.
The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>>
>>> df = spark.range(10)
>>>
>>> def return_range(value):
... return [(i, str(i)) for i in range(value - 1, value + 1)]
...
>>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()),
... StructField("string_val", StringType())])))
>>>
>>> df.select("id", explode(range_udf(df.id))).show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)
The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.
Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes.
It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`.
However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen.
To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct.
## How was this patch tested?
Added test cases to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <[email protected]>
Closes #16120 from viirya/fix-py-udf-with-generator.
(cherry picked from commit 3ba69b6)
Signed-off-by: Herman van Hovell <[email protected]>
|
Thank you! @hvanhovell |
## What changes were proposed in this pull request? I jumped the gun on merging #16120, and missed a tiny potential problem. This PR fixes that by changing a val into a def; this should prevent potential serialization/initialization weirdness from happening. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes #16170 from hvanhovell/SPARK-18634. (cherry picked from commit 381ef4e) Signed-off-by: Herman van Hovell <[email protected]>
## What changes were proposed in this pull request? I jumped the gun on merging #16120, and missed a tiny potential problem. This PR fixes that by changing a val into a def; this should prevent potential serialization/initialization weirdness from happening. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes #16170 from hvanhovell/SPARK-18634. (cherry picked from commit 381ef4e) Signed-off-by: Herman van Hovell <[email protected]>
## What changes were proposed in this pull request? I jumped the gun on merging #16120, and missed a tiny potential problem. This PR fixes that by changing a val into a def; this should prevent potential serialization/initialization weirdness from happening. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes #16170 from hvanhovell/SPARK-18634.
…ploding Python UDFs
## What changes were proposed in this pull request?
As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.
The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>>
>>> df = spark.range(10)
>>>
>>> def return_range(value):
... return [(i, str(i)) for i in range(value - 1, value + 1)]
...
>>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()),
... StructField("string_val", StringType())])))
>>>
>>> df.select("id", explode(range_udf(df.id))).show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)
The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.
Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes.
It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`.
However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen.
To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct.
## How was this patch tested?
Added test cases to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <[email protected]>
Closes apache#16120 from viirya/fix-py-udf-with-generator.
## What changes were proposed in this pull request? I jumped the gun on merging apache#16120, and missed a tiny potential problem. This PR fixes that by changing a val into a def; this should prevent potential serialization/initialization weirdness from happening. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes apache#16170 from hvanhovell/SPARK-18634.
…ploding Python UDFs
## What changes were proposed in this pull request?
As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.
The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>>
>>> df = spark.range(10)
>>>
>>> def return_range(value):
... return [(i, str(i)) for i in range(value - 1, value + 1)]
...
>>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()),
... StructField("string_val", StringType())])))
>>>
>>> df.select("id", explode(range_udf(df.id))).show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)
The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.
Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes.
It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`.
However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen.
To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct.
## How was this patch tested?
Added test cases to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <[email protected]>
Closes apache#16120 from viirya/fix-py-udf-with-generator.
## What changes were proposed in this pull request? I jumped the gun on merging apache#16120, and missed a tiny potential problem. This PR fixes that by changing a val into a def; this should prevent potential serialization/initialization weirdness from happening. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes apache#16170 from hvanhovell/SPARK-18634.
What changes were proposed in this pull request?
As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.
The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.
The cause of this issue is, in
ExtractPythonUDFswe insertBatchEvalPythonExecto run PythonUDFs in batch.BatchEvalPythonExecwill add extra outputs (e.g.,pythonUDF0) to original plan. In above case, the originalRangeonly has one outputid. AfterExtractPythonUDFs, the addedBatchEvalPythonExechas two outputsidandpythonUDF0.Because the output of
GenerateExecis given after analysis phase, in above case, it is the combination ofid, i.e., the output ofRange, andcol. But in planning phase, we changeGenerateExec's child plan toBatchEvalPythonExecwith additional output attributes.It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of
GenerateExec.However, as
GenerateExecnow supports wholestage codegen, the framework will input all the outputs of the child plan toGenerateExec. Then when consumingGenerateExec's output data (i.e., callingconsume), the number of output attributes is different to the output variables in wholestage codegen.To solve this issue, this patch only gives the generator's output to
GenerateExecafter analysis phase.GenerateExec's output is the combination of its child plan's output and the generator's output. So when we changeGenerateExec's child, its output is still correct.How was this patch tested?
Added test cases to PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull request.