Skip to content

Commit 90d3154

Browse files
bersprocketsHyukjinKwon
authored andcommitted
[SPARK-40963][SQL] Set nullable correctly in project created by ExtractGenerator
### What changes were proposed in this pull request? When creating the project list for the new projection In `ExtractGenerator`, take into account whether the generator is outer when setting nullable on generator-related output attributes. ### Why are the changes needed? This PR fixes an issue that can produce either incorrect results or a `NullPointerException`. It's a bit of an obscure issue in that I am hard-pressed to reproduce without using a subquery that has a inline table. Example: ``` select c1, explode(c4) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, null) as data(c1, c2) ) ); +---+---+ |c1 |c5 | +---+---+ |1 |1 | |1 |2 | |2 |2 | |2 |3 | |3 |0 | +---+---+ ``` In the last row, `c5` is 0, but should be `NULL`. Another example: ``` select c1, exists(c4, x -> x is null) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, array()) as data(c1, c2) ) ); +---+-----+ |c1 |c5 | +---+-----+ |1 |false| |1 |false| |2 |false| |2 |false| |3 |false| +---+-----+ ``` In the last row, `false` should be `true`. In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s nullability is incorrect because the new projection created by `ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a projection list. `generatorOutput` doesn't take into account that `explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost. `UpdateAttributeNullability` will eventually fix the nullable setting for attributes referring to `c3`, but it doesn't fix the `containsNull` setting for `c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` (from the second example). This example fails with a `NullPointerException`: ``` select c1, inline_outer(c4) from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(named_struct('a', 1, 'b', 2))), (2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))), (3, array()) as data(c1, c2) ) ); 22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 14) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes apache#38440 from bersprockets/SPARK-40963. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 406d0e2 commit 90d3154

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2910,7 +2910,7 @@ class Analyzer(override val catalogManager: CatalogManager)
29102910
generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names),
29112911
child)
29122912

2913-
(Some(g), res._2 ++ g.generatorOutput)
2913+
(Some(g), res._2 ++ g.nullableOutput)
29142914
case other =>
29152915
(res._1, res._2 :+ other)
29162916
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,16 +273,17 @@ case class Generate(
273273

274274
override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
275275

276+
def nullableOutput: Seq[Attribute] = {
277+
generatorOutput.map { a =>
278+
a.withNullability(outer || a.nullable)
279+
}
280+
}
281+
276282
def qualifiedGeneratorOutput: Seq[Attribute] = {
277-
val qualifiedOutput = qualifier.map { q =>
283+
qualifier.map { q =>
278284
// prepend the new qualifier to the existed one
279-
generatorOutput.map(a => a.withQualifier(Seq(q)))
280-
}.getOrElse(generatorOutput)
281-
val nullableOutput = qualifiedOutput.map {
282-
// if outer, make all attributes nullable, otherwise keep existing nullability
283-
a => a.withNullability(outer || a.nullable)
284-
}
285-
nullableOutput
285+
nullableOutput.map(a => a.withQualifier(Seq(q)))
286+
}.getOrElse(nullableOutput)
286287
}
287288

288289
def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput

sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
425425
testNullStruct
426426
}
427427
}
428+
429+
test("SPARK-40963: generator output has correct nullability") {
430+
// This test does not check nullability directly. Before SPARK-40963,
431+
// the below query got wrong results due to incorrect nullability.
432+
val df = sql(
433+
"""select c1, explode(c4) as c5 from (
434+
| select c1, array(c3) as c4 from (
435+
| select c1, explode_outer(c2) as c3
436+
| from values
437+
| (1, array(1, 2)),
438+
| (2, array(2, 3)),
439+
| (3, null)
440+
| as data(c1, c2)
441+
| )
442+
|)
443+
|""".stripMargin)
444+
checkAnswer(df,
445+
Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil)
446+
}
428447
}
429448

430449
case class EmptyGenerator() extends Generator with LeafLike[Expression] {

0 commit comments

Comments
 (0)