Skip to content

Commit a095df3

Browse files
committed
Move rule out to its own file.
1 parent c702e3e commit a095df3

File tree

2 files changed

+84
-58
lines changed

2 files changed

+84
-58
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class Analyzer(
119119
WindowsSubstitution,
120120
EliminateUnions,
121121
new SubstituteUnresolvedOrdinals(conf),
122-
SubstituteHints),
122+
new SubstituteHints(conf)),
123123
Batch("Resolution", fixedPoint,
124124
ResolveTableValuedFunctions ::
125125
ResolveRelations ::
@@ -2088,63 +2088,6 @@ class Analyzer(
20882088
}
20892089
}
20902090

2091-
/**
2092-
* Substitute Hints.
2093-
* - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters.
2094-
*
2095-
* This rule substitutes `UnresolvedRelation`s in `Substitute` batch before `ResolveRelations`
2096-
* rule is applied. Here are two reasons.
2097-
* - To support `MetastoreRelation` in Hive module.
2098-
* - To reduce the effect of `Hint` on the other rules.
2099-
*
2100-
* After this rule, it is guaranteed that there exists no unknown `Hint` in the plan.
2101-
* All new `Hint`s should be transformed into concrete Hint classes `BroadcastHint` here.
2102-
*/
2103-
object SubstituteHints extends Rule[LogicalPlan] {
2104-
val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
2105-
2106-
import scala.collection.mutable.Set
2107-
private def appendAllDescendant(set: Set[LogicalPlan], plan: LogicalPlan): Unit = {
2108-
set += plan
2109-
plan.children.foreach { child => appendAllDescendant(set, child) }
2110-
}
2111-
2112-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
2113-
case logical: LogicalPlan => logical transformDown {
2114-
case h @ Hint(name, parameters, child) if BROADCAST_HINT_NAMES.contains(name.toUpperCase) =>
2115-
var resolvedChild = child
2116-
for (table <- parameters) {
2117-
var stop = false
2118-
val skipNodeSet = scala.collection.mutable.Set.empty[LogicalPlan]
2119-
resolvedChild = resolvedChild.transformDown {
2120-
case n if skipNodeSet.contains(n) =>
2121-
skipNodeSet -= n
2122-
n
2123-
case p @ Project(_, _) if p != resolvedChild =>
2124-
appendAllDescendant(skipNodeSet, p)
2125-
skipNodeSet -= p
2126-
p
2127-
case r @ BroadcastHint(UnresolvedRelation(t, _))
2128-
if !stop && resolver(t.table, table) =>
2129-
stop = true
2130-
r
2131-
case r @ UnresolvedRelation(t, alias) if !stop && resolver(t.table, table) =>
2132-
stop = true
2133-
if (alias.isDefined) {
2134-
SubqueryAlias(alias.get, BroadcastHint(r.copy(alias = None)), None)
2135-
} else {
2136-
BroadcastHint(r)
2137-
}
2138-
}
2139-
}
2140-
resolvedChild
2141-
2142-
// Remove unrecognized hints
2143-
case Hint(name, _, child) => child
2144-
}
2145-
}
2146-
}
2147-
21482091
/**
21492092
* Check and add proper window frames for all window functions.
21502093
*/
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import scala.collection.{immutable, mutable}
21+
22+
import org.apache.spark.sql.catalyst.CatalystConf
23+
import org.apache.spark.sql.catalyst.plans.logical._
24+
import org.apache.spark.sql.catalyst.rules.Rule
25+
26+
27+
/**
28+
* Substitute Hints.
29+
* - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters.
30+
*
31+
* This rule substitutes `UnresolvedRelation`s in `Substitute` batch before `ResolveRelations`
32+
* rule is applied. Here are two reasons.
33+
* - To support `MetastoreRelation` in Hive module.
34+
* - To reduce the effect of `Hint` on the other rules.
35+
*
36+
* After this rule, it is guaranteed that there exists no unknown `Hint` in the plan.
37+
* All new `Hint`s should be transformed into concrete Hint classes `BroadcastHint` here.
38+
*/
39+
class SubstituteHints(conf: CatalystConf) extends Rule[LogicalPlan] {
40+
private val BROADCAST_HINT_NAMES = immutable.Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
41+
42+
def resolver: Resolver = conf.resolver
43+
44+
private def appendAllDescendant(set: mutable.Set[LogicalPlan], plan: LogicalPlan): Unit = {
45+
set += plan
46+
plan.children.foreach { child => appendAllDescendant(set, child) }
47+
}
48+
49+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
50+
case logical: LogicalPlan => logical transformDown {
51+
case h @ Hint(name, parameters, child) if BROADCAST_HINT_NAMES.contains(name.toUpperCase) =>
52+
var resolvedChild = child
53+
for (table <- parameters) {
54+
var stop = false
55+
val skipNodeSet = scala.collection.mutable.Set.empty[LogicalPlan]
56+
resolvedChild = resolvedChild.transformDown {
57+
case n if skipNodeSet.contains(n) =>
58+
skipNodeSet -= n
59+
n
60+
case p @ Project(_, _) if p != resolvedChild =>
61+
appendAllDescendant(skipNodeSet, p)
62+
skipNodeSet -= p
63+
p
64+
case r @ BroadcastHint(UnresolvedRelation(t, _))
65+
if !stop && resolver(t.table, table) =>
66+
stop = true
67+
r
68+
case r @ UnresolvedRelation(t, alias) if !stop && resolver(t.table, table) =>
69+
stop = true
70+
if (alias.isDefined) {
71+
SubqueryAlias(alias.get, BroadcastHint(r.copy(alias = None)), None)
72+
} else {
73+
BroadcastHint(r)
74+
}
75+
}
76+
}
77+
resolvedChild
78+
79+
// Remove unrecognized hints
80+
case Hint(name, _, child) => child
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)