Skip to content

Commit c7ef560

Browse files
bersprocketsHyukjinKwon
authored andcommitted
[SPARK-40963][SQL] Set nullable correctly in project created by ExtractGenerator
### What changes were proposed in this pull request? When creating the project list for the new projection In `ExtractGenerator`, take into account whether the generator is outer when setting nullable on generator-related output attributes. ### Why are the changes needed? This PR fixes an issue that can produce either incorrect results or a `NullPointerException`. It's a bit of an obscure issue in that I am hard-pressed to reproduce without using a subquery that has a inline table. Example: ``` select c1, explode(c4) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, null) as data(c1, c2) ) ); +---+---+ |c1 |c5 | +---+---+ |1 |1 | |1 |2 | |2 |2 | |2 |3 | |3 |0 | +---+---+ ``` In the last row, `c5` is 0, but should be `NULL`. Another example: ``` select c1, exists(c4, x -> x is null) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, array()) as data(c1, c2) ) ); +---+-----+ |c1 |c5 | +---+-----+ |1 |false| |1 |false| |2 |false| |2 |false| |3 |false| +---+-----+ ``` In the last row, `false` should be `true`. In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s nullability is incorrect because the new projection created by `ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a projection list. `generatorOutput` doesn't take into account that `explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost. `UpdateAttributeNullability` will eventually fix the nullable setting for attributes referring to `c3`, but it doesn't fix the `containsNull` setting for `c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` (from the second example). This example fails with a `NullPointerException`: ``` select c1, inline_outer(c4) from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(named_struct('a', 1, 'b', 2))), (2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))), (3, array()) as data(c1, c2) ) ); 22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 14) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes apache#38440 from bersprockets/SPARK-40963. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 90d3154) Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 0f234d9 commit c7ef560

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2826,7 +2826,7 @@ class Analyzer(override val catalogManager: CatalogManager)
28262826
generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names),
28272827
child)
28282828

2829-
(Some(g), res._2 ++ g.generatorOutput)
2829+
(Some(g), res._2 ++ g.nullableOutput)
28302830
case other =>
28312831
(res._1, res._2 :+ other)
28322832
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,17 @@ case class Generate(
143143

144144
override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
145145

146+
def nullableOutput: Seq[Attribute] = {
147+
generatorOutput.map { a =>
148+
a.withNullability(outer || a.nullable)
149+
}
150+
}
151+
146152
def qualifiedGeneratorOutput: Seq[Attribute] = {
147-
val qualifiedOutput = qualifier.map { q =>
153+
qualifier.map { q =>
148154
// prepend the new qualifier to the existed one
149-
generatorOutput.map(a => a.withQualifier(Seq(q)))
150-
}.getOrElse(generatorOutput)
151-
val nullableOutput = qualifiedOutput.map {
152-
// if outer, make all attributes nullable, otherwise keep existing nullability
153-
a => a.withNullability(outer || a.nullable)
154-
}
155-
nullableOutput
155+
nullableOutput.map(a => a.withQualifier(Seq(q)))
156+
}.getOrElse(nullableOutput)
156157
}
157158

158159
def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput

sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
425425
testNullStruct
426426
}
427427
}
428+
429+
test("SPARK-40963: generator output has correct nullability") {
430+
// This test does not check nullability directly. Before SPARK-40963,
431+
// the below query got wrong results due to incorrect nullability.
432+
val df = sql(
433+
"""select c1, explode(c4) as c5 from (
434+
| select c1, array(c3) as c4 from (
435+
| select c1, explode_outer(c2) as c3
436+
| from values
437+
| (1, array(1, 2)),
438+
| (2, array(2, 3)),
439+
| (3, null)
440+
| as data(c1, c2)
441+
| )
442+
|)
443+
|""".stripMargin)
444+
checkAnswer(df,
445+
Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil)
446+
}
428447
}
429448

430449
case class EmptyGenerator() extends Generator with LeafLike[Expression] {

0 commit comments

Comments
 (0)