Skip to content

Commit d20c646

Browse files
committed
[SPARK-20857][SQL] Generic resolved hint node
## What changes were proposed in this pull request? This patch renames BroadcastHint to ResolvedHint (and Hint to UnresolvedHint) so the hint framework is more generic and would allow us to introduce other hint types in the future without introducing new hint nodes. ## How was this patch tested? Updated test cases. Author: Reynold Xin <[email protected]> Closes #18072 from rxin/SPARK-20857. (cherry picked from commit 0d589ba) Signed-off-by: Reynold Xin <[email protected]>
1 parent dbb068f commit d20c646

File tree

20 files changed

+118
-80
lines changed

20 files changed

+118
-80
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,7 @@ class Analyzer(
13111311

13121312
// Category 1:
13131313
// BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
1314-
case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
1314+
case _: ResolvedHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
13151315

13161316
// Category 2:
13171317
// These operators can be anywhere in a correlated subquery.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ trait CheckAnalysis extends PredicateHelper {
399399
|in operator ${operator.simpleString}
400400
""".stripMargin)
401401

402-
case _: Hint =>
402+
case _: UnresolvedHint =>
403403
throw new IllegalStateException(
404404
"Internal error: logical hint operator should have been removed during analysis")
405405

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ object ResolveHints {
5757
val newNode = CurrentOrigin.withOrigin(plan.origin) {
5858
plan match {
5959
case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
60-
BroadcastHint(plan)
60+
ResolvedHint(plan, isBroadcastable = Option(true))
6161
case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
62-
BroadcastHint(plan)
62+
ResolvedHint(plan, isBroadcastable = Option(true))
6363

64-
case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
64+
case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
6565
// Don't traverse down these nodes.
6666
// For an existing broadcast hint, there is no point going down (if we do, we either
6767
// won't change the structure, or will introduce another broadcast hint that is useless.
@@ -85,10 +85,10 @@ object ResolveHints {
8585
}
8686

8787
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
88-
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
88+
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
8989
if (h.parameters.isEmpty) {
9090
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
91-
BroadcastHint(h.child)
91+
ResolvedHint(h.child, isBroadcastable = Option(true))
9292
} else {
9393
// Otherwise, find within the subtree query plans that should be broadcasted.
9494
applyBroadcastHint(h.child, h.parameters.toSet)
@@ -102,7 +102,7 @@ object ResolveHints {
102102
*/
103103
object RemoveAllHints extends Rule[LogicalPlan] {
104104
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
105-
case h: Hint => h.child
105+
case h: UnresolvedHint => h.child
106106
}
107107
}
108108

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
861861
// Note that some operators (e.g. project, aggregate, union) are being handled separately
862862
// (earlier in this rule).
863863
case _: AppendColumns => true
864-
case _: BroadcastHint => true
864+
case _: ResolvedHint => true
865865
case _: Distinct => true
866866
case _: Generate => true
867867
case _: Pivot => true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
477477
case _: Distinct => true
478478
case _: AppendColumns => true
479479
case _: AppendColumnsWithObject => true
480-
case _: BroadcastHint => true
480+
case _: ResolvedHint => true
481481
case _: RepartitionByExpression => true
482482
case _: Repartition => true
483483
case _: Sort => true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,13 +532,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
532532
}
533533

534534
/**
535-
* Add a [[Hint]] to a logical plan.
535+
* Add a [[UnresolvedHint]] to a logical plan.
536536
*/
537537
private def withHints(
538538
ctx: HintContext,
539539
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
540540
val stmt = ctx.hintStatement
541-
Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
541+
UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
542542
}
543543

544544
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ object PhysicalOperation extends PredicateHelper {
6565
val substitutedCondition = substitute(aliases)(condition)
6666
(fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
6767

68-
case BroadcastHint(child) =>
69-
collectProjectsAndFilters(child)
68+
case h: ResolvedHint =>
69+
collectProjectsAndFilters(h.child)
7070

7171
case other =>
7272
(None, Nil, other, Map.empty)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ case class Statistics(
6868
s"isBroadcastable=$isBroadcastable"
6969
).filter(_.nonEmpty).mkString(", ")
7070
}
71+
72+
/** Must be called when computing stats for a join operator to reset hints. */
73+
def resetHintsForJoin(): Statistics = copy(
74+
isBroadcastable = false
75+
)
7176
}
7277

7378

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ case class Join(
364364
case _ =>
365365
// Make sure we don't propagate isBroadcastable in other joins, because
366366
// they could explode the size.
367-
super.computeStats(conf).copy(isBroadcastable = false)
367+
super.computeStats(conf).resetHintsForJoin()
368368
}
369369

370370
if (conf.cboEnabled) {
@@ -375,26 +375,6 @@ case class Join(
375375
}
376376
}
377377

378-
/**
379-
* A hint for the optimizer that we should broadcast the `child` if used in a join operator.
380-
*/
381-
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
382-
override def output: Seq[Attribute] = child.output
383-
384-
// set isBroadcastable to true so the child will be broadcasted
385-
override def computeStats(conf: SQLConf): Statistics =
386-
child.stats(conf).copy(isBroadcastable = true)
387-
}
388-
389-
/**
390-
* A general hint for the child. This node will be eliminated post analysis.
391-
* A pair of (name, parameters).
392-
*/
393-
case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode {
394-
override lazy val resolved: Boolean = false
395-
override def output: Seq[Attribute] = child.output
396-
}
397-
398378
/**
399379
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
400380
* concrete implementations during analysis.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.internal.SQLConf
22+
23+
/**
24+
* A general hint for the child that is not yet resolved. This node is generated by the parser and
25+
* should be removed This node will be eliminated post analysis.
26+
* A pair of (name, parameters).
27+
*/
28+
case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan)
29+
extends UnaryNode {
30+
31+
override lazy val resolved: Boolean = false
32+
override def output: Seq[Attribute] = child.output
33+
}
34+
35+
/**
36+
* A resolved hint node. The analyzer should convert all [[UnresolvedHint]] into [[ResolvedHint]].
37+
*/
38+
case class ResolvedHint(
39+
child: LogicalPlan,
40+
isBroadcastable: Option[Boolean] = None)
41+
extends UnaryNode {
42+
43+
override def output: Seq[Attribute] = child.output
44+
45+
override def computeStats(conf: SQLConf): Statistics = {
46+
val stats = child.stats(conf)
47+
isBroadcastable.map(x => stats.copy(isBroadcastable = x)).getOrElse(stats)
48+
}
49+
}

0 commit comments

Comments
 (0)