Skip to content

Commit 51a73d5

Browse files
committed
Separate rule.
1 parent 617f8a2 commit 51a73d5

File tree

3 files changed

+72
-45
lines changed

3 files changed

+72
-45
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,10 @@ class Analyzer(
114114
val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
115115

116116
lazy val batches: Seq[Batch] = Seq(
117+
Batch("Hints", fixedPoint,
118+
new SubstituteHints.SubstituteBroadcastHints(conf),
119+
SubstituteHints.RemoveAllHints),
117120
Batch("Substitution", fixedPoint,
118-
new SubstituteHints(conf),
119121
CTESubstitution,
120122
WindowsSubstitution,
121123
EliminateUnions,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -24,62 +24,80 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin
2424

2525

2626
/**
27-
* Substitute Hints.
28-
* - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters.
27+
* Collection of rules related to hints. The only hint currently available is broadcast join hint.
2928
*
30-
* In the case of broadcast hint, we find the frontier of
31-
*
32-
* This rule substitutes `UnresolvedRelation`s in `Substitute` batch before `ResolveRelations`
33-
* rule is applied. Here are two reasons.
34-
* - To support `MetastoreRelation` in Hive module.
35-
* - To reduce the effect of `Hint` on the other rules.
36-
*
37-
* After this rule, it is guaranteed that there exists no unknown `Hint` in the plan.
38-
* All new `Hint`s should be transformed into concrete Hint classes `BroadcastHint` here.
29+
* Note that this is separatedly into two rules because in the future we might introduce new hint
30+
* rules that have different ordering requirements from broadcast.
3931
*/
40-
class SubstituteHints(conf: CatalystConf) extends Rule[LogicalPlan] {
41-
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
32+
object SubstituteHints {
33+
34+
/**
35+
* Substitute Hints.
36+
*
37+
* The only hint currently available is broadcast join hint.
38+
*
39+
* For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
40+
* relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
41+
* on top of any relation (that is not aliased differently), subquery, or common table expression
42+
* that match the specified name.
43+
*
44+
* The hint resolution works by recursively traversing down the query plan to find a relation or
45+
* subquery that matches one of the specified broadcast aliases. The traversal does not go past
46+
* beyond any existing broadcast hints, subquery aliases.
47+
*
48+
* This rule must happen before common table expressions.
49+
*/
50+
class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
51+
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
4252

43-
def resolver: Resolver = conf.resolver
53+
def resolver: Resolver = conf.resolver
4454

45-
private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
46-
// Whether to continue recursing down the tree
47-
var recurse = true
55+
private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
56+
// Whether to continue recursing down the tree
57+
var recurse = true
4858

49-
val newNode = CurrentOrigin.withOrigin(plan.origin) {
50-
plan match {
51-
case r: UnresolvedRelation =>
52-
val alias = r.alias.getOrElse(r.tableIdentifier.table)
53-
if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
54-
case r: SubqueryAlias =>
55-
if (toBroadcast.exists(resolver(_, r.alias))) {
56-
BroadcastHint(plan)
57-
} else {
58-
// Don't recurse down subquery aliases if there are no match.
59+
val newNode = CurrentOrigin.withOrigin(plan.origin) {
60+
plan match {
61+
case r: UnresolvedRelation =>
62+
val alias = r.alias.getOrElse(r.tableIdentifier.table)
63+
if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
64+
case r: SubqueryAlias =>
65+
if (toBroadcast.exists(resolver(_, r.alias))) {
66+
BroadcastHint(plan)
67+
} else {
68+
// Don't recurse down subquery aliases if there are no match.
69+
recurse = false
70+
plan
71+
}
72+
case _: BroadcastHint =>
73+
// Found a broadcast hint; don't change the plan but also don't recurse down.
5974
recurse = false
6075
plan
61-
}
62-
case _: BroadcastHint =>
63-
// Found a broadcast hint; don't change the plan but also don't recurse down.
64-
recurse = false
65-
plan
66-
case _ =>
67-
plan
76+
case _ =>
77+
plan
78+
}
79+
}
80+
81+
if ((plan fastEquals newNode) && recurse) {
82+
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
83+
} else {
84+
newNode
6885
}
6986
}
7087

71-
if ((plan fastEquals newNode) && recurse) {
72-
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
73-
} else {
74-
newNode
88+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
89+
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
90+
applyBroadcastHint(h.child, h.parameters.toSet)
7591
}
7692
}
7793

78-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
79-
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
80-
applyBroadcastHint(h.child, h.parameters.toSet)
81-
82-
// Remove unrecognized hints
83-
case h: Hint => h.child
94+
/**
95+
* Removes all the hints. This must be executed after all the other hint rules are executed.
96+
*/
97+
object RemoveAllHints extends Rule[LogicalPlan] {
98+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
99+
case h: Hint => h.child
100+
}
84101
}
102+
85103
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ import org.apache.spark.sql.catalyst.plans.logical._
2727
class SubstituteHintsSuite extends AnalysisTest {
2828
import org.apache.spark.sql.catalyst.analysis.TestRelations._
2929

30+
test("invalid hints should be ignored") {
31+
checkAnalysis(
32+
Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
33+
testRelation,
34+
caseSensitive = false)
35+
}
36+
3037
test("case-sensitive or insensitive parameters") {
3138
checkAnalysis(
3239
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),

0 commit comments

Comments
 (0)