Skip to content

Commit ba26cd1

Browse files
committed
Created seperate parser for hql.It pre parses the commands like
cache,uncache,add jar etc.. and then parses with HiveQl
1 parent dab1b0a commit ba26cd1

File tree

6 files changed

+153
-6
lines changed

6 files changed

+153
-6
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
7575
*/
7676
class HiveContext(sc: SparkContext) extends SQLContext(sc) {
7777
self =>
78+
79+
@transient
80+
protected[sql] val hiveParser = new HiveSqlParser
7881

7982
// Change the default SQL dialect to HiveQL
8083
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
@@ -95,15 +98,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
9598
if (dialect == "sql") {
9699
super.sql(sqlText)
97100
} else if (dialect == "hiveql") {
98-
new SchemaRDD(this, HiveQl.parseSql(sqlText))
101+
new SchemaRDD(this, hiveParser(sqlText))
99102
} else {
100103
sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'")
101104
}
102105
}
103106

104107
@deprecated("hiveql() is deprecated as the sql function now parses using HiveQL by default. " +
105108
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
106-
def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
109+
def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, hiveParser(hqlQuery))
107110

108111
@deprecated("hql() is deprecated as the sql function now parses using HiveQL by default. " +
109112
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import scala.language.implicitConversions
21+
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
22+
import scala.util.parsing.combinator.PackratParsers
23+
import scala.util.parsing.input.CharArrayReader.EofCh
24+
import org.apache.spark.sql.catalyst.plans.logical._
25+
import org.apache.spark.sql.catalyst.SqlLexical
26+
import scala.util.parsing.combinator.lexical.StdLexical
27+
28+
/**
29+
* A simple Hive SQL pre parser. It parses the commands like cache,uncache etc and
30+
* remaining actual query will be parsed by HiveQl.parseSql
31+
*/
32+
class HiveSqlParser extends StandardTokenParsers with PackratParsers {
33+
34+
def apply(input: String): LogicalPlan = {
35+
// Special-case out set commands since the value fields can be
36+
// complex to handle without RegexParsers. Also this approach
37+
// is clearer for the several possible cases of set commands.
38+
if (input.trim.toLowerCase.startsWith("set")) {
39+
input.trim.drop(3).split("=", 2).map(_.trim) match {
40+
case Array("") => // "set"
41+
SetCommand(None, None)
42+
case Array(key) => // "set key"
43+
SetCommand(Some(key), None)
44+
case Array(key, value) => // "set key=value"
45+
SetCommand(Some(key), Some(value))
46+
}
47+
} else if (input.trim.startsWith("!")) {
48+
ShellCommand(input.drop(1))
49+
} else {
50+
phrase(query)(new lexical.Scanner(input)) match {
51+
case Success(r, x) => r
52+
case x => sys.error(x.toString)
53+
}
54+
}
55+
}
56+
57+
protected case class Keyword(str: String)
58+
59+
protected val CACHE = Keyword("CACHE")
60+
protected val SET = Keyword("SET")
61+
protected val ADD = Keyword("ADD")
62+
protected val JAR = Keyword("JAR")
63+
protected val TABLE = Keyword("TABLE")
64+
protected val AS = Keyword("AS")
65+
protected val UNCACHE = Keyword("UNCACHE")
66+
protected val FILE = Keyword("FILE")
67+
protected val DFS = Keyword("DFS")
68+
protected val SOURCE = Keyword("SOURCE")
69+
70+
protected implicit def asParser(k: Keyword): Parser[String] =
71+
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
72+
73+
protected def allCaseConverse(k: String): Parser[String] =
74+
lexical.allCaseVersions(k).map(x => x : Parser[String]).reduce(_ | _)
75+
76+
protected val reservedWords =
77+
this.getClass
78+
.getMethods
79+
.filter(_.getReturnType == classOf[Keyword])
80+
.map(_.invoke(this).asInstanceOf[Keyword].str)
81+
82+
override val lexical = new SqlLexical(reservedWords)
83+
84+
protected lazy val query: Parser[LogicalPlan] = (
85+
cache | unCache | addJar | addFile | dfs | source | hiveQl
86+
)
87+
88+
protected lazy val hiveQl: Parser[LogicalPlan] =
89+
remainingQuery ^^ {
90+
case r => HiveQl.parseSql(r.trim())
91+
}
92+
93+
/** It returns all remaining query */
94+
protected lazy val remainingQuery: Parser[String] = new Parser[String] {
95+
def apply(in:Input) = Success(in.source.subSequence(in.offset, in.source.length).toString,
96+
in.drop(in.source.length()))
97+
}
98+
99+
/** It returns all query */
100+
protected lazy val allQuery: Parser[String] = new Parser[String] {
101+
def apply(in:Input) = Success(in.source.toString,
102+
in.drop(in.source.length()))
103+
}
104+
105+
protected lazy val cache: Parser[LogicalPlan] =
106+
CACHE ~ TABLE ~> ident ~ opt(AS ~> hiveQl) ^^ {
107+
case tableName ~ None => CacheCommand(tableName, true)
108+
case tableName ~ Some(plan) =>
109+
CacheTableAsSelectCommand(tableName, plan)
110+
}
111+
112+
protected lazy val unCache: Parser[LogicalPlan] =
113+
UNCACHE ~ TABLE ~> ident ^^ {
114+
case tableName => CacheCommand(tableName, false)
115+
}
116+
117+
protected lazy val addJar: Parser[LogicalPlan] =
118+
ADD ~ JAR ~> remainingQuery ^^ {
119+
case rq => AddJar(rq.trim())
120+
}
121+
122+
protected lazy val addFile: Parser[LogicalPlan] =
123+
ADD ~ FILE ~> remainingQuery ^^ {
124+
case rq => AddFile(rq.trim())
125+
}
126+
127+
protected lazy val dfs: Parser[LogicalPlan] =
128+
DFS ~> allQuery ^^ {
129+
case aq => NativeCommand(aq.trim())
130+
}
131+
132+
protected lazy val source: Parser[LogicalPlan] =
133+
SOURCE ~> remainingQuery ^^ {
134+
case rq => SourceCommand(rq.trim())
135+
}
136+
137+
}
138+

sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
140140
val describedTable = "DESCRIBE (\\w+)".r
141141

142142
protected[hive] class HiveQLQueryExecution(hql: String) extends this.QueryExecution {
143-
lazy val logical = HiveQl.parseSql(hql)
143+
lazy val logical = hiveParser(hql)
144144
def hiveExec() = runSqlHive(hql)
145145
override def toString = hql + "\n" + super.toString
146146
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa
3434
if (sqlContext.dialect == "sql") {
3535
super.sql(sqlText)
3636
} else if (sqlContext.dialect == "hiveql") {
37-
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText))
37+
new JavaSchemaRDD(sqlContext, sqlContext.hiveParser(sqlText))
3838
} else {
3939
sys.error(s"Unsupported SQL dialect: ${sqlContext.dialect}. Try 'sql' or 'hiveql'")
4040
}
@@ -45,5 +45,5 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa
4545
*/
4646
@Deprecated
4747
def hql(hqlQuery: String): JavaSchemaRDD =
48-
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
48+
new JavaSchemaRDD(sqlContext, sqlContext.hiveParser(hqlQuery))
4949
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,10 @@ class CachedTableSuite extends HiveComparisonTest {
8888
}
8989
assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
9090
}
91+
92+
test("'CACHE TABLE tableName AS SELECT ..'") {
93+
TestHive.sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
94+
assert(TestHive.isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
95+
TestHive.uncacheTable("testCacheTable")
96+
}
9197
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
3434

3535
test("parse analyze commands") {
3636
def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
37-
val parsed = HiveQl.parseSql(analyzeCommand)
37+
val parsed = new HiveSqlParser().apply(analyzeCommand)
3838
val operators = parsed.collect {
3939
case a: AnalyzeTable => a
4040
case o => o

0 commit comments

Comments
 (0)