Skip to content

Commit bafd290

Browse files
rxincmonkey
authored andcommitted
[SPARK-16475][SQL] Broadcast hint for SQL Queries
## What changes were proposed in this pull request? This pull request introduces a simple hint infrastructure to SQL and implements broadcast join hint using the infrastructure. The hint syntax looks like the following: ``` SELECT /*+ BROADCAST(t) */ * FROM t ``` For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of relation aliases can be specified in the hint. A broadcast hint plan node will be inserted on top of any relation (that is not aliased differently), subquery, or common table expression that match the specified name. The hint resolution works by recursively traversing down the query plan to find a relation or subquery that matches one of the specified broadcast aliases. The traversal does not go past beyond any existing broadcast hints, subquery aliases. This rule happens before common table expressions. Note that there was an earlier patch in apache#14426. This is a rewrite of that patch, with different semantics and simpler test cases. ## How was this patch tested? Added a new unit test suite for the broadcast hint rule (SubstituteHintsSuite) and new test cases for parser change (in PlanParserSuite). Also added end-to-end test case in BroadcastSuite. Author: Reynold Xin <[email protected]> Author: Dongjoon Hyun <[email protected]> Closes apache#16925 from rxin/SPARK-16475-broadcast-hint.
1 parent 2bb78f4 commit bafd290

File tree

10 files changed

+340
-4
lines changed

10 files changed

+340
-4
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ querySpecification
365365
(RECORDREADER recordReader=STRING)?
366366
fromClause?
367367
(WHERE where=booleanExpression)?)
368-
| ((kind=SELECT setQuantifier? namedExpressionSeq fromClause?
368+
| ((kind=SELECT hint? setQuantifier? namedExpressionSeq fromClause?
369369
| fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?)
370370
lateralView*
371371
(WHERE where=booleanExpression)?
@@ -374,6 +374,16 @@ querySpecification
374374
windows?)
375375
;
376376

377+
hint
378+
: '/*+' hintStatement '*/'
379+
;
380+
381+
hintStatement
382+
: hintName=identifier
383+
| hintName=identifier '(' parameters+=identifier parameters+=identifier ')'
384+
| hintName=identifier '(' parameters+=identifier (',' parameters+=identifier)* ')'
385+
;
386+
377387
fromClause
378388
: FROM relation (',' relation)* lateralView*
379389
;
@@ -1002,8 +1012,12 @@ SIMPLE_COMMENT
10021012
: '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
10031013
;
10041014

1015+
BRACKETED_EMPTY_COMMENT
1016+
: '/**/' -> channel(HIDDEN)
1017+
;
1018+
10051019
BRACKETED_COMMENT
1006-
: '/*' .*? '*/' -> channel(HIDDEN)
1020+
: '/*' ~[+] .*? '*/' -> channel(HIDDEN)
10071021
;
10081022

10091023
WS

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ class Analyzer(
114114
val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
115115

116116
lazy val batches: Seq[Batch] = Seq(
117+
Batch("Hints", fixedPoint,
118+
new SubstituteHints.SubstituteBroadcastHints(conf),
119+
SubstituteHints.RemoveAllHints),
117120
Batch("Substitution", fixedPoint,
118121
CTESubstitution,
119122
WindowsSubstitution,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,10 @@ trait CheckAnalysis extends PredicateHelper {
387387
|in operator ${operator.simpleString}
388388
""".stripMargin)
389389

390+
case _: Hint =>
391+
throw new IllegalStateException(
392+
"Internal error: logical hint operator should have been removed during analysis")
393+
390394
case _ => // Analysis successful!
391395
}
392396
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.CatalystConf
21+
import org.apache.spark.sql.catalyst.plans.logical._
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
24+
25+
26+
/**
27+
* Collection of rules related to hints. The only hint currently available is broadcast join hint.
28+
*
29+
* Note that this is separatedly into two rules because in the future we might introduce new hint
30+
* rules that have different ordering requirements from broadcast.
31+
*/
32+
object SubstituteHints {
33+
34+
/**
35+
* Substitute Hints.
36+
*
37+
* The only hint currently available is broadcast join hint.
38+
*
39+
* For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
40+
* relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
41+
* on top of any relation (that is not aliased differently), subquery, or common table expression
42+
* that match the specified name.
43+
*
44+
* The hint resolution works by recursively traversing down the query plan to find a relation or
45+
* subquery that matches one of the specified broadcast aliases. The traversal does not go past
46+
* beyond any existing broadcast hints, subquery aliases.
47+
*
48+
* This rule must happen before common table expressions.
49+
*/
50+
class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
51+
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
52+
53+
def resolver: Resolver = conf.resolver
54+
55+
private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
56+
// Whether to continue recursing down the tree
57+
var recurse = true
58+
59+
val newNode = CurrentOrigin.withOrigin(plan.origin) {
60+
plan match {
61+
case r: UnresolvedRelation =>
62+
val alias = r.alias.getOrElse(r.tableIdentifier.table)
63+
if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
64+
case r: SubqueryAlias =>
65+
if (toBroadcast.exists(resolver(_, r.alias))) {
66+
BroadcastHint(plan)
67+
} else {
68+
// Don't recurse down subquery aliases if there are no match.
69+
recurse = false
70+
plan
71+
}
72+
case _: BroadcastHint =>
73+
// Found a broadcast hint; don't change the plan but also don't recurse down.
74+
recurse = false
75+
plan
76+
case _ =>
77+
plan
78+
}
79+
}
80+
81+
if ((plan fastEquals newNode) && recurse) {
82+
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
83+
} else {
84+
newNode
85+
}
86+
}
87+
88+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
89+
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
90+
applyBroadcastHint(h.child, h.parameters.toSet)
91+
}
92+
}
93+
94+
/**
95+
* Removes all the hints, used to remove invalid hints provided by the user.
96+
* This must be executed after all the other hint rules are executed.
97+
*/
98+
object RemoveAllHints extends Rule[LogicalPlan] {
99+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
100+
case h: Hint => h.child
101+
}
102+
}
103+
104+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
380380
}
381381

382382
// Window
383-
withDistinct.optionalMap(windows)(withWindows)
383+
val withWindow = withDistinct.optionalMap(windows)(withWindows)
384+
385+
// Hint
386+
withWindow.optionalMap(hint)(withHints)
384387
}
385388
}
386389

@@ -505,6 +508,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
505508
}
506509
}
507510

511+
/**
512+
* Add a Hint to a logical plan.
513+
*/
514+
private def withHints(
515+
ctx: HintContext,
516+
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
517+
val stmt = ctx.hintStatement
518+
Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
519+
}
520+
508521
/**
509522
* Add a [[Generate]] (Lateral View) to a logical plan.
510523
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,15 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
362362
super.computeStats(conf).copy(isBroadcastable = true)
363363
}
364364

365+
/**
366+
* A general hint for the child. This node will be eliminated post analysis.
367+
* A pair of (name, parameters).
368+
*/
369+
case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode {
370+
override lazy val resolved: Boolean = false
371+
override def output: Seq[Attribute] = child.output
372+
}
373+
365374
/**
366375
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
367376
* concrete implementations during analysis.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ trait AnalysisTest extends PlanTest {
3232
val conf = new SimpleCatalystConf(caseSensitive)
3333
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
3434
catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
35+
catalog.createTempView("TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
3536
new Analyzer(catalog, conf) {
3637
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
3738
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.TableIdentifier
21+
import org.apache.spark.sql.catalyst.dsl.expressions._
22+
import org.apache.spark.sql.catalyst.dsl.plans._
23+
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
24+
import org.apache.spark.sql.catalyst.plans.Inner
25+
import org.apache.spark.sql.catalyst.plans.logical._
26+
27+
class SubstituteHintsSuite extends AnalysisTest {
28+
import org.apache.spark.sql.catalyst.analysis.TestRelations._
29+
30+
test("invalid hints should be ignored") {
31+
checkAnalysis(
32+
Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
33+
testRelation,
34+
caseSensitive = false)
35+
}
36+
37+
test("case-sensitive or insensitive parameters") {
38+
checkAnalysis(
39+
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
40+
BroadcastHint(testRelation),
41+
caseSensitive = false)
42+
43+
checkAnalysis(
44+
Hint("MAPJOIN", Seq("table"), table("TaBlE")),
45+
BroadcastHint(testRelation),
46+
caseSensitive = false)
47+
48+
checkAnalysis(
49+
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
50+
BroadcastHint(testRelation),
51+
caseSensitive = true)
52+
53+
checkAnalysis(
54+
Hint("MAPJOIN", Seq("table"), table("TaBlE")),
55+
testRelation,
56+
caseSensitive = true)
57+
}
58+
59+
test("multiple broadcast hint aliases") {
60+
checkAnalysis(
61+
Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
62+
Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None),
63+
caseSensitive = false)
64+
}
65+
66+
test("do not traverse past existing broadcast hints") {
67+
checkAnalysis(
68+
Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))),
69+
BroadcastHint(testRelation.where('a > 1)).analyze,
70+
caseSensitive = false)
71+
}
72+
73+
test("should work for subqueries") {
74+
checkAnalysis(
75+
Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
76+
BroadcastHint(testRelation),
77+
caseSensitive = false)
78+
79+
checkAnalysis(
80+
Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
81+
BroadcastHint(testRelation),
82+
caseSensitive = false)
83+
84+
// Negative case: if the alias doesn't match, don't match the original table name.
85+
checkAnalysis(
86+
Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
87+
testRelation,
88+
caseSensitive = false)
89+
}
90+
91+
test("do not traverse past subquery alias") {
92+
checkAnalysis(
93+
Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
94+
testRelation.where('a > 1).analyze,
95+
caseSensitive = false)
96+
}
97+
98+
test("should work for CTE") {
99+
checkAnalysis(
100+
CatalystSqlParser.parsePlan(
101+
"""
102+
|WITH ctetable AS (SELECT * FROM table WHERE a > 1)
103+
|SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
104+
""".stripMargin
105+
),
106+
BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
107+
caseSensitive = false)
108+
}
109+
110+
test("should not traverse down CTE") {
111+
checkAnalysis(
112+
CatalystSqlParser.parsePlan(
113+
"""
114+
|WITH ctetable AS (SELECT * FROM table WHERE a > 1)
115+
|SELECT /*+ BROADCAST(table) */ * FROM ctetable
116+
""".stripMargin
117+
),
118+
testRelation.where('a > 1).select('a).select('a).analyze,
119+
caseSensitive = false)
120+
}
121+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,4 +493,46 @@ class PlanParserSuite extends PlanTest {
493493
assertEqual("select a, b from db.c where x !> 1",
494494
table("db", "c").where('x <= 1).select('a, 'b))
495495
}
496+
497+
test("select hint syntax") {
498+
// Hive compatibility: Missing parameter raises ParseException.
499+
val m = intercept[ParseException] {
500+
parsePlan("SELECT /*+ HINT() */ * FROM t")
501+
}.getMessage
502+
assert(m.contains("no viable alternative at input"))
503+
504+
// Hive compatibility: No database.
505+
val m2 = intercept[ParseException] {
506+
parsePlan("SELECT /*+ MAPJOIN(default.t) */ * from default.t")
507+
}.getMessage
508+
assert(m2.contains("no viable alternative at input"))
509+
510+
comparePlans(
511+
parsePlan("SELECT /*+ HINT */ * FROM t"),
512+
Hint("HINT", Seq.empty, table("t").select(star())))
513+
514+
comparePlans(
515+
parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"),
516+
Hint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
517+
518+
comparePlans(
519+
parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"),
520+
Hint("MAPJOIN", Seq("u"), table("t").select(star())))
521+
522+
comparePlans(
523+
parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"),
524+
Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
525+
526+
comparePlans(
527+
parsePlan("SELECT /*+ INDEX(t emp_job_ix) */ * FROM t"),
528+
Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
529+
530+
comparePlans(
531+
parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"),
532+
Hint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
533+
534+
comparePlans(
535+
parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"),
536+
Hint("MAPJOIN", Seq("t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
537+
}
496538
}

0 commit comments

Comments
 (0)