Skip to content

Commit bde1d41

Browse files
viiryahvanhovell
authored andcommitted
[SPARK-18634][PYSPARK][SQL] Corruption and Correctness issues with exploding Python UDFs
## What changes were proposed in this pull request? As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL. The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result. >>> from pyspark.sql.functions import * >>> from pyspark.sql.types import * >>> >>> df = spark.range(10) >>> >>> def return_range(value): ... return [(i, str(i)) for i in range(value - 1, value + 1)] ... >>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()), ... StructField("string_val", StringType())]))) >>> >>> df.select("id", explode(range_udf(df.id))).show() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/spark/python/pyspark/sql/dataframe.py", line 318, in show print(self._jdf.showString(n, 20)) File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120) at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57) The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`. Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes. It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`. However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen. To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct. ## How was this patch tested? Added test cases to PySpark. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #16120 from viirya/fix-py-udf-with-generator. (cherry picked from commit 3ba69b6) Signed-off-by: Herman van Hovell <[email protected]>
1 parent dc61ed4 commit bde1d41

File tree

4 files changed

+40
-10
lines changed

4 files changed

+40
-10
lines changed

python/pyspark/sql/tests.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,26 @@ def test_udf_in_generate(self):
379379
row = df.select(explode(f(*df))).groupBy().sum().first()
380380
self.assertEqual(row[0], 10)
381381

382+
df = self.spark.range(3)
383+
res = df.select("id", explode(f(df.id))).collect()
384+
self.assertEqual(res[0][0], 1)
385+
self.assertEqual(res[0][1], 0)
386+
self.assertEqual(res[1][0], 2)
387+
self.assertEqual(res[1][1], 0)
388+
self.assertEqual(res[2][0], 2)
389+
self.assertEqual(res[2][1], 1)
390+
391+
range_udf = udf(lambda value: list(range(value - 1, value + 1)), ArrayType(IntegerType()))
392+
res = df.select("id", explode(range_udf(df.id))).collect()
393+
self.assertEqual(res[0][0], 0)
394+
self.assertEqual(res[0][1], -1)
395+
self.assertEqual(res[1][0], 0)
396+
self.assertEqual(res[1][1], 0)
397+
self.assertEqual(res[2][0], 1)
398+
self.assertEqual(res[2][1], 0)
399+
self.assertEqual(res[3][0], 1)
400+
self.assertEqual(res[3][1], 1)
401+
382402
def test_udf_with_order_by_and_limit(self):
383403
from pyspark.sql.functions import udf
384404
my_copy = udf(lambda x: x, IntegerType())

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,13 @@ case class Generate(
9191

9292
override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
9393

94-
def output: Seq[Attribute] = {
95-
val qualified = qualifier.map(q =>
96-
// prepend the new qualifier to the existed one
97-
generatorOutput.map(a => a.withQualifier(Some(q)))
98-
).getOrElse(generatorOutput)
94+
val qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q =>
95+
// prepend the new qualifier to the existed one
96+
generatorOutput.map(a => a.withQualifier(Some(q)))
97+
}.getOrElse(generatorOutput)
9998

100-
if (join) child.output ++ qualified else qualified
99+
def output: Seq[Attribute] = {
100+
if (join) child.output ++ qualifiedGeneratorOutput else qualifiedGeneratorOutput
101101
}
102102
}
103103

sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,26 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
4444
* it.
4545
* @param outer when true, each input row will be output at least once, even if the output of the
4646
* given `generator` is empty. `outer` has no effect when `join` is false.
47-
* @param output the output attributes of this node, which constructed in analysis phase,
48-
* and we can not change it, as the parent node bound with it already.
47+
* @param generatorOutput the qualified output attributes of the generator of this node, which
48+
* constructed in analysis phase, and we can not change it, as the
49+
* parent node bound with it already.
4950
*/
5051
case class GenerateExec(
5152
generator: Generator,
5253
join: Boolean,
5354
outer: Boolean,
54-
output: Seq[Attribute],
55+
generatorOutput: Seq[Attribute],
5556
child: SparkPlan)
5657
extends UnaryExecNode {
5758

59+
override def output: Seq[Attribute] = {
60+
if (join) {
61+
child.output ++ generatorOutput
62+
} else {
63+
generatorOutput
64+
}
65+
}
66+
5867
override lazy val metrics = Map(
5968
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
6069

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
399399
execution.UnionExec(unionChildren.map(planLater)) :: Nil
400400
case g @ logical.Generate(generator, join, outer, _, _, child) =>
401401
execution.GenerateExec(
402-
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
402+
generator, join = join, outer = outer, g.qualifiedGeneratorOutput,
403+
planLater(child)) :: Nil
403404
case logical.OneRowRelation =>
404405
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
405406
case r : logical.Range =>

0 commit comments

Comments
 (0)