From 809e472761132d267255117e7bd8aea5f9a7040a Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Fri, 21 Aug 2020 15:17:06 +0800 Subject: [PATCH] Code refine. 1. rename EmptyHashedRelationWithAllNullKeys to HashedRelationWithAllNullKeys. 2. simplify generated code for BHJ NAAJ. Change-Id: If91057b60898c0233f93cd9363dde718e3e1e49e --- .../adaptive/EliminateNullAwareAntiJoin.scala | 6 +++--- .../joins/BroadcastHashJoinExec.scala | 18 +++--------------- .../sql/execution/joins/HashedRelation.scala | 10 +++++----- .../adaptive/AdaptiveQueryExecSuite.scala | 2 +- 4 files changed, 12 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala index 4e0247e2f4bb5..afccde09040a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala @@ -20,17 +20,17 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys +import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys /** * This optimization rule detects and convert a NAAJ to an Empty LocalRelation - * when buildSide is EmptyHashedRelationWithAllNullKeys. + * when buildSide is HashedRelationWithAllNullKeys. */ object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] { private def canEliminate(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined - && stage.broadcast.relationFuture.get().value == EmptyHashedRelationWithAllNullKeys => true + && stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 5df06f8e4d4fa..2a9e15851e9f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -146,7 +146,7 @@ case class BroadcastHashJoinExec( TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize) if (hashed == EmptyHashedRelation) { streamedIter - } else if (hashed == EmptyHashedRelationWithAllNullKeys) { + } else if (hashed == HashedRelationWithAllNullKeys) { Iterator.empty } else { val keyGenerator = UnsafeProjection.create( @@ -228,7 +228,6 @@ case class BroadcastHashJoinExec( if (isNullAwareAntiJoin) { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) - val (matched, _, _) = getJoinCondition(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") if (broadcastRelation.value == EmptyHashedRelation) { @@ -237,26 +236,15 @@ case class BroadcastHashJoinExec( |$numOutput.add(1); |${consume(ctx, input)} """.stripMargin - } else if (broadcastRelation.value == EmptyHashedRelationWithAllNullKeys) { + } else if (broadcastRelation.value == HashedRelationWithAllNullKeys) { s""" |// If the right side contains any all-null key, NAAJ simply returns Nothing. """.stripMargin } else { - val found = ctx.freshName("found") s""" - |boolean $found = false; |// generate join key for stream side |${keyEv.code} - |if ($anyNull) { - | $found = true; - |} else { - | UnsafeRow $matched = (UnsafeRow)$relationTerm.getValue(${keyEv.value}); - | if ($matched != null) { - | $found = true; - | } - |} - | - |if (!$found) { + |if (!$anyNull && $relationTerm.getValue(${keyEv.value}) == null) { | $numOutput.add(1); | ${consume(ctx, input)} |} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 183d8a3ad04d7..a2e062d880524 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -472,7 +472,7 @@ private[joins] object UnsafeHashedRelation { // scalastyle:on throwerror } } else if (isNullAware) { - return EmptyHashedRelationWithAllNullKeys + return HashedRelationWithAllNullKeys } } @@ -1056,7 +1056,7 @@ private[joins] object LongHashedRelation { val key = rowKey.getLong(0) map.append(key, unsafeRow) } else if (isNullAware) { - return EmptyHashedRelationWithAllNullKeys + return HashedRelationWithAllNullKeys } } map.optimize() @@ -1067,7 +1067,7 @@ private[joins] object LongHashedRelation { /** * Common trait with dummy implementation for NAAJ special HashedRelation * EmptyHashedRelation - * EmptyHashedRelationWithAllNullKeys + * HashedRelationWithAllNullKeys */ trait NullAwareHashedRelation extends HashedRelation with Externalizable { override def get(key: InternalRow): Iterator[InternalRow] = { @@ -1130,8 +1130,8 @@ object EmptyHashedRelation extends NullAwareHashedRelation { * A special HashedRelation indicates it built from a non-empty input:Iterator[InternalRow], * which contains all null columns key. */ -object EmptyHashedRelationWithAllNullKeys extends NullAwareHashedRelation { - override def asReadOnlyCopy(): EmptyHashedRelationWithAllNullKeys.type = this +object HashedRelationWithAllNullKeys extends NullAwareHashedRelation { + override def asReadOnlyCopy(): HashedRelationWithAllNullKeys.type = this } /** The HashedRelationBroadcastMode requires that rows are broadcasted as a HashedRelation. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index ff039ca9ad0a9..1dc239c0416f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1168,7 +1168,7 @@ class AdaptiveQueryExecSuite } } - test("SPARK-32573: Eliminate NAAJ when BuildSide is EmptyHashedRelationWithAllNullKeys") { + test("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {