Skip to content

Commit f258718

Browse files
leanken-zzviirya
authored andcommitted
[SPARK-32678][SQL] Rename EmptyHashedRelationWithAllNullKeys and simplify NAAJ generated code
### What changes were proposed in this pull request? Some Code refine. 1. rename EmptyHashedRelationWithAllNullKeys to HashedRelationWithAllNullKeys. 2. simplify generated code for BHJ NAAJ. ### Why are the changes needed? Refine code and naming to avoid confusing understanding. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test. Closes #29503 from leanken/leanken-SPARK-32678. Authored-by: xuewei.linxuewei <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent a4d785d commit f258718

File tree

4 files changed

+12
-24
lines changed

4 files changed

+12
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ package org.apache.spark.sql.execution.adaptive
2020
import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
2121
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2222
import org.apache.spark.sql.catalyst.rules.Rule
23-
import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
23+
import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys
2424

2525
/**
2626
* This optimization rule detects and convert a NAAJ to an Empty LocalRelation
27-
* when buildSide is EmptyHashedRelationWithAllNullKeys.
27+
* when buildSide is HashedRelationWithAllNullKeys.
2828
*/
2929
object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
3030

3131
private def canEliminate(plan: LogicalPlan): Boolean = plan match {
3232
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined
33-
&& stage.broadcast.relationFuture.get().value == EmptyHashedRelationWithAllNullKeys => true
33+
&& stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys => true
3434
case _ => false
3535
}
3636

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ case class BroadcastHashJoinExec(
146146
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
147147
if (hashed == EmptyHashedRelation) {
148148
streamedIter
149-
} else if (hashed == EmptyHashedRelationWithAllNullKeys) {
149+
} else if (hashed == HashedRelationWithAllNullKeys) {
150150
Iterator.empty
151151
} else {
152152
val keyGenerator = UnsafeProjection.create(
@@ -228,7 +228,6 @@ case class BroadcastHashJoinExec(
228228
if (isNullAwareAntiJoin) {
229229
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
230230
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
231-
val (matched, _, _) = getJoinCondition(ctx, input)
232231
val numOutput = metricTerm(ctx, "numOutputRows")
233232

234233
if (broadcastRelation.value == EmptyHashedRelation) {
@@ -237,26 +236,15 @@ case class BroadcastHashJoinExec(
237236
|$numOutput.add(1);
238237
|${consume(ctx, input)}
239238
""".stripMargin
240-
} else if (broadcastRelation.value == EmptyHashedRelationWithAllNullKeys) {
239+
} else if (broadcastRelation.value == HashedRelationWithAllNullKeys) {
241240
s"""
242241
|// If the right side contains any all-null key, NAAJ simply returns Nothing.
243242
""".stripMargin
244243
} else {
245-
val found = ctx.freshName("found")
246244
s"""
247-
|boolean $found = false;
248245
|// generate join key for stream side
249246
|${keyEv.code}
250-
|if ($anyNull) {
251-
| $found = true;
252-
|} else {
253-
| UnsafeRow $matched = (UnsafeRow)$relationTerm.getValue(${keyEv.value});
254-
| if ($matched != null) {
255-
| $found = true;
256-
| }
257-
|}
258-
|
259-
|if (!$found) {
247+
|if (!$anyNull && $relationTerm.getValue(${keyEv.value}) == null) {
260248
| $numOutput.add(1);
261249
| ${consume(ctx, input)}
262250
|}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ private[joins] object UnsafeHashedRelation {
472472
// scalastyle:on throwerror
473473
}
474474
} else if (isNullAware) {
475-
return EmptyHashedRelationWithAllNullKeys
475+
return HashedRelationWithAllNullKeys
476476
}
477477
}
478478

@@ -1056,7 +1056,7 @@ private[joins] object LongHashedRelation {
10561056
val key = rowKey.getLong(0)
10571057
map.append(key, unsafeRow)
10581058
} else if (isNullAware) {
1059-
return EmptyHashedRelationWithAllNullKeys
1059+
return HashedRelationWithAllNullKeys
10601060
}
10611061
}
10621062
map.optimize()
@@ -1067,7 +1067,7 @@ private[joins] object LongHashedRelation {
10671067
/**
10681068
* Common trait with dummy implementation for NAAJ special HashedRelation
10691069
* EmptyHashedRelation
1070-
* EmptyHashedRelationWithAllNullKeys
1070+
* HashedRelationWithAllNullKeys
10711071
*/
10721072
trait NullAwareHashedRelation extends HashedRelation with Externalizable {
10731073
override def get(key: InternalRow): Iterator[InternalRow] = {
@@ -1130,8 +1130,8 @@ object EmptyHashedRelation extends NullAwareHashedRelation {
11301130
* A special HashedRelation indicates it built from a non-empty input:Iterator[InternalRow],
11311131
* which contains all null columns key.
11321132
*/
1133-
object EmptyHashedRelationWithAllNullKeys extends NullAwareHashedRelation {
1134-
override def asReadOnlyCopy(): EmptyHashedRelationWithAllNullKeys.type = this
1133+
object HashedRelationWithAllNullKeys extends NullAwareHashedRelation {
1134+
override def asReadOnlyCopy(): HashedRelationWithAllNullKeys.type = this
11351135
}
11361136

11371137
/** The HashedRelationBroadcastMode requires that rows are broadcasted as a HashedRelation. */

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1168,7 +1168,7 @@ class AdaptiveQueryExecSuite
11681168
}
11691169
}
11701170

1171-
test("SPARK-32573: Eliminate NAAJ when BuildSide is EmptyHashedRelationWithAllNullKeys") {
1171+
test("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") {
11721172
withSQLConf(
11731173
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
11741174
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {

0 commit comments

Comments
 (0)